stock/demo_langgraph_workflow.py
ZhangPeng 9aecdd036c Initial commit: OpenClaw Trading - AI多智能体量化交易系统
- 添加项目核心代码和配置
- 添加前端界面 (Next.js)
- 添加单元测试
- 更新 .gitignore 排除缓存和依赖
2026-02-27 03:47:40 +08:00

205 lines
6.6 KiB
Python

"""Demo script for LangGraph-based trading workflow.
This script demonstrates how to use the LangGraph trading workflow
to orchestrate multi-agent trading analysis.
Usage:
python demo_langgraph_workflow.py [SYMBOL]
Example:
python demo_langgraph_workflow.py AAPL
python demo_langgraph_workflow.py TSLA
"""
import asyncio
import sys
from typing import Any, Dict
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.tree import Tree
from openclaw.workflow.trading_workflow import TradingWorkflow
from openclaw.workflow.state import get_state_summary
console = Console()
def display_workflow_graph():
"""Display the workflow graph structure."""
console.print("\n[bold cyan]LangGraph Workflow Structure:[/bold cyan]\n")
tree = Tree("[bold green]START[/bold green]")
# Parallel analysis branch
parallel = tree.add("[yellow]Parallel Analysis Phase[/yellow]")
parallel.add("MarketAnalysis (Technical)")
parallel.add("SentimentAnalysis (News/Social)")
parallel.add("FundamentalAnalysis (Financial)")
# Sequential phases
tree.add("BullBearDebate (Generate bull/bear cases)")
tree.add("DecisionFusion (Combine all signals)")
tree.add("RiskAssessment (Position sizing & approval)")
tree.add("[bold red]END[/bold red]")
console.print(tree)
def display_final_decision(decision: Dict[str, Any]):
"""Display the final trading decision."""
if not decision:
console.print("[red]No decision generated[/red]")
return
action = decision.get("action", "UNKNOWN")
confidence = decision.get("confidence", 0.0)
position_size = decision.get("position_size", 0.0)
approved = decision.get("approved", False)
risk_level = decision.get("risk_level", "unknown")
var_95 = decision.get("var_95", 0.0)
# Color based on action
action_color = {
"BUY": "green",
"SELL": "red",
"HOLD": "yellow",
}.get(action, "white")
table = Table(title="Final Trading Decision", show_header=False)
table.add_column("Field", style="cyan")
table.add_column("Value", style="white")
table.add_row("Symbol", decision.get("symbol", "N/A"))
table.add_row("Action", f"[{action_color}]{action}[/{action_color}]")
table.add_row("Confidence", f"{confidence:.1%}")
table.add_row("Position Size", f"${position_size:,.2f}")
table.add_row("Approved", "✓ Yes" if approved else "✗ No")
table.add_row("Risk Level", risk_level.upper())
table.add_row("VaR (95%)", f"${var_95:,.2f}")
console.print(table)
# Show warnings if any
warnings = decision.get("warnings", [])
if warnings:
console.print("\n[bold yellow]Risk Warnings:[/bold yellow]")
for warning in warnings:
console.print(f" ⚠️ {warning}")
def display_state_summary(state):
"""Display workflow execution summary."""
summary = get_state_summary(state)
table = Table(title="Workflow Execution Summary", show_header=False)
table.add_column("Phase", style="cyan")
table.add_column("Status", style="green")
table.add_row("Symbol", summary["symbol"])
table.add_row("Current Step", summary["current_step"])
table.add_row("Completed Steps", str(len(summary["completed_steps"])))
# Reports generated
table.add_row("Technical Report", "" if summary["has_technical"] else "")
table.add_row("Sentiment Report", "" if summary["has_sentiment"] else "")
table.add_row("Fundamental Report", "" if summary["has_fundamental"] else "")
table.add_row("Bull Report", "" if summary["has_bull"] else "")
table.add_row("Bear Report", "" if summary["has_bear"] else "")
table.add_row("Fused Decision", "" if summary["has_fusion"] else "")
table.add_row("Risk Report", "" if summary["has_risk"] else "")
table.add_row("Final Decision", "" if summary["has_final"] else "")
if summary["error_count"] > 0:
table.add_row("Errors", f"[red]{summary['error_count']}[/red]")
console.print(table)
async def run_demo(symbol: str):
"""Run the LangGraph workflow demo."""
console.print(Panel.fit(
f"[bold blue]OpenClaw LangGraph Trading Workflow Demo[/bold blue]\n"
f"Symbol: [bold green]{symbol}[/bold green]",
border_style="blue"
))
# Display workflow structure
display_workflow_graph()
# Create and run workflow
console.print(f"\n[bold]Initializing workflow for {symbol}...[/bold]")
workflow = TradingWorkflow(symbol=symbol, initial_capital=1000.0)
# Show workflow visualization
console.print("\n[dim]Workflow Graph (Mermaid):[/dim]")
console.print(workflow.visualize())
# Run workflow with progress tracking
console.print(f"\n[bold cyan]Executing workflow...[/bold cyan]\n")
async for update in workflow.astream(debug=True):
# Log state updates
for node_name, node_state in update.items():
if isinstance(node_state, dict):
step = node_state.get("current_step", "unknown")
console.print(f" [dim]→ {node_name}: {step}[/dim]")
# Get final state
final_state = await workflow.run()
# Display results
console.print("\n" + "=" * 60)
console.print("[bold green]WORKFLOW COMPLETED[/bold green]")
console.print("=" * 60)
display_state_summary(final_state)
# Display final decision
decision = workflow.get_final_decision(final_state)
if decision:
console.print()
display_final_decision(decision)
# Show completed steps
console.print(f"\n[bold]Completed Steps:[/bold]")
for step in final_state.get("completed_steps", []):
console.print(f"{step}")
# Show any errors
errors = final_state.get("errors", [])
if errors:
console.print(f"\n[bold red]Errors:[/bold red]")
for error in errors:
console.print(f"{error}")
return decision
def main():
"""Main entry point."""
# Get symbol from command line or use default
symbol = sys.argv[1] if len(sys.argv) > 1 else "AAPL"
try:
decision = asyncio.run(run_demo(symbol))
# Exit with success
if decision and decision.get("approved"):
console.print(f"\n[bold green]✓ Trade approved for {symbol}![/bold green]")
sys.exit(0)
else:
console.print(f"\n[bold yellow]⚠ Trade not approved for {symbol}[/bold yellow]")
sys.exit(1)
except Exception as e:
console.print(f"\n[bold red]Error: {e}[/bold red]")
import traceback
console.print(traceback.format_exc())
sys.exit(2)
if __name__ == "__main__":
main()