feat(05-02): add individual record management commands

Add commands for managing individual CloudKit records:
- --get TYPE ID: Retrieve and display single record
- --list TYPE [--count]: List all recordNames for a type
- --update-record TYPE ID FIELD=VALUE: Update fields with conflict handling
- --delete-record TYPE ID [--force]: Delete with confirmation

Features:
- Type validation against VALID_RECORD_TYPES
- Triple lookup fallback: direct -> deterministic UUID -> canonicalId query
- Automatic type parsing for numeric field values
- Conflict detection with automatic forceReplace retry
- Deletion confirmation (skip with --force)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Trey t
2026-01-10 10:17:40 -06:00
parent 5763db4a61
commit 5a08659837

View File

@@ -1315,6 +1315,318 @@ def verify_sync(ck, data_dir, verbose=False, deep=False):
return total_mismatches == 0
# Valid record types for individual record management
VALID_RECORD_TYPES = ['Stadium', 'Team', 'Game', 'LeagueStructure', 'TeamAlias', 'StadiumAlias']
def validate_record_type(record_type):
"""Validate record type and return normalized version."""
# Allow case-insensitive matching
for valid_type in VALID_RECORD_TYPES:
if record_type.lower() == valid_type.lower():
return valid_type
return None
def get_record(ck, record_type, record_id, verbose=False):
"""Get and display a single record by ID."""
normalized_type = validate_record_type(record_type)
if not normalized_type:
print(f"Error: Unknown record type '{record_type}'. Valid types: {', '.join(VALID_RECORD_TYPES)}")
return False
print(f"\nLooking up {normalized_type} with id '{record_id}'...")
# Try to look up by recordName directly
records = ck.lookup(normalized_type, [record_id], verbose=verbose)
if isinstance(records, dict) and 'error' in records:
print(f"Error: {records['error']}")
return False
# Filter out records with errors (NOT_FOUND)
found_records = [r for r in records if 'serverErrorCode' not in r]
# If not found, try deterministic_uuid lookup
if not found_records:
uuid_name = deterministic_uuid(record_id)
records = ck.lookup(normalized_type, [uuid_name], verbose=verbose)
if isinstance(records, dict) and 'error' in records:
print(f"Error: {records['error']}")
return False
found_records = [r for r in records if 'serverErrorCode' not in r]
# If still not found, try query by canonicalId field
if not found_records:
if verbose:
print(f" Trying query by canonicalId field...")
# Query by canonicalId (works for Stadium, Team, Game)
query_records = ck.query(normalized_type, [['canonicalId', 'EQUALS', record_id]])
if isinstance(query_records, dict) and 'error' in query_records:
pass # Ignore error, will report not found below
elif query_records:
result_records = query_records.get('records', [])
found_records = [r for r in result_records if 'serverErrorCode' not in r]
if not found_records:
print(f"Error: No {normalized_type} with id '{record_id}' found in CloudKit")
return False
record = found_records[0]
# Display record
print(f"\n{'='*50}")
print(f"Record: {record.get('recordName', 'N/A')}")
print(f"Type: {record.get('recordType', 'N/A')}")
print(f"ChangeTag: {record.get('recordChangeTag', 'N/A')}")
print(f"{'='*50}")
print("Fields:")
fields = record.get('fields', {})
for field_name, field_data in sorted(fields.items()):
value = field_data.get('value', 'N/A')
field_type = field_data.get('type', '')
if field_type:
print(f" {field_name}: {value} ({field_type})")
else:
print(f" {field_name}: {value}")
print()
return True
def list_records(ck, record_type, count_only=False, verbose=False):
"""List all recordNames for a record type."""
normalized_type = validate_record_type(record_type)
if not normalized_type:
print(f"Error: Unknown record type '{record_type}'. Valid types: {', '.join(VALID_RECORD_TYPES)}")
return False
print(f"\nQuerying {normalized_type} records...")
records = ck.query_all(normalized_type, verbose=verbose)
if isinstance(records, dict) and 'error' in records:
print(f"Error: {records['error']}")
return False
if count_only:
print(f"\n{normalized_type}: {len(records)} records")
else:
print(f"\n{normalized_type} ({len(records)} records):")
print("-" * 40)
for record_name in sorted(records.keys()):
print(record_name)
return True
def update_record(ck, record_type, record_id, field_updates, verbose=False):
"""Update a single record with field changes."""
normalized_type = validate_record_type(record_type)
if not normalized_type:
print(f"Error: Unknown record type '{record_type}'. Valid types: {', '.join(VALID_RECORD_TYPES)}")
return False
if not field_updates:
print("Error: No field updates specified. Use format: field=value")
return False
# Parse field updates
updates = {}
for update in field_updates:
if '=' not in update:
print(f"Error: Invalid update format '{update}'. Use: field=value")
return False
field_name, value = update.split('=', 1)
# Try to parse value as number if possible
try:
if '.' in value:
value = float(value)
else:
value = int(value)
except ValueError:
pass # Keep as string
updates[field_name] = value
print(f"\nLooking up {normalized_type} with id '{record_id}'...")
# Look up record to get recordChangeTag
records = ck.lookup(normalized_type, [record_id], verbose=verbose)
if isinstance(records, dict) and 'error' in records:
print(f"Error: {records['error']}")
return False
# Filter out NOT_FOUND records
found_records = [r for r in records if 'serverErrorCode' not in r]
if not found_records:
# Try with deterministic_uuid
uuid_name = deterministic_uuid(record_id)
records = ck.lookup(normalized_type, [uuid_name], verbose=verbose)
if isinstance(records, dict) and 'error' in records:
print(f"Error: {records['error']}")
return False
found_records = [r for r in records if 'serverErrorCode' not in r]
if not found_records:
print(f"Error: No {normalized_type} with id '{record_id}' found in CloudKit")
return False
record_id = uuid_name
record = found_records[0]
record_change_tag = record.get('recordChangeTag', '')
print(f"Found record: {record_id}")
print(f"Current recordChangeTag: {record_change_tag}")
print(f"\nUpdating fields: {updates}")
# Build update operation
updated_record = {
'recordType': normalized_type,
'recordName': record_id,
'recordChangeTag': record_change_tag,
'fields': {field: {'value': value} for field, value in updates.items()}
}
result = ck.modify([{'operationType': 'update', 'record': updated_record}])
if 'error' in result:
print(f"Error: {result['error']}")
return False
result_records = result.get('records', [])
if not result_records:
print("Error: No response from CloudKit")
return False
result_record = result_records[0]
if 'serverErrorCode' in result_record:
error_code = result_record.get('serverErrorCode')
reason = result_record.get('reason', 'Unknown')
if error_code == 'CONFLICT':
print(f"\nConflict detected: Record was modified since lookup.")
print("Retrying with forceReplace...")
# Retry with forceReplace
updated_record.pop('recordChangeTag', None)
retry_result = ck.modify([{'operationType': 'forceReplace', 'record': updated_record}])
if 'error' in retry_result:
print(f"Error: {retry_result['error']}")
return False
retry_records = retry_result.get('records', [])
if retry_records and 'serverErrorCode' not in retry_records[0]:
print(f"\n✓ Record updated successfully (forceReplace)")
return True
else:
print(f"Error: {retry_records[0].get('reason', 'Unknown error')}")
return False
else:
print(f"Error: {error_code}: {reason}")
return False
print(f"\n✓ Record updated successfully")
print(f"New recordChangeTag: {result_record.get('recordChangeTag', 'N/A')}")
return True
def delete_record(ck, record_type, record_id, force=False, verbose=False):
"""Delete a single record by ID."""
normalized_type = validate_record_type(record_type)
if not normalized_type:
print(f"Error: Unknown record type '{record_type}'. Valid types: {', '.join(VALID_RECORD_TYPES)}")
return False
print(f"\nLooking up {normalized_type} with id '{record_id}'...")
# Look up record to get recordChangeTag
records = ck.lookup(normalized_type, [record_id], verbose=verbose)
if isinstance(records, dict) and 'error' in records:
print(f"Error: {records['error']}")
return False
# Filter out NOT_FOUND records
found_records = [r for r in records if 'serverErrorCode' not in r]
if not found_records:
# Try with deterministic_uuid
uuid_name = deterministic_uuid(record_id)
records = ck.lookup(normalized_type, [uuid_name], verbose=verbose)
if isinstance(records, dict) and 'error' in records:
print(f"Error: {records['error']}")
return False
found_records = [r for r in records if 'serverErrorCode' not in r]
if not found_records:
print(f"Error: No {normalized_type} with id '{record_id}' found in CloudKit")
return False
record_id = uuid_name
record = found_records[0]
record_change_tag = record.get('recordChangeTag', '')
print(f"Found record: {record_id}")
# Show record details
fields = record.get('fields', {})
print("\nRecord details:")
for field_name, field_data in list(fields.items())[:5]: # Show first 5 fields
print(f" {field_name}: {field_data.get('value', 'N/A')}")
if len(fields) > 5:
print(f" ... and {len(fields) - 5} more fields")
# Confirm deletion
if not force:
try:
confirm = input("\nAre you sure you want to delete this record? (yes/no): ").strip().lower()
if confirm not in ['yes', 'y']:
print("Deletion cancelled.")
return False
except (EOFError, KeyboardInterrupt):
print("\nDeletion cancelled.")
return False
print("\nDeleting record...")
# Build delete operation
delete_op = {
'operationType': 'delete',
'record': {
'recordName': record_id,
'recordType': normalized_type,
'recordChangeTag': record_change_tag
}
}
result = ck.modify([delete_op])
if 'error' in result:
print(f"Error: {result['error']}")
return False
result_records = result.get('records', [])
if not result_records:
print("Error: No response from CloudKit")
return False
result_record = result_records[0]
if 'serverErrorCode' in result_record:
error_code = result_record.get('serverErrorCode')
reason = result_record.get('reason', 'Unknown')
print(f"Error: {error_code}: {reason}")
return False
print(f"\n✓ Record deleted successfully")
return True
def main():
p = argparse.ArgumentParser(description='Import JSON to CloudKit')
p.add_argument('--key-id', default=DEFAULT_KEY_ID)
@@ -1336,6 +1648,13 @@ def main():
p.add_argument('--delete-orphans', action='store_true', help='With --smart-sync, also delete records not in local data')
p.add_argument('--verify', action='store_true', help='Verify CloudKit matches local data (quick: counts + spot-check)')
p.add_argument('--verify-deep', action='store_true', help='Verify CloudKit matches local data (deep: full field comparison)')
# Individual record management
p.add_argument('--get', nargs=2, metavar=('TYPE', 'ID'), help='Get a single record (e.g., --get Stadium stadium_nba_td_garden)')
p.add_argument('--list', metavar='TYPE', help='List all recordNames for a type (e.g., --list Stadium)')
p.add_argument('--count', action='store_true', help='With --list, show only the count')
p.add_argument('--update-record', nargs='+', metavar='ARG', help='Update a record: TYPE ID FIELD=VALUE [FIELD=VALUE ...] (e.g., --update-record Stadium id123 capacity=19156)')
p.add_argument('--delete-record', nargs=2, metavar=('TYPE', 'ID'), help='Delete a single record (e.g., --delete-record Game game_mlb_2025_xxx)')
p.add_argument('--force', action='store_true', help='Skip confirmation for --delete-record')
p.add_argument('--dry-run', action='store_true')
p.add_argument('--verbose', '-v', action='store_true')
p.add_argument('--interactive', '-i', action='store_true', help='Show interactive menu')
@@ -1346,7 +1665,7 @@ def main():
args.stadiums_only, args.games_only, args.games_files, args.league_structure_only,
args.team_aliases_only, args.stadium_aliases_only, args.canonical_only,
args.delete_all, args.delete_only, args.dry_run, args.diff, args.smart_sync,
args.verify, args.verify_deep
args.verify, args.verify_deep, args.get, args.list, args.update_record, args.delete_record
])
# Track selected game files (for option 4 or --games-files)
@@ -1491,6 +1810,67 @@ def main():
verify_sync(ck, args.data_dir, verbose=args.verbose, deep=args.verify_deep)
return
# Handle individual record operations
if args.get:
record_type, record_id = args.get
if not ck:
if not HAS_CRYPTO:
sys.exit("Error: pip install cryptography")
if not os.path.exists(args.key_file):
sys.exit(f"Error: Key file not found: {args.key_file}")
ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env)
get_record(ck, record_type, record_id, verbose=args.verbose)
return
if args.list:
if not ck:
if not HAS_CRYPTO:
sys.exit("Error: pip install cryptography")
if not os.path.exists(args.key_file):
sys.exit(f"Error: Key file not found: {args.key_file}")
ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env)
list_records(ck, args.list, count_only=args.count, verbose=args.verbose)
return
if args.update_record:
if len(args.update_record) < 3:
sys.exit("Error: --update-record requires TYPE ID FIELD=VALUE [FIELD=VALUE ...]")
record_type = args.update_record[0]
record_id = args.update_record[1]
field_updates = {}
for update in args.update_record[2:]:
if '=' not in update:
sys.exit(f"Error: Invalid field update '{update}'. Format: FIELD=VALUE")
field, value = update.split('=', 1)
# Try to parse as number
try:
value = int(value)
except ValueError:
try:
value = float(value)
except ValueError:
pass # Keep as string
field_updates[field] = value
if not ck:
if not HAS_CRYPTO:
sys.exit("Error: pip install cryptography")
if not os.path.exists(args.key_file):
sys.exit(f"Error: Key file not found: {args.key_file}")
ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env)
update_record(ck, record_type, record_id, field_updates, verbose=args.verbose)
return
if args.delete_record:
record_type, record_id = args.delete_record
if not ck:
if not HAS_CRYPTO:
sys.exit("Error: pip install cryptography")
if not os.path.exists(args.key_file):
sys.exit(f"Error: Key file not found: {args.key_file}")
ck = CloudKit(args.key_id, open(args.key_file, 'rb').read(), args.container, args.env)
delete_record(ck, record_type, record_id, force=args.force, verbose=args.verbose)
return
# Handle smart sync mode (differential upload)
if args.smart_sync:
if not ck: