"""add canvas metadata table Revision ID: add_canvas_metadata Revises: bfac9b8e32f5 Create Date: 2026-01-17 10:00:00.000000 """ from typing import Sequence, Union from alembic import op import sqlalchemy as sa import sqlmodel import json import uuid from datetime import datetime # revision identifiers, used by Alembic. revision: str = 'add_canvas_metadata' down_revision: Union[str, Sequence[str], None] = ('add_progress_tracking', 'add_prompt_fields') branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: """Upgrade schema.""" # 1. 创建 canvas_metadata 表 op.create_table( 'canvas_metadata', sa.Column('id', sqlmodel.sql.sqltypes.AutoString(), nullable=False), sa.Column('project_id', sqlmodel.sql.sqltypes.AutoString(), nullable=False), sa.Column('canvas_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False), sa.Column('related_entity_type', sqlmodel.sql.sqltypes.AutoString(), nullable=True), sa.Column('related_entity_id', sqlmodel.sql.sqltypes.AutoString(), nullable=True), sa.Column('name', sqlmodel.sql.sqltypes.AutoString(), nullable=False), sa.Column('description', sqlmodel.sql.sqltypes.AutoString(), nullable=True), sa.Column('order_index', sa.Integer(), nullable=False, server_default='0'), sa.Column('is_pinned', sa.Boolean(), nullable=False, server_default='0'), sa.Column('tags', sa.JSON(), nullable=True), sa.Column('node_count', sa.Integer(), nullable=False, server_default='0'), sa.Column('last_accessed_at', sa.Float(), nullable=True), sa.Column('access_count', sa.Integer(), nullable=False, server_default='0'), sa.Column('created_at', sa.Float(), nullable=False), sa.Column('updated_at', sa.Float(), nullable=False), sa.Column('deleted_at', sa.Float(), nullable=True), sa.Column('legacy_id', sqlmodel.sql.sqltypes.AutoString(), nullable=True), sa.PrimaryKeyConstraint('id'), sa.ForeignKeyConstraint(['project_id'], ['projects.id']) ) # 2. 创建索引 op.create_index('ix_canvas_metadata_project_id', 'canvas_metadata', ['project_id']) op.create_index('ix_canvas_metadata_canvas_type', 'canvas_metadata', ['canvas_type']) op.create_index('ix_canvas_metadata_related_entity_id', 'canvas_metadata', ['related_entity_id']) op.create_index('ix_canvas_metadata_legacy_id', 'canvas_metadata', ['legacy_id']) op.create_index('ix_canvas_metadata_project_type', 'canvas_metadata', ['project_id', 'canvas_type']) op.create_index('ix_canvas_metadata_type_entity', 'canvas_metadata', ['canvas_type', 'related_entity_id']) # 3. 迁移数据 migrate_general_canvases() migrate_asset_canvases() migrate_storyboard_canvases() def migrate_general_canvases(): """迁移通用画布数据""" conn = op.get_bind() # 获取所有项目的 general_canvases try: projects = conn.execute(sa.text("SELECT id, general_canvases FROM projects")).fetchall() except: # 如果 general_canvases 列不存在,跳过 return for project in projects: project_id = project[0] general_canvases_json = project[1] if not general_canvases_json: continue try: canvases = json.loads(general_canvases_json) if isinstance(general_canvases_json, str) else general_canvases_json except: continue if not isinstance(canvases, list): continue for idx, canvas in enumerate(canvases): canvas_id = canvas.get('id') if not canvas_id: continue # 插入到 canvas_metadata conn.execute(sa.text(""" INSERT INTO canvas_metadata ( id, project_id, canvas_type, name, order_index, created_at, updated_at ) VALUES ( :id, :project_id, 'general', :name, :order_index, :created_at, :updated_at ) """), { 'id': canvas_id, 'project_id': project_id, 'name': canvas.get('name', f'Canvas {idx + 1}'), 'order_index': idx, 'created_at': canvas.get('createdAt', datetime.now().timestamp()), 'updated_at': canvas.get('updatedAt', datetime.now().timestamp()) }) conn.commit() def migrate_asset_canvases(): """迁移素材画布数据""" conn = op.get_bind() # 查找所有以 canvas-asset- 开头的画布 try: canvases = conn.execute(sa.text(""" SELECT id, project_id, updated_at FROM canvases WHERE id LIKE 'canvas-asset-%' """)).fetchall() except: return for canvas in canvases: old_id = canvas[0] project_id = canvas[1] updated_at = canvas[2] # 提取 asset_id asset_id = old_id.replace('canvas-asset-', '') # 查找对应的 asset try: asset = conn.execute(sa.text(""" SELECT name FROM assets WHERE id = :asset_id """), {'asset_id': asset_id}).fetchone() except: continue if not asset: continue # 生成新 UUID new_id = str(uuid.uuid4()) # 插入元数据 conn.execute(sa.text(""" INSERT INTO canvas_metadata ( id, project_id, canvas_type, related_entity_type, related_entity_id, name, created_at, updated_at, legacy_id ) VALUES ( :id, :project_id, 'asset', 'asset', :asset_id, :name, :created_at, :updated_at, :legacy_id ) """), { 'id': new_id, 'project_id': project_id, 'asset_id': asset_id, 'name': asset[0], 'created_at': updated_at, 'updated_at': updated_at, 'legacy_id': old_id }) # 更新 canvases 表的 ID conn.execute(sa.text(""" UPDATE canvases SET id = :new_id WHERE id = :old_id """), {'new_id': new_id, 'old_id': old_id}) conn.commit() def migrate_storyboard_canvases(): """迁移分镜画布数据""" conn = op.get_bind() # 查找所有以 canvas-storyboard- 开头的画布 try: canvases = conn.execute(sa.text(""" SELECT id, project_id, updated_at FROM canvases WHERE id LIKE 'canvas-storyboard-%' """)).fetchall() except: return for canvas in canvases: old_id = canvas[0] project_id = canvas[1] updated_at = canvas[2] storyboard_id = old_id.replace('canvas-storyboard-', '') try: storyboard = conn.execute(sa.text(""" SELECT shot FROM storyboards WHERE id = :storyboard_id """), {'storyboard_id': storyboard_id}).fetchone() except: continue if not storyboard: continue new_id = str(uuid.uuid4()) conn.execute(sa.text(""" INSERT INTO canvas_metadata ( id, project_id, canvas_type, related_entity_type, related_entity_id, name, created_at, updated_at, legacy_id ) VALUES ( :id, :project_id, 'storyboard', 'storyboard', :storyboard_id, :name, :created_at, :updated_at, :legacy_id ) """), { 'id': new_id, 'project_id': project_id, 'storyboard_id': storyboard_id, 'name': storyboard[0], 'created_at': updated_at, 'updated_at': updated_at, 'legacy_id': old_id }) conn.execute(sa.text(""" UPDATE canvases SET id = :new_id WHERE id = :old_id """), {'new_id': new_id, 'old_id': old_id}) conn.commit() def downgrade() -> None: """Downgrade schema.""" # 回滚操作 op.drop_index('ix_canvas_metadata_type_entity', 'canvas_metadata') op.drop_index('ix_canvas_metadata_project_type', 'canvas_metadata') op.drop_index('ix_canvas_metadata_legacy_id', 'canvas_metadata') op.drop_index('ix_canvas_metadata_related_entity_id', 'canvas_metadata') op.drop_index('ix_canvas_metadata_canvas_type', 'canvas_metadata') op.drop_index('ix_canvas_metadata_project_id', 'canvas_metadata') op.drop_table('canvas_metadata')