import random import logging import re from collections import Counter from django.db.models import Q, Count from exercise.models import Exercise from muscle.models import Muscle, ExerciseMuscle from equipment.models import Equipment, WorkoutEquipment from generator.services.muscle_normalizer import ( normalize_muscle_name, get_muscles_for_exercise, classify_split_type, MUSCLE_GROUP_CATEGORIES, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Movement family deduplication constants # --------------------------------------------------------------------------- # Ordered (keyword, family_tag) pairs — longer/more-specific keywords first # so that "hang clean" matches before generic "clean". MOVEMENT_FAMILY_KEYWORDS = [ # Olympic — specific before general ('clean and jerk', 'clean_and_jerk'), ('hang clean', 'clean'), ('clean pull', 'clean'), ('high pull', 'clean'), ('power clean', 'clean'), ('clean', 'clean'), ('snatch', 'snatch'), # Vertical pull ('chin-up', 'chin_up'), ('chin up', 'chin_up'), ('pull-up', 'pull_up'), ('pull up', 'pull_up'), ('lat pulldown', 'lat_pulldown'), ('pulldown', 'lat_pulldown'), # Horizontal press ('bench press', 'bench_press'), ('chest press', 'bench_press'), ('push-up', 'push_up'), ('push up', 'push_up'), # Overhead press ('overhead press', 'overhead_press'), ('shoulder press', 'overhead_press'), ('military press', 'overhead_press'), ('push press', 'push_press'), # Lower body ('squat', 'squat'), ('deadlift', 'deadlift'), ('hip thrust', 'hip_thrust'), ('lunge', 'lunge'), ('split squat', 'lunge'), ('step up', 'step_up'), ('step-up', 'step_up'), # Row ('row', 'row'), # Arms ('bicep curl', 'bicep_curl'), ('hammer curl', 'bicep_curl'), ('curl', 'bicep_curl'), ('tricep extension', 'tricep_extension'), ('skull crusher', 'tricep_extension'), # Shoulders ('lateral raise', 'lateral_raise'), ('front raise', 'front_raise'), ('rear delt', 'rear_delt'), ('face pull', 'face_pull'), ('shrug', 'shrug'), # Other ('carry', 'carry'), ('farmer', 'carry'), ('dip', 'dip'), ('burpee', 'burpee'), ('thruster', 'thruster'), ('turkish', 'turkish_getup'), ] # Super-families: families that are too similar for the same superset FAMILY_GROUPS = { 'vertical_pull': {'pull_up', 'chin_up', 'lat_pulldown'}, 'olympic_pull': {'clean', 'snatch', 'clean_and_jerk'}, 'horizontal_press': {'bench_press', 'push_up'}, } # Narrow families — max 1 per entire workout NARROW_FAMILIES = { 'clean', 'snatch', 'clean_and_jerk', 'push_press', 'thruster', 'turkish_getup', 'burpee', } # Everything else defaults to max 2 per workout # Precomputed reverse map: family -> group name _FAMILY_TO_GROUP = {} for _group, _members in FAMILY_GROUPS.items(): for _member in _members: _FAMILY_TO_GROUP[_member] = _group _LEFT_SIDE_VALUES = {'left', 'left_arm', 'left_leg', 'left_side'} _RIGHT_SIDE_VALUES = {'right', 'right_arm', 'right_leg', 'right_side'} def extract_movement_families(exercise_name): """Extract movement family tags from an exercise name. Returns a set of family strings. Uses longest-match-first to avoid partial overlaps (e.g. "hang clean" matches before "clean"). """ if not exercise_name: return set() name_lower = exercise_name.lower().strip() families = set() matched_spans = [] for keyword, family in MOVEMENT_FAMILY_KEYWORDS: idx = name_lower.find(keyword) if idx >= 0: span = (idx, idx + len(keyword)) # Skip if this span overlaps an already-matched span overlaps = any( not (span[1] <= ms[0] or span[0] >= ms[1]) for ms in matched_spans ) if not overlaps: families.add(family) matched_spans.append(span) return families class ExerciseSelector: """ Smart exercise selection service that picks exercises based on user preferences, available equipment, target muscle groups, and variety. """ # Bodyweight equipment names to fall back to when equipment-filtered # results are too sparse. BODYWEIGHT_KEYWORDS = ['bodyweight', 'body weight', 'none', 'no equipment'] # Movement patterns considered too complex for beginners ADVANCED_PATTERNS = ['olympic', 'plyometric'] # Movement patterns considered appropriate for warm-up / cool-down WARMUP_PATTERNS = [ 'dynamic stretch', 'mobility - dynamic', 'activation', 'warm up', 'warmup', 'cardio/locomotion', 'balance', ] COOLDOWN_PATTERNS = [ 'static stretch', 'stretch', 'cool down', 'cooldown', 'mobility', 'foam roll', 'yoga', ] # Movement patterns explicitly forbidden in cooldowns COOLDOWN_EXCLUDED_PATTERNS = [ 'plyometric', 'combat', 'cardio/locomotion', 'olympic', ] # Warm-up must avoid working-set patterns. WARMUP_EXCLUDED_PATTERNS = [ 'upper push', 'upper pull', 'olympic', 'combat', 'arms', ] # Similarity thresholds to prevent near-duplicate selections. SIMILARITY_HARD_THRESHOLD = 0.80 SIMILARITY_SOFT_THRESHOLD = 0.65 # Recovery/stretch movements should not appear in working sets. WORKING_EXCLUDED_PATTERNS = [ 'mobility - static', 'static stretch', 'cool down', 'cooldown', 'yoga', 'breathing', 'massage', ] def __init__(self, user_preference, recently_used_ids=None, hard_exclude_ids=None): self.user_preference = user_preference self.used_exercise_ids = set() # tracks within a single workout self.used_exercise_names = set() # tracks names for cross-superset dedup self.recently_used_ids = recently_used_ids or set() self.hard_exclude_ids = hard_exclude_ids or set() # Phase 6: hard exclude recent exercises self.used_movement_patterns = Counter() # Phase 11: track patterns for variety self.used_movement_families = Counter() # Movement family dedup across workout self.used_working_similarity_profiles = [] self.last_working_similarity_profiles = [] self._exercise_profile_cache = {} self.warnings = [] # Phase 13: generation warnings self.progression_boost_ids = set() # IDs of exercises that are progressions of recently done ones # Week-scoped state for cross-day dedup (NOT cleared by reset()) self.week_used_exercise_ids = set() self.week_used_movement_families = Counter() # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def reset(self): """Reset used exercises for a new workout (preserves week-scoped state).""" self.used_exercise_ids = set() self.used_exercise_names = set() self.used_movement_patterns = Counter() self.used_movement_families = Counter() self.used_working_similarity_profiles = [] self.last_working_similarity_profiles = [] self._exercise_profile_cache = {} self.warnings = [] # Clear per-queryset caches so equipment/exclusion changes take effect if hasattr(self, '_equipment_map_cache'): del self._equipment_map_cache if hasattr(self, '_bodyweight_ids_cache'): del self._bodyweight_ids_cache if hasattr(self, '_warned_small_pool'): del self._warned_small_pool if hasattr(self, '_warned_no_equipment'): del self._warned_no_equipment if hasattr(self, '_relaxed_hard_exclude_ids'): del self._relaxed_hard_exclude_ids if hasattr(self, '_injury_warnings_emitted'): del self._injury_warnings_emitted def reset_week(self): """Reset all state including week-scoped tracking. Call at start of a new week.""" self.reset() self.week_used_exercise_ids = set() self.week_used_movement_families = Counter() def accumulate_week_state(self, exercise_ids, exercise_names): """Record a completed day's exercises into week-scoped tracking. Parameters ---------- exercise_ids : set[int] Primary keys of exercises used in the day's workout. exercise_names : set[str] Exercise names (used for family extraction). """ self.week_used_exercise_ids.update(exercise_ids) for name in exercise_names: for fam in extract_movement_families(name): self.week_used_movement_families[fam] += 1 def _get_week_family_limit(self, family): """Max allowed uses of a movement family across the whole week. Wider than per-workout limits: narrow families = 2/week, broad = 4/week. """ if family in NARROW_FAMILIES: return 2 return 4 def select_exercises( self, muscle_groups, count, is_duration_based=False, movement_pattern_preference=None, prefer_weighted=False, superset_position=None, allow_cross_modality=False, ): """ Select *count* exercises matching the given criteria. Parameters ---------- muscle_groups : list[str] Canonical muscle group names (e.g. ['chest', 'triceps']). count : int How many exercises to return. is_duration_based : bool When True, prefer exercises whose ``is_duration`` flag is set. movement_pattern_preference : list[str] | None Optional list of preferred movement patterns to favour. prefer_weighted : bool When True (R6), boost is_weight=True exercises in selection. allow_cross_modality : bool When True, don't hard-filter by modality — instead use soft preference so duration-only exercises (carries, planks) can land in rep-based supersets and vice versa. Returns ------- list[Exercise] """ if count <= 0: return [] fitness_level = getattr(self.user_preference, 'fitness_level', None) # When cross-modality is allowed, skip the hard modality filter # so duration-only exercises can appear in rep supersets and vice versa. modality_for_filter = None if allow_cross_modality else is_duration_based preferred_modality = 'duration' if is_duration_based else 'reps' qs = self._get_filtered_queryset( muscle_groups=muscle_groups, is_duration_based=modality_for_filter, fitness_level=fitness_level, ) # Working supersets should not contain stretch/recovery exercises. # Use regex word boundary to avoid over-matching (e.g. "Stretch Band Row" # should NOT be excluded, but "Hamstring Stretch" should). excluded_q = Q(name__iregex=r'\bstretch(ing|es|ed)?\b') for pat in self.WORKING_EXCLUDED_PATTERNS: excluded_q |= Q(movement_patterns__icontains=pat) qs = qs.exclude(excluded_q) # Guard against low-quality rows causing misclassification/selection drift. qs = qs.exclude(Q(movement_patterns__isnull=True) | Q(movement_patterns='')) qs = qs.exclude(Q(muscle_groups__isnull=True) | Q(muscle_groups='')) # For advanced/elite, boost compound movements if fitness_level and fitness_level >= 3 and not movement_pattern_preference: compound_qs = qs.filter(is_compound=True) if compound_qs.exists(): preferred_qs = compound_qs other_qs = qs.exclude(pk__in=compound_qs.values_list('pk', flat=True)) else: preferred_qs = qs.none() other_qs = qs elif movement_pattern_preference: # Optionally boost exercises whose movement_patterns match a preference pattern_q = Q() for pat in movement_pattern_preference: pattern_q |= Q(movement_patterns__icontains=pat) preferred_qs = qs.filter(pattern_q) other_qs = qs.exclude(pk__in=preferred_qs.values_list('pk', flat=True)) else: preferred_qs = qs.none() other_qs = qs # R6: For strength workouts, boost is_weight=True exercises if prefer_weighted: weighted_qs = qs.filter(is_weight=True) if weighted_qs.exists(): # Merge weighted exercises into preferred pool combined_preferred_ids = set(preferred_qs.values_list('pk', flat=True)) | set(weighted_qs.values_list('pk', flat=True)) preferred_qs = qs.filter(pk__in=combined_preferred_ids) other_qs = qs.exclude(pk__in=combined_preferred_ids) selected = self._weighted_pick( preferred_qs, other_qs, count, superset_position=superset_position, similarity_scope='working', preferred_modality=preferred_modality if allow_cross_modality else None, ) # Sort selected exercises by tier: primary first, then secondary, then accessory TIER_ORDER = {'primary': 0, 'secondary': 1, 'accessory': 2, None: 2} selected.sort(key=lambda ex: TIER_ORDER.get(ex.exercise_tier, 2)) # Ensure target muscle groups have coverage if muscle_groups and selected: from muscle.models import ExerciseMuscle # Batch-load muscles for all selected exercises (avoid N+1) selected_ids = {ex.pk for ex in selected} ex_muscle_rows = ExerciseMuscle.objects.filter( exercise_id__in=selected_ids ).values_list('exercise_id', 'muscle__name') from collections import defaultdict ex_muscle_map = defaultdict(set) for ex_id, muscle_name in ex_muscle_rows: ex_muscle_map[ex_id].add(normalize_muscle_name(muscle_name)) covered_muscles = set() for ex in selected: covered_muscles.update(ex_muscle_map.get(ex.pk, set())) normalized_targets = {normalize_muscle_name(mg) for mg in muscle_groups} uncovered = normalized_targets - covered_muscles if uncovered and len(selected) > 1: # Track swapped indices to avoid overwriting previous swaps swapped_indices = set() for missing_muscle in uncovered: replacement_qs = self._get_filtered_queryset( muscle_groups=[missing_muscle], is_duration_based=modality_for_filter, fitness_level=fitness_level, ).exclude(pk__in={e.pk for e in selected}) # Validate modality: ensure replacement matches expected modality # (skip when cross-modality is allowed) if not allow_cross_modality: if is_duration_based: replacement_qs = replacement_qs.filter(is_duration=True) elif is_duration_based is False: replacement_qs = replacement_qs.filter(is_reps=True) replacement = list(replacement_qs[:1]) if replacement: # Find last unswapped accessory swap_idx = None for i in range(len(selected) - 1, -1, -1): if i in swapped_indices: continue if getattr(selected[i], 'exercise_tier', None) == 'accessory': swap_idx = i break # Fallback: any unswapped non-primary if swap_idx is None: for i in range(len(selected) - 1, -1, -1): if i in swapped_indices: continue if getattr(selected[i], 'exercise_tier', None) != 'primary': swap_idx = i break if swap_idx is not None: selected[swap_idx] = replacement[0] swapped_indices.add(swap_idx) # If we couldn't get enough with equipment filters, widen to bodyweight if len(selected) < count: fallback_qs = self._get_bodyweight_queryset( muscle_groups=muscle_groups, is_duration_based=is_duration_based, fitness_level=fitness_level, ) fallback_qs = fallback_qs.exclude(excluded_q) fallback_qs = fallback_qs.exclude(Q(movement_patterns__isnull=True) | Q(movement_patterns='')) fallback_qs = fallback_qs.exclude(Q(muscle_groups__isnull=True) | Q(muscle_groups='')) still_needed = count - len(selected) already_ids = {e.pk for e in selected} fallback_qs = fallback_qs.exclude(pk__in=already_ids) mg_label = ', '.join(muscle_groups[:3]) if muscle_groups else 'target muscles' extras = self._weighted_pick( fallback_qs, Exercise.objects.none(), still_needed, similarity_scope='working', ) if extras: self.warnings.append( f'Used bodyweight fallback for {mg_label} ' f'({len(extras)} exercises) due to limited equipment matches.' ) selected.extend(extras) if len(selected) < count: self.warnings.append( f'Could only find {len(selected)}/{count} exercises ' f'for {mg_label}.' ) # Handle side-specific pairing: if an exercise has a side value, # look for the matching opposite-side exercise so they appear together. selected = self._pair_sided_exercises(selected, qs) selected = self._ensure_side_pair_integrity( selected, qs, count=count, similarity_scope='working', superset_position=superset_position, ) # Mark everything we just selected as used and track patterns for ex in selected: self.used_exercise_ids.add(ex.pk) self.used_exercise_names.add((ex.name or '').lower().strip()) patterns = getattr(ex, 'movement_patterns', '') or '' if patterns: for pat in [p.strip().lower() for p in patterns.split(',') if p.strip()]: self.used_movement_patterns[pat] += 1 self._track_families(selected) self._track_similarity_profiles(selected, scope='working') return self._trim_preserving_pairs(selected, count) def select_warmup_exercises(self, target_muscles, count=5): """Select duration-based exercises suitable for warm-up.""" fitness_level = getattr(self.user_preference, 'fitness_level', None) qs = self._get_filtered_queryset( muscle_groups=target_muscles, is_duration_based=True, fitness_level=fitness_level, ) # Prefer exercises whose movement_patterns overlap with warmup keywords warmup_q = Q() for kw in self.WARMUP_PATTERNS: warmup_q |= Q(movement_patterns__icontains=kw) # Warm-up should be dynamic movement prep, not loaded working sets. qs = qs.exclude(is_weight=True) # Exclude heavy compounds (no barbell squats in warmup) qs = qs.exclude(is_compound=True) # Exclude primary-tier exercises (no primary lifts in warmup) qs = qs.exclude(exercise_tier='primary') # Exclude technically complex movements qs = qs.exclude(complexity_rating__gte=4) # Exclude common working-set movement families from warmup. warmup_exclude_q = Q() for pat in self.WARMUP_EXCLUDED_PATTERNS: warmup_exclude_q |= Q(movement_patterns__icontains=pat) qs = qs.exclude(warmup_exclude_q) # Tightened HR filter for warmup (1-4 instead of 2-5) hr_warmup_q = Q(hr_elevation_rating__gte=1, hr_elevation_rating__lte=4) preferred = qs.filter(warmup_q).filter( hr_warmup_q | Q(hr_elevation_rating__isnull=True) ) # STRICT: warmup must come from warmup-pattern exercises only. selected = self._weighted_pick(preferred, preferred.none(), count) # Fallback: if not enough duration-based warmup exercises, widen to # any duration exercise regardless of muscle group if len(selected) < count: wide_qs = self._get_filtered_queryset( muscle_groups=None, is_duration_based=True, fitness_level=fitness_level, ).exclude(pk__in={e.pk for e in selected}) # Apply same warmup safety exclusions wide_qs = wide_qs.exclude(is_weight=True) wide_qs = wide_qs.exclude(is_compound=True) wide_qs = wide_qs.exclude(exercise_tier='primary') wide_qs = wide_qs.exclude(complexity_rating__gte=4) wide_qs = wide_qs.exclude(warmup_exclude_q) wide_preferred = wide_qs.filter(warmup_q).filter( hr_warmup_q | Q(hr_elevation_rating__isnull=True) ) selected.extend( self._weighted_pick(wide_preferred, wide_preferred.none(), count - len(selected)) ) for ex in selected: self.used_exercise_ids.add(ex.pk) self.used_exercise_names.add((ex.name or '').lower().strip()) self._track_families(selected) selected = self._ensure_side_pair_integrity(selected, qs, count=count) selected = self._trim_preserving_pairs(selected, count) return self._order_side_pairs_adjacent(selected) def select_cooldown_exercises(self, target_muscles, count=4): """ Select duration-based exercises suitable for cool-down. R11: Excludes is_weight=True exercises that don't match cooldown movement patterns (stretch/mobility only). Also enforces low HR elevation (<=3) for proper cool-down. """ fitness_level = getattr(self.user_preference, 'fitness_level', None) qs = self._get_filtered_queryset( muscle_groups=target_muscles, is_duration_based=True, fitness_level=fitness_level, ) cooldown_q = Q() for kw in self.COOLDOWN_PATTERNS: cooldown_q |= Q(movement_patterns__icontains=kw) # Exclude dangerous movement patterns from cooldowns entirely exclude_q = Q() for pat in self.COOLDOWN_EXCLUDED_PATTERNS: exclude_q |= Q(movement_patterns__icontains=pat) # Also exclude compound push/pull exercises exclude_q |= (Q(movement_patterns__icontains='push') | Q(movement_patterns__icontains='pull')) & Q(is_compound=True) qs = qs.exclude(exclude_q) # R11: Exclude weighted exercises that aren't cooldown-pattern exercises weighted_non_cooldown = qs.filter(is_weight=True).exclude(cooldown_q) qs = qs.exclude(pk__in=weighted_non_cooldown.values_list('pk', flat=True)) # Cooldown HR ceiling: only low HR exercises (<=3) for proper cool-down qs = qs.filter(Q(hr_elevation_rating__lte=3) | Q(hr_elevation_rating__isnull=True)) # STRICT: Only use cooldown-pattern exercises (no 'other' pool) preferred = qs.filter(cooldown_q) selected = self._weighted_pick(preferred, preferred.none(), count) # Fallback: widen to any duration exercise with cooldown patterns (no muscle filter) if len(selected) < count: wide_qs = self._get_filtered_queryset( muscle_groups=None, is_duration_based=True, fitness_level=fitness_level, ).exclude(pk__in={e.pk for e in selected}) # Apply same exclusions wide_qs = wide_qs.exclude(exclude_q) # R11: also apply weight filter on wide fallback wide_weighted_non_cooldown = wide_qs.filter(is_weight=True).exclude(cooldown_q) wide_qs = wide_qs.exclude(pk__in=wide_weighted_non_cooldown.values_list('pk', flat=True)) # HR ceiling on fallback too wide_qs = wide_qs.filter(Q(hr_elevation_rating__lte=3) | Q(hr_elevation_rating__isnull=True)) # STRICT: Only cooldown-pattern exercises even in fallback wide_preferred = wide_qs.filter(cooldown_q) selected.extend( self._weighted_pick(wide_preferred, wide_preferred.none(), count - len(selected)) ) for ex in selected: self.used_exercise_ids.add(ex.pk) self.used_exercise_names.add((ex.name or '').lower().strip()) self._track_families(selected) selected = self._ensure_side_pair_integrity(selected, qs, count=count) selected = self._trim_preserving_pairs(selected, count) return self._order_side_pairs_adjacent(selected) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _get_family_limit(self, family): """Max allowed uses of a movement family across the whole workout.""" if family in NARROW_FAMILIES: return 1 return 2 def _track_families(self, exercises): """Record movement families for a list of selected exercises.""" for ex in exercises: for fam in extract_movement_families(ex.name): self.used_movement_families[fam] += 1 def _track_similarity_profiles(self, exercises, scope='working'): """Record similarity profiles so later supersets can avoid near-duplicates.""" if scope != 'working': return profiles = [self._build_similarity_profile(ex) for ex in exercises] self.last_working_similarity_profiles = profiles self.used_working_similarity_profiles.extend(profiles) def _get_filtered_queryset(self, muscle_groups=None, is_duration_based=None, fitness_level=None): """ Build a base Exercise queryset filtered by: - User's available equipment (through WorkoutEquipment) - Excluded exercises from user preferences - Already-used exercises in the current workout - Target muscle groups (through ExerciseMuscle) - Optionally, duration-based flag - Fitness level (excludes complex patterns for beginners) """ qs = Exercise.objects.all() # ---- Exclude exercises the user has explicitly blacklisted ---- excluded_ids = set( self.user_preference.excluded_exercises.values_list('pk', flat=True) ) if excluded_ids: qs = qs.exclude(pk__in=excluded_ids) # ---- Exclude already-used exercises in this workout ---- if self.used_exercise_ids: qs = qs.exclude(pk__in=self.used_exercise_ids) # ---- Exclude exercises with same name (cross-superset dedup) ---- if self.used_exercise_names: name_exclude_q = Q() for name in self.used_exercise_names: if name: name_exclude_q |= Q(name__iexact=name) if name_exclude_q: qs = qs.exclude(name_exclude_q) # ---- Hard exclude exercises from recent workouts (Phase 6) ---- # Adaptive: if pool would be too small, relax hard exclude to soft penalty. # Use a local merged set to avoid permanently polluting recently_used_ids. if self.hard_exclude_ids: test_qs = qs.exclude(pk__in=self.hard_exclude_ids) if test_qs.count() >= 10: qs = test_qs else: # Pool too small — treat hard excludes as soft penalty for this # queryset only (don't mutate the original recently_used_ids). if not hasattr(self, '_relaxed_hard_exclude_ids'): self._relaxed_hard_exclude_ids = set(self.hard_exclude_ids) if not hasattr(self, '_warned_small_pool'): self.warnings.append( 'Exercise pool too small for full variety rotation — ' 'relaxed recent exclusion to soft penalty.' ) self._warned_small_pool = True # ---- Filter by user's available equipment ---- available_equipment_ids = set( self.user_preference.available_equipment.values_list('pk', flat=True) ) if not available_equipment_ids: # No equipment set in preferences — all exercises are available (no filtering). pass elif available_equipment_ids: # Cache equipment map on instance to avoid rebuilding per call if not hasattr(self, '_equipment_map_cache'): from collections import defaultdict exercise_equipment_map = defaultdict(set) for ex_id, eq_id in WorkoutEquipment.objects.values_list('exercise_id', 'equipment_id'): exercise_equipment_map[ex_id].add(eq_id) self._equipment_map_cache = dict(exercise_equipment_map) self._bodyweight_ids_cache = set( Exercise.objects.exclude( pk__in=set(exercise_equipment_map.keys()) ).values_list('pk', flat=True) ) exercise_equipment_map = self._equipment_map_cache bodyweight_ids = self._bodyweight_ids_cache # AND logic: only include exercises where ALL required equipment is available equipment_ok_ids = set() for ex_id, required_equip in exercise_equipment_map.items(): if required_equip.issubset(available_equipment_ids): equipment_ok_ids.add(ex_id) allowed_ids = equipment_ok_ids | bodyweight_ids qs = qs.filter(pk__in=allowed_ids) # ---- Filter by muscle groups via ExerciseMuscle join ---- if muscle_groups: normalized = [normalize_muscle_name(mg) for mg in muscle_groups] muscle_ids = set( Muscle.objects.filter( name__in=normalized ).values_list('pk', flat=True) ) # Also try case-insensitive matching for robustness if not muscle_ids: q = Q() for name in normalized: q |= Q(name__iexact=name) muscle_ids = set( Muscle.objects.filter(q).values_list('pk', flat=True) ) if muscle_ids: exercise_ids = set( ExerciseMuscle.objects.filter( muscle_id__in=muscle_ids ).values_list('exercise_id', flat=True) ) qs = qs.filter(pk__in=exercise_ids) # ---- Duration bias ---- if is_duration_based is True: qs = qs.filter(is_duration=True) elif is_duration_based is False: # Rep-based supersets must use rep-capable exercises only. qs = qs.filter(is_reps=True) # ---- Fitness-level filtering ---- if fitness_level is not None and fitness_level <= 1: # Beginners: exclude exercises with complex movement patterns exclude_q = Q() for pat in self.ADVANCED_PATTERNS: exclude_q |= Q(movement_patterns__icontains=pat) qs = qs.exclude(exclude_q) # Exclude advanced exercises for beginners if fitness_level is not None and fitness_level <= 1: qs = qs.exclude(difficulty_level='advanced') # ---- Complexity cap by fitness level ---- if fitness_level is not None: complexity_caps = {1: 3, 2: 4, 3: 5, 4: 5} max_complexity = complexity_caps.get(fitness_level, 5) qs = qs.filter( Q(complexity_rating__lte=max_complexity) | Q(complexity_rating__isnull=True) ) # ---- Injury-based filtering ---- qs = self._apply_injury_filters(qs) return qs.distinct() def _get_bodyweight_queryset(self, muscle_groups=None, is_duration_based=None, fitness_level=None): """ Fallback queryset that only includes exercises with NO equipment requirement (bodyweight). Ignores user's equipment preferences but still applies safety filters (fitness level, injuries, complexity). """ exercises_with_equipment = set( WorkoutEquipment.objects.values_list('exercise_id', flat=True).distinct() ) qs = Exercise.objects.exclude(pk__in=exercises_with_equipment) # Excluded exercises excluded_ids = set( self.user_preference.excluded_exercises.values_list('pk', flat=True) ) if excluded_ids: qs = qs.exclude(pk__in=excluded_ids) # Already used if self.used_exercise_ids: qs = qs.exclude(pk__in=self.used_exercise_ids) # Hard exclude from recent workouts (Phase 6) if self.hard_exclude_ids: qs = qs.exclude(pk__in=self.hard_exclude_ids) # Muscle groups if muscle_groups: normalized = [normalize_muscle_name(mg) for mg in muscle_groups] muscle_ids = set( Muscle.objects.filter(name__in=normalized).values_list('pk', flat=True) ) if not muscle_ids: q = Q() for name in normalized: q |= Q(name__iexact=name) muscle_ids = set( Muscle.objects.filter(q).values_list('pk', flat=True) ) if muscle_ids: exercise_ids = set( ExerciseMuscle.objects.filter( muscle_id__in=muscle_ids ).values_list('exercise_id', flat=True) ) qs = qs.filter(pk__in=exercise_ids) if is_duration_based is True: qs = qs.filter(is_duration=True) elif is_duration_based is False: qs = qs.filter(is_reps=True) # ---- Safety: Fitness-level filtering (same as _get_filtered_queryset) ---- if fitness_level is not None and fitness_level <= 1: exclude_q = Q() for pat in self.ADVANCED_PATTERNS: exclude_q |= Q(movement_patterns__icontains=pat) qs = qs.exclude(exclude_q) qs = qs.exclude(difficulty_level='advanced') # ---- Safety: Complexity cap by fitness level ---- if fitness_level is not None: complexity_caps = {1: 3, 2: 4, 3: 5, 4: 5} max_complexity = complexity_caps.get(fitness_level, 5) qs = qs.filter( Q(complexity_rating__lte=max_complexity) | Q(complexity_rating__isnull=True) ) # ---- Safety: Injury-based filtering ---- qs = self._apply_injury_filters(qs) return qs.distinct() def _apply_injury_filters(self, qs): """ Apply injury-based exercise exclusions with severity levels. Supports both legacy format (list of strings) and new format (list of {"type": str, "severity": "mild|moderate|severe"}). Severity levels: - mild: only exclude exercises explicitly dangerous for that injury - moderate: current behavior (exclude high-impact, relevant patterns) - severe: aggressive exclusion (broader pattern exclusion) """ injury_types = getattr(self.user_preference, 'injury_types', None) or [] if injury_types: # Normalize to dict format for backward compatibility injury_map = {} for item in injury_types: if isinstance(item, str): injury_map[item] = 'moderate' elif isinstance(item, dict): injury_map[item.get('type', '')] = item.get('severity', 'moderate') def _is_at_least(injury_type, min_severity): """Check if an injury has at least the given severity.""" levels = {'mild': 1, 'moderate': 2, 'severe': 3} actual = injury_map.get(injury_type, '') return levels.get(actual, 0) >= levels.get(min_severity, 0) # Generate informational warnings about injury-based exclusions if not hasattr(self, '_injury_warnings_emitted'): self._injury_warnings_emitted = True for inj_type, sev in injury_map.items(): label = inj_type.replace('_', ' ').title() if sev == 'severe': self.warnings.append( f'Excluding high-impact and many weighted exercises due to severe {label.lower()} injury.' ) elif sev == 'moderate': self.warnings.append( f'Excluding high-impact exercises due to {label.lower()} injury.' ) else: self.warnings.append( f'Limiting certain movements due to mild {label.lower()} injury.' ) # High impact exclusion for lower body injuries (moderate+) lower_injuries = {'knee', 'ankle', 'hip', 'lower_back'} if any(_is_at_least(inj, 'moderate') for inj in lower_injuries & set(injury_map)): qs = qs.exclude(impact_level='high') # Severe: also exclude medium impact if any(_is_at_least(inj, 'severe') for inj in lower_injuries & set(injury_map)): qs = qs.exclude(impact_level='medium') if _is_at_least('knee', 'moderate') or _is_at_least('ankle', 'moderate'): qs = qs.exclude(movement_patterns__icontains='plyometric') # Severe knee/ankle: also exclude lunges if _is_at_least('knee', 'severe') or _is_at_least('ankle', 'severe'): qs = qs.exclude(movement_patterns__icontains='lunge') if _is_at_least('lower_back', 'moderate'): qs = qs.exclude( Q(movement_patterns__icontains='hip hinge') & Q(is_weight=True) & Q(difficulty_level='advanced') ) if _is_at_least('lower_back', 'severe'): qs = qs.exclude( Q(movement_patterns__icontains='hip hinge') & Q(is_weight=True) ) if _is_at_least('upper_back', 'moderate'): qs = qs.exclude( Q(movement_patterns__icontains='upper pull') & Q(is_weight=True) & Q(difficulty_level='advanced') ) if _is_at_least('upper_back', 'severe'): qs = qs.exclude( Q(movement_patterns__icontains='upper pull') & Q(is_weight=True) ) if _is_at_least('shoulder', 'mild'): qs = qs.exclude(movement_patterns__icontains='upper push - vertical') if _is_at_least('shoulder', 'severe'): qs = qs.exclude( Q(movement_patterns__icontains='upper push') & Q(is_weight=True) ) if _is_at_least('hip', 'moderate'): qs = qs.exclude( Q(movement_patterns__icontains='lower push - squat') & Q(difficulty_level='advanced') ) if _is_at_least('hip', 'severe'): qs = qs.exclude(movement_patterns__icontains='lower push - squat') if _is_at_least('wrist', 'moderate'): qs = qs.exclude( Q(movement_patterns__icontains='olympic') & Q(is_weight=True) ) if _is_at_least('neck', 'moderate'): qs = qs.exclude( Q(movement_patterns__icontains='upper push - vertical') & Q(is_weight=True) ) else: # Legacy: parse free-text injuries_limitations field injuries = getattr(self.user_preference, 'injuries_limitations', '') or '' if injuries: injuries_lower = injuries.lower() knee_keywords = ['knee', 'acl', 'mcl', 'meniscus', 'patella'] back_keywords = ['back', 'spine', 'spinal', 'disc', 'herniat'] shoulder_keywords = ['shoulder', 'rotator', 'labrum', 'impingement'] if any(kw in injuries_lower for kw in knee_keywords): qs = qs.exclude(impact_level='high') if any(kw in injuries_lower for kw in back_keywords): qs = qs.exclude(impact_level='high') qs = qs.exclude( Q(movement_patterns__icontains='hip hinge') & Q(is_weight=True) & Q(difficulty_level='advanced') ) if any(kw in injuries_lower for kw in shoulder_keywords): qs = qs.exclude(movement_patterns__icontains='upper push - vertical') return qs def _weighted_pick( self, preferred_qs, other_qs, count, superset_position=None, similarity_scope=None, preferred_modality=None, ): """ Pick up to *count* exercises using weighted random selection. Preferred exercises are 3x more likely to be chosen than the general pool, ensuring variety while still favouring matches. Enforces movement-family deduplication: - Intra-superset: no two exercises from the same family group - Cross-workout: max N per family (1 for narrow, 2 for broad) superset_position: 'early', 'late', or None. When set, boosts exercises based on their exercise_tier (primary for early, accessory for late). preferred_modality: 'reps' or 'duration' or None. When set, exercises that don't match the preferred modality get 0.3x weight (cross-modality penalty). Dual-modality exercises always get full weight. """ if count <= 0: return [] preferred_list = list(preferred_qs) other_list = list(other_qs) # Build a weighted pool: each preferred exercise appears 3 times pool = [] weight_preferred = 3 weight_other = 1 def _tier_boost(ex, base_w): """Apply tier-based weighting based on superset position.""" if not superset_position: return base_w tier = getattr(ex, 'exercise_tier', None) if superset_position == 'early' and tier == 'primary': return base_w * 2 elif superset_position == 'late' and tier == 'accessory': return base_w * 2 return base_w def _apply_week_penalty(ex, base_w): """Soft-penalize exercises already used earlier in the week.""" w = base_w if self.week_used_exercise_ids and ex.pk in self.week_used_exercise_ids: w = max(1, w // 2) if self.week_used_movement_families: for fam in extract_movement_families(ex.name): if self.week_used_movement_families.get(fam, 0) >= self._get_week_family_limit(fam): w = max(1, w // 2) break return w def _apply_modality_penalty(ex, base_w): """Soft-penalize exercises that don't match the preferred modality. Dual-modality exercises (is_reps AND is_duration) get full weight. Cross-modality exercises get 0.3x weight (minimum 1). """ if not preferred_modality: return base_w is_reps = getattr(ex, 'is_reps', False) is_dur = getattr(ex, 'is_duration', False) # Dual-modality: always full weight if is_reps and is_dur: return base_w if preferred_modality == 'reps' and is_reps: return base_w if preferred_modality == 'duration' and is_dur: return base_w # Cross-modality: reduce to ~30% of base weight return max(1, int(base_w * 0.3)) # Build effective soft-penalty set: recently_used + any relaxed hard excludes _effective_recently_used = self.recently_used_ids if hasattr(self, '_relaxed_hard_exclude_ids') and self._relaxed_hard_exclude_ids: _effective_recently_used = self.recently_used_ids | self._relaxed_hard_exclude_ids for ex in preferred_list: w = weight_preferred # Boost exercises that are progressions of recently completed exercises if ex.pk in self.progression_boost_ids: w = w * 2 if ex.pk in _effective_recently_used: w = 1 # Reduce weight for recently used # Penalize overused movement patterns for variety (Phase 11) # Fixed: check ALL comma-separated patterns, use max count if self.used_movement_patterns: ex_patterns = getattr(ex, 'movement_patterns', '') or '' if ex_patterns: max_pat_count = max( (self.used_movement_patterns.get(p.strip().lower(), 0) for p in ex_patterns.split(',') if p.strip()), default=0, ) if max_pat_count >= 3: w = 1 elif max_pat_count >= 2: w = max(1, w - 1) w = _apply_week_penalty(ex, w) w = _apply_modality_penalty(ex, w) w = _tier_boost(ex, w) pool.extend([ex] * w) for ex in other_list: w = weight_other if ex.pk in _effective_recently_used: w = 1 # Already 1 but keep explicit w = _apply_week_penalty(ex, w) w = _apply_modality_penalty(ex, w) w = _tier_boost(ex, w) pool.extend([ex] * w) if not pool: return [] selected = [] selected_ids = set() selected_names = set() # Intra-superset family tracking selected_family_groups = set() # group names used in this superset selected_families = set() # exact families used in this superset selected_family_counts = Counter() # exact family counts in this superset selected_profiles = [] # Shuffle to break any ordering bias random.shuffle(pool) attempts = 0 max_attempts = len(pool) * 3 # avoid infinite loop on tiny pools while len(selected) < count and attempts < max_attempts: candidate = random.choice(pool) candidate_name = (candidate.name or '').lower().strip() if candidate.pk in selected_ids or candidate_name in selected_names: attempts += 1 continue # --- Movement family blocking --- candidate_families = extract_movement_families(candidate.name) blocked = False for fam in candidate_families: # Cross-workout: check family count limit historical_count = self.used_movement_families.get(fam, 0) in_superset_count = selected_family_counts.get(fam, 0) if historical_count + in_superset_count >= self._get_family_limit(fam): blocked = True break # Intra-superset: avoid exact family duplicates entirely. if fam in selected_families: blocked = True break # Intra-superset: check family group overlap group = _FAMILY_TO_GROUP.get(fam) if group and group in selected_family_groups: blocked = True break if blocked: attempts += 1 continue if similarity_scope == 'working': candidate_profile = self._build_similarity_profile(candidate) if self._is_similarity_blocked(candidate_profile, selected_profiles): attempts += 1 continue selected.append(candidate) selected_ids.add(candidate.pk) selected_names.add(candidate_name) if similarity_scope == 'working': selected_profiles.append(candidate_profile) # Track family groups for intra-superset blocking for fam in candidate_families: selected_families.add(fam) selected_family_counts[fam] += 1 group = _FAMILY_TO_GROUP.get(fam) if group: selected_family_groups.add(group) attempts += 1 return selected @staticmethod def _tokenize_text(value): """Tokenize free text into normalized, low-noise tokens.""" if not value: return set() tokens = set(re.findall(r"[a-z0-9]+", value.lower())) stop_words = { 'and', 'or', 'the', 'with', 'to', 'a', 'an', 'of', 'single', 'arm', 'double', 'alternating', 'barbell', 'dumbbell', 'kettlebell', 'machine', 'cable', 'bodyweight', } return {tok for tok in tokens if tok not in stop_words and len(tok) > 1} @staticmethod def _tokenize_csv(value): """Tokenize comma-separated categorical fields.""" if not value: return set() return {part.strip().lower() for part in value.split(',') if part and part.strip()} def _build_similarity_profile(self, ex): """Create a cached token profile used by similarity scoring.""" cached = self._exercise_profile_cache.get(ex.pk) if cached is not None: return cached profile = { 'id': ex.pk, 'movement': self._tokenize_csv(getattr(ex, 'movement_patterns', '') or ''), 'muscles': self._tokenize_csv(getattr(ex, 'muscle_groups', '') or ''), 'equipment': self._tokenize_csv(getattr(ex, 'equipment_required', '') or ''), 'name_tokens': self._tokenize_text(getattr(ex, 'name', '') or ''), } self._exercise_profile_cache[ex.pk] = profile return profile @staticmethod def _jaccard_similarity(left, right): """Jaccard similarity between token sets.""" if not left and not right: return 0.0 union = left | right if not union: return 0.0 return len(left & right) / len(union) def _exercise_similarity_score(self, candidate_profile, existing_profile): """Weighted similarity score in [0,1].""" movement = self._jaccard_similarity( candidate_profile['movement'], existing_profile['movement'] ) muscles = self._jaccard_similarity( candidate_profile['muscles'], existing_profile['muscles'] ) equipment = self._jaccard_similarity( candidate_profile['equipment'], existing_profile['equipment'] ) name = self._jaccard_similarity( candidate_profile['name_tokens'], existing_profile['name_tokens'] ) return ( (0.45 * movement) + (0.35 * muscles) + (0.10 * equipment) + (0.10 * name) ) def _is_similarity_blocked(self, candidate_profile, selected_profiles): """Block near-duplicate exercises within the workout and adjacent sets.""" for existing_profile in self.used_working_similarity_profiles: if ( self._exercise_similarity_score(candidate_profile, existing_profile) >= self.SIMILARITY_HARD_THRESHOLD ): return True for existing_profile in selected_profiles: if ( self._exercise_similarity_score(candidate_profile, existing_profile) >= self.SIMILARITY_HARD_THRESHOLD ): return True for existing_profile in self.last_working_similarity_profiles: if ( self._exercise_similarity_score(candidate_profile, existing_profile) >= self.SIMILARITY_SOFT_THRESHOLD ): return True for existing_profile in selected_profiles: if ( self._exercise_similarity_score(candidate_profile, existing_profile) >= self.SIMILARITY_SOFT_THRESHOLD ): return True return False def _pair_sided_exercises(self, selected, base_qs): """ For exercises with a ``side`` value (e.g. 'Left', 'Right'), try to include the matching opposite-side exercise in the selection. This swaps out a non-sided exercise to keep the count stable, or simply appends if the list is short. """ paired = list(selected) paired_ids = {e.pk for e in paired} exercises_to_add = [] for ex in list(paired): if ex.side and ex.side.strip(): side_norm = self._normalize_side_value(ex.side) opposite_norm = self._opposite_side(side_norm) if not opposite_norm: continue # Find the matching partner by exact base-name match and opposite side. # Typically the name is identical except for side, e.g. # "Single Arm Row Left" / "Single Arm Row Right" base_name = self._strip_side_tokens(ex.name) # Use strict matching: find candidates with opposite side, # then filter in Python by exact base-name match to avoid # substring false positives (e.g. "L Sit" matching "Wall Sit"). partner_candidates = ( Exercise.objects .filter(self._side_values_q(opposite_norm)) .exclude(pk__in=self.used_exercise_ids) .exclude(pk__in=paired_ids) ) partner = None for candidate in partner_candidates: candidate_base = self._strip_side_tokens(candidate.name) if base_name.lower() == candidate_base.lower(): partner = candidate break if partner and partner.pk not in paired_ids: exercises_to_add.append(partner) paired_ids.add(partner.pk) # Insert partners right after their matching exercise final = [] added_ids = set() for ex in paired: final.append(ex) added_ids.add(ex.pk) # Check if any partner should follow this exercise for partner in exercises_to_add: if partner.pk not in added_ids: # Check if partner is the pair for this exercise using exact base-name match if ex.side and ex.side.strip(): ex_base = self._strip_side_tokens(ex.name) partner_base = self._strip_side_tokens(partner.name) if ex_base.lower() == partner_base.lower(): final.append(partner) added_ids.add(partner.pk) # Add any remaining partners that didn't get inserted for partner in exercises_to_add: if partner.pk not in added_ids: final.append(partner) added_ids.add(partner.pk) return final def _trim_preserving_pairs(self, selected, count): """ Trim selected exercises to count, but never split a Left/Right pair. If keeping a Left exercise, always keep its Right partner (and vice versa). """ if len(selected) <= count: return selected # Identify paired indices paired_indices = set() for i, ex in enumerate(selected): if self._normalize_side_value(getattr(ex, 'side', '')): # Find its partner in the list base_name = ex.name for side_word in ['Left', 'Right', 'left', 'right']: base_name = base_name.replace(side_word, '').strip() for j, other in enumerate(selected): if i != j and self._normalize_side_value(getattr(other, 'side', '')): other_base = other.name for side_word in ['Left', 'Right', 'left', 'right']: other_base = other_base.replace(side_word, '').strip() if base_name.lower() == other_base.lower(): paired_indices.add(i) paired_indices.add(j) result = [] for i, ex in enumerate(selected): if len(result) >= count and i not in paired_indices: continue # If this is part of a pair, include it even if over count if i in paired_indices or len(result) < count: result.append(ex) # If keeping pairs pushed us over count, remove non-paired exercises # from the end to compensate if len(result) > count + 1: excess = len(result) - count trimmed = [] removed = 0 # Build paired set for result indices result_paired = set() for i, ex in enumerate(result): if self._normalize_side_value(getattr(ex, 'side', '')): base_name = ex.name for side_word in ['Left', 'Right', 'left', 'right']: base_name = base_name.replace(side_word, '').strip() for j, other in enumerate(result): if i != j and self._normalize_side_value(getattr(other, 'side', '')): other_base = other.name for side_word in ['Left', 'Right', 'left', 'right']: other_base = other_base.replace(side_word, '').strip() if base_name.lower() == other_base.lower(): result_paired.add(i) result_paired.add(j) for i in range(len(result) - 1, -1, -1): if removed >= excess: break if i not in result_paired: result.pop(i) removed += 1 return result def _order_side_pairs_adjacent(self, selected): """ Keep left/right variants adjacent in list order. This is primarily for warm-up/cool-down UX so side-specific movements render one after another instead of grouped by side. """ if len(selected) < 2: return selected side_map = {} for ex in selected: side_val = self._normalize_side_value(getattr(ex, 'side', '')) if side_val not in ('left', 'right'): continue key = self._strip_side_tokens(getattr(ex, 'name', '')) side_map.setdefault(key, {'left': [], 'right': []}) side_map[key][side_val].append(ex) ordered = [] used_ids = set() for ex in selected: if ex.pk in used_ids: continue side_val = self._normalize_side_value(getattr(ex, 'side', '')) if side_val in ('left', 'right'): key = self._strip_side_tokens(getattr(ex, 'name', '')) opposite = self._opposite_side(side_val) opposite_ex = None for candidate in side_map.get(key, {}).get(opposite, []): if candidate.pk not in used_ids: opposite_ex = candidate break if opposite_ex: ordered.append(ex) ordered.append(opposite_ex) used_ids.add(ex.pk) used_ids.add(opposite_ex.pk) continue ordered.append(ex) used_ids.add(ex.pk) for ex in selected: if ex.pk not in used_ids: ordered.append(ex) used_ids.add(ex.pk) return ordered def _strip_side_tokens(self, name): """Normalize a name by removing left/right tokens.""" base = name or '' for side_word in [ 'Left', 'Right', 'left', 'right', 'left arm', 'right arm', 'left leg', 'right leg', 'left side', 'right side', ]: base = base.replace(side_word, '').strip() return base.lower() @staticmethod def _normalize_side_value(side): """Map DB side values to canonical left/right tokens.""" value = (side or '').strip().lower() if value in _LEFT_SIDE_VALUES: return 'left' if value in _RIGHT_SIDE_VALUES: return 'right' return None @staticmethod def _opposite_side(side_norm): """Return opposite canonical side for left/right.""" if side_norm == 'left': return 'right' if side_norm == 'right': return 'left' return None @staticmethod def _side_values_q(side_norm): """Build a queryset filter matching any DB side token for a canonical side.""" q = Q() values = _LEFT_SIDE_VALUES if side_norm == 'left' else _RIGHT_SIDE_VALUES for side_value in values: q |= Q(side__iexact=side_value) return q def _drop_unpaired_sided_exercises(self, selected): """Drop any left/right exercise that does not have its opposite side.""" side_groups = {} for ex in selected: side_val = self._normalize_side_value(getattr(ex, 'side', '')) if side_val not in ('left', 'right'): continue key = self._strip_side_tokens(getattr(ex, 'name', '')) side_groups.setdefault(key, {'left': [], 'right': []}) side_groups[key][side_val].append(ex.pk) allowed_ids = set() for key, sides in side_groups.items(): if sides['left'] and sides['right']: allowed_ids.update(sides['left']) allowed_ids.update(sides['right']) filtered = [] removed_count = 0 for ex in selected: side_val = self._normalize_side_value(getattr(ex, 'side', '')) if side_val in ('left', 'right') and ex.pk not in allowed_ids: removed_count += 1 continue filtered.append(ex) if removed_count: self.warnings.append( f'Removed {removed_count} unpaired side-specific exercises ' f'to enforce left/right pairing.' ) return filtered def _find_missing_side_partner(self, ex, base_qs, existing_ids): """ Try hard to find opposite-side partner for a sided exercise. Search order: 1) base_qs with strict name-base match 2) global Exercise table with strict name-base match 3) base_qs with relaxed icontains name-base match 4) global Exercise table with relaxed icontains name-base match """ side_norm = self._normalize_side_value(getattr(ex, 'side', '')) opposite_norm = self._opposite_side(side_norm) if not opposite_norm: return None base_name = self._strip_side_tokens(getattr(ex, 'name', '')) if not base_name: return None def _pick_from_queryset(qs, strict=True): candidates = qs.filter(self._side_values_q(opposite_norm)) if strict: candidates = [ c for c in candidates if self._strip_side_tokens(getattr(c, 'name', '')) == base_name ] return candidates[0] if candidates else None return candidates.filter(name__icontains=base_name).first() common_exclusions = Q(pk__in=existing_ids) # Prefer unused exercise ids, but do not hard-fail pairing if only used counterpart exists. preferred_exclusions = common_exclusions | Q(pk__in=self.used_exercise_ids) base_preferred = base_qs.exclude(preferred_exclusions) partner = _pick_from_queryset(base_preferred, strict=True) if partner: return partner global_preferred = Exercise.objects.exclude(preferred_exclusions) partner = _pick_from_queryset(global_preferred, strict=True) if partner: return partner # Relaxed pass still avoiding duplicates in the current selection. base_relaxed = base_qs.exclude(common_exclusions) partner = _pick_from_queryset(base_relaxed, strict=False) if partner: return partner global_relaxed = Exercise.objects.exclude(common_exclusions) return _pick_from_queryset(global_relaxed, strict=False) def _ensure_side_pair_integrity( self, selected, base_qs, count, similarity_scope=None, superset_position=None, ): """ Enforce strict left/right pairing: - First attempt to add missing opposite-side partners - Remove orphan left/right exercises only as a last resort - Backfill with non-sided exercises when possible """ balanced = list(selected) existing_ids = {ex.pk for ex in balanced} added_partners = 0 for ex in list(balanced): side_val = self._normalize_side_value(getattr(ex, 'side', '')) if side_val not in ('left', 'right'): continue key = self._strip_side_tokens(getattr(ex, 'name', '')) has_left = any( self._normalize_side_value(getattr(other, 'side', '')) == 'left' and self._strip_side_tokens(getattr(other, 'name', '')) == key for other in balanced ) has_right = any( self._normalize_side_value(getattr(other, 'side', '')) == 'right' and self._strip_side_tokens(getattr(other, 'name', '')) == key for other in balanced ) if has_left and has_right: continue partner = self._find_missing_side_partner(ex, base_qs, existing_ids) if partner and partner.pk not in existing_ids: balanced.append(partner) existing_ids.add(partner.pk) added_partners += 1 if added_partners: # Keep sided pairs by preferentially removing non-sided fillers. while len(balanced) > count: remove_idx = None for idx in range(len(balanced) - 1, -1, -1): if self._normalize_side_value(getattr(balanced[idx], 'side', '')) not in ('left', 'right'): remove_idx = idx break if remove_idx is None: break balanced.pop(remove_idx) self.warnings.append( f'Added {added_partners} missing opposite-side exercise partners.' ) balanced = self._drop_unpaired_sided_exercises(balanced) if len(balanced) < count: deficit = count - len(balanced) existing_ids = {ex.pk for ex in balanced} filler_qs = ( base_qs.exclude(pk__in=existing_ids) .filter(Q(side__isnull=True) | Q(side='')) ) extras = self._weighted_pick( filler_qs, Exercise.objects.none(), deficit, superset_position=superset_position, similarity_scope=similarity_scope, ) balanced.extend(extras) return balanced def balance_stretch_positions(self, selected, muscle_groups=None, fitness_level=None): """ Improve stretch position variety for hypertrophy workouts. Ensures exercises within a superset cover multiple stretch positions (lengthened, mid, shortened) for more complete muscle stimulus. Swaps the last non-primary exercise if all exercises share the same stretch position. Prefers 'lengthened' replacements (greater mechanical tension at long muscle lengths = stronger hypertrophy stimulus). """ if len(selected) < 3: return selected position_counts = {} for ex in selected: pos = getattr(ex, 'stretch_position', None) if pos: position_counts[pos] = position_counts.get(pos, 0) + 1 # Check if variety is sufficient (no single position >= 75%) if len(position_counts) >= 2: total_with_pos = sum(position_counts.values()) max_count = max(position_counts.values()) if total_with_pos > 0 and max_count / total_with_pos < 0.75: return selected # Good variety, no dominant position dominant_position = max(position_counts, key=position_counts.get) if position_counts else None if not dominant_position: return selected # No stretch data available # Find a replacement with a different stretch position desired_positions = {'lengthened', 'mid', 'shortened'} - {dominant_position} position_q = Q() for pos in desired_positions: position_q |= Q(stretch_position=pos) replacement_qs = self._get_filtered_queryset( muscle_groups=muscle_groups, fitness_level=fitness_level, ).filter(position_q).exclude(pk__in={e.pk for e in selected}) replacements = list(replacement_qs[:5]) if not replacements: return selected # Prefer 'lengthened' for hypertrophy (greater mechanical tension) lengthened = [r for r in replacements if r.stretch_position == 'lengthened'] pick = lengthened[0] if lengthened else replacements[0] # Swap the last non-primary exercise for i in range(len(selected) - 1, -1, -1): if getattr(selected[i], 'exercise_tier', None) != 'primary': old = selected[i] selected[i] = pick self.used_exercise_ids.discard(old.pk) self.used_exercise_ids.add(pick.pk) break return selected