Files
evotraders/backend/cli.py
cillin 9bcc4221a4 refactor: remove mock trading functionality from backend and frontend
Removes all mock price simulation features:
- Delete MockPriceManager from backend/data/
- Remove mock_mode, enable_mock, is_mock_mode flags from services
- Remove mock CLI options and config
- Remove mock mode UI components and state from frontend
- Update tests to remove mock references

Now system supports only live and backtest modes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-26 13:38:51 +08:00

1339 lines
41 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
EvoTraders CLI - Command-line interface for the EvoTraders trading system.
This module provides easy-to-use commands for running backtest, live trading,
and frontend development server.
"""
# flake8: noqa: E501
# pylint: disable=R0912, R0915
import logging
import os
import shutil
import subprocess
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from zoneinfo import ZoneInfo
logger = logging.getLogger(__name__)
import typer
import yaml
from rich.console import Console
from rich.panel import Panel
from rich.prompt import Confirm
from rich.table import Table
from dotenv import load_dotenv
from backend.agents.agent_workspace import load_agent_workspace_config
from backend.agents.prompt_loader import get_prompt_loader
from backend.agents.skills_manager import SkillsManager
from backend.agents.team_pipeline_config import (
ensure_team_pipeline_config,
load_team_pipeline_config,
)
from backend.agents.workspace_manager import WorkspaceManager
from backend.config.constants import ANALYST_TYPES
from backend.data.market_ingest import ingest_symbols
from backend.data.market_store import MarketStore
from backend.enrich.llm_enricher import get_explain_model_info, llm_enrichment_enabled
from backend.enrich.news_enricher import enrich_symbols
app = typer.Typer(
name="evotraders",
help="EvoTraders: A self-evolving multi-agent trading system",
add_completion=False,
)
ingest_app = typer.Typer(help="Ingest Polygon market data into the research warehouse.")
app.add_typer(ingest_app, name="ingest")
skills_app = typer.Typer(help="Inspect and manage per-agent skills.")
app.add_typer(skills_app, name="skills")
team_app = typer.Typer(help="Inspect and manage run-scoped team pipeline config.")
app.add_typer(team_app, name="team")
console = Console()
_prompt_loader = get_prompt_loader()
load_dotenv()
def _normalize_typer_value(value, default):
"""Allow CLI command functions to be called directly in tests/internal code."""
if hasattr(value, "default"):
return value.default
return default if value is None else value
def get_project_root() -> Path:
"""Get the project root directory."""
# Assuming cli.py is in backend/
return Path(__file__).parent.parent
def handle_history_cleanup(config_name: str, auto_clean: bool = False) -> None:
"""
Handle cleanup of historical data for a given config.
Args:
config_name: Configuration name for the run
auto_clean: If True, skip confirmation and clean automatically
"""
workspace_manager = WorkspaceManager(project_root=get_project_root())
base_data_dir = workspace_manager.get_run_dir(config_name)
# Check if historical data exists
if not base_data_dir.exists() or not any(base_data_dir.iterdir()):
console.print(
f"\n[dim]No historical data found for config '{config_name}'[/dim]",
)
console.print("[dim] Will start from scratch[/dim]\n")
return
console.print("\n[bold yellow]Detected existing run data:[/bold yellow]")
console.print(f" Data directory: [cyan]{base_data_dir}[/cyan]")
# Show directory size
try:
total_size = sum(
f.stat().st_size for f in base_data_dir.rglob("*") if f.is_file()
)
size_mb = total_size / (1024 * 1024)
if size_mb < 1:
console.print(
f" Directory size: [cyan]{total_size / 1024:.1f} KB[/cyan]",
)
else:
console.print(f" Directory size: [cyan]{size_mb:.1f} MB[/cyan]")
except Exception as e:
logger.debug(f"Could not calculate directory size: {e}")
# Show last modified time
state_dir = base_data_dir / "state"
if state_dir.exists():
state_files = list(state_dir.glob("*.json"))
if state_files:
last_modified = max(f.stat().st_mtime for f in state_files)
last_modified_str = datetime.fromtimestamp(last_modified).strftime(
"%Y-%m-%d %H:%M:%S",
)
console.print(f" Last updated: [cyan]{last_modified_str}[/cyan]")
console.print()
# Determine if we should clean
should_clean = auto_clean
if not auto_clean:
should_clean = Confirm.ask(
" ﹂ Clear historical data and start fresh?",
default=False,
)
else:
console.print("[yellow]⚠️ Auto-clean enabled (--clean flag)[/yellow]")
should_clean = True
if should_clean:
console.print("\n[yellow]▩ Cleaning historical data...[/yellow]")
# Backup important config files if they exist
backup_files = [".env", "config.json"]
backed_up = []
backup_dir = None
for backup_file in backup_files:
file_path = base_data_dir / backup_file
if file_path.exists():
if backup_dir is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_dir = (
base_data_dir.parent
/ f"{config_name}_backup_{timestamp}"
)
backup_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(file_path, backup_dir / backup_file)
backed_up.append(backup_file)
if backed_up:
console.print(
f" 💾 Backed up config files to: [cyan]{backup_dir}[/cyan]",
)
console.print(f" Files: {', '.join(backed_up)}")
# Remove the data directory
try:
shutil.rmtree(base_data_dir)
console.print(" ✔ Historical data cleared\n")
except Exception as e:
console.print(f" [red]✗ Error clearing data: {e}[/red]\n")
raise typer.Exit(1)
else:
console.print(
"\n[dim] Continuing with existing historical data[/dim]\n",
)
def run_data_updater(project_root: Path) -> None:
"""Run the historical data updater."""
console.print("\n[bold]Checking historical data update...[/bold]")
try:
result = subprocess.run(
[sys.executable, "-m", "backend.data.ret_data_updater", "--help"],
capture_output=True,
timeout=5,
check=False,
)
if result.returncode == 0:
console.print("[cyan]Updating historical data...[/cyan]")
update_result = subprocess.run(
[sys.executable, "-m", "backend.data.ret_data_updater"],
cwd=project_root,
check=False,
)
if update_result.returncode == 0:
console.print(
"[green]✔ Historical data updated successfully[/green]\n",
)
else:
console.print(
"[yellow] Data update failed (might be weekend/holiday)[/yellow]",
)
console.print(
"[dim] Will continue with existing data[/dim]\n",
)
else:
console.print(
"[yellow] Data updater module not available, skipping update[/yellow]\n",
)
except Exception as e:
logger.debug(f"Data updater check failed: {e}")
console.print(
"[yellow] Data updater check failed, skipping update[/yellow]\n",
)
def initialize_workspace(config_name: str) -> Path:
"""Create run-scoped workspace files for a config."""
workspace_manager = WorkspaceManager(project_root=get_project_root())
workspace_manager.initialize_default_assets(
config_name=config_name,
agent_ids=[
"fundamentals_analyst",
"technical_analyst",
"sentiment_analyst",
"valuation_analyst",
"risk_manager",
"portfolio_manager",
],
analyst_personas=_prompt_loader.load_yaml_config(
"analyst",
"personas",
),
)
return workspace_manager.get_run_dir(config_name)
def _require_agent_asset_dir(config_name: str, agent_id: str) -> Path:
manager = WorkspaceManager(project_root=get_project_root())
manager.initialize_default_assets(
config_name=config_name,
agent_ids=[agent_id],
analyst_personas=_prompt_loader.load_yaml_config(
"analyst",
"personas",
),
)
return manager.skills_manager.get_agent_asset_dir(config_name, agent_id)
def _resolve_symbols(raw_tickers: Optional[str], config_name: Optional[str] = None) -> list[str]:
"""Resolve symbols from explicit input or runtime bootstrap config."""
if raw_tickers and raw_tickers.strip():
return [
item.strip().upper()
for item in raw_tickers.split(",")
if item.strip()
]
workspace_manager = WorkspaceManager(project_root=get_project_root())
bootstrap_path = workspace_manager.get_run_dir(config_name or "default") / "BOOTSTRAP.md"
if bootstrap_path.exists():
content = bootstrap_path.read_text(encoding="utf-8")
for line in content.splitlines():
if line.strip().startswith("tickers:"):
raw = line.split(":", 1)[1]
return [
item.strip().upper()
for item in raw.split(",")
if item.strip()
]
return []
def _filter_problematic_report_rows(rows: list[dict]) -> list[dict]:
"""Keep tickers with incomplete coverage or without any LLM-enriched rows."""
return [
row
for row in rows
if float(row.get("coverage_pct") or 0.0) < 100.0
or int(row.get("llm_count") or 0) == 0
]
def auto_update_market_store(
config_name: str,
*,
end_date: Optional[str] = None,
) -> None:
"""Refresh the long-lived Polygon market store for the active watchlist."""
api_key = os.getenv("POLYGON_API_KEY", "").strip()
if not api_key:
console.print(
"[dim]Skipping Polygon market store update: POLYGON_API_KEY not set[/dim]",
)
return
symbols = _resolve_symbols(None, config_name)
if not symbols:
console.print(
f"[dim]Skipping Polygon market store update: no tickers found for config '{config_name}'[/dim]",
)
return
target_end = end_date or datetime.now().date().isoformat()
console.print(
f"[cyan]Updating Polygon market store for {', '.join(symbols)} -> {target_end}[/cyan]",
)
try:
results = ingest_symbols(
symbols,
mode="incremental",
end_date=target_end,
)
except Exception as exc:
console.print(
f"[yellow]Polygon market store update failed, continuing startup: {exc}[/yellow]",
)
return
for result in results:
console.print(
"[green]"
f"{result['symbol']}"
"[/green] "
f"prices={result['prices']} news={result['news']} aligned={result['aligned']}"
)
def auto_prepare_backtest_market_store(
config_name: str,
*,
start_date: str,
end_date: str,
) -> None:
"""Ensure the market store has the requested backtest window for the active watchlist."""
api_key = os.getenv("POLYGON_API_KEY", "").strip()
if not api_key:
console.print(
"[dim]Skipping Polygon backtest preload: POLYGON_API_KEY not set[/dim]",
)
return
symbols = _resolve_symbols(None, config_name)
if not symbols:
console.print(
f"[dim]Skipping Polygon backtest preload: no tickers found for config '{config_name}'[/dim]",
)
return
console.print(
f"[cyan]Preparing Polygon market store for backtest {start_date} -> {end_date} "
f"({', '.join(symbols)})[/cyan]",
)
try:
results = ingest_symbols(
symbols,
mode="full",
start_date=start_date,
end_date=end_date,
)
except Exception as exc:
console.print(
f"[yellow]Polygon backtest preload failed, continuing startup: {exc}[/yellow]",
)
return
for result in results:
console.print(
"[green]"
f"{result['symbol']}"
"[/green] "
f"prices={result['prices']} news={result['news']} aligned={result['aligned']}"
)
def auto_enrich_market_store(
config_name: str,
*,
end_date: Optional[str] = None,
lookback_days: int = 120,
force: bool = False,
) -> None:
"""Refresh explain-oriented enriched news for the active watchlist."""
symbols = _resolve_symbols(None, config_name)
if not symbols:
console.print(
f"[dim]Skipping explain enrich: no tickers found for config '{config_name}'[/dim]",
)
return
target_end = end_date or datetime.now().date().isoformat()
try:
end_dt = datetime.strptime(target_end, "%Y-%m-%d")
except ValueError:
console.print(
f"[yellow]Skipping explain enrich: invalid end date {target_end}[/yellow]",
)
return
start_date = (end_dt - timedelta(days=max(1, lookback_days))).date().isoformat()
console.print(
f"[cyan]Refreshing explain enrich for {', '.join(symbols)} -> {target_end}[/cyan]",
)
store = MarketStore()
try:
results = enrich_symbols(
store,
symbols,
start_date=start_date,
end_date=target_end,
limit=300,
skip_existing=not force,
)
except Exception as exc:
console.print(
f"[yellow]Explain enrich failed, continuing startup: {exc}[/yellow]",
)
return
for result in results:
console.print(
"[green]"
f"{result['symbol']}"
"[/green] "
f"news={result['news_count']} queued={result['queued_count']} analyzed={result['analyzed']} "
f"skipped={result['skipped_existing_count']} deduped={result['deduped_count']} "
f"llm={result['llm_count']} local={result['local_count']}"
)
@app.command("init-workspace")
def init_workspace(
config_name: str = typer.Option(
"default",
"--config-name",
"-c",
help="Configuration name for the workspace",
),
):
"""Initialize run-scoped BOOTSTRAP and agent prompt asset files."""
run_dir = initialize_workspace(config_name)
console.print(
Panel.fit(
f"[bold green]Workspace initialized[/bold green]\n[cyan]{run_dir}[/cyan]",
border_style="green",
),
)
@ingest_app.command("full")
def ingest_full(
tickers: Optional[str] = typer.Option(
None,
"--tickers",
"-t",
help="Comma-separated tickers to ingest",
),
start: Optional[str] = typer.Option(
None,
"--start",
help="Start date for full ingestion (YYYY-MM-DD)",
),
end: Optional[str] = typer.Option(
None,
"--end",
help="End date for ingestion (YYYY-MM-DD)",
),
config_name: str = typer.Option(
"default",
"--config-name",
"-c",
help="Fallback config to read tickers from BOOTSTRAP.md",
),
):
"""Run full Polygon ingestion for the specified symbols."""
symbols = _resolve_symbols(tickers, config_name)
if not symbols:
console.print("[red]No tickers provided and none found in BOOTSTRAP.md[/red]")
raise typer.Exit(1)
console.print(f"[cyan]Starting full Polygon ingest for {', '.join(symbols)}[/cyan]")
results = ingest_symbols(symbols, mode="full", start_date=start, end_date=end)
for result in results:
console.print(
f"[green]{result['symbol']}[/green] prices={result['prices']} news={result['news']} aligned={result['aligned']}"
)
@ingest_app.command("update")
def ingest_update(
tickers: Optional[str] = typer.Option(
None,
"--tickers",
"-t",
help="Comma-separated tickers to update",
),
end: Optional[str] = typer.Option(
None,
"--end",
help="Optional end date override (YYYY-MM-DD)",
),
config_name: str = typer.Option(
"default",
"--config-name",
"-c",
help="Fallback config to read tickers from BOOTSTRAP.md",
),
):
"""Run incremental Polygon ingestion using stored watermarks."""
symbols = _resolve_symbols(tickers, config_name)
if not symbols:
console.print("[red]No tickers provided and none found in BOOTSTRAP.md[/red]")
raise typer.Exit(1)
console.print(f"[cyan]Starting incremental Polygon ingest for {', '.join(symbols)}[/cyan]")
results = ingest_symbols(symbols, mode="incremental", end_date=end)
for result in results:
console.print(
f"[green]{result['symbol']}[/green] prices={result['prices']} news={result['news']} aligned={result['aligned']}"
)
@ingest_app.command("enrich")
def ingest_enrich(
tickers: Optional[str] = typer.Option(
None,
"--tickers",
"-t",
help="Comma-separated tickers to enrich",
),
start: Optional[str] = typer.Option(
None,
"--start",
help="Optional start date for enrichment window (YYYY-MM-DD)",
),
end: Optional[str] = typer.Option(
None,
"--end",
help="Optional end date for enrichment window (YYYY-MM-DD)",
),
limit: int = typer.Option(
300,
"--limit",
help="Maximum raw news rows per ticker to analyze",
),
force: bool = typer.Option(
False,
"--force",
help="Re-analyze already enriched news instead of only missing rows",
),
config_name: str = typer.Option(
"default",
"--config-name",
"-c",
help="Fallback config to read tickers from BOOTSTRAP.md",
),
):
"""Run explain-oriented news enrichment for symbols already in the market store."""
symbols = _resolve_symbols(tickers, config_name)
if not symbols:
console.print("[red]No tickers provided and none found in BOOTSTRAP.md[/red]")
raise typer.Exit(1)
console.print(f"[cyan]Starting explain enrich for {', '.join(symbols)}[/cyan]")
store = MarketStore()
results = enrich_symbols(
store,
symbols,
start_date=start,
end_date=end,
limit=max(10, limit),
skip_existing=not force,
)
for result in results:
console.print(
f"[green]{result['symbol']}[/green] "
f"news={result['news_count']} queued={result['queued_count']} analyzed={result['analyzed']} "
f"skipped={result['skipped_existing_count']} deduped={result['deduped_count']} "
f"llm={result['llm_count']} local={result['local_count']}"
)
@ingest_app.command("report")
def ingest_report(
tickers: Optional[str] = typer.Option(
None,
"--tickers",
"-t",
help="Optional comma-separated tickers to report",
),
start: Optional[str] = typer.Option(
None,
"--start",
help="Optional start date for report window (YYYY-MM-DD)",
),
end: Optional[str] = typer.Option(
None,
"--end",
help="Optional end date for report window (YYYY-MM-DD)",
),
config_name: str = typer.Option(
"default",
"--config-name",
"-c",
help="Fallback config to read tickers from BOOTSTRAP.md",
),
only_problematic: bool = typer.Option(
False,
"--only-problematic",
help="Only show tickers with incomplete coverage or no LLM-enriched news",
),
):
"""Show explain enrichment coverage and freshness per ticker."""
symbols = _resolve_symbols(tickers, config_name)
store = MarketStore()
report_rows = store.get_enrich_report(
symbols=symbols or None,
start_date=start,
end_date=end,
)
if only_problematic:
report_rows = _filter_problematic_report_rows(report_rows)
if not report_rows:
if only_problematic:
console.print("[green]No problematic enrich report rows found for the requested scope[/green]")
else:
console.print("[yellow]No enrich report rows found for the requested scope[/yellow]")
raise typer.Exit(0)
model_info = get_explain_model_info()
model_label = model_info["label"] if llm_enrichment_enabled() else "disabled"
table = Table(title="Explain Enrichment Report")
table.add_column("Ticker", style="cyan")
table.add_column("Raw News", justify="right")
table.add_column("Analyzed", justify="right")
table.add_column("Coverage", justify="right")
table.add_column("LLM", justify="right")
table.add_column("Local", justify="right")
table.add_column("Latest Trade Date")
table.add_column("Latest Analysis")
table.caption = f"Explain LLM: {model_label}"
for row in report_rows:
table.add_row(
row["symbol"],
str(row["raw_news_count"]),
str(row["analyzed_news_count"]),
f'{row["coverage_pct"]:.1f}%',
str(row["llm_count"]),
str(row["local_count"]),
str(row["latest_trade_date"] or "-"),
str(row["latest_analysis_at"] or "-"),
)
console.print(table)
@skills_app.command("list")
def skills_list(
config_name: str = typer.Option(
"default",
"--config-name",
"-c",
help="Run config name.",
),
agent_id: Optional[str] = typer.Option(
None,
"--agent-id",
"-a",
help="Optional agent id to show resolved status for.",
),
):
"""List available skills and optional agent-level enablement state."""
project_root = get_project_root()
skills_manager = SkillsManager(project_root=project_root)
catalog = (
skills_manager.list_agent_skill_catalog(config_name, agent_id)
if agent_id
else skills_manager.list_skill_catalog()
)
if not catalog:
console.print("[yellow]No skills found[/yellow]")
raise typer.Exit(0)
agent_config = None
resolved_skills = set()
if agent_id:
asset_dir = _require_agent_asset_dir(config_name, agent_id)
agent_config = load_agent_workspace_config(asset_dir / "agent.yaml")
resolved_skills = set(
skills_manager.resolve_agent_skill_names(
config_name=config_name,
agent_id=agent_id,
default_skills=[],
),
)
table = Table(title="Skill Catalog")
table.add_column("Skill", style="cyan")
table.add_column("Source")
table.add_column("Description")
if agent_id:
table.add_column("Status")
enabled = set(agent_config.enabled_skills) if agent_config else set()
disabled = set(agent_config.disabled_skills) if agent_config else set()
for skill in catalog:
row = [
skill.skill_name,
skill.source,
skill.description or "-",
]
if agent_id:
if skill.skill_name in disabled:
status = "disabled"
elif skill.skill_name in enabled:
status = "enabled"
elif skill.skill_name in resolved_skills:
status = "active"
else:
status = "-"
row.append(status)
table.add_row(*row)
console.print(table)
@skills_app.command("enable")
def skills_enable(
agent_id: str = typer.Option(..., "--agent-id", "-a", help="Agent id."),
skill: str = typer.Option(..., "--skill", "-s", help="Skill name."),
config_name: str = typer.Option(
"default",
"--config-name",
"-c",
help="Run config name.",
),
):
"""Enable a skill for one agent in agent.yaml."""
asset_dir = _require_agent_asset_dir(config_name, agent_id)
skills_manager = SkillsManager(project_root=get_project_root())
catalog = {
item.skill_name
for item in skills_manager.list_agent_skill_catalog(config_name, agent_id)
}
if skill not in catalog:
console.print(f"[red]Unknown skill: {skill}[/red]")
raise typer.Exit(1)
result = skills_manager.update_agent_skill_overrides(
config_name=config_name,
agent_id=agent_id,
enable=[skill],
)
console.print(
f"[green]Enabled[/green] `{skill}` for `{agent_id}` "
f"([{asset_dir / 'agent.yaml'}])",
)
console.print(f"Enabled skills: {', '.join(result['enabled_skills']) or '-'}")
console.print(f"Disabled skills: {', '.join(result['disabled_skills']) or '-'}")
@skills_app.command("disable")
def skills_disable(
agent_id: str = typer.Option(..., "--agent-id", "-a", help="Agent id."),
skill: str = typer.Option(..., "--skill", "-s", help="Skill name."),
config_name: str = typer.Option(
"default",
"--config-name",
"-c",
help="Run config name.",
),
):
"""Disable a skill for one agent in agent.yaml."""
asset_dir = _require_agent_asset_dir(config_name, agent_id)
skills_manager = SkillsManager(project_root=get_project_root())
result = skills_manager.update_agent_skill_overrides(
config_name=config_name,
agent_id=agent_id,
disable=[skill],
)
console.print(
f"[yellow]Disabled[/yellow] `{skill}` for `{agent_id}` "
f"([{asset_dir / 'agent.yaml'}])",
)
console.print(f"Enabled skills: {', '.join(result['enabled_skills']) or '-'}")
console.print(f"Disabled skills: {', '.join(result['disabled_skills']) or '-'}")
@skills_app.command("install")
def skills_install(
agent_id: str = typer.Option(..., "--agent-id", "-a", help="Target agent id."),
source: str = typer.Option(
...,
"--source",
"-s",
help="External skill source: directory path, zip path, or http(s) zip URL.",
),
config_name: str = typer.Option(
"default",
"--config-name",
"-c",
help="Run config name.",
),
name: Optional[str] = typer.Option(
None,
"--name",
help="Optional override skill name.",
),
activate: bool = typer.Option(
True,
"--activate/--no-activate",
help="Enable the skill for this agent immediately.",
),
):
"""Install an external skill into one agent's local skill directory."""
_require_agent_asset_dir(config_name, agent_id)
skills_manager = SkillsManager(project_root=get_project_root())
result = skills_manager.install_external_skill_for_agent(
config_name=config_name,
agent_id=agent_id,
source=source,
skill_name=name,
activate=activate,
)
console.print(
f"[green]Installed[/green] `{result['skill_name']}` to `{agent_id}`",
)
console.print(f"Path: {result['target_dir']}")
console.print(f"Activated: {result['activated']}")
warnings = result.get("warnings") or []
if warnings:
console.print(f"Warnings: {'; '.join(warnings)}")
@team_app.command("show")
def team_show(
config_name: str = typer.Option(
"default",
"--config-name",
"-c",
help="Run config name.",
),
):
"""Show TEAM_PIPELINE.yaml for one run."""
project_root = get_project_root()
ensure_team_pipeline_config(
project_root=project_root,
config_name=config_name,
default_analysts=list(ANALYST_TYPES.keys()),
)
config = load_team_pipeline_config(project_root, config_name)
console.print(
Panel.fit(
yaml.safe_dump(config, allow_unicode=True, sort_keys=False),
title=f"TEAM_PIPELINE ({config_name})",
border_style="cyan",
),
)
@app.command()
def backtest(
start: Optional[str] = typer.Option(
None,
"--start",
"-s",
help="Start date for backtest (YYYY-MM-DD)",
),
end: Optional[str] = typer.Option(
None,
"--end",
"-e",
help="End date for backtest (YYYY-MM-DD)",
),
config_name: str = typer.Option(
"backtest",
"--config-name",
"-c",
help="Configuration name for this backtest run",
),
host: str = typer.Option(
"0.0.0.0",
"--host",
help="WebSocket server host",
),
port: int = typer.Option(
8765,
"--port",
"-p",
help="WebSocket server port",
),
poll_interval: int = typer.Option(
10,
"--poll-interval",
help="Price polling interval in seconds",
),
clean: bool = typer.Option(
False,
"--clean",
help="Clear historical data before starting",
),
enable_memory: bool = typer.Option(
False,
"--enable-memory",
help="Enable ReMeTaskLongTermMemory for agents (requires MEMORY_API_KEY)",
),
):
"""
Run backtest mode with historical data.
Example:
evotraders backtest --start 2025-11-01 --end 2025-12-01
evotraders backtest --config-name my_strategy --port 9000
evotraders backtest --clean # Clear historical data before starting
evotraders backtest --enable-memory # Enable long-term memory
"""
console.print(
Panel.fit(
"[bold cyan]EvoTraders Backtest Mode[/bold cyan]",
border_style="cyan",
),
)
poll_interval = int(_normalize_typer_value(poll_interval, 10))
# Validate dates - required for backtest
if not start or not end:
console.print(
"[red]✗ Both --start and --end dates are required for backtest mode[/red]",
)
raise typer.Exit(1)
try:
datetime.strptime(start, "%Y-%m-%d")
except ValueError as exc:
console.print(
"[red]✗ Invalid start date format. Use YYYY-MM-DD[/red]",
)
raise typer.Exit(1) from exc
try:
datetime.strptime(end, "%Y-%m-%d")
except ValueError as exc:
console.print(
"[red]✗ Invalid end date format. Use YYYY-MM-DD[/red]",
)
raise typer.Exit(1) from exc
# Handle historical data cleanup
handle_history_cleanup(config_name, auto_clean=clean)
# Display configuration
console.print("\n[bold]Configuration:[/bold]")
console.print(" Mode: Backtest")
console.print(f" Config: {config_name}")
console.print(f" Period: {start} -> {end}")
console.print(f" Server: {host}:{port}")
console.print(f" Poll Interval: {poll_interval}s")
console.print(
f" Long-term Memory: {'enabled' if enable_memory else 'disabled'}",
)
console.print("\nAccess frontend at: [cyan]http://localhost:5173[/cyan]")
console.print("Press Ctrl+C to stop\n")
# Change to project root
project_root = get_project_root()
os.chdir(project_root)
# Run data updater
run_data_updater(project_root)
auto_prepare_backtest_market_store(
config_name,
start_date=start,
end_date=end,
)
auto_enrich_market_store(
config_name,
end_date=end,
force=False,
)
# Build command using backend.main
cmd = [
sys.executable,
"-u",
"-m",
"backend.main",
"--mode",
"backtest",
"--config-name",
config_name,
"--host",
host,
"--port",
str(port),
"--poll-interval",
str(poll_interval),
"--start-date",
start,
"--end-date",
end,
]
if enable_memory:
cmd.append("--enable-memory")
try:
subprocess.run(cmd, check=True)
except KeyboardInterrupt:
console.print("\n\n[yellow]Backtest stopped by user[/yellow]")
except subprocess.CalledProcessError as e:
console.print(
f"\n[red]Backtest failed with exit code {e.returncode}[/red]",
)
raise typer.Exit(1)
@app.command()
def live(
config_name: str = typer.Option(
"live",
"--config-name",
"-c",
help="Configuration name for this live run",
),
host: str = typer.Option(
"0.0.0.0",
"--host",
help="WebSocket server host",
),
port: int = typer.Option(
8765,
"--port",
"-p",
help="WebSocket server port",
),
schedule_mode: str = typer.Option(
"daily",
"--schedule-mode",
help="Scheduler mode: 'daily' or 'intraday'",
),
trigger_time: str = typer.Option(
"now",
"--trigger-time",
"-t",
help="Trigger time in LOCAL timezone (HH:MM), or 'now' to run immediately",
),
interval_minutes: int = typer.Option(
60,
"--interval-minutes",
help="When schedule-mode=intraday, run every N minutes",
),
poll_interval: int = typer.Option(
10,
"--poll-interval",
help="Price polling interval in seconds",
),
clean: bool = typer.Option(
False,
"--clean",
help="Clear historical data before starting",
),
enable_memory: bool = typer.Option(
False,
"--enable-memory",
help="Enable ReMeTaskLongTermMemory for agents (requires MEMORY_API_KEY)",
),
):
"""
Run live trading mode with real-time data.
Example:
evotraders live # Run immediately (default)
evotraders live -t 22:30 # Run at 22:30 local time daily
evotraders live --schedule-mode intraday --interval-minutes 60
evotraders live --trigger-time now # Run immediately
evotraders live --clean # Clear historical data before starting
"""
schedule_mode = str(_normalize_typer_value(schedule_mode, "daily"))
interval_minutes = int(_normalize_typer_value(interval_minutes, 60))
console.print(
Panel.fit(
"[bold cyan]EvoTraders LIVE Mode[/bold cyan]",
border_style="cyan",
),
)
# Check for required API key in live mode
env_file = get_project_root() / ".env"
if not env_file.exists():
console.print("\n[yellow]Warning: .env file not found[/yellow]")
console.print("Creating from template...\n")
template = get_project_root() / "env.template"
if template.exists():
shutil.copy(template, env_file)
console.print("[green].env file created[/green]")
console.print(
"\n[red]Error: Please edit .env and set FINNHUB_API_KEY[/red]",
)
console.print(
"Get your free API key at: https://finnhub.io/register\n",
)
else:
console.print("[red]Error: env.template not found[/red]")
raise typer.Exit(1)
# Handle historical data cleanup
handle_history_cleanup(config_name, auto_clean=clean)
if schedule_mode not in {"daily", "intraday"}:
console.print(
f"[red]Error: unsupported schedule mode '{schedule_mode}'[/red]",
)
raise typer.Exit(1)
if interval_minutes <= 0:
console.print("[red]Error: --interval-minutes must be > 0[/red]")
raise typer.Exit(1)
# Convert local time to NYSE time
nyse_tz = ZoneInfo("America/New_York")
local_tz = datetime.now().astimezone().tzinfo
local_now = datetime.now()
nyse_now = datetime.now(nyse_tz)
# Convert trigger time from local to NYSE
if schedule_mode == "intraday":
nyse_trigger_time = "now"
elif trigger_time.lower() == "now":
nyse_trigger_time = "now"
else:
local_trigger = datetime.strptime(trigger_time, "%H:%M")
local_trigger_dt = local_now.replace(
hour=local_trigger.hour,
minute=local_trigger.minute,
second=0,
microsecond=0,
)
local_trigger_aware = local_trigger_dt.astimezone(local_tz)
nyse_trigger_dt = local_trigger_aware.astimezone(nyse_tz)
nyse_trigger_time = nyse_trigger_dt.strftime("%H:%M")
# Display time info
console.print("\n[bold]Time Info:[/bold]")
console.print(f" Local Time: {local_now.strftime('%Y-%m-%d %H:%M:%S')}")
console.print(
f" NYSE Time: {nyse_now.strftime('%Y-%m-%d %H:%M:%S %Z')}",
)
console.print(f" Schedule: {schedule_mode}")
if schedule_mode == "intraday":
console.print(f" Interval: every {interval_minutes} minute(s)")
elif nyse_trigger_time == "now":
console.print(" Trigger: [green]NOW (immediate)[/green]")
else:
console.print(
f" Trigger: {trigger_time} local = {nyse_trigger_time} NYSE",
)
# Display configuration
console.print("\n[bold]Configuration:[/bold]")
console.print(
" Mode: [green]LIVE[/green] (Real-time prices via Finnhub)",
)
console.print(f" Config: {config_name}")
console.print(f" Server: {host}:{port}")
console.print(f" Poll Interval: {poll_interval}s")
console.print(
f" Long-term Memory: {'enabled' if enable_memory else 'disabled'}",
)
console.print("\nAccess frontend at: [cyan]http://localhost:5173[/cyan]")
console.print("Press Ctrl+C to stop\n")
# Change to project root
project_root = get_project_root()
os.chdir(project_root)
# Data update
run_data_updater(project_root)
auto_update_market_store(
config_name,
end_date=nyse_now.date().isoformat(),
)
auto_enrich_market_store(
config_name,
end_date=nyse_now.date().isoformat(),
force=False,
)
# Build command using backend.main
cmd = [
sys.executable,
"-u",
"-m",
"backend.main",
"--mode",
"live",
"--config-name",
config_name,
"--host",
host,
"--port",
str(port),
"--schedule-mode",
schedule_mode,
"--poll-interval",
str(poll_interval),
"--trigger-time",
nyse_trigger_time,
"--interval-minutes",
str(interval_minutes),
]
if enable_memory:
cmd.append("--enable-memory")
try:
subprocess.run(cmd, check=True)
except KeyboardInterrupt:
console.print("\n\n[yellow]Live server stopped by user[/yellow]")
except subprocess.CalledProcessError as e:
console.print(
f"\n[red]Live server failed with exit code {e.returncode}[/red]",
)
raise typer.Exit(1) from e
@app.command()
def frontend(
port: int = typer.Option(
8765,
"--ws-port",
"-p",
help="WebSocket server port to connect to",
),
host_mode: bool = typer.Option(
False,
"--host",
help="Allow external access (default: localhost only)",
),
):
"""
Start the frontend development server.
Example:
evotraders frontend
evotraders frontend --ws-port 8765
evotraders frontend --ws-port 8765 --host
"""
console.print(
Panel.fit(
"[bold cyan]EvoTraders Frontend[/bold cyan]",
border_style="cyan",
),
)
project_root = get_project_root()
frontend_dir = project_root / "frontend"
# Check if frontend directory exists
if not frontend_dir.exists():
console.print(
f"\n[red]Error: Frontend directory not found: {frontend_dir}[/red]",
)
raise typer.Exit(1)
# Check if node_modules exists
node_modules = frontend_dir / "node_modules"
if not node_modules.exists():
console.print("\n[yellow]Installing frontend dependencies...[/yellow]")
try:
subprocess.run(
["npm", "install"],
cwd=frontend_dir,
check=True,
)
console.print("[green]Dependencies installed[/green]\n")
except subprocess.CalledProcessError as exc:
console.print("\n[red]Error: Failed to install dependencies[/red]")
console.print("Make sure Node.js and npm are installed")
raise typer.Exit(1) from exc
# Set WebSocket URL environment variable
ws_url = f"ws://localhost:{port}"
env = os.environ.copy()
env["VITE_WS_URL"] = ws_url
# Display configuration
console.print("\n[bold]Configuration:[/bold]")
console.print(f" WebSocket URL: {ws_url}")
console.print(" Frontend Port: 5173 (Vite default)")
if host_mode:
console.print(" Access: External allowed")
else:
console.print(" Access: Localhost only")
console.print("\nAccess at: [cyan]http://localhost:5173[/cyan]")
console.print("Press Ctrl+C to stop\n")
# Choose npm command
npm_cmd = ["npm", "run", "dev:host" if host_mode else "dev"]
try:
subprocess.run(
npm_cmd,
cwd=frontend_dir,
env=env,
check=True,
)
except KeyboardInterrupt:
console.print("\n\n[yellow]Frontend stopped by user[/yellow]")
except subprocess.CalledProcessError as e:
console.print(
f"\n[red]Frontend failed with exit code {e.returncode}[/red]",
)
raise typer.Exit(1)
@app.command()
def version():
"""Show the version of EvoTraders."""
console.print(
"\n[bold cyan]EvoTraders[/bold cyan] version [green]0.1.0[/green]\n",
)
@app.callback()
def main():
"""
EvoTraders: A self-evolving multi-agent trading system
Use 'evotraders --help' to see available commands.
"""
if __name__ == "__main__":
app()