Files
WerkoutAPI/generator/services/exercise_selector.py
Trey t c80c66c2e5 Codebase hardening: 102 fixes across 35+ files
Deep audit identified 106 findings; 102 fixed, 4 deferred. Covers 8 areas:

- Settings & deploy: env-gated DEBUG/SECRET_KEY, HTTPS headers, gunicorn, celery worker
- Auth (registered_user): password write_only, request.data fixes, transaction safety, proper HTTP status codes
- Workout app: IDOR protection, get_object_or_404, prefetch_related N+1 fixes, transaction.atomic
- Video/scripts: path traversal sanitization, HLS trigger guard, auth on cache wipe
- Models (exercise/equipment/muscle/superset): null-safe __str__, stable IDs, prefetch support
- Generator views: helper for registered_user lookup, logger.exception, bulk_update, transaction wrapping
- Generator core (rules/selector/generator): push-pull ratio, type affinity normalization, modality checks, side-pair exact match, word-boundary regex, equipment cache clearing
- Generator services (plan_builder/analyzer/normalizer): transaction.atomic, muscle cache, bulk_update, glutes classification fix

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 22:29:14 -06:00

1679 lines
70 KiB
Python

import random
import logging
import re
from collections import Counter
from django.db.models import Q, Count
from exercise.models import Exercise
from muscle.models import Muscle, ExerciseMuscle
from equipment.models import Equipment, WorkoutEquipment
from generator.services.muscle_normalizer import (
normalize_muscle_name,
get_muscles_for_exercise,
classify_split_type,
MUSCLE_GROUP_CATEGORIES,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Movement family deduplication constants
# ---------------------------------------------------------------------------
# Ordered (keyword, family_tag) pairs — longer/more-specific keywords first
# so that "hang clean" matches before generic "clean".
MOVEMENT_FAMILY_KEYWORDS = [
# Olympic — specific before general
('clean and jerk', 'clean_and_jerk'), ('hang clean', 'clean'),
('clean pull', 'clean'), ('high pull', 'clean'),
('power clean', 'clean'), ('clean', 'clean'),
('snatch', 'snatch'),
# Vertical pull
('chin-up', 'chin_up'), ('chin up', 'chin_up'),
('pull-up', 'pull_up'), ('pull up', 'pull_up'),
('lat pulldown', 'lat_pulldown'), ('pulldown', 'lat_pulldown'),
# Horizontal press
('bench press', 'bench_press'), ('chest press', 'bench_press'),
('push-up', 'push_up'), ('push up', 'push_up'),
# Overhead press
('overhead press', 'overhead_press'), ('shoulder press', 'overhead_press'),
('military press', 'overhead_press'), ('push press', 'push_press'),
# Lower body
('squat', 'squat'), ('deadlift', 'deadlift'),
('hip thrust', 'hip_thrust'),
('lunge', 'lunge'), ('split squat', 'lunge'),
('step up', 'step_up'), ('step-up', 'step_up'),
# Row
('row', 'row'),
# Arms
('bicep curl', 'bicep_curl'), ('hammer curl', 'bicep_curl'), ('curl', 'bicep_curl'),
('tricep extension', 'tricep_extension'), ('skull crusher', 'tricep_extension'),
# Shoulders
('lateral raise', 'lateral_raise'), ('front raise', 'front_raise'),
('rear delt', 'rear_delt'), ('face pull', 'face_pull'), ('shrug', 'shrug'),
# Other
('carry', 'carry'), ('farmer', 'carry'), ('dip', 'dip'),
('burpee', 'burpee'), ('thruster', 'thruster'),
('turkish', 'turkish_getup'),
]
# Super-families: families that are too similar for the same superset
FAMILY_GROUPS = {
'vertical_pull': {'pull_up', 'chin_up', 'lat_pulldown'},
'olympic_pull': {'clean', 'snatch', 'clean_and_jerk'},
'horizontal_press': {'bench_press', 'push_up'},
}
# Narrow families — max 1 per entire workout
NARROW_FAMILIES = {
'clean', 'snatch', 'clean_and_jerk', 'push_press',
'thruster', 'turkish_getup', 'burpee',
}
# Everything else defaults to max 2 per workout
# Precomputed reverse map: family -> group name
_FAMILY_TO_GROUP = {}
for _group, _members in FAMILY_GROUPS.items():
for _member in _members:
_FAMILY_TO_GROUP[_member] = _group
_LEFT_SIDE_VALUES = {'left', 'left_arm', 'left_leg', 'left_side'}
_RIGHT_SIDE_VALUES = {'right', 'right_arm', 'right_leg', 'right_side'}
def extract_movement_families(exercise_name):
"""Extract movement family tags from an exercise name.
Returns a set of family strings. Uses longest-match-first to avoid
partial overlaps (e.g. "hang clean" matches before "clean").
"""
if not exercise_name:
return set()
name_lower = exercise_name.lower().strip()
families = set()
matched_spans = []
for keyword, family in MOVEMENT_FAMILY_KEYWORDS:
idx = name_lower.find(keyword)
if idx >= 0:
span = (idx, idx + len(keyword))
# Skip if this span overlaps an already-matched span
overlaps = any(
not (span[1] <= ms[0] or span[0] >= ms[1])
for ms in matched_spans
)
if not overlaps:
families.add(family)
matched_spans.append(span)
return families
class ExerciseSelector:
"""
Smart exercise selection service that picks exercises based on user
preferences, available equipment, target muscle groups, and variety.
"""
# Bodyweight equipment names to fall back to when equipment-filtered
# results are too sparse.
BODYWEIGHT_KEYWORDS = ['bodyweight', 'body weight', 'none', 'no equipment']
# Movement patterns considered too complex for beginners
ADVANCED_PATTERNS = ['olympic', 'plyometric']
# Movement patterns considered appropriate for warm-up / cool-down
WARMUP_PATTERNS = [
'dynamic stretch', 'mobility - dynamic', 'activation', 'warm up',
'warmup', 'cardio/locomotion', 'balance',
]
COOLDOWN_PATTERNS = [
'static stretch', 'stretch', 'cool down', 'cooldown',
'mobility', 'foam roll', 'yoga',
]
# Movement patterns explicitly forbidden in cooldowns
COOLDOWN_EXCLUDED_PATTERNS = [
'plyometric', 'combat', 'cardio/locomotion', 'olympic',
]
# Warm-up must avoid working-set patterns.
WARMUP_EXCLUDED_PATTERNS = [
'upper push', 'upper pull', 'olympic', 'combat', 'arms',
]
# Similarity thresholds to prevent near-duplicate selections.
SIMILARITY_HARD_THRESHOLD = 0.80
SIMILARITY_SOFT_THRESHOLD = 0.65
# Recovery/stretch movements should not appear in working sets.
WORKING_EXCLUDED_PATTERNS = [
'mobility - static', 'static stretch', 'cool down', 'cooldown',
'yoga', 'breathing', 'massage',
]
def __init__(self, user_preference, recently_used_ids=None, hard_exclude_ids=None):
self.user_preference = user_preference
self.used_exercise_ids = set() # tracks within a single workout
self.used_exercise_names = set() # tracks names for cross-superset dedup
self.recently_used_ids = recently_used_ids or set()
self.hard_exclude_ids = hard_exclude_ids or set() # Phase 6: hard exclude recent exercises
self.used_movement_patterns = Counter() # Phase 11: track patterns for variety
self.used_movement_families = Counter() # Movement family dedup across workout
self.used_working_similarity_profiles = []
self.last_working_similarity_profiles = []
self._exercise_profile_cache = {}
self.warnings = [] # Phase 13: generation warnings
self.progression_boost_ids = set() # IDs of exercises that are progressions of recently done ones
# Week-scoped state for cross-day dedup (NOT cleared by reset())
self.week_used_exercise_ids = set()
self.week_used_movement_families = Counter()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def reset(self):
"""Reset used exercises for a new workout (preserves week-scoped state)."""
self.used_exercise_ids = set()
self.used_exercise_names = set()
self.used_movement_patterns = Counter()
self.used_movement_families = Counter()
self.used_working_similarity_profiles = []
self.last_working_similarity_profiles = []
self._exercise_profile_cache = {}
self.warnings = []
# Clear per-queryset caches so equipment/exclusion changes take effect
if hasattr(self, '_equipment_map_cache'):
del self._equipment_map_cache
if hasattr(self, '_bodyweight_ids_cache'):
del self._bodyweight_ids_cache
if hasattr(self, '_warned_small_pool'):
del self._warned_small_pool
if hasattr(self, '_warned_no_equipment'):
del self._warned_no_equipment
if hasattr(self, '_relaxed_hard_exclude_ids'):
del self._relaxed_hard_exclude_ids
if hasattr(self, '_injury_warnings_emitted'):
del self._injury_warnings_emitted
def reset_week(self):
"""Reset all state including week-scoped tracking. Call at start of a new week."""
self.reset()
self.week_used_exercise_ids = set()
self.week_used_movement_families = Counter()
def accumulate_week_state(self, exercise_ids, exercise_names):
"""Record a completed day's exercises into week-scoped tracking.
Parameters
----------
exercise_ids : set[int]
Primary keys of exercises used in the day's workout.
exercise_names : set[str]
Exercise names (used for family extraction).
"""
self.week_used_exercise_ids.update(exercise_ids)
for name in exercise_names:
for fam in extract_movement_families(name):
self.week_used_movement_families[fam] += 1
def _get_week_family_limit(self, family):
"""Max allowed uses of a movement family across the whole week.
Wider than per-workout limits: narrow families = 2/week, broad = 4/week.
"""
if family in NARROW_FAMILIES:
return 2
return 4
def select_exercises(
self,
muscle_groups,
count,
is_duration_based=False,
movement_pattern_preference=None,
prefer_weighted=False,
superset_position=None,
allow_cross_modality=False,
):
"""
Select *count* exercises matching the given criteria.
Parameters
----------
muscle_groups : list[str]
Canonical muscle group names (e.g. ['chest', 'triceps']).
count : int
How many exercises to return.
is_duration_based : bool
When True, prefer exercises whose ``is_duration`` flag is set.
movement_pattern_preference : list[str] | None
Optional list of preferred movement patterns to favour.
prefer_weighted : bool
When True (R6), boost is_weight=True exercises in selection.
allow_cross_modality : bool
When True, don't hard-filter by modality — instead use soft
preference so duration-only exercises (carries, planks) can
land in rep-based supersets and vice versa.
Returns
-------
list[Exercise]
"""
if count <= 0:
return []
fitness_level = getattr(self.user_preference, 'fitness_level', None)
# When cross-modality is allowed, skip the hard modality filter
# so duration-only exercises can appear in rep supersets and vice versa.
modality_for_filter = None if allow_cross_modality else is_duration_based
preferred_modality = 'duration' if is_duration_based else 'reps'
qs = self._get_filtered_queryset(
muscle_groups=muscle_groups,
is_duration_based=modality_for_filter,
fitness_level=fitness_level,
)
# Working supersets should not contain stretch/recovery exercises.
# Use regex word boundary to avoid over-matching (e.g. "Stretch Band Row"
# should NOT be excluded, but "Hamstring Stretch" should).
excluded_q = Q(name__iregex=r'\bstretch(ing|es|ed)?\b')
for pat in self.WORKING_EXCLUDED_PATTERNS:
excluded_q |= Q(movement_patterns__icontains=pat)
qs = qs.exclude(excluded_q)
# Guard against low-quality rows causing misclassification/selection drift.
qs = qs.exclude(Q(movement_patterns__isnull=True) | Q(movement_patterns=''))
qs = qs.exclude(Q(muscle_groups__isnull=True) | Q(muscle_groups=''))
# For advanced/elite, boost compound movements
if fitness_level and fitness_level >= 3 and not movement_pattern_preference:
compound_qs = qs.filter(is_compound=True)
if compound_qs.exists():
preferred_qs = compound_qs
other_qs = qs.exclude(pk__in=compound_qs.values_list('pk', flat=True))
else:
preferred_qs = qs.none()
other_qs = qs
elif movement_pattern_preference:
# Optionally boost exercises whose movement_patterns match a preference
pattern_q = Q()
for pat in movement_pattern_preference:
pattern_q |= Q(movement_patterns__icontains=pat)
preferred_qs = qs.filter(pattern_q)
other_qs = qs.exclude(pk__in=preferred_qs.values_list('pk', flat=True))
else:
preferred_qs = qs.none()
other_qs = qs
# R6: For strength workouts, boost is_weight=True exercises
if prefer_weighted:
weighted_qs = qs.filter(is_weight=True)
if weighted_qs.exists():
# Merge weighted exercises into preferred pool
combined_preferred_ids = set(preferred_qs.values_list('pk', flat=True)) | set(weighted_qs.values_list('pk', flat=True))
preferred_qs = qs.filter(pk__in=combined_preferred_ids)
other_qs = qs.exclude(pk__in=combined_preferred_ids)
selected = self._weighted_pick(
preferred_qs,
other_qs,
count,
superset_position=superset_position,
similarity_scope='working',
preferred_modality=preferred_modality if allow_cross_modality else None,
)
# Sort selected exercises by tier: primary first, then secondary, then accessory
TIER_ORDER = {'primary': 0, 'secondary': 1, 'accessory': 2, None: 2}
selected.sort(key=lambda ex: TIER_ORDER.get(ex.exercise_tier, 2))
# Ensure target muscle groups have coverage
if muscle_groups and selected:
from muscle.models import ExerciseMuscle
# Batch-load muscles for all selected exercises (avoid N+1)
selected_ids = {ex.pk for ex in selected}
ex_muscle_rows = ExerciseMuscle.objects.filter(
exercise_id__in=selected_ids
).values_list('exercise_id', 'muscle__name')
from collections import defaultdict
ex_muscle_map = defaultdict(set)
for ex_id, muscle_name in ex_muscle_rows:
ex_muscle_map[ex_id].add(normalize_muscle_name(muscle_name))
covered_muscles = set()
for ex in selected:
covered_muscles.update(ex_muscle_map.get(ex.pk, set()))
normalized_targets = {normalize_muscle_name(mg) for mg in muscle_groups}
uncovered = normalized_targets - covered_muscles
if uncovered and len(selected) > 1:
# Track swapped indices to avoid overwriting previous swaps
swapped_indices = set()
for missing_muscle in uncovered:
replacement_qs = self._get_filtered_queryset(
muscle_groups=[missing_muscle],
is_duration_based=modality_for_filter,
fitness_level=fitness_level,
).exclude(pk__in={e.pk for e in selected})
# Validate modality: ensure replacement matches expected modality
# (skip when cross-modality is allowed)
if not allow_cross_modality:
if is_duration_based:
replacement_qs = replacement_qs.filter(is_duration=True)
elif is_duration_based is False:
replacement_qs = replacement_qs.filter(is_reps=True)
replacement = list(replacement_qs[:1])
if replacement:
# Find last unswapped accessory
swap_idx = None
for i in range(len(selected) - 1, -1, -1):
if i in swapped_indices:
continue
if getattr(selected[i], 'exercise_tier', None) == 'accessory':
swap_idx = i
break
# Fallback: any unswapped non-primary
if swap_idx is None:
for i in range(len(selected) - 1, -1, -1):
if i in swapped_indices:
continue
if getattr(selected[i], 'exercise_tier', None) != 'primary':
swap_idx = i
break
if swap_idx is not None:
selected[swap_idx] = replacement[0]
swapped_indices.add(swap_idx)
# If we couldn't get enough with equipment filters, widen to bodyweight
if len(selected) < count:
fallback_qs = self._get_bodyweight_queryset(
muscle_groups=muscle_groups,
is_duration_based=is_duration_based,
fitness_level=fitness_level,
)
fallback_qs = fallback_qs.exclude(excluded_q)
fallback_qs = fallback_qs.exclude(Q(movement_patterns__isnull=True) | Q(movement_patterns=''))
fallback_qs = fallback_qs.exclude(Q(muscle_groups__isnull=True) | Q(muscle_groups=''))
still_needed = count - len(selected)
already_ids = {e.pk for e in selected}
fallback_qs = fallback_qs.exclude(pk__in=already_ids)
mg_label = ', '.join(muscle_groups[:3]) if muscle_groups else 'target muscles'
extras = self._weighted_pick(
fallback_qs,
Exercise.objects.none(),
still_needed,
similarity_scope='working',
)
if extras:
self.warnings.append(
f'Used bodyweight fallback for {mg_label} '
f'({len(extras)} exercises) due to limited equipment matches.'
)
selected.extend(extras)
if len(selected) < count:
self.warnings.append(
f'Could only find {len(selected)}/{count} exercises '
f'for {mg_label}.'
)
# Handle side-specific pairing: if an exercise has a side value,
# look for the matching opposite-side exercise so they appear together.
selected = self._pair_sided_exercises(selected, qs)
selected = self._ensure_side_pair_integrity(
selected,
qs,
count=count,
similarity_scope='working',
superset_position=superset_position,
)
# Mark everything we just selected as used and track patterns
for ex in selected:
self.used_exercise_ids.add(ex.pk)
self.used_exercise_names.add((ex.name or '').lower().strip())
patterns = getattr(ex, 'movement_patterns', '') or ''
if patterns:
for pat in [p.strip().lower() for p in patterns.split(',') if p.strip()]:
self.used_movement_patterns[pat] += 1
self._track_families(selected)
self._track_similarity_profiles(selected, scope='working')
return self._trim_preserving_pairs(selected, count)
def select_warmup_exercises(self, target_muscles, count=5):
"""Select duration-based exercises suitable for warm-up."""
fitness_level = getattr(self.user_preference, 'fitness_level', None)
qs = self._get_filtered_queryset(
muscle_groups=target_muscles,
is_duration_based=True,
fitness_level=fitness_level,
)
# Prefer exercises whose movement_patterns overlap with warmup keywords
warmup_q = Q()
for kw in self.WARMUP_PATTERNS:
warmup_q |= Q(movement_patterns__icontains=kw)
# Warm-up should be dynamic movement prep, not loaded working sets.
qs = qs.exclude(is_weight=True)
# Exclude heavy compounds (no barbell squats in warmup)
qs = qs.exclude(is_compound=True)
# Exclude primary-tier exercises (no primary lifts in warmup)
qs = qs.exclude(exercise_tier='primary')
# Exclude technically complex movements
qs = qs.exclude(complexity_rating__gte=4)
# Exclude common working-set movement families from warmup.
warmup_exclude_q = Q()
for pat in self.WARMUP_EXCLUDED_PATTERNS:
warmup_exclude_q |= Q(movement_patterns__icontains=pat)
qs = qs.exclude(warmup_exclude_q)
# Tightened HR filter for warmup (1-4 instead of 2-5)
hr_warmup_q = Q(hr_elevation_rating__gte=1, hr_elevation_rating__lte=4)
preferred = qs.filter(warmup_q).filter(
hr_warmup_q | Q(hr_elevation_rating__isnull=True)
)
# STRICT: warmup must come from warmup-pattern exercises only.
selected = self._weighted_pick(preferred, preferred.none(), count)
# Fallback: if not enough duration-based warmup exercises, widen to
# any duration exercise regardless of muscle group
if len(selected) < count:
wide_qs = self._get_filtered_queryset(
muscle_groups=None,
is_duration_based=True,
fitness_level=fitness_level,
).exclude(pk__in={e.pk for e in selected})
# Apply same warmup safety exclusions
wide_qs = wide_qs.exclude(is_weight=True)
wide_qs = wide_qs.exclude(is_compound=True)
wide_qs = wide_qs.exclude(exercise_tier='primary')
wide_qs = wide_qs.exclude(complexity_rating__gte=4)
wide_qs = wide_qs.exclude(warmup_exclude_q)
wide_preferred = wide_qs.filter(warmup_q).filter(
hr_warmup_q | Q(hr_elevation_rating__isnull=True)
)
selected.extend(
self._weighted_pick(wide_preferred, wide_preferred.none(), count - len(selected))
)
for ex in selected:
self.used_exercise_ids.add(ex.pk)
self.used_exercise_names.add((ex.name or '').lower().strip())
self._track_families(selected)
selected = self._ensure_side_pair_integrity(selected, qs, count=count)
selected = self._trim_preserving_pairs(selected, count)
return self._order_side_pairs_adjacent(selected)
def select_cooldown_exercises(self, target_muscles, count=4):
"""
Select duration-based exercises suitable for cool-down.
R11: Excludes is_weight=True exercises that don't match cooldown
movement patterns (stretch/mobility only).
Also enforces low HR elevation (<=3) for proper cool-down.
"""
fitness_level = getattr(self.user_preference, 'fitness_level', None)
qs = self._get_filtered_queryset(
muscle_groups=target_muscles,
is_duration_based=True,
fitness_level=fitness_level,
)
cooldown_q = Q()
for kw in self.COOLDOWN_PATTERNS:
cooldown_q |= Q(movement_patterns__icontains=kw)
# Exclude dangerous movement patterns from cooldowns entirely
exclude_q = Q()
for pat in self.COOLDOWN_EXCLUDED_PATTERNS:
exclude_q |= Q(movement_patterns__icontains=pat)
# Also exclude compound push/pull exercises
exclude_q |= (Q(movement_patterns__icontains='push') | Q(movement_patterns__icontains='pull')) & Q(is_compound=True)
qs = qs.exclude(exclude_q)
# R11: Exclude weighted exercises that aren't cooldown-pattern exercises
weighted_non_cooldown = qs.filter(is_weight=True).exclude(cooldown_q)
qs = qs.exclude(pk__in=weighted_non_cooldown.values_list('pk', flat=True))
# Cooldown HR ceiling: only low HR exercises (<=3) for proper cool-down
qs = qs.filter(Q(hr_elevation_rating__lte=3) | Q(hr_elevation_rating__isnull=True))
# STRICT: Only use cooldown-pattern exercises (no 'other' pool)
preferred = qs.filter(cooldown_q)
selected = self._weighted_pick(preferred, preferred.none(), count)
# Fallback: widen to any duration exercise with cooldown patterns (no muscle filter)
if len(selected) < count:
wide_qs = self._get_filtered_queryset(
muscle_groups=None,
is_duration_based=True,
fitness_level=fitness_level,
).exclude(pk__in={e.pk for e in selected})
# Apply same exclusions
wide_qs = wide_qs.exclude(exclude_q)
# R11: also apply weight filter on wide fallback
wide_weighted_non_cooldown = wide_qs.filter(is_weight=True).exclude(cooldown_q)
wide_qs = wide_qs.exclude(pk__in=wide_weighted_non_cooldown.values_list('pk', flat=True))
# HR ceiling on fallback too
wide_qs = wide_qs.filter(Q(hr_elevation_rating__lte=3) | Q(hr_elevation_rating__isnull=True))
# STRICT: Only cooldown-pattern exercises even in fallback
wide_preferred = wide_qs.filter(cooldown_q)
selected.extend(
self._weighted_pick(wide_preferred, wide_preferred.none(), count - len(selected))
)
for ex in selected:
self.used_exercise_ids.add(ex.pk)
self.used_exercise_names.add((ex.name or '').lower().strip())
self._track_families(selected)
selected = self._ensure_side_pair_integrity(selected, qs, count=count)
selected = self._trim_preserving_pairs(selected, count)
return self._order_side_pairs_adjacent(selected)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _get_family_limit(self, family):
"""Max allowed uses of a movement family across the whole workout."""
if family in NARROW_FAMILIES:
return 1
return 2
def _track_families(self, exercises):
"""Record movement families for a list of selected exercises."""
for ex in exercises:
for fam in extract_movement_families(ex.name):
self.used_movement_families[fam] += 1
def _track_similarity_profiles(self, exercises, scope='working'):
"""Record similarity profiles so later supersets can avoid near-duplicates."""
if scope != 'working':
return
profiles = [self._build_similarity_profile(ex) for ex in exercises]
self.last_working_similarity_profiles = profiles
self.used_working_similarity_profiles.extend(profiles)
def _get_filtered_queryset(self, muscle_groups=None, is_duration_based=None, fitness_level=None):
"""
Build a base Exercise queryset filtered by:
- User's available equipment (through WorkoutEquipment)
- Excluded exercises from user preferences
- Already-used exercises in the current workout
- Target muscle groups (through ExerciseMuscle)
- Optionally, duration-based flag
- Fitness level (excludes complex patterns for beginners)
"""
qs = Exercise.objects.all()
# ---- Exclude exercises the user has explicitly blacklisted ----
excluded_ids = set(
self.user_preference.excluded_exercises.values_list('pk', flat=True)
)
if excluded_ids:
qs = qs.exclude(pk__in=excluded_ids)
# ---- Exclude already-used exercises in this workout ----
if self.used_exercise_ids:
qs = qs.exclude(pk__in=self.used_exercise_ids)
# ---- Exclude exercises with same name (cross-superset dedup) ----
if self.used_exercise_names:
name_exclude_q = Q()
for name in self.used_exercise_names:
if name:
name_exclude_q |= Q(name__iexact=name)
if name_exclude_q:
qs = qs.exclude(name_exclude_q)
# ---- Hard exclude exercises from recent workouts (Phase 6) ----
# Adaptive: if pool would be too small, relax hard exclude to soft penalty.
# Use a local merged set to avoid permanently polluting recently_used_ids.
if self.hard_exclude_ids:
test_qs = qs.exclude(pk__in=self.hard_exclude_ids)
if test_qs.count() >= 10:
qs = test_qs
else:
# Pool too small — treat hard excludes as soft penalty for this
# queryset only (don't mutate the original recently_used_ids).
if not hasattr(self, '_relaxed_hard_exclude_ids'):
self._relaxed_hard_exclude_ids = set(self.hard_exclude_ids)
if not hasattr(self, '_warned_small_pool'):
self.warnings.append(
'Exercise pool too small for full variety rotation — '
'relaxed recent exclusion to soft penalty.'
)
self._warned_small_pool = True
# ---- Filter by user's available equipment ----
available_equipment_ids = set(
self.user_preference.available_equipment.values_list('pk', flat=True)
)
if not available_equipment_ids:
# No equipment set in preferences — all exercises are available (no filtering).
pass
elif available_equipment_ids:
# Cache equipment map on instance to avoid rebuilding per call
if not hasattr(self, '_equipment_map_cache'):
from collections import defaultdict
exercise_equipment_map = defaultdict(set)
for ex_id, eq_id in WorkoutEquipment.objects.values_list('exercise_id', 'equipment_id'):
exercise_equipment_map[ex_id].add(eq_id)
self._equipment_map_cache = dict(exercise_equipment_map)
self._bodyweight_ids_cache = set(
Exercise.objects.exclude(
pk__in=set(exercise_equipment_map.keys())
).values_list('pk', flat=True)
)
exercise_equipment_map = self._equipment_map_cache
bodyweight_ids = self._bodyweight_ids_cache
# AND logic: only include exercises where ALL required equipment is available
equipment_ok_ids = set()
for ex_id, required_equip in exercise_equipment_map.items():
if required_equip.issubset(available_equipment_ids):
equipment_ok_ids.add(ex_id)
allowed_ids = equipment_ok_ids | bodyweight_ids
qs = qs.filter(pk__in=allowed_ids)
# ---- Filter by muscle groups via ExerciseMuscle join ----
if muscle_groups:
normalized = [normalize_muscle_name(mg) for mg in muscle_groups]
muscle_ids = set(
Muscle.objects.filter(
name__in=normalized
).values_list('pk', flat=True)
)
# Also try case-insensitive matching for robustness
if not muscle_ids:
q = Q()
for name in normalized:
q |= Q(name__iexact=name)
muscle_ids = set(
Muscle.objects.filter(q).values_list('pk', flat=True)
)
if muscle_ids:
exercise_ids = set(
ExerciseMuscle.objects.filter(
muscle_id__in=muscle_ids
).values_list('exercise_id', flat=True)
)
qs = qs.filter(pk__in=exercise_ids)
# ---- Duration bias ----
if is_duration_based is True:
qs = qs.filter(is_duration=True)
elif is_duration_based is False:
# Rep-based supersets must use rep-capable exercises only.
qs = qs.filter(is_reps=True)
# ---- Fitness-level filtering ----
if fitness_level is not None and fitness_level <= 1:
# Beginners: exclude exercises with complex movement patterns
exclude_q = Q()
for pat in self.ADVANCED_PATTERNS:
exclude_q |= Q(movement_patterns__icontains=pat)
qs = qs.exclude(exclude_q)
# Exclude advanced exercises for beginners
if fitness_level is not None and fitness_level <= 1:
qs = qs.exclude(difficulty_level='advanced')
# ---- Complexity cap by fitness level ----
if fitness_level is not None:
complexity_caps = {1: 3, 2: 4, 3: 5, 4: 5}
max_complexity = complexity_caps.get(fitness_level, 5)
qs = qs.filter(
Q(complexity_rating__lte=max_complexity) | Q(complexity_rating__isnull=True)
)
# ---- Injury-based filtering ----
qs = self._apply_injury_filters(qs)
return qs.distinct()
def _get_bodyweight_queryset(self, muscle_groups=None, is_duration_based=None, fitness_level=None):
"""
Fallback queryset that only includes exercises with NO equipment
requirement (bodyweight). Ignores user's equipment preferences but
still applies safety filters (fitness level, injuries, complexity).
"""
exercises_with_equipment = set(
WorkoutEquipment.objects.values_list('exercise_id', flat=True).distinct()
)
qs = Exercise.objects.exclude(pk__in=exercises_with_equipment)
# Excluded exercises
excluded_ids = set(
self.user_preference.excluded_exercises.values_list('pk', flat=True)
)
if excluded_ids:
qs = qs.exclude(pk__in=excluded_ids)
# Already used
if self.used_exercise_ids:
qs = qs.exclude(pk__in=self.used_exercise_ids)
# Hard exclude from recent workouts (Phase 6)
if self.hard_exclude_ids:
qs = qs.exclude(pk__in=self.hard_exclude_ids)
# Muscle groups
if muscle_groups:
normalized = [normalize_muscle_name(mg) for mg in muscle_groups]
muscle_ids = set(
Muscle.objects.filter(name__in=normalized).values_list('pk', flat=True)
)
if not muscle_ids:
q = Q()
for name in normalized:
q |= Q(name__iexact=name)
muscle_ids = set(
Muscle.objects.filter(q).values_list('pk', flat=True)
)
if muscle_ids:
exercise_ids = set(
ExerciseMuscle.objects.filter(
muscle_id__in=muscle_ids
).values_list('exercise_id', flat=True)
)
qs = qs.filter(pk__in=exercise_ids)
if is_duration_based is True:
qs = qs.filter(is_duration=True)
elif is_duration_based is False:
qs = qs.filter(is_reps=True)
# ---- Safety: Fitness-level filtering (same as _get_filtered_queryset) ----
if fitness_level is not None and fitness_level <= 1:
exclude_q = Q()
for pat in self.ADVANCED_PATTERNS:
exclude_q |= Q(movement_patterns__icontains=pat)
qs = qs.exclude(exclude_q)
qs = qs.exclude(difficulty_level='advanced')
# ---- Safety: Complexity cap by fitness level ----
if fitness_level is not None:
complexity_caps = {1: 3, 2: 4, 3: 5, 4: 5}
max_complexity = complexity_caps.get(fitness_level, 5)
qs = qs.filter(
Q(complexity_rating__lte=max_complexity) | Q(complexity_rating__isnull=True)
)
# ---- Safety: Injury-based filtering ----
qs = self._apply_injury_filters(qs)
return qs.distinct()
def _apply_injury_filters(self, qs):
"""
Apply injury-based exercise exclusions with severity levels.
Supports both legacy format (list of strings) and new format
(list of {"type": str, "severity": "mild|moderate|severe"}).
Severity levels:
- mild: only exclude exercises explicitly dangerous for that injury
- moderate: current behavior (exclude high-impact, relevant patterns)
- severe: aggressive exclusion (broader pattern exclusion)
"""
injury_types = getattr(self.user_preference, 'injury_types', None) or []
if injury_types:
# Normalize to dict format for backward compatibility
injury_map = {}
for item in injury_types:
if isinstance(item, str):
injury_map[item] = 'moderate'
elif isinstance(item, dict):
injury_map[item.get('type', '')] = item.get('severity', 'moderate')
def _is_at_least(injury_type, min_severity):
"""Check if an injury has at least the given severity."""
levels = {'mild': 1, 'moderate': 2, 'severe': 3}
actual = injury_map.get(injury_type, '')
return levels.get(actual, 0) >= levels.get(min_severity, 0)
# Generate informational warnings about injury-based exclusions
if not hasattr(self, '_injury_warnings_emitted'):
self._injury_warnings_emitted = True
for inj_type, sev in injury_map.items():
label = inj_type.replace('_', ' ').title()
if sev == 'severe':
self.warnings.append(
f'Excluding high-impact and many weighted exercises due to severe {label.lower()} injury.'
)
elif sev == 'moderate':
self.warnings.append(
f'Excluding high-impact exercises due to {label.lower()} injury.'
)
else:
self.warnings.append(
f'Limiting certain movements due to mild {label.lower()} injury.'
)
# High impact exclusion for lower body injuries (moderate+)
lower_injuries = {'knee', 'ankle', 'hip', 'lower_back'}
if any(_is_at_least(inj, 'moderate') for inj in lower_injuries & set(injury_map)):
qs = qs.exclude(impact_level='high')
# Severe: also exclude medium impact
if any(_is_at_least(inj, 'severe') for inj in lower_injuries & set(injury_map)):
qs = qs.exclude(impact_level='medium')
if _is_at_least('knee', 'moderate') or _is_at_least('ankle', 'moderate'):
qs = qs.exclude(movement_patterns__icontains='plyometric')
# Severe knee/ankle: also exclude lunges
if _is_at_least('knee', 'severe') or _is_at_least('ankle', 'severe'):
qs = qs.exclude(movement_patterns__icontains='lunge')
if _is_at_least('lower_back', 'moderate'):
qs = qs.exclude(
Q(movement_patterns__icontains='hip hinge') &
Q(is_weight=True) &
Q(difficulty_level='advanced')
)
if _is_at_least('lower_back', 'severe'):
qs = qs.exclude(
Q(movement_patterns__icontains='hip hinge') &
Q(is_weight=True)
)
if _is_at_least('upper_back', 'moderate'):
qs = qs.exclude(
Q(movement_patterns__icontains='upper pull') &
Q(is_weight=True) &
Q(difficulty_level='advanced')
)
if _is_at_least('upper_back', 'severe'):
qs = qs.exclude(
Q(movement_patterns__icontains='upper pull') &
Q(is_weight=True)
)
if _is_at_least('shoulder', 'mild'):
qs = qs.exclude(movement_patterns__icontains='upper push - vertical')
if _is_at_least('shoulder', 'severe'):
qs = qs.exclude(
Q(movement_patterns__icontains='upper push') &
Q(is_weight=True)
)
if _is_at_least('hip', 'moderate'):
qs = qs.exclude(
Q(movement_patterns__icontains='lower push - squat') &
Q(difficulty_level='advanced')
)
if _is_at_least('hip', 'severe'):
qs = qs.exclude(movement_patterns__icontains='lower push - squat')
if _is_at_least('wrist', 'moderate'):
qs = qs.exclude(
Q(movement_patterns__icontains='olympic') &
Q(is_weight=True)
)
if _is_at_least('neck', 'moderate'):
qs = qs.exclude(
Q(movement_patterns__icontains='upper push - vertical') &
Q(is_weight=True)
)
else:
# Legacy: parse free-text injuries_limitations field
injuries = getattr(self.user_preference, 'injuries_limitations', '') or ''
if injuries:
injuries_lower = injuries.lower()
knee_keywords = ['knee', 'acl', 'mcl', 'meniscus', 'patella']
back_keywords = ['back', 'spine', 'spinal', 'disc', 'herniat']
shoulder_keywords = ['shoulder', 'rotator', 'labrum', 'impingement']
if any(kw in injuries_lower for kw in knee_keywords):
qs = qs.exclude(impact_level='high')
if any(kw in injuries_lower for kw in back_keywords):
qs = qs.exclude(impact_level='high')
qs = qs.exclude(
Q(movement_patterns__icontains='hip hinge') &
Q(is_weight=True) &
Q(difficulty_level='advanced')
)
if any(kw in injuries_lower for kw in shoulder_keywords):
qs = qs.exclude(movement_patterns__icontains='upper push - vertical')
return qs
def _weighted_pick(
self,
preferred_qs,
other_qs,
count,
superset_position=None,
similarity_scope=None,
preferred_modality=None,
):
"""
Pick up to *count* exercises using weighted random selection.
Preferred exercises are 3x more likely to be chosen than the
general pool, ensuring variety while still favouring matches.
Enforces movement-family deduplication:
- Intra-superset: no two exercises from the same family group
- Cross-workout: max N per family (1 for narrow, 2 for broad)
superset_position: 'early', 'late', or None. When set, boosts
exercises based on their exercise_tier (primary for early,
accessory for late).
preferred_modality: 'reps' or 'duration' or None. When set,
exercises that don't match the preferred modality get 0.3x weight
(cross-modality penalty). Dual-modality exercises always get full weight.
"""
if count <= 0:
return []
preferred_list = list(preferred_qs)
other_list = list(other_qs)
# Build a weighted pool: each preferred exercise appears 3 times
pool = []
weight_preferred = 3
weight_other = 1
def _tier_boost(ex, base_w):
"""Apply tier-based weighting based on superset position."""
if not superset_position:
return base_w
tier = getattr(ex, 'exercise_tier', None)
if superset_position == 'early' and tier == 'primary':
return base_w * 2
elif superset_position == 'late' and tier == 'accessory':
return base_w * 2
return base_w
def _apply_week_penalty(ex, base_w):
"""Soft-penalize exercises already used earlier in the week."""
w = base_w
if self.week_used_exercise_ids and ex.pk in self.week_used_exercise_ids:
w = max(1, w // 2)
if self.week_used_movement_families:
for fam in extract_movement_families(ex.name):
if self.week_used_movement_families.get(fam, 0) >= self._get_week_family_limit(fam):
w = max(1, w // 2)
break
return w
def _apply_modality_penalty(ex, base_w):
"""Soft-penalize exercises that don't match the preferred modality.
Dual-modality exercises (is_reps AND is_duration) get full weight.
Cross-modality exercises get 0.3x weight (minimum 1).
"""
if not preferred_modality:
return base_w
is_reps = getattr(ex, 'is_reps', False)
is_dur = getattr(ex, 'is_duration', False)
# Dual-modality: always full weight
if is_reps and is_dur:
return base_w
if preferred_modality == 'reps' and is_reps:
return base_w
if preferred_modality == 'duration' and is_dur:
return base_w
# Cross-modality: reduce to ~30% of base weight
return max(1, int(base_w * 0.3))
# Build effective soft-penalty set: recently_used + any relaxed hard excludes
_effective_recently_used = self.recently_used_ids
if hasattr(self, '_relaxed_hard_exclude_ids') and self._relaxed_hard_exclude_ids:
_effective_recently_used = self.recently_used_ids | self._relaxed_hard_exclude_ids
for ex in preferred_list:
w = weight_preferred
# Boost exercises that are progressions of recently completed exercises
if ex.pk in self.progression_boost_ids:
w = w * 2
if ex.pk in _effective_recently_used:
w = 1 # Reduce weight for recently used
# Penalize overused movement patterns for variety (Phase 11)
# Fixed: check ALL comma-separated patterns, use max count
if self.used_movement_patterns:
ex_patterns = getattr(ex, 'movement_patterns', '') or ''
if ex_patterns:
max_pat_count = max(
(self.used_movement_patterns.get(p.strip().lower(), 0)
for p in ex_patterns.split(',') if p.strip()),
default=0,
)
if max_pat_count >= 3:
w = 1
elif max_pat_count >= 2:
w = max(1, w - 1)
w = _apply_week_penalty(ex, w)
w = _apply_modality_penalty(ex, w)
w = _tier_boost(ex, w)
pool.extend([ex] * w)
for ex in other_list:
w = weight_other
if ex.pk in _effective_recently_used:
w = 1 # Already 1 but keep explicit
w = _apply_week_penalty(ex, w)
w = _apply_modality_penalty(ex, w)
w = _tier_boost(ex, w)
pool.extend([ex] * w)
if not pool:
return []
selected = []
selected_ids = set()
selected_names = set()
# Intra-superset family tracking
selected_family_groups = set() # group names used in this superset
selected_families = set() # exact families used in this superset
selected_family_counts = Counter() # exact family counts in this superset
selected_profiles = []
# Shuffle to break any ordering bias
random.shuffle(pool)
attempts = 0
max_attempts = len(pool) * 3 # avoid infinite loop on tiny pools
while len(selected) < count and attempts < max_attempts:
candidate = random.choice(pool)
candidate_name = (candidate.name or '').lower().strip()
if candidate.pk in selected_ids or candidate_name in selected_names:
attempts += 1
continue
# --- Movement family blocking ---
candidate_families = extract_movement_families(candidate.name)
blocked = False
for fam in candidate_families:
# Cross-workout: check family count limit
historical_count = self.used_movement_families.get(fam, 0)
in_superset_count = selected_family_counts.get(fam, 0)
if historical_count + in_superset_count >= self._get_family_limit(fam):
blocked = True
break
# Intra-superset: avoid exact family duplicates entirely.
if fam in selected_families:
blocked = True
break
# Intra-superset: check family group overlap
group = _FAMILY_TO_GROUP.get(fam)
if group and group in selected_family_groups:
blocked = True
break
if blocked:
attempts += 1
continue
if similarity_scope == 'working':
candidate_profile = self._build_similarity_profile(candidate)
if self._is_similarity_blocked(candidate_profile, selected_profiles):
attempts += 1
continue
selected.append(candidate)
selected_ids.add(candidate.pk)
selected_names.add(candidate_name)
if similarity_scope == 'working':
selected_profiles.append(candidate_profile)
# Track family groups for intra-superset blocking
for fam in candidate_families:
selected_families.add(fam)
selected_family_counts[fam] += 1
group = _FAMILY_TO_GROUP.get(fam)
if group:
selected_family_groups.add(group)
attempts += 1
return selected
@staticmethod
def _tokenize_text(value):
"""Tokenize free text into normalized, low-noise tokens."""
if not value:
return set()
tokens = set(re.findall(r"[a-z0-9]+", value.lower()))
stop_words = {
'and', 'or', 'the', 'with', 'to', 'a', 'an', 'of',
'single', 'arm', 'double', 'alternating',
'barbell', 'dumbbell', 'kettlebell', 'machine', 'cable',
'bodyweight',
}
return {tok for tok in tokens if tok not in stop_words and len(tok) > 1}
@staticmethod
def _tokenize_csv(value):
"""Tokenize comma-separated categorical fields."""
if not value:
return set()
return {part.strip().lower() for part in value.split(',') if part and part.strip()}
def _build_similarity_profile(self, ex):
"""Create a cached token profile used by similarity scoring."""
cached = self._exercise_profile_cache.get(ex.pk)
if cached is not None:
return cached
profile = {
'id': ex.pk,
'movement': self._tokenize_csv(getattr(ex, 'movement_patterns', '') or ''),
'muscles': self._tokenize_csv(getattr(ex, 'muscle_groups', '') or ''),
'equipment': self._tokenize_csv(getattr(ex, 'equipment_required', '') or ''),
'name_tokens': self._tokenize_text(getattr(ex, 'name', '') or ''),
}
self._exercise_profile_cache[ex.pk] = profile
return profile
@staticmethod
def _jaccard_similarity(left, right):
"""Jaccard similarity between token sets."""
if not left and not right:
return 0.0
union = left | right
if not union:
return 0.0
return len(left & right) / len(union)
def _exercise_similarity_score(self, candidate_profile, existing_profile):
"""Weighted similarity score in [0,1]."""
movement = self._jaccard_similarity(
candidate_profile['movement'], existing_profile['movement']
)
muscles = self._jaccard_similarity(
candidate_profile['muscles'], existing_profile['muscles']
)
equipment = self._jaccard_similarity(
candidate_profile['equipment'], existing_profile['equipment']
)
name = self._jaccard_similarity(
candidate_profile['name_tokens'], existing_profile['name_tokens']
)
return (
(0.45 * movement)
+ (0.35 * muscles)
+ (0.10 * equipment)
+ (0.10 * name)
)
def _is_similarity_blocked(self, candidate_profile, selected_profiles):
"""Block near-duplicate exercises within the workout and adjacent sets."""
for existing_profile in self.used_working_similarity_profiles:
if (
self._exercise_similarity_score(candidate_profile, existing_profile)
>= self.SIMILARITY_HARD_THRESHOLD
):
return True
for existing_profile in selected_profiles:
if (
self._exercise_similarity_score(candidate_profile, existing_profile)
>= self.SIMILARITY_HARD_THRESHOLD
):
return True
for existing_profile in self.last_working_similarity_profiles:
if (
self._exercise_similarity_score(candidate_profile, existing_profile)
>= self.SIMILARITY_SOFT_THRESHOLD
):
return True
for existing_profile in selected_profiles:
if (
self._exercise_similarity_score(candidate_profile, existing_profile)
>= self.SIMILARITY_SOFT_THRESHOLD
):
return True
return False
def _pair_sided_exercises(self, selected, base_qs):
"""
For exercises with a ``side`` value (e.g. 'Left', 'Right'), try
to include the matching opposite-side exercise in the selection.
This swaps out a non-sided exercise to keep the count stable, or
simply appends if the list is short.
"""
paired = list(selected)
paired_ids = {e.pk for e in paired}
exercises_to_add = []
for ex in list(paired):
if ex.side and ex.side.strip():
side_norm = self._normalize_side_value(ex.side)
opposite_norm = self._opposite_side(side_norm)
if not opposite_norm:
continue
# Find the matching partner by exact base-name match and opposite side.
# Typically the name is identical except for side, e.g.
# "Single Arm Row Left" / "Single Arm Row Right"
base_name = self._strip_side_tokens(ex.name)
# Use strict matching: find candidates with opposite side,
# then filter in Python by exact base-name match to avoid
# substring false positives (e.g. "L Sit" matching "Wall Sit").
partner_candidates = (
Exercise.objects
.filter(self._side_values_q(opposite_norm))
.exclude(pk__in=self.used_exercise_ids)
.exclude(pk__in=paired_ids)
)
partner = None
for candidate in partner_candidates:
candidate_base = self._strip_side_tokens(candidate.name)
if base_name.lower() == candidate_base.lower():
partner = candidate
break
if partner and partner.pk not in paired_ids:
exercises_to_add.append(partner)
paired_ids.add(partner.pk)
# Insert partners right after their matching exercise
final = []
added_ids = set()
for ex in paired:
final.append(ex)
added_ids.add(ex.pk)
# Check if any partner should follow this exercise
for partner in exercises_to_add:
if partner.pk not in added_ids:
# Check if partner is the pair for this exercise using exact base-name match
if ex.side and ex.side.strip():
ex_base = self._strip_side_tokens(ex.name)
partner_base = self._strip_side_tokens(partner.name)
if ex_base.lower() == partner_base.lower():
final.append(partner)
added_ids.add(partner.pk)
# Add any remaining partners that didn't get inserted
for partner in exercises_to_add:
if partner.pk not in added_ids:
final.append(partner)
added_ids.add(partner.pk)
return final
def _trim_preserving_pairs(self, selected, count):
"""
Trim selected exercises to count, but never split a Left/Right pair.
If keeping a Left exercise, always keep its Right partner (and vice versa).
"""
if len(selected) <= count:
return selected
# Identify paired indices
paired_indices = set()
for i, ex in enumerate(selected):
if self._normalize_side_value(getattr(ex, 'side', '')):
# Find its partner in the list
base_name = ex.name
for side_word in ['Left', 'Right', 'left', 'right']:
base_name = base_name.replace(side_word, '').strip()
for j, other in enumerate(selected):
if i != j and self._normalize_side_value(getattr(other, 'side', '')):
other_base = other.name
for side_word in ['Left', 'Right', 'left', 'right']:
other_base = other_base.replace(side_word, '').strip()
if base_name.lower() == other_base.lower():
paired_indices.add(i)
paired_indices.add(j)
result = []
for i, ex in enumerate(selected):
if len(result) >= count and i not in paired_indices:
continue
# If this is part of a pair, include it even if over count
if i in paired_indices or len(result) < count:
result.append(ex)
# If keeping pairs pushed us over count, remove non-paired exercises
# from the end to compensate
if len(result) > count + 1:
excess = len(result) - count
trimmed = []
removed = 0
# Build paired set for result indices
result_paired = set()
for i, ex in enumerate(result):
if self._normalize_side_value(getattr(ex, 'side', '')):
base_name = ex.name
for side_word in ['Left', 'Right', 'left', 'right']:
base_name = base_name.replace(side_word, '').strip()
for j, other in enumerate(result):
if i != j and self._normalize_side_value(getattr(other, 'side', '')):
other_base = other.name
for side_word in ['Left', 'Right', 'left', 'right']:
other_base = other_base.replace(side_word, '').strip()
if base_name.lower() == other_base.lower():
result_paired.add(i)
result_paired.add(j)
for i in range(len(result) - 1, -1, -1):
if removed >= excess:
break
if i not in result_paired:
result.pop(i)
removed += 1
return result
def _order_side_pairs_adjacent(self, selected):
"""
Keep left/right variants adjacent in list order.
This is primarily for warm-up/cool-down UX so side-specific movements
render one after another instead of grouped by side.
"""
if len(selected) < 2:
return selected
side_map = {}
for ex in selected:
side_val = self._normalize_side_value(getattr(ex, 'side', ''))
if side_val not in ('left', 'right'):
continue
key = self._strip_side_tokens(getattr(ex, 'name', ''))
side_map.setdefault(key, {'left': [], 'right': []})
side_map[key][side_val].append(ex)
ordered = []
used_ids = set()
for ex in selected:
if ex.pk in used_ids:
continue
side_val = self._normalize_side_value(getattr(ex, 'side', ''))
if side_val in ('left', 'right'):
key = self._strip_side_tokens(getattr(ex, 'name', ''))
opposite = self._opposite_side(side_val)
opposite_ex = None
for candidate in side_map.get(key, {}).get(opposite, []):
if candidate.pk not in used_ids:
opposite_ex = candidate
break
if opposite_ex:
ordered.append(ex)
ordered.append(opposite_ex)
used_ids.add(ex.pk)
used_ids.add(opposite_ex.pk)
continue
ordered.append(ex)
used_ids.add(ex.pk)
for ex in selected:
if ex.pk not in used_ids:
ordered.append(ex)
used_ids.add(ex.pk)
return ordered
def _strip_side_tokens(self, name):
"""Normalize a name by removing left/right tokens."""
base = name or ''
for side_word in [
'Left', 'Right', 'left', 'right',
'left arm', 'right arm', 'left leg', 'right leg',
'left side', 'right side',
]:
base = base.replace(side_word, '').strip()
return base.lower()
@staticmethod
def _normalize_side_value(side):
"""Map DB side values to canonical left/right tokens."""
value = (side or '').strip().lower()
if value in _LEFT_SIDE_VALUES:
return 'left'
if value in _RIGHT_SIDE_VALUES:
return 'right'
return None
@staticmethod
def _opposite_side(side_norm):
"""Return opposite canonical side for left/right."""
if side_norm == 'left':
return 'right'
if side_norm == 'right':
return 'left'
return None
@staticmethod
def _side_values_q(side_norm):
"""Build a queryset filter matching any DB side token for a canonical side."""
q = Q()
values = _LEFT_SIDE_VALUES if side_norm == 'left' else _RIGHT_SIDE_VALUES
for side_value in values:
q |= Q(side__iexact=side_value)
return q
def _drop_unpaired_sided_exercises(self, selected):
"""Drop any left/right exercise that does not have its opposite side."""
side_groups = {}
for ex in selected:
side_val = self._normalize_side_value(getattr(ex, 'side', ''))
if side_val not in ('left', 'right'):
continue
key = self._strip_side_tokens(getattr(ex, 'name', ''))
side_groups.setdefault(key, {'left': [], 'right': []})
side_groups[key][side_val].append(ex.pk)
allowed_ids = set()
for key, sides in side_groups.items():
if sides['left'] and sides['right']:
allowed_ids.update(sides['left'])
allowed_ids.update(sides['right'])
filtered = []
removed_count = 0
for ex in selected:
side_val = self._normalize_side_value(getattr(ex, 'side', ''))
if side_val in ('left', 'right') and ex.pk not in allowed_ids:
removed_count += 1
continue
filtered.append(ex)
if removed_count:
self.warnings.append(
f'Removed {removed_count} unpaired side-specific exercises '
f'to enforce left/right pairing.'
)
return filtered
def _find_missing_side_partner(self, ex, base_qs, existing_ids):
"""
Try hard to find opposite-side partner for a sided exercise.
Search order:
1) base_qs with strict name-base match
2) global Exercise table with strict name-base match
3) base_qs with relaxed icontains name-base match
4) global Exercise table with relaxed icontains name-base match
"""
side_norm = self._normalize_side_value(getattr(ex, 'side', ''))
opposite_norm = self._opposite_side(side_norm)
if not opposite_norm:
return None
base_name = self._strip_side_tokens(getattr(ex, 'name', ''))
if not base_name:
return None
def _pick_from_queryset(qs, strict=True):
candidates = qs.filter(self._side_values_q(opposite_norm))
if strict:
candidates = [
c for c in candidates
if self._strip_side_tokens(getattr(c, 'name', '')) == base_name
]
return candidates[0] if candidates else None
return candidates.filter(name__icontains=base_name).first()
common_exclusions = Q(pk__in=existing_ids)
# Prefer unused exercise ids, but do not hard-fail pairing if only used counterpart exists.
preferred_exclusions = common_exclusions | Q(pk__in=self.used_exercise_ids)
base_preferred = base_qs.exclude(preferred_exclusions)
partner = _pick_from_queryset(base_preferred, strict=True)
if partner:
return partner
global_preferred = Exercise.objects.exclude(preferred_exclusions)
partner = _pick_from_queryset(global_preferred, strict=True)
if partner:
return partner
# Relaxed pass still avoiding duplicates in the current selection.
base_relaxed = base_qs.exclude(common_exclusions)
partner = _pick_from_queryset(base_relaxed, strict=False)
if partner:
return partner
global_relaxed = Exercise.objects.exclude(common_exclusions)
return _pick_from_queryset(global_relaxed, strict=False)
def _ensure_side_pair_integrity(
self,
selected,
base_qs,
count,
similarity_scope=None,
superset_position=None,
):
"""
Enforce strict left/right pairing:
- First attempt to add missing opposite-side partners
- Remove orphan left/right exercises only as a last resort
- Backfill with non-sided exercises when possible
"""
balanced = list(selected)
existing_ids = {ex.pk for ex in balanced}
added_partners = 0
for ex in list(balanced):
side_val = self._normalize_side_value(getattr(ex, 'side', ''))
if side_val not in ('left', 'right'):
continue
key = self._strip_side_tokens(getattr(ex, 'name', ''))
has_left = any(
self._normalize_side_value(getattr(other, 'side', '')) == 'left'
and self._strip_side_tokens(getattr(other, 'name', '')) == key
for other in balanced
)
has_right = any(
self._normalize_side_value(getattr(other, 'side', '')) == 'right'
and self._strip_side_tokens(getattr(other, 'name', '')) == key
for other in balanced
)
if has_left and has_right:
continue
partner = self._find_missing_side_partner(ex, base_qs, existing_ids)
if partner and partner.pk not in existing_ids:
balanced.append(partner)
existing_ids.add(partner.pk)
added_partners += 1
if added_partners:
# Keep sided pairs by preferentially removing non-sided fillers.
while len(balanced) > count:
remove_idx = None
for idx in range(len(balanced) - 1, -1, -1):
if self._normalize_side_value(getattr(balanced[idx], 'side', '')) not in ('left', 'right'):
remove_idx = idx
break
if remove_idx is None:
break
balanced.pop(remove_idx)
self.warnings.append(
f'Added {added_partners} missing opposite-side exercise partners.'
)
balanced = self._drop_unpaired_sided_exercises(balanced)
if len(balanced) < count:
deficit = count - len(balanced)
existing_ids = {ex.pk for ex in balanced}
filler_qs = (
base_qs.exclude(pk__in=existing_ids)
.filter(Q(side__isnull=True) | Q(side=''))
)
extras = self._weighted_pick(
filler_qs,
Exercise.objects.none(),
deficit,
superset_position=superset_position,
similarity_scope=similarity_scope,
)
balanced.extend(extras)
return balanced
def balance_stretch_positions(self, selected, muscle_groups=None, fitness_level=None):
"""
Improve stretch position variety for hypertrophy workouts.
Ensures exercises within a superset cover multiple stretch
positions (lengthened, mid, shortened) for more complete
muscle stimulus. Swaps the last non-primary exercise if
all exercises share the same stretch position.
Prefers 'lengthened' replacements (greater mechanical tension
at long muscle lengths = stronger hypertrophy stimulus).
"""
if len(selected) < 3:
return selected
position_counts = {}
for ex in selected:
pos = getattr(ex, 'stretch_position', None)
if pos:
position_counts[pos] = position_counts.get(pos, 0) + 1
# Check if variety is sufficient (no single position >= 75%)
if len(position_counts) >= 2:
total_with_pos = sum(position_counts.values())
max_count = max(position_counts.values())
if total_with_pos > 0 and max_count / total_with_pos < 0.75:
return selected # Good variety, no dominant position
dominant_position = max(position_counts, key=position_counts.get) if position_counts else None
if not dominant_position:
return selected # No stretch data available
# Find a replacement with a different stretch position
desired_positions = {'lengthened', 'mid', 'shortened'} - {dominant_position}
position_q = Q()
for pos in desired_positions:
position_q |= Q(stretch_position=pos)
replacement_qs = self._get_filtered_queryset(
muscle_groups=muscle_groups,
fitness_level=fitness_level,
).filter(position_q).exclude(pk__in={e.pk for e in selected})
replacements = list(replacement_qs[:5])
if not replacements:
return selected
# Prefer 'lengthened' for hypertrophy (greater mechanical tension)
lengthened = [r for r in replacements if r.stretch_position == 'lengthened']
pick = lengthened[0] if lengthened else replacements[0]
# Swap the last non-primary exercise
for i in range(len(selected) - 1, -1, -1):
if getattr(selected[i], 'exercise_tier', None) != 'primary':
old = selected[i]
selected[i] = pick
self.used_exercise_ids.discard(old.pk)
self.used_exercise_ids.add(pick.pk)
break
return selected