Codebase hardening: 102 fixes across 35+ files

Deep audit identified 106 findings; 102 fixed, 4 deferred. Covers 8 areas:

- Settings & deploy: env-gated DEBUG/SECRET_KEY, HTTPS headers, gunicorn, celery worker
- Auth (registered_user): password write_only, request.data fixes, transaction safety, proper HTTP status codes
- Workout app: IDOR protection, get_object_or_404, prefetch_related N+1 fixes, transaction.atomic
- Video/scripts: path traversal sanitization, HLS trigger guard, auth on cache wipe
- Models (exercise/equipment/muscle/superset): null-safe __str__, stable IDs, prefetch support
- Generator views: helper for registered_user lookup, logger.exception, bulk_update, transaction wrapping
- Generator core (rules/selector/generator): push-pull ratio, type affinity normalization, modality checks, side-pair exact match, word-boundary regex, equipment cache clearing
- Generator services (plan_builder/analyzer/normalizer): transaction.atomic, muscle cache, bulk_update, glutes classification fix

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Trey t
2026-02-27 22:29:14 -06:00
parent 63b57a83ab
commit c80c66c2e5
58 changed files with 3363 additions and 1049 deletions

View File

@@ -160,13 +160,16 @@ class ExerciseSelector:
self._exercise_profile_cache = {}
self.warnings = [] # Phase 13: generation warnings
self.progression_boost_ids = set() # IDs of exercises that are progressions of recently done ones
# Week-scoped state for cross-day dedup (NOT cleared by reset())
self.week_used_exercise_ids = set()
self.week_used_movement_families = Counter()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def reset(self):
"""Reset used exercises for a new workout."""
"""Reset used exercises for a new workout (preserves week-scoped state)."""
self.used_exercise_ids = set()
self.used_exercise_names = set()
self.used_movement_patterns = Counter()
@@ -175,6 +178,49 @@ class ExerciseSelector:
self.last_working_similarity_profiles = []
self._exercise_profile_cache = {}
self.warnings = []
# Clear per-queryset caches so equipment/exclusion changes take effect
if hasattr(self, '_equipment_map_cache'):
del self._equipment_map_cache
if hasattr(self, '_bodyweight_ids_cache'):
del self._bodyweight_ids_cache
if hasattr(self, '_warned_small_pool'):
del self._warned_small_pool
if hasattr(self, '_warned_no_equipment'):
del self._warned_no_equipment
if hasattr(self, '_relaxed_hard_exclude_ids'):
del self._relaxed_hard_exclude_ids
if hasattr(self, '_injury_warnings_emitted'):
del self._injury_warnings_emitted
def reset_week(self):
"""Reset all state including week-scoped tracking. Call at start of a new week."""
self.reset()
self.week_used_exercise_ids = set()
self.week_used_movement_families = Counter()
def accumulate_week_state(self, exercise_ids, exercise_names):
"""Record a completed day's exercises into week-scoped tracking.
Parameters
----------
exercise_ids : set[int]
Primary keys of exercises used in the day's workout.
exercise_names : set[str]
Exercise names (used for family extraction).
"""
self.week_used_exercise_ids.update(exercise_ids)
for name in exercise_names:
for fam in extract_movement_families(name):
self.week_used_movement_families[fam] += 1
def _get_week_family_limit(self, family):
"""Max allowed uses of a movement family across the whole week.
Wider than per-workout limits: narrow families = 2/week, broad = 4/week.
"""
if family in NARROW_FAMILIES:
return 2
return 4
def select_exercises(
self,
@@ -184,6 +230,7 @@ class ExerciseSelector:
movement_pattern_preference=None,
prefer_weighted=False,
superset_position=None,
allow_cross_modality=False,
):
"""
Select *count* exercises matching the given criteria.
@@ -200,6 +247,10 @@ class ExerciseSelector:
Optional list of preferred movement patterns to favour.
prefer_weighted : bool
When True (R6), boost is_weight=True exercises in selection.
allow_cross_modality : bool
When True, don't hard-filter by modality — instead use soft
preference so duration-only exercises (carries, planks) can
land in rep-based supersets and vice versa.
Returns
-------
@@ -209,13 +260,19 @@ class ExerciseSelector:
return []
fitness_level = getattr(self.user_preference, 'fitness_level', None)
# When cross-modality is allowed, skip the hard modality filter
# so duration-only exercises can appear in rep supersets and vice versa.
modality_for_filter = None if allow_cross_modality else is_duration_based
preferred_modality = 'duration' if is_duration_based else 'reps'
qs = self._get_filtered_queryset(
muscle_groups=muscle_groups,
is_duration_based=is_duration_based,
is_duration_based=modality_for_filter,
fitness_level=fitness_level,
)
# Working supersets should not contain stretch/recovery exercises.
excluded_q = Q(name__icontains='stretch')
# Use regex word boundary to avoid over-matching (e.g. "Stretch Band Row"
# should NOT be excluded, but "Hamstring Stretch" should).
excluded_q = Q(name__iregex=r'\bstretch(ing|es|ed)?\b')
for pat in self.WORKING_EXCLUDED_PATTERNS:
excluded_q |= Q(movement_patterns__icontains=pat)
qs = qs.exclude(excluded_q)
@@ -258,6 +315,7 @@ class ExerciseSelector:
count,
superset_position=superset_position,
similarity_scope='working',
preferred_modality=preferred_modality if allow_cross_modality else None,
)
# Sort selected exercises by tier: primary first, then secondary, then accessory
@@ -288,14 +346,16 @@ class ExerciseSelector:
for missing_muscle in uncovered:
replacement_qs = self._get_filtered_queryset(
muscle_groups=[missing_muscle],
is_duration_based=is_duration_based,
is_duration_based=modality_for_filter,
fitness_level=fitness_level,
).exclude(pk__in={e.pk for e in selected})
# Validate modality: ensure replacement matches expected modality
if is_duration_based:
replacement_qs = replacement_qs.filter(is_duration=True)
elif is_duration_based is False:
replacement_qs = replacement_qs.filter(is_reps=True)
# (skip when cross-modality is allowed)
if not allow_cross_modality:
if is_duration_based:
replacement_qs = replacement_qs.filter(is_duration=True)
elif is_duration_based is False:
replacement_qs = replacement_qs.filter(is_reps=True)
replacement = list(replacement_qs[:1])
if replacement:
# Find last unswapped accessory
@@ -382,8 +442,6 @@ class ExerciseSelector:
is_duration_based=True,
fitness_level=fitness_level,
)
# Avoid duplicate-looking left/right variants in recovery sections.
qs = qs.filter(Q(side__isnull=True) | Q(side=''))
# Prefer exercises whose movement_patterns overlap with warmup keywords
warmup_q = Q()
@@ -420,7 +478,6 @@ class ExerciseSelector:
is_duration_based=True,
fitness_level=fitness_level,
).exclude(pk__in={e.pk for e in selected})
wide_qs = wide_qs.filter(Q(side__isnull=True) | Q(side=''))
# Apply same warmup safety exclusions
wide_qs = wide_qs.exclude(is_weight=True)
wide_qs = wide_qs.exclude(is_compound=True)
@@ -440,7 +497,8 @@ class ExerciseSelector:
self._track_families(selected)
selected = self._ensure_side_pair_integrity(selected, qs, count=count)
return self._trim_preserving_pairs(selected, count)
selected = self._trim_preserving_pairs(selected, count)
return self._order_side_pairs_adjacent(selected)
def select_cooldown_exercises(self, target_muscles, count=4):
"""
@@ -456,8 +514,6 @@ class ExerciseSelector:
is_duration_based=True,
fitness_level=fitness_level,
)
# Avoid duplicate-looking left/right variants in recovery sections.
qs = qs.filter(Q(side__isnull=True) | Q(side=''))
cooldown_q = Q()
for kw in self.COOLDOWN_PATTERNS:
@@ -489,7 +545,6 @@ class ExerciseSelector:
is_duration_based=True,
fitness_level=fitness_level,
).exclude(pk__in={e.pk for e in selected})
wide_qs = wide_qs.filter(Q(side__isnull=True) | Q(side=''))
# Apply same exclusions
wide_qs = wide_qs.exclude(exclude_q)
# R11: also apply weight filter on wide fallback
@@ -509,7 +564,8 @@ class ExerciseSelector:
self._track_families(selected)
selected = self._ensure_side_pair_integrity(selected, qs, count=count)
return self._trim_preserving_pairs(selected, count)
selected = self._trim_preserving_pairs(selected, count)
return self._order_side_pairs_adjacent(selected)
# ------------------------------------------------------------------
# Internal helpers
@@ -568,37 +624,31 @@ class ExerciseSelector:
qs = qs.exclude(name_exclude_q)
# ---- Hard exclude exercises from recent workouts (Phase 6) ----
# Adaptive: if pool would be too small, relax hard exclude to soft penalty
# Adaptive: if pool would be too small, relax hard exclude to soft penalty.
# Use a local merged set to avoid permanently polluting recently_used_ids.
if self.hard_exclude_ids:
test_qs = qs.exclude(pk__in=self.hard_exclude_ids)
if test_qs.count() >= 10:
qs = test_qs
else:
# Pool too small — convert hard exclude to soft penalty instead
self.recently_used_ids = self.recently_used_ids | self.hard_exclude_ids
if not hasattr(self, '_warned_small_pool'):
self.warnings.append(
'Exercise pool too small for full variety rotation — '
'relaxed recent exclusion to soft penalty.'
)
self._warned_small_pool = True
# Pool too small — treat hard excludes as soft penalty for this
# queryset only (don't mutate the original recently_used_ids).
if not hasattr(self, '_relaxed_hard_exclude_ids'):
self._relaxed_hard_exclude_ids = set(self.hard_exclude_ids)
if not hasattr(self, '_warned_small_pool'):
self.warnings.append(
'Exercise pool too small for full variety rotation — '
'relaxed recent exclusion to soft penalty.'
)
self._warned_small_pool = True
# ---- Filter by user's available equipment ----
available_equipment_ids = set(
self.user_preference.available_equipment.values_list('pk', flat=True)
)
if not available_equipment_ids:
# No equipment set: only allow bodyweight exercises (no WorkoutEquipment entries)
exercises_with_equipment = set(
WorkoutEquipment.objects.values_list('exercise_id', flat=True).distinct()
)
qs = qs.exclude(pk__in=exercises_with_equipment)
if not hasattr(self, '_warned_no_equipment'):
self.warnings.append(
'No equipment set — using bodyweight exercises only. '
'Update your equipment preferences for more variety.'
)
self._warned_no_equipment = True
# No equipment set in preferences — all exercises are available (no filtering).
pass
elif available_equipment_ids:
# Cache equipment map on instance to avoid rebuilding per call
if not hasattr(self, '_equipment_map_cache'):
@@ -895,6 +945,7 @@ class ExerciseSelector:
count,
superset_position=None,
similarity_scope=None,
preferred_modality=None,
):
"""
Pick up to *count* exercises using weighted random selection.
@@ -909,6 +960,10 @@ class ExerciseSelector:
superset_position: 'early', 'late', or None. When set, boosts
exercises based on their exercise_tier (primary for early,
accessory for late).
preferred_modality: 'reps' or 'duration' or None. When set,
exercises that don't match the preferred modality get 0.3x weight
(cross-modality penalty). Dual-modality exercises always get full weight.
"""
if count <= 0:
return []
@@ -932,12 +987,49 @@ class ExerciseSelector:
return base_w * 2
return base_w
def _apply_week_penalty(ex, base_w):
"""Soft-penalize exercises already used earlier in the week."""
w = base_w
if self.week_used_exercise_ids and ex.pk in self.week_used_exercise_ids:
w = max(1, w // 2)
if self.week_used_movement_families:
for fam in extract_movement_families(ex.name):
if self.week_used_movement_families.get(fam, 0) >= self._get_week_family_limit(fam):
w = max(1, w // 2)
break
return w
def _apply_modality_penalty(ex, base_w):
"""Soft-penalize exercises that don't match the preferred modality.
Dual-modality exercises (is_reps AND is_duration) get full weight.
Cross-modality exercises get 0.3x weight (minimum 1).
"""
if not preferred_modality:
return base_w
is_reps = getattr(ex, 'is_reps', False)
is_dur = getattr(ex, 'is_duration', False)
# Dual-modality: always full weight
if is_reps and is_dur:
return base_w
if preferred_modality == 'reps' and is_reps:
return base_w
if preferred_modality == 'duration' and is_dur:
return base_w
# Cross-modality: reduce to ~30% of base weight
return max(1, int(base_w * 0.3))
# Build effective soft-penalty set: recently_used + any relaxed hard excludes
_effective_recently_used = self.recently_used_ids
if hasattr(self, '_relaxed_hard_exclude_ids') and self._relaxed_hard_exclude_ids:
_effective_recently_used = self.recently_used_ids | self._relaxed_hard_exclude_ids
for ex in preferred_list:
w = weight_preferred
# Boost exercises that are progressions of recently completed exercises
if ex.pk in self.progression_boost_ids:
w = w * 2
if ex.pk in self.recently_used_ids:
if ex.pk in _effective_recently_used:
w = 1 # Reduce weight for recently used
# Penalize overused movement patterns for variety (Phase 11)
# Fixed: check ALL comma-separated patterns, use max count
@@ -953,12 +1045,16 @@ class ExerciseSelector:
w = 1
elif max_pat_count >= 2:
w = max(1, w - 1)
w = _apply_week_penalty(ex, w)
w = _apply_modality_penalty(ex, w)
w = _tier_boost(ex, w)
pool.extend([ex] * w)
for ex in other_list:
w = weight_other
if ex.pk in self.recently_used_ids:
if ex.pk in _effective_recently_used:
w = 1 # Already 1 but keep explicit
w = _apply_week_penalty(ex, w)
w = _apply_modality_penalty(ex, w)
w = _tier_boost(ex, w)
pool.extend([ex] * w)
@@ -1153,23 +1249,26 @@ class ExerciseSelector:
if not opposite_norm:
continue
# Find the matching partner by name similarity and opposite side
# Find the matching partner by exact base-name match and opposite side.
# Typically the name is identical except for side, e.g.
# "Single Arm Row Left" / "Single Arm Row Right"
base_name = ex.name
for side_word in ['Left', 'Right', 'left', 'right']:
base_name = base_name.replace(side_word, '').strip()
base_name = self._strip_side_tokens(ex.name)
partner = (
# Use strict matching: find candidates with opposite side,
# then filter in Python by exact base-name match to avoid
# substring false positives (e.g. "L Sit" matching "Wall Sit").
partner_candidates = (
Exercise.objects
.filter(
name__icontains=base_name,
)
.filter(self._side_values_q(opposite_norm))
.exclude(pk__in=self.used_exercise_ids)
.exclude(pk__in=paired_ids)
.first()
)
partner = None
for candidate in partner_candidates:
candidate_base = self._strip_side_tokens(candidate.name)
if base_name.lower() == candidate_base.lower():
partner = candidate
break
if partner and partner.pk not in paired_ids:
exercises_to_add.append(partner)
@@ -1184,12 +1283,11 @@ class ExerciseSelector:
# Check if any partner should follow this exercise
for partner in exercises_to_add:
if partner.pk not in added_ids:
# Check if partner is the pair for this exercise
# Check if partner is the pair for this exercise using exact base-name match
if ex.side and ex.side.strip():
base_name = ex.name
for side_word in ['Left', 'Right', 'left', 'right']:
base_name = base_name.replace(side_word, '').strip()
if base_name.lower() in partner.name.lower():
ex_base = self._strip_side_tokens(ex.name)
partner_base = self._strip_side_tokens(partner.name)
if ex_base.lower() == partner_base.lower():
final.append(partner)
added_ids.add(partner.pk)
@@ -1265,6 +1363,57 @@ class ExerciseSelector:
return result
def _order_side_pairs_adjacent(self, selected):
"""
Keep left/right variants adjacent in list order.
This is primarily for warm-up/cool-down UX so side-specific movements
render one after another instead of grouped by side.
"""
if len(selected) < 2:
return selected
side_map = {}
for ex in selected:
side_val = self._normalize_side_value(getattr(ex, 'side', ''))
if side_val not in ('left', 'right'):
continue
key = self._strip_side_tokens(getattr(ex, 'name', ''))
side_map.setdefault(key, {'left': [], 'right': []})
side_map[key][side_val].append(ex)
ordered = []
used_ids = set()
for ex in selected:
if ex.pk in used_ids:
continue
side_val = self._normalize_side_value(getattr(ex, 'side', ''))
if side_val in ('left', 'right'):
key = self._strip_side_tokens(getattr(ex, 'name', ''))
opposite = self._opposite_side(side_val)
opposite_ex = None
for candidate in side_map.get(key, {}).get(opposite, []):
if candidate.pk not in used_ids:
opposite_ex = candidate
break
if opposite_ex:
ordered.append(ex)
ordered.append(opposite_ex)
used_ids.add(ex.pk)
used_ids.add(opposite_ex.pk)
continue
ordered.append(ex)
used_ids.add(ex.pk)
for ex in selected:
if ex.pk not in used_ids:
ordered.append(ex)
used_ids.add(ex.pk)
return ordered
def _strip_side_tokens(self, name):
"""Normalize a name by removing left/right tokens."""
base = name or ''

View File

@@ -157,7 +157,7 @@ MUSCLE_GROUP_CATEGORIES: dict[str, list[str]] = {
'traps', 'forearms', 'rotator cuff',
],
'lower_push': [
'quads', 'calves', 'glutes', 'hip abductors', 'hip adductors',
'quads', 'calves', 'hip abductors', 'hip adductors',
],
'lower_pull': [
'hamstrings', 'glutes', 'lower back', 'hip flexors',
@@ -202,6 +202,9 @@ def normalize_muscle_name(name: Optional[str]) -> Optional[str]:
return key
_muscle_cache: dict[int, Set[str]] = {}
def get_muscles_for_exercise(exercise) -> Set[str]:
"""
Return the set of normalized muscle names for a given Exercise instance.
@@ -209,7 +212,12 @@ def get_muscles_for_exercise(exercise) -> Set[str]:
Uses the ExerciseMuscle join table (exercise.exercise_muscle_exercise).
Falls back to the comma-separated Exercise.muscle_groups field if no
ExerciseMuscle rows exist.
Results are cached per exercise ID to avoid repeated DB queries.
"""
if exercise.id in _muscle_cache:
return _muscle_cache[exercise.id]
from muscle.models import ExerciseMuscle
muscles: Set[str] = set()
@@ -229,9 +237,15 @@ def get_muscles_for_exercise(exercise) -> Set[str]:
if normalized:
muscles.add(normalized)
_muscle_cache[exercise.id] = muscles
return muscles
def clear_muscle_cache() -> None:
"""Clear the muscle cache (useful for testing or re-analysis)."""
_muscle_cache.clear()
def get_movement_patterns_for_exercise(exercise) -> List[str]:
"""
Parse the comma-separated movement_patterns CharField on Exercise and

View File

@@ -1,5 +1,7 @@
import logging
from django.db import transaction
from workout.models import Workout
from superset.models import Superset, SupersetExercise
@@ -55,88 +57,87 @@ class PlanBuilder:
Workout
The fully-persisted Workout instance with all child objects.
"""
# ---- 1. Create the Workout ----
workout = Workout.objects.create(
name=workout_spec.get('name', 'Generated Workout'),
description=workout_spec.get('description', ''),
registered_user=self.registered_user,
)
workout.save()
workout_total_time = 0
superset_order = 1
# ---- 2. Create each Superset ----
for ss_spec in workout_spec.get('supersets', []):
ss_name = ss_spec.get('name', f'Set {superset_order}')
rounds = ss_spec.get('rounds', 1)
exercises = ss_spec.get('exercises', [])
superset = Superset.objects.create(
workout=workout,
name=ss_name,
rounds=rounds,
order=superset_order,
rest_between_rounds=ss_spec.get('rest_between_rounds', 45),
with transaction.atomic():
# ---- 1. Create the Workout ----
workout = Workout.objects.create(
name=workout_spec.get('name', 'Generated Workout'),
description=workout_spec.get('description', ''),
registered_user=self.registered_user,
)
superset.save()
superset_total_time = 0
workout_total_time = 0
superset_order = 1
# ---- 3. Create each SupersetExercise ----
for ex_spec in exercises:
exercise_obj = ex_spec.get('exercise')
if exercise_obj is None:
logger.warning(
"Skipping exercise entry with no exercise object in "
"superset '%s'", ss_name,
)
continue
# ---- 2. Create each Superset ----
for ss_spec in workout_spec.get('supersets', []):
ss_name = ss_spec.get('name', f'Set {superset_order}')
rounds = ss_spec.get('rounds', 1)
exercises = ss_spec.get('exercises', [])
order = ex_spec.get('order', 1)
superset_exercise = SupersetExercise.objects.create(
superset=superset,
exercise=exercise_obj,
order=order,
superset = Superset.objects.create(
workout=workout,
name=ss_name,
rounds=rounds,
order=superset_order,
rest_between_rounds=ss_spec.get('rest_between_rounds', 45),
)
# Assign optional fields exactly like add_workout does
if ex_spec.get('weight') is not None:
superset_exercise.weight = ex_spec['weight']
superset_total_time = 0
if ex_spec.get('reps') is not None:
superset_exercise.reps = ex_spec['reps']
rep_duration = exercise_obj.estimated_rep_duration or 3.0
superset_total_time += ex_spec['reps'] * rep_duration
# ---- 3. Create each SupersetExercise ----
for ex_spec in exercises:
exercise_obj = ex_spec.get('exercise')
if exercise_obj is None:
logger.warning(
"Skipping exercise entry with no exercise object in "
"superset '%s'", ss_name,
)
continue
if ex_spec.get('duration') is not None:
superset_exercise.duration = ex_spec['duration']
superset_total_time += ex_spec['duration']
order = ex_spec.get('order', 1)
superset_exercise.save()
superset_exercise = SupersetExercise.objects.create(
superset=superset,
exercise=exercise_obj,
order=order,
)
# ---- 4. Update superset estimated_time ----
# Store total time including all rounds and rest between rounds
rest_between_rounds = ss_spec.get('rest_between_rounds', 45)
rest_time = rest_between_rounds * max(0, rounds - 1)
superset.estimated_time = (superset_total_time * rounds) + rest_time
superset.save()
# Assign optional fields exactly like add_workout does
if ex_spec.get('weight') is not None:
superset_exercise.weight = ex_spec['weight']
# Accumulate into workout total (use the already-calculated superset time)
workout_total_time += superset.estimated_time
superset_order += 1
if ex_spec.get('reps') is not None:
superset_exercise.reps = ex_spec['reps']
rep_duration = exercise_obj.estimated_rep_duration or 3.0
superset_total_time += ex_spec['reps'] * rep_duration
# Add transition time between supersets
# (matches GENERATION_RULES['rest_between_supersets'] in workout_generator)
superset_count = superset_order - 1
if superset_count > 1:
rest_between_supersets = 30
workout_total_time += rest_between_supersets * (superset_count - 1)
if ex_spec.get('duration') is not None:
superset_exercise.duration = ex_spec['duration']
superset_total_time += ex_spec['duration']
# ---- 5. Update workout estimated_time ----
workout.estimated_time = workout_total_time
workout.save()
superset_exercise.save()
# ---- 4. Update superset estimated_time ----
# Store total time including all rounds and rest between rounds
rest_between_rounds = ss_spec.get('rest_between_rounds', 45)
rest_time = rest_between_rounds * max(0, rounds - 1)
superset.estimated_time = (superset_total_time * rounds) + rest_time
superset.save()
# Accumulate into workout total (use the already-calculated superset time)
workout_total_time += superset.estimated_time
superset_order += 1
# Add transition time between supersets
# (matches GENERATION_RULES['rest_between_supersets'] in workout_generator)
superset_count = superset_order - 1
if superset_count > 1:
rest_between_supersets = 30
workout_total_time += rest_between_supersets * (superset_count - 1)
# ---- 5. Update workout estimated_time ----
workout.estimated_time = workout_total_time
workout.save()
logger.info(
"Created workout '%s' (id=%s) with %d supersets, est. %ds",

View File

@@ -27,6 +27,7 @@ from typing import Dict, List, Optional, Set, Tuple
import numpy as np
from django.db import transaction
from django.db.models import Count, Prefetch, Q
from exercise.models import Exercise
@@ -225,14 +226,15 @@ class WorkoutAnalyzer:
print(' Workout Analyzer - ML Pattern Extraction')
print('=' * 64)
self._clear_existing_patterns()
self._step1_populate_workout_types()
self._step2_extract_workout_data()
self._step3_extract_muscle_group_splits()
self._step4_extract_weekly_split_patterns()
self._step5_extract_workout_structure_rules()
self._step6_extract_movement_pattern_ordering()
self._step7_ensure_full_rule_coverage()
with transaction.atomic():
self._clear_existing_patterns()
self._step1_populate_workout_types()
self._step2_extract_workout_data()
self._step3_extract_muscle_group_splits()
self._step4_extract_weekly_split_patterns()
self._step5_extract_workout_structure_rules()
self._step6_extract_movement_pattern_ordering()
self._step7_ensure_full_rule_coverage()
print('\n' + '=' * 64)
print(' Analysis complete.')
@@ -1325,16 +1327,19 @@ class WorkoutAnalyzer:
},
}
# Prefetch all existing rules into an in-memory set to avoid
# N exists() queries (one per workout_type x section x goal combination).
existing_rules = set(
WorkoutStructureRule.objects.values_list(
'workout_type_id', 'section_type', 'goal_type'
)
)
created = 0
for wt in workout_types:
for section in all_sections:
for goal in all_goals:
exists = WorkoutStructureRule.objects.filter(
workout_type=wt,
section_type=section,
goal_type=goal,
).exists()
if not exists:
if (wt.pk, section, goal) not in existing_rules:
defaults = dict(section_defaults[section])
# Apply goal adjustments
base_params = {

View File

@@ -0,0 +1,6 @@
"""Pure workout generation utilities.
These helpers are intentionally side-effect free so they can be tested
independently from Django models and service orchestration.
"""

View File

@@ -0,0 +1,39 @@
import math
import random
def pick_reps_for_exercise(exercise, wt_params: dict, tier_ranges: dict, rng=random) -> int:
"""Pick reps from tier-specific range, then fallback to generic wt params."""
tier = (getattr(exercise, 'exercise_tier', None) or 'accessory').lower()
selected_range = tier_ranges.get(tier) or (wt_params['rep_min'], wt_params['rep_max'])
low, high = int(selected_range[0]), int(selected_range[1])
if low > high:
low, high = high, low
return rng.randint(low, high)
def apply_rep_volume_floor(entries: list[dict], rounds: int, min_volume: int) -> None:
"""Mutate entries in-place so reps*rounds meets the minimum volume floor."""
if rounds <= 0:
return
for entry in entries:
reps = entry.get('reps')
if reps and reps * rounds < min_volume:
entry['reps'] = max(reps, math.ceil(min_volume / rounds))
def working_rest_seconds(rest_override, default_rest: int, minimum_rest: int = 15) -> int:
"""Return guarded positive working rest in seconds."""
rest = rest_override or default_rest or 45
return max(minimum_rest, int(rest))
def sort_entries_by_hr(entries: list[dict], is_early_block: bool) -> None:
"""Sort entries by HR elevation and re-number order."""
entries.sort(
key=lambda e: getattr(e.get('exercise'), 'hr_elevation_rating', 5) or 5,
reverse=is_early_block,
)
for idx, entry in enumerate(entries, start=1):
entry['order'] = idx

View File

@@ -0,0 +1,41 @@
from typing import Optional
from generator.services.exercise_selector import extract_movement_families
def focus_key_for_exercise(exercise) -> Optional[str]:
"""Classify exercise into a coarse focus key used for variety checks."""
if exercise is None:
return None
families = sorted(extract_movement_families(getattr(exercise, 'name', '') or ''))
if families:
return families[0]
patterns = (getattr(exercise, 'movement_patterns', '') or '').lower()
for token in ('upper pull', 'upper push', 'hip hinge', 'squat', 'lunge', 'core', 'carry'):
if token in patterns:
return token
return None
def has_duplicate_focus(exercises: list) -> bool:
"""True when two exercises in one superset map to the same focus key."""
seen = set()
for ex in exercises or []:
key = focus_key_for_exercise(ex)
if not key:
continue
if key in seen:
return True
seen.add(key)
return False
def focus_keys_for_exercises(exercises: list) -> set:
"""Return non-empty focus keys for a list of exercises."""
keys = set()
for ex in exercises or []:
key = focus_key_for_exercise(ex)
if key:
keys.add(key)
return keys

View File

@@ -0,0 +1,53 @@
import math
import random
from typing import Optional
def clamp_duration_bias(duration_bias: float, duration_bias_range: Optional[tuple]) -> float:
"""Clamp duration bias to [0,1] or workout-type specific range."""
if not duration_bias_range:
return max(0.0, min(1.0, duration_bias))
low, high = duration_bias_range
return max(float(low), min(float(high), duration_bias))
def plan_superset_modalities(
*,
num_supersets: int,
duration_bias: float,
duration_bias_range: Optional[tuple],
is_strength_workout: bool,
rng=random,
) -> list[bool]:
"""Plan per-superset modality (True=duration, False=reps)."""
if num_supersets <= 0:
return []
if is_strength_workout:
return [False] * num_supersets
if duration_bias_range:
low, high = duration_bias_range
target_bias = (float(low) + float(high)) / 2.0
min_duration_sets = max(0, math.ceil(num_supersets * float(low)))
max_duration_sets = min(num_supersets, math.floor(num_supersets * float(high)))
else:
target_bias = max(0.0, min(1.0, duration_bias))
min_duration_sets = max(0, math.floor(num_supersets * max(0.0, target_bias - 0.15)))
max_duration_sets = min(num_supersets, math.ceil(num_supersets * min(1.0, target_bias + 0.15)))
duration_sets = int(round(num_supersets * target_bias))
duration_sets = max(min_duration_sets, min(max_duration_sets, duration_sets))
if num_supersets > 1 and duration_sets == num_supersets and max_duration_sets < num_supersets:
duration_sets = max_duration_sets
if num_supersets > 1 and duration_sets == 0 and min_duration_sets > 0:
duration_sets = min_duration_sets
modalities = [False] * num_supersets
if duration_sets > 0:
positions = list(range(num_supersets))
rng.shuffle(positions)
for idx in positions[:duration_sets]:
modalities[idx] = True
return modalities

View File

@@ -0,0 +1,26 @@
def working_position_label(ss_idx: int, num_supersets: int) -> str:
"""Return early/middle/late position label for a working superset index."""
if num_supersets <= 1 or ss_idx == 0:
return 'early'
if ss_idx >= num_supersets - 1:
return 'late'
return 'middle'
def merge_pattern_preferences(position_patterns, rule_patterns):
"""Combine positional and structure-rule pattern preferences."""
if rule_patterns and position_patterns:
overlap = [p for p in position_patterns if p in rule_patterns]
return overlap or rule_patterns[:3]
if rule_patterns:
return rule_patterns[:3]
return position_patterns
def rotated_muscle_subset(target_muscles: list[str], ss_idx: int) -> list[str]:
"""Rotate target muscle emphasis between supersets."""
if len(target_muscles) <= 1:
return target_muscles
start = ss_idx % len(target_muscles)
return target_muscles[start:] + target_muscles[:start]

View File

@@ -0,0 +1,14 @@
def is_recovery_exercise(ex) -> bool:
"""True for warmup/cooldown-style recovery/stretch exercises."""
if ex is None:
return False
name = (getattr(ex, 'name', '') or '').lower()
patterns = (getattr(ex, 'movement_patterns', '') or '').lower()
if 'stretch' in name:
return True
blocked = (
'mobility - static', 'static stretch', 'yoga',
'cool down', 'cooldown', 'breathing', 'massage',
)
return any(token in patterns for token in blocked)

View File

@@ -0,0 +1,31 @@
def apply_fitness_scaling(
params: dict,
*,
fitness_level: int,
scaling_config: dict,
min_reps: int,
min_reps_strength: int,
is_strength: bool = False,
) -> dict:
"""Scale workout params based on fitness level."""
out = dict(params)
level = fitness_level or 2
scaling = scaling_config.get(level, scaling_config[2])
rep_floor = min_reps_strength if is_strength else min_reps
out['rep_min'] = max(rep_floor, int(out['rep_min'] * scaling['rep_min_mult']))
out['rep_max'] = max(out['rep_min'], int(out['rep_max'] * scaling['rep_max_mult']))
rounds_min, rounds_max = out['rounds']
rounds_min = max(1, rounds_min + scaling['rounds_adj'])
rounds_max = max(rounds_min, rounds_max + scaling['rounds_adj'])
out['rounds'] = (rounds_min, rounds_max)
rest = out.get('rest_between_rounds', 45)
out['rest_between_rounds'] = max(15, rest + scaling['rest_adj'])
if level <= 1 and is_strength:
out['rep_min'] = max(5, out['rep_min'])
out['rep_max'] = max(out['rep_min'], out['rep_max'])
return out

View File

@@ -0,0 +1,68 @@
import random
from typing import Iterable, Optional
def section_exercise_count(section: str, fitness_level: int, rng=random) -> int:
"""Return section exercise count range by fitness level."""
level = fitness_level or 2
if section == 'warmup':
if level <= 1:
return rng.randint(5, 7)
if level >= 3:
return rng.randint(3, 5)
return rng.randint(4, 6)
if section == 'cooldown':
if level <= 1:
return rng.randint(4, 5)
if level >= 3:
return rng.randint(2, 3)
return rng.randint(3, 4)
raise ValueError(f'Unknown section: {section}')
def rounded_duration(
raw_duration: int,
*,
min_duration: int,
duration_multiple: int,
) -> int:
"""Round duration to configured multiple and clamp to minimum."""
return max(min_duration, round(raw_duration / duration_multiple) * duration_multiple)
def build_duration_entries(
exercises: Iterable,
*,
duration_min: int,
duration_max: int,
min_duration: int,
duration_multiple: int,
rng=random,
) -> list[dict]:
"""Build ordered duration entries from exercises."""
entries = []
for idx, ex in enumerate(exercises, start=1):
duration = rng.randint(duration_min, duration_max)
entries.append({
'exercise': ex,
'duration': rounded_duration(
duration,
min_duration=min_duration,
duration_multiple=duration_multiple,
),
'order': idx,
})
return entries
def build_section_superset(name: str, entries: list[dict]) -> Optional[dict]:
"""Build a single-round warmup/cooldown superset payload."""
if not entries:
return None
return {
'name': name,
'rounds': 1,
'rest_between_rounds': 0,
'exercises': entries,
}

File diff suppressed because it is too large Load Diff