feat(05-01): add differential sync with smart-sync flag

- sync_diff() for differential uploads
- update operation with recordChangeTag conflict handling
- --smart-sync and --delete-orphans flags
- Menu options 12-13 for smart sync

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Trey t
2026-01-10 10:08:04 -06:00
parent 0c74495ee5
commit d9a6aa4fe4

View File

@@ -135,17 +135,19 @@ def show_menu():
print(" 9. Delete all then import")
print(" 10. Delete only (no import)")
print(" 11. Dry run (preview only)")
print(" 12. Smart sync (diff-based, only upload changes)")
print(" 13. Smart sync + delete orphans")
print(" 0. Exit")
print()
while True:
try:
choice = input("Enter choice [1-11, 0 to exit]: ").strip()
choice = input("Enter choice [1-13, 0 to exit]: ").strip()
if choice == '0':
return None
if choice in ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11']:
if choice in ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13']:
return int(choice)
print("Invalid choice. Please enter 1-11 or 0.")
print("Invalid choice. Please enter 1-13 or 0.")
except (EOFError, KeyboardInterrupt):
print("\nExiting.")
return None
@@ -693,6 +695,362 @@ def show_diff_report(ck, data_dir, verbose=False):
return all_diffs
def sync_diff(ck, diff, record_type, dry_run=False, verbose=False, delete_orphans=False):
"""
Sync only changed records based on diff.
Returns counts: created, updated, deleted, skipped, errors.
"""
stats = {'created': 0, 'updated': 0, 'deleted': 0, 'skipped': 0, 'errors': 0}
# Handle new records (forceReplace)
new_records = [item['record'] for item in diff['new']]
if new_records:
if dry_run:
print(f" [DRY RUN] Would create {len(new_records)} new {record_type}")
stats['created'] = len(new_records)
else:
for i in range(0, len(new_records), BATCH_SIZE):
batch = new_records[i:i+BATCH_SIZE]
ops = [{'operationType': 'forceReplace', 'record': r} for r in batch]
result = ck.modify(ops)
if 'error' in result:
print(f" Create error: {result['error']}")
stats['errors'] += len(batch)
else:
result_records = result.get('records', [])
successful = [r for r in result_records if 'serverErrorCode' not in r]
failed = [r for r in result_records if 'serverErrorCode' in r]
stats['created'] += len(successful)
stats['errors'] += len(failed)
if failed and verbose:
print(f" Create failed: {failed[0].get('serverErrorCode')}: {failed[0].get('reason')}")
time.sleep(0.3)
# Handle updated records (update with recordChangeTag)
updated_items = diff['updated']
if updated_items:
if dry_run:
print(f" [DRY RUN] Would update {len(updated_items)} {record_type}")
if verbose:
for item in updated_items[:5]:
print(f" - {item['record'].get('recordName')}: {', '.join(item.get('changed_fields', []))}")
if len(updated_items) > 5:
print(f" ... and {len(updated_items) - 5} more")
stats['updated'] = len(updated_items)
else:
for i in range(0, len(updated_items), BATCH_SIZE):
batch = updated_items[i:i+BATCH_SIZE]
ops = []
for item in batch:
record = item['record'].copy()
record['recordChangeTag'] = item['recordChangeTag']
ops.append({'operationType': 'update', 'record': record})
result = ck.modify(ops)
if 'error' in result:
print(f" Update error: {result['error']}")
stats['errors'] += len(batch)
else:
result_records = result.get('records', [])
successful = [r for r in result_records if 'serverErrorCode' not in r]
failed = [r for r in result_records if 'serverErrorCode' in r]
stats['updated'] += len(successful)
# Handle conflicts (409)
conflicts = [r for r in failed if r.get('serverErrorCode') == 'CONFLICT']
other_errors = [r for r in failed if r.get('serverErrorCode') != 'CONFLICT']
if conflicts:
print(f" {len(conflicts)} conflicts detected (records modified since query)")
# Re-try with forceReplace for conflicts (data loss is acceptable as we have source of truth)
conflict_records = [item['record'] for item in batch if any(
c.get('recordName') == item['record'].get('recordName') for c in conflicts
)]
if conflict_records:
retry_ops = [{'operationType': 'forceReplace', 'record': r} for r in conflict_records]
retry_result = ck.modify(retry_ops)
if 'error' not in retry_result:
retry_records = retry_result.get('records', [])
retry_success = [r for r in retry_records if 'serverErrorCode' not in r]
stats['updated'] += len(retry_success)
stats['errors'] -= len(retry_success)
if other_errors:
stats['errors'] += len(other_errors)
if verbose:
print(f" Update failed: {other_errors[0].get('serverErrorCode')}: {other_errors[0].get('reason')}")
time.sleep(0.3)
# Handle deleted records (only if delete_orphans is True)
deleted_items = diff['deleted']
if deleted_items:
if delete_orphans:
if dry_run:
print(f" [DRY RUN] Would delete {len(deleted_items)} orphan {record_type}")
stats['deleted'] = len(deleted_items)
else:
for i in range(0, len(deleted_items), BATCH_SIZE):
batch = deleted_items[i:i+BATCH_SIZE]
ops = [{
'operationType': 'delete',
'record': {
'recordName': item['record'].get('recordName'),
'recordType': record_type,
'recordChangeTag': item['recordChangeTag']
}
} for item in batch]
result = ck.modify(ops)
if 'error' in result:
print(f" Delete error: {result['error']}")
stats['errors'] += len(batch)
else:
result_records = result.get('records', [])
successful = [r for r in result_records if 'serverErrorCode' not in r]
failed = [r for r in result_records if 'serverErrorCode' in r]
stats['deleted'] += len(successful)
stats['errors'] += len(failed)
time.sleep(0.3)
else:
print(f" Warning: {len(deleted_items)} orphan {record_type} in CloudKit (use --delete-orphans to remove)")
# Count unchanged as skipped
stats['skipped'] = len(diff['unchanged'])
return stats
def run_smart_sync(ck, data_dir, dry_run=False, verbose=False, delete_orphans=False):
"""Run differential sync for all record types."""
from pathlib import Path
data_dir = Path(data_dir)
print("\n" + "="*50)
print(f"CloudKit Smart Sync {'(DRY RUN)' if dry_run else ''}")
print("="*50 + "\n")
# Load local data
stadiums = json.load(open(data_dir / 'stadiums_canonical.json')) if (data_dir / 'stadiums_canonical.json').exists() else []
teams = json.load(open(data_dir / 'teams_canonical.json')) if (data_dir / 'teams_canonical.json').exists() else []
# Load games from canonical/games/*.json
canonical_games_dir = data_dir / 'canonical' / 'games'
games = []
if canonical_games_dir.exists():
for games_file in sorted(canonical_games_dir.glob('*.json')):
with open(games_file) as f:
games.extend(json.load(f))
league_structure = json.load(open(data_dir / 'league_structure.json')) if (data_dir / 'league_structure.json').exists() else []
team_aliases = json.load(open(data_dir / 'team_aliases.json')) if (data_dir / 'team_aliases.json').exists() else []
stadium_aliases = json.load(open(data_dir / 'stadium_aliases.json')) if (data_dir / 'stadium_aliases.json').exists() else []
print(f"Local data: {len(stadiums)} stadiums, {len(teams)} teams, {len(games)} games")
print(f" {len(league_structure)} league structures, {len(team_aliases)} team aliases, {len(stadium_aliases)} stadium aliases\n")
# Build local record maps (reuse from show_diff_report)
def build_stadium_records(stadiums):
records = {}
for s in stadiums:
stadium_id = s.get('canonical_id', s.get('id', ''))
record_name = deterministic_uuid(stadium_id)
team_abbrevs = s.get('primary_team_abbrevs', s.get('team_abbrevs', []))
fields = {
'stadiumId': {'value': record_name},
'canonicalId': {'value': stadium_id},
'name': {'value': s['name']},
'city': {'value': s['city']},
'state': {'value': s.get('state', '')},
'sport': {'value': s['sport']},
'source': {'value': s.get('source', 'canonical')},
'teamAbbrevs': {'value': team_abbrevs},
}
if s.get('latitude'):
fields['location'] = {'value': {'latitude': s['latitude'], 'longitude': s['longitude']}}
if s.get('capacity'):
fields['capacity'] = {'value': s['capacity']}
records[record_name] = {'recordType': 'Stadium', 'recordName': record_name, 'fields': fields}
return records
def build_team_records(teams):
records = {}
for t in teams:
team_id = t.get('canonical_id', '')
record_name = deterministic_uuid(team_id)
fields = {
'teamId': {'value': record_name},
'canonicalId': {'value': team_id},
'abbreviation': {'value': t['abbreviation']},
'name': {'value': t['name']},
'city': {'value': t['city']},
'sport': {'value': t['sport']},
'stadiumCanonicalId': {'value': t.get('stadium_canonical_id', '')},
}
if t.get('conference_id'):
fields['conferenceId'] = {'value': t['conference_id']}
if t.get('division_id'):
fields['divisionId'] = {'value': t['division_id']}
records[record_name] = {'recordType': 'Team', 'recordName': record_name, 'fields': fields}
return records
def build_game_records(games, stadiums):
records = {}
stadium_id_map = {s.get('canonical_id', s.get('id', '')): deterministic_uuid(s.get('canonical_id', s.get('id', ''))) for s in stadiums}
seen_ids = set()
for g in games:
game_id = g.get('canonical_id', g.get('id', ''))
if game_id in seen_ids:
continue
seen_ids.add(game_id)
game_uuid = deterministic_uuid(game_id)
sport = g['sport']
fields = {
'gameId': {'value': game_uuid},
'canonicalId': {'value': game_id},
'sport': {'value': sport},
'season': {'value': g.get('season', '')},
'source': {'value': g.get('source', 'canonical')},
}
if g.get('date'):
try:
time_str = g.get('time', '7:00p')
hour, minute = 19, 0
if time_str:
clean_time = time_str.lower().replace(' ', '')
is_pm = 'p' in clean_time
time_parts = clean_time.replace('p', '').replace('a', '').split(':')
if time_parts:
hour = int(time_parts[0])
if is_pm and hour != 12:
hour += 12
elif not is_pm and hour == 12:
hour = 0
if len(time_parts) > 1:
minute = int(time_parts[1])
dt = datetime.strptime(f"{g['date']} {hour:02d}:{minute:02d}", '%Y-%m-%d %H:%M')
fields['dateTime'] = {'value': int(dt.timestamp() * 1000), 'type': 'TIMESTAMP'}
except:
pass
home_team_canonical_id = g.get('home_team_canonical_id', '')
away_team_canonical_id = g.get('away_team_canonical_id', '')
home_team_uuid = deterministic_uuid(home_team_canonical_id)
away_team_uuid = deterministic_uuid(away_team_canonical_id)
fields['homeTeamRef'] = {'value': {'recordName': home_team_uuid, 'action': 'NONE'}}
fields['awayTeamRef'] = {'value': {'recordName': away_team_uuid, 'action': 'NONE'}}
if g.get('stadium_canonical_id'):
stadium_canonical_id = g['stadium_canonical_id']
stadium_uuid = stadium_id_map.get(stadium_canonical_id)
if stadium_uuid:
fields['stadiumRef'] = {'value': {'recordName': stadium_uuid, 'action': 'NONE'}}
fields['stadiumCanonicalId'] = {'value': stadium_canonical_id}
records[game_uuid] = {'recordType': 'Game', 'recordName': game_uuid, 'fields': fields}
return records
def build_league_structure_records(league_structure):
records = {}
for ls in league_structure:
record_name = ls['id']
fields = {
'structureId': {'value': ls['id']},
'sport': {'value': ls['sport']},
'type': {'value': ls['type']},
'name': {'value': ls['name']},
'displayOrder': {'value': ls['display_order']},
'schemaVersion': {'value': 1},
}
if ls.get('abbreviation'):
fields['abbreviation'] = {'value': ls['abbreviation']}
if ls.get('parent_id'):
fields['parentId'] = {'value': ls['parent_id']}
records[record_name] = {'recordType': 'LeagueStructure', 'recordName': record_name, 'fields': fields}
return records
def build_team_alias_records(team_aliases):
records = {}
for ta in team_aliases:
record_name = ta['id']
fields = {
'aliasId': {'value': ta['id']},
'teamCanonicalId': {'value': ta['team_canonical_id']},
'aliasType': {'value': ta['alias_type']},
'aliasValue': {'value': ta['alias_value']},
'schemaVersion': {'value': 1},
}
records[record_name] = {'recordType': 'TeamAlias', 'recordName': record_name, 'fields': fields}
return records
def build_stadium_alias_records(stadium_aliases):
records = {}
for sa in stadium_aliases:
stadium_id = sa['stadium_canonical_id']
sport = stadium_id.split('_')[1] if '_' in stadium_id else 'unknown'
record_name = f"{sport}_{sa['alias_name'].lower()}"
fields = {
'aliasName': {'value': sa['alias_name'].lower()},
'stadiumCanonicalId': {'value': sa['stadium_canonical_id']},
'schemaVersion': {'value': 1},
}
records[record_name] = {'recordType': 'StadiumAlias', 'recordName': record_name, 'fields': fields}
return records
# Sync each record type
record_types = [
('Stadium', stadiums, build_stadium_records),
('Team', teams, build_team_records),
('Game', games, lambda g: build_game_records(g, stadiums)),
('LeagueStructure', league_structure, build_league_structure_records),
('TeamAlias', team_aliases, build_team_alias_records),
('StadiumAlias', stadium_aliases, build_stadium_alias_records),
]
total_stats = {'created': 0, 'updated': 0, 'deleted': 0, 'skipped': 0, 'errors': 0}
for record_type, data, builder in record_types:
if not data:
print(f"{record_type}: No local data, skipping")
continue
print(f"\n--- {record_type} ---")
local_records = builder(data)
# Query cloud records
print(f" Querying CloudKit...")
cloud_records = ck.query_all(record_type, verbose=verbose)
print(f" Found {len(cloud_records)} cloud records, {len(local_records)} local records")
# Compute diff
diff = compute_diff(local_records, cloud_records, verbose=verbose)
print(f" Diff: {len(diff['new'])} new, {len(diff['updated'])} updated, {len(diff['unchanged'])} unchanged, {len(diff['deleted'])} orphans")
# Sync
stats = sync_diff(ck, diff, record_type, dry_run=dry_run, verbose=verbose, delete_orphans=delete_orphans)
# Accumulate stats
for key in total_stats:
total_stats[key] += stats[key]
print(f" Result: {stats['created']} created, {stats['updated']} updated, {stats['deleted']} deleted, {stats['skipped']} skipped")
if stats['errors']:
print(f" Errors: {stats['errors']}")
# Summary
print("\n" + "="*50)
print("Smart Sync Summary")
print("="*50)
print(f" Created: {total_stats['created']}")
print(f" Updated: {total_stats['updated']}")
print(f" Deleted: {total_stats['deleted']}")
print(f" Skipped (unchanged): {total_stats['skipped']}")
if total_stats['errors']:
print(f" Errors: {total_stats['errors']}")
if dry_run:
print("\n[DRY RUN - no changes made]")
return total_stats
def main():
p = argparse.ArgumentParser(description='Import JSON to CloudKit')
p.add_argument('--key-id', default=DEFAULT_KEY_ID)
@@ -710,6 +1068,8 @@ def main():
p.add_argument('--delete-all', action='store_true', help='Delete all records before importing')
p.add_argument('--delete-only', action='store_true', help='Only delete records, do not import')
p.add_argument('--diff', action='store_true', help='Show diff between local and CloudKit without importing')
p.add_argument('--smart-sync', action='store_true', help='Differential sync: only upload new/changed records')
p.add_argument('--delete-orphans', action='store_true', help='With --smart-sync, also delete records not in local data')
p.add_argument('--dry-run', action='store_true')
p.add_argument('--verbose', '-v', action='store_true')
p.add_argument('--interactive', '-i', action='store_true', help='Show interactive menu')
@@ -719,7 +1079,7 @@ def main():
has_action_flag = any([
args.stadiums_only, args.games_only, args.games_files, args.league_structure_only,
args.team_aliases_only, args.stadium_aliases_only, args.canonical_only,
args.delete_all, args.delete_only, args.dry_run, args.diff
args.delete_all, args.delete_only, args.dry_run, args.diff, args.smart_sync
])
# Track selected game files (for option 4 or --games-files)
@@ -761,6 +1121,11 @@ def main():
args.delete_only = True
elif choice == 11: # Dry run
args.dry_run = True
elif choice == 12: # Smart sync
args.smart_sync = True
elif choice == 13: # Smart sync + delete orphans
args.smart_sync = True
args.delete_orphans = True
print(f"\n{'='*50}")
print(f"CloudKit Import {'(DRY RUN)' if args.dry_run else ''}")
@@ -843,6 +1208,18 @@ def main():
show_diff_report(ck, args.data_dir, verbose=args.verbose)
return
# Handle smart sync mode (differential upload)
if args.smart_sync:
if not ck:
# Need CloudKit connection for smart sync
if not HAS_CRYPTO:
sys.exit("Error: pip install cryptography")
if not os.path.exists(args.key_file):
sys.exit(f"Error: Key file not found: {args.key_file}")
ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env)
run_smart_sync(ck, args.data_dir, dry_run=args.dry_run, verbose=args.verbose, delete_orphans=args.delete_orphans)
return
# Handle deletion
if args.delete_all or args.delete_only:
if not ck: