1530 lines
64 KiB
Python
1530 lines
64 KiB
Python
import random
|
|
import logging
|
|
import re
|
|
from collections import Counter
|
|
|
|
from django.db.models import Q, Count
|
|
|
|
from exercise.models import Exercise
|
|
from muscle.models import Muscle, ExerciseMuscle
|
|
from equipment.models import Equipment, WorkoutEquipment
|
|
from generator.services.muscle_normalizer import (
|
|
normalize_muscle_name,
|
|
get_muscles_for_exercise,
|
|
classify_split_type,
|
|
MUSCLE_GROUP_CATEGORIES,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Movement family deduplication constants
|
|
# ---------------------------------------------------------------------------
|
|
# Ordered (keyword, family_tag) pairs — longer/more-specific keywords first
|
|
# so that "hang clean" matches before generic "clean".
|
|
MOVEMENT_FAMILY_KEYWORDS = [
|
|
# Olympic — specific before general
|
|
('clean and jerk', 'clean_and_jerk'), ('hang clean', 'clean'),
|
|
('clean pull', 'clean'), ('high pull', 'clean'),
|
|
('power clean', 'clean'), ('clean', 'clean'),
|
|
('snatch', 'snatch'),
|
|
# Vertical pull
|
|
('chin-up', 'chin_up'), ('chin up', 'chin_up'),
|
|
('pull-up', 'pull_up'), ('pull up', 'pull_up'),
|
|
('lat pulldown', 'lat_pulldown'), ('pulldown', 'lat_pulldown'),
|
|
# Horizontal press
|
|
('bench press', 'bench_press'), ('chest press', 'bench_press'),
|
|
('push-up', 'push_up'), ('push up', 'push_up'),
|
|
# Overhead press
|
|
('overhead press', 'overhead_press'), ('shoulder press', 'overhead_press'),
|
|
('military press', 'overhead_press'), ('push press', 'push_press'),
|
|
# Lower body
|
|
('squat', 'squat'), ('deadlift', 'deadlift'),
|
|
('hip thrust', 'hip_thrust'),
|
|
('lunge', 'lunge'), ('split squat', 'lunge'),
|
|
('step up', 'step_up'), ('step-up', 'step_up'),
|
|
# Row
|
|
('row', 'row'),
|
|
# Arms
|
|
('bicep curl', 'bicep_curl'), ('hammer curl', 'bicep_curl'), ('curl', 'bicep_curl'),
|
|
('tricep extension', 'tricep_extension'), ('skull crusher', 'tricep_extension'),
|
|
# Shoulders
|
|
('lateral raise', 'lateral_raise'), ('front raise', 'front_raise'),
|
|
('rear delt', 'rear_delt'), ('face pull', 'face_pull'), ('shrug', 'shrug'),
|
|
# Other
|
|
('carry', 'carry'), ('farmer', 'carry'), ('dip', 'dip'),
|
|
('burpee', 'burpee'), ('thruster', 'thruster'),
|
|
('turkish', 'turkish_getup'),
|
|
]
|
|
|
|
# Super-families: families that are too similar for the same superset
|
|
FAMILY_GROUPS = {
|
|
'vertical_pull': {'pull_up', 'chin_up', 'lat_pulldown'},
|
|
'olympic_pull': {'clean', 'snatch', 'clean_and_jerk'},
|
|
'horizontal_press': {'bench_press', 'push_up'},
|
|
}
|
|
|
|
# Narrow families — max 1 per entire workout
|
|
NARROW_FAMILIES = {
|
|
'clean', 'snatch', 'clean_and_jerk', 'push_press',
|
|
'thruster', 'turkish_getup', 'burpee',
|
|
}
|
|
# Everything else defaults to max 2 per workout
|
|
|
|
# Precomputed reverse map: family -> group name
|
|
_FAMILY_TO_GROUP = {}
|
|
for _group, _members in FAMILY_GROUPS.items():
|
|
for _member in _members:
|
|
_FAMILY_TO_GROUP[_member] = _group
|
|
|
|
_LEFT_SIDE_VALUES = {'left', 'left_arm', 'left_leg', 'left_side'}
|
|
_RIGHT_SIDE_VALUES = {'right', 'right_arm', 'right_leg', 'right_side'}
|
|
|
|
|
|
def extract_movement_families(exercise_name):
|
|
"""Extract movement family tags from an exercise name.
|
|
|
|
Returns a set of family strings. Uses longest-match-first to avoid
|
|
partial overlaps (e.g. "hang clean" matches before "clean").
|
|
"""
|
|
if not exercise_name:
|
|
return set()
|
|
name_lower = exercise_name.lower().strip()
|
|
families = set()
|
|
matched_spans = []
|
|
for keyword, family in MOVEMENT_FAMILY_KEYWORDS:
|
|
idx = name_lower.find(keyword)
|
|
if idx >= 0:
|
|
span = (idx, idx + len(keyword))
|
|
# Skip if this span overlaps an already-matched span
|
|
overlaps = any(
|
|
not (span[1] <= ms[0] or span[0] >= ms[1])
|
|
for ms in matched_spans
|
|
)
|
|
if not overlaps:
|
|
families.add(family)
|
|
matched_spans.append(span)
|
|
return families
|
|
|
|
|
|
class ExerciseSelector:
|
|
"""
|
|
Smart exercise selection service that picks exercises based on user
|
|
preferences, available equipment, target muscle groups, and variety.
|
|
"""
|
|
|
|
# Bodyweight equipment names to fall back to when equipment-filtered
|
|
# results are too sparse.
|
|
BODYWEIGHT_KEYWORDS = ['bodyweight', 'body weight', 'none', 'no equipment']
|
|
|
|
# Movement patterns considered too complex for beginners
|
|
ADVANCED_PATTERNS = ['olympic', 'plyometric']
|
|
|
|
# Movement patterns considered appropriate for warm-up / cool-down
|
|
WARMUP_PATTERNS = [
|
|
'dynamic stretch', 'mobility - dynamic', 'activation', 'warm up',
|
|
'warmup', 'cardio/locomotion', 'balance',
|
|
]
|
|
COOLDOWN_PATTERNS = [
|
|
'static stretch', 'stretch', 'cool down', 'cooldown',
|
|
'mobility', 'foam roll', 'yoga',
|
|
]
|
|
|
|
# Movement patterns explicitly forbidden in cooldowns
|
|
COOLDOWN_EXCLUDED_PATTERNS = [
|
|
'plyometric', 'combat', 'cardio/locomotion', 'olympic',
|
|
]
|
|
# Warm-up must avoid working-set patterns.
|
|
WARMUP_EXCLUDED_PATTERNS = [
|
|
'upper push', 'upper pull', 'olympic', 'combat', 'arms',
|
|
]
|
|
# Similarity thresholds to prevent near-duplicate selections.
|
|
SIMILARITY_HARD_THRESHOLD = 0.80
|
|
SIMILARITY_SOFT_THRESHOLD = 0.65
|
|
# Recovery/stretch movements should not appear in working sets.
|
|
WORKING_EXCLUDED_PATTERNS = [
|
|
'mobility - static', 'static stretch', 'cool down', 'cooldown',
|
|
'yoga', 'breathing', 'massage',
|
|
]
|
|
|
|
def __init__(self, user_preference, recently_used_ids=None, hard_exclude_ids=None):
|
|
self.user_preference = user_preference
|
|
self.used_exercise_ids = set() # tracks within a single workout
|
|
self.used_exercise_names = set() # tracks names for cross-superset dedup
|
|
self.recently_used_ids = recently_used_ids or set()
|
|
self.hard_exclude_ids = hard_exclude_ids or set() # Phase 6: hard exclude recent exercises
|
|
self.used_movement_patterns = Counter() # Phase 11: track patterns for variety
|
|
self.used_movement_families = Counter() # Movement family dedup across workout
|
|
self.used_working_similarity_profiles = []
|
|
self.last_working_similarity_profiles = []
|
|
self._exercise_profile_cache = {}
|
|
self.warnings = [] # Phase 13: generation warnings
|
|
self.progression_boost_ids = set() # IDs of exercises that are progressions of recently done ones
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public API
|
|
# ------------------------------------------------------------------
|
|
|
|
def reset(self):
|
|
"""Reset used exercises for a new workout."""
|
|
self.used_exercise_ids = set()
|
|
self.used_exercise_names = set()
|
|
self.used_movement_patterns = Counter()
|
|
self.used_movement_families = Counter()
|
|
self.used_working_similarity_profiles = []
|
|
self.last_working_similarity_profiles = []
|
|
self._exercise_profile_cache = {}
|
|
self.warnings = []
|
|
|
|
def select_exercises(
|
|
self,
|
|
muscle_groups,
|
|
count,
|
|
is_duration_based=False,
|
|
movement_pattern_preference=None,
|
|
prefer_weighted=False,
|
|
superset_position=None,
|
|
):
|
|
"""
|
|
Select *count* exercises matching the given criteria.
|
|
|
|
Parameters
|
|
----------
|
|
muscle_groups : list[str]
|
|
Canonical muscle group names (e.g. ['chest', 'triceps']).
|
|
count : int
|
|
How many exercises to return.
|
|
is_duration_based : bool
|
|
When True, prefer exercises whose ``is_duration`` flag is set.
|
|
movement_pattern_preference : list[str] | None
|
|
Optional list of preferred movement patterns to favour.
|
|
prefer_weighted : bool
|
|
When True (R6), boost is_weight=True exercises in selection.
|
|
|
|
Returns
|
|
-------
|
|
list[Exercise]
|
|
"""
|
|
if count <= 0:
|
|
return []
|
|
|
|
fitness_level = getattr(self.user_preference, 'fitness_level', None)
|
|
qs = self._get_filtered_queryset(
|
|
muscle_groups=muscle_groups,
|
|
is_duration_based=is_duration_based,
|
|
fitness_level=fitness_level,
|
|
)
|
|
# Working supersets should not contain stretch/recovery exercises.
|
|
excluded_q = Q(name__icontains='stretch')
|
|
for pat in self.WORKING_EXCLUDED_PATTERNS:
|
|
excluded_q |= Q(movement_patterns__icontains=pat)
|
|
qs = qs.exclude(excluded_q)
|
|
# Guard against low-quality rows causing misclassification/selection drift.
|
|
qs = qs.exclude(Q(movement_patterns__isnull=True) | Q(movement_patterns=''))
|
|
qs = qs.exclude(Q(muscle_groups__isnull=True) | Q(muscle_groups=''))
|
|
|
|
# For advanced/elite, boost compound movements
|
|
if fitness_level and fitness_level >= 3 and not movement_pattern_preference:
|
|
compound_qs = qs.filter(is_compound=True)
|
|
if compound_qs.exists():
|
|
preferred_qs = compound_qs
|
|
other_qs = qs.exclude(pk__in=compound_qs.values_list('pk', flat=True))
|
|
else:
|
|
preferred_qs = qs.none()
|
|
other_qs = qs
|
|
elif movement_pattern_preference:
|
|
# Optionally boost exercises whose movement_patterns match a preference
|
|
pattern_q = Q()
|
|
for pat in movement_pattern_preference:
|
|
pattern_q |= Q(movement_patterns__icontains=pat)
|
|
preferred_qs = qs.filter(pattern_q)
|
|
other_qs = qs.exclude(pk__in=preferred_qs.values_list('pk', flat=True))
|
|
else:
|
|
preferred_qs = qs.none()
|
|
other_qs = qs
|
|
|
|
# R6: For strength workouts, boost is_weight=True exercises
|
|
if prefer_weighted:
|
|
weighted_qs = qs.filter(is_weight=True)
|
|
if weighted_qs.exists():
|
|
# Merge weighted exercises into preferred pool
|
|
combined_preferred_ids = set(preferred_qs.values_list('pk', flat=True)) | set(weighted_qs.values_list('pk', flat=True))
|
|
preferred_qs = qs.filter(pk__in=combined_preferred_ids)
|
|
other_qs = qs.exclude(pk__in=combined_preferred_ids)
|
|
|
|
selected = self._weighted_pick(
|
|
preferred_qs,
|
|
other_qs,
|
|
count,
|
|
superset_position=superset_position,
|
|
similarity_scope='working',
|
|
)
|
|
|
|
# Sort selected exercises by tier: primary first, then secondary, then accessory
|
|
TIER_ORDER = {'primary': 0, 'secondary': 1, 'accessory': 2, None: 2}
|
|
selected.sort(key=lambda ex: TIER_ORDER.get(ex.exercise_tier, 2))
|
|
|
|
# Ensure target muscle groups have coverage
|
|
if muscle_groups and selected:
|
|
from muscle.models import ExerciseMuscle
|
|
# Batch-load muscles for all selected exercises (avoid N+1)
|
|
selected_ids = {ex.pk for ex in selected}
|
|
ex_muscle_rows = ExerciseMuscle.objects.filter(
|
|
exercise_id__in=selected_ids
|
|
).values_list('exercise_id', 'muscle__name')
|
|
from collections import defaultdict
|
|
ex_muscle_map = defaultdict(set)
|
|
for ex_id, muscle_name in ex_muscle_rows:
|
|
ex_muscle_map[ex_id].add(normalize_muscle_name(muscle_name))
|
|
covered_muscles = set()
|
|
for ex in selected:
|
|
covered_muscles.update(ex_muscle_map.get(ex.pk, set()))
|
|
|
|
normalized_targets = {normalize_muscle_name(mg) for mg in muscle_groups}
|
|
uncovered = normalized_targets - covered_muscles
|
|
if uncovered and len(selected) > 1:
|
|
# Track swapped indices to avoid overwriting previous swaps
|
|
swapped_indices = set()
|
|
for missing_muscle in uncovered:
|
|
replacement_qs = self._get_filtered_queryset(
|
|
muscle_groups=[missing_muscle],
|
|
is_duration_based=is_duration_based,
|
|
fitness_level=fitness_level,
|
|
).exclude(pk__in={e.pk for e in selected})
|
|
# Validate modality: ensure replacement matches expected modality
|
|
if is_duration_based:
|
|
replacement_qs = replacement_qs.filter(is_duration=True)
|
|
elif is_duration_based is False:
|
|
replacement_qs = replacement_qs.filter(is_reps=True)
|
|
replacement = list(replacement_qs[:1])
|
|
if replacement:
|
|
# Find last unswapped accessory
|
|
swap_idx = None
|
|
for i in range(len(selected) - 1, -1, -1):
|
|
if i in swapped_indices:
|
|
continue
|
|
if getattr(selected[i], 'exercise_tier', None) == 'accessory':
|
|
swap_idx = i
|
|
break
|
|
# Fallback: any unswapped non-primary
|
|
if swap_idx is None:
|
|
for i in range(len(selected) - 1, -1, -1):
|
|
if i in swapped_indices:
|
|
continue
|
|
if getattr(selected[i], 'exercise_tier', None) != 'primary':
|
|
swap_idx = i
|
|
break
|
|
if swap_idx is not None:
|
|
selected[swap_idx] = replacement[0]
|
|
swapped_indices.add(swap_idx)
|
|
|
|
# If we couldn't get enough with equipment filters, widen to bodyweight
|
|
if len(selected) < count:
|
|
fallback_qs = self._get_bodyweight_queryset(
|
|
muscle_groups=muscle_groups,
|
|
is_duration_based=is_duration_based,
|
|
fitness_level=fitness_level,
|
|
)
|
|
fallback_qs = fallback_qs.exclude(excluded_q)
|
|
fallback_qs = fallback_qs.exclude(Q(movement_patterns__isnull=True) | Q(movement_patterns=''))
|
|
fallback_qs = fallback_qs.exclude(Q(muscle_groups__isnull=True) | Q(muscle_groups=''))
|
|
still_needed = count - len(selected)
|
|
already_ids = {e.pk for e in selected}
|
|
fallback_qs = fallback_qs.exclude(pk__in=already_ids)
|
|
mg_label = ', '.join(muscle_groups[:3]) if muscle_groups else 'target muscles'
|
|
extras = self._weighted_pick(
|
|
fallback_qs,
|
|
Exercise.objects.none(),
|
|
still_needed,
|
|
similarity_scope='working',
|
|
)
|
|
if extras:
|
|
self.warnings.append(
|
|
f'Used bodyweight fallback for {mg_label} '
|
|
f'({len(extras)} exercises) due to limited equipment matches.'
|
|
)
|
|
selected.extend(extras)
|
|
if len(selected) < count:
|
|
self.warnings.append(
|
|
f'Could only find {len(selected)}/{count} exercises '
|
|
f'for {mg_label}.'
|
|
)
|
|
|
|
# Handle side-specific pairing: if an exercise has a side value,
|
|
# look for the matching opposite-side exercise so they appear together.
|
|
selected = self._pair_sided_exercises(selected, qs)
|
|
selected = self._ensure_side_pair_integrity(
|
|
selected,
|
|
qs,
|
|
count=count,
|
|
similarity_scope='working',
|
|
superset_position=superset_position,
|
|
)
|
|
|
|
# Mark everything we just selected as used and track patterns
|
|
for ex in selected:
|
|
self.used_exercise_ids.add(ex.pk)
|
|
self.used_exercise_names.add((ex.name or '').lower().strip())
|
|
patterns = getattr(ex, 'movement_patterns', '') or ''
|
|
if patterns:
|
|
for pat in [p.strip().lower() for p in patterns.split(',') if p.strip()]:
|
|
self.used_movement_patterns[pat] += 1
|
|
self._track_families(selected)
|
|
self._track_similarity_profiles(selected, scope='working')
|
|
|
|
return self._trim_preserving_pairs(selected, count)
|
|
|
|
def select_warmup_exercises(self, target_muscles, count=5):
|
|
"""Select duration-based exercises suitable for warm-up."""
|
|
fitness_level = getattr(self.user_preference, 'fitness_level', None)
|
|
qs = self._get_filtered_queryset(
|
|
muscle_groups=target_muscles,
|
|
is_duration_based=True,
|
|
fitness_level=fitness_level,
|
|
)
|
|
# Avoid duplicate-looking left/right variants in recovery sections.
|
|
qs = qs.filter(Q(side__isnull=True) | Q(side=''))
|
|
|
|
# Prefer exercises whose movement_patterns overlap with warmup keywords
|
|
warmup_q = Q()
|
|
for kw in self.WARMUP_PATTERNS:
|
|
warmup_q |= Q(movement_patterns__icontains=kw)
|
|
|
|
# Warm-up should be dynamic movement prep, not loaded working sets.
|
|
qs = qs.exclude(is_weight=True)
|
|
# Exclude heavy compounds (no barbell squats in warmup)
|
|
qs = qs.exclude(is_compound=True)
|
|
# Exclude primary-tier exercises (no primary lifts in warmup)
|
|
qs = qs.exclude(exercise_tier='primary')
|
|
# Exclude technically complex movements
|
|
qs = qs.exclude(complexity_rating__gte=4)
|
|
# Exclude common working-set movement families from warmup.
|
|
warmup_exclude_q = Q()
|
|
for pat in self.WARMUP_EXCLUDED_PATTERNS:
|
|
warmup_exclude_q |= Q(movement_patterns__icontains=pat)
|
|
qs = qs.exclude(warmup_exclude_q)
|
|
|
|
# Tightened HR filter for warmup (1-4 instead of 2-5)
|
|
hr_warmup_q = Q(hr_elevation_rating__gte=1, hr_elevation_rating__lte=4)
|
|
preferred = qs.filter(warmup_q).filter(
|
|
hr_warmup_q | Q(hr_elevation_rating__isnull=True)
|
|
)
|
|
# STRICT: warmup must come from warmup-pattern exercises only.
|
|
selected = self._weighted_pick(preferred, preferred.none(), count)
|
|
|
|
# Fallback: if not enough duration-based warmup exercises, widen to
|
|
# any duration exercise regardless of muscle group
|
|
if len(selected) < count:
|
|
wide_qs = self._get_filtered_queryset(
|
|
muscle_groups=None,
|
|
is_duration_based=True,
|
|
fitness_level=fitness_level,
|
|
).exclude(pk__in={e.pk for e in selected})
|
|
wide_qs = wide_qs.filter(Q(side__isnull=True) | Q(side=''))
|
|
# Apply same warmup safety exclusions
|
|
wide_qs = wide_qs.exclude(is_weight=True)
|
|
wide_qs = wide_qs.exclude(is_compound=True)
|
|
wide_qs = wide_qs.exclude(exercise_tier='primary')
|
|
wide_qs = wide_qs.exclude(complexity_rating__gte=4)
|
|
wide_qs = wide_qs.exclude(warmup_exclude_q)
|
|
wide_preferred = wide_qs.filter(warmup_q).filter(
|
|
hr_warmup_q | Q(hr_elevation_rating__isnull=True)
|
|
)
|
|
selected.extend(
|
|
self._weighted_pick(wide_preferred, wide_preferred.none(), count - len(selected))
|
|
)
|
|
|
|
for ex in selected:
|
|
self.used_exercise_ids.add(ex.pk)
|
|
self.used_exercise_names.add((ex.name or '').lower().strip())
|
|
self._track_families(selected)
|
|
|
|
selected = self._ensure_side_pair_integrity(selected, qs, count=count)
|
|
return self._trim_preserving_pairs(selected, count)
|
|
|
|
def select_cooldown_exercises(self, target_muscles, count=4):
|
|
"""
|
|
Select duration-based exercises suitable for cool-down.
|
|
|
|
R11: Excludes is_weight=True exercises that don't match cooldown
|
|
movement patterns (stretch/mobility only).
|
|
Also enforces low HR elevation (<=3) for proper cool-down.
|
|
"""
|
|
fitness_level = getattr(self.user_preference, 'fitness_level', None)
|
|
qs = self._get_filtered_queryset(
|
|
muscle_groups=target_muscles,
|
|
is_duration_based=True,
|
|
fitness_level=fitness_level,
|
|
)
|
|
# Avoid duplicate-looking left/right variants in recovery sections.
|
|
qs = qs.filter(Q(side__isnull=True) | Q(side=''))
|
|
|
|
cooldown_q = Q()
|
|
for kw in self.COOLDOWN_PATTERNS:
|
|
cooldown_q |= Q(movement_patterns__icontains=kw)
|
|
|
|
# Exclude dangerous movement patterns from cooldowns entirely
|
|
exclude_q = Q()
|
|
for pat in self.COOLDOWN_EXCLUDED_PATTERNS:
|
|
exclude_q |= Q(movement_patterns__icontains=pat)
|
|
# Also exclude compound push/pull exercises
|
|
exclude_q |= (Q(movement_patterns__icontains='push') | Q(movement_patterns__icontains='pull')) & Q(is_compound=True)
|
|
qs = qs.exclude(exclude_q)
|
|
|
|
# R11: Exclude weighted exercises that aren't cooldown-pattern exercises
|
|
weighted_non_cooldown = qs.filter(is_weight=True).exclude(cooldown_q)
|
|
qs = qs.exclude(pk__in=weighted_non_cooldown.values_list('pk', flat=True))
|
|
|
|
# Cooldown HR ceiling: only low HR exercises (<=3) for proper cool-down
|
|
qs = qs.filter(Q(hr_elevation_rating__lte=3) | Q(hr_elevation_rating__isnull=True))
|
|
|
|
# STRICT: Only use cooldown-pattern exercises (no 'other' pool)
|
|
preferred = qs.filter(cooldown_q)
|
|
selected = self._weighted_pick(preferred, preferred.none(), count)
|
|
|
|
# Fallback: widen to any duration exercise with cooldown patterns (no muscle filter)
|
|
if len(selected) < count:
|
|
wide_qs = self._get_filtered_queryset(
|
|
muscle_groups=None,
|
|
is_duration_based=True,
|
|
fitness_level=fitness_level,
|
|
).exclude(pk__in={e.pk for e in selected})
|
|
wide_qs = wide_qs.filter(Q(side__isnull=True) | Q(side=''))
|
|
# Apply same exclusions
|
|
wide_qs = wide_qs.exclude(exclude_q)
|
|
# R11: also apply weight filter on wide fallback
|
|
wide_weighted_non_cooldown = wide_qs.filter(is_weight=True).exclude(cooldown_q)
|
|
wide_qs = wide_qs.exclude(pk__in=wide_weighted_non_cooldown.values_list('pk', flat=True))
|
|
# HR ceiling on fallback too
|
|
wide_qs = wide_qs.filter(Q(hr_elevation_rating__lte=3) | Q(hr_elevation_rating__isnull=True))
|
|
# STRICT: Only cooldown-pattern exercises even in fallback
|
|
wide_preferred = wide_qs.filter(cooldown_q)
|
|
selected.extend(
|
|
self._weighted_pick(wide_preferred, wide_preferred.none(), count - len(selected))
|
|
)
|
|
|
|
for ex in selected:
|
|
self.used_exercise_ids.add(ex.pk)
|
|
self.used_exercise_names.add((ex.name or '').lower().strip())
|
|
self._track_families(selected)
|
|
|
|
selected = self._ensure_side_pair_integrity(selected, qs, count=count)
|
|
return self._trim_preserving_pairs(selected, count)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _get_family_limit(self, family):
|
|
"""Max allowed uses of a movement family across the whole workout."""
|
|
if family in NARROW_FAMILIES:
|
|
return 1
|
|
return 2
|
|
|
|
def _track_families(self, exercises):
|
|
"""Record movement families for a list of selected exercises."""
|
|
for ex in exercises:
|
|
for fam in extract_movement_families(ex.name):
|
|
self.used_movement_families[fam] += 1
|
|
|
|
def _track_similarity_profiles(self, exercises, scope='working'):
|
|
"""Record similarity profiles so later supersets can avoid near-duplicates."""
|
|
if scope != 'working':
|
|
return
|
|
profiles = [self._build_similarity_profile(ex) for ex in exercises]
|
|
self.last_working_similarity_profiles = profiles
|
|
self.used_working_similarity_profiles.extend(profiles)
|
|
|
|
def _get_filtered_queryset(self, muscle_groups=None, is_duration_based=None, fitness_level=None):
|
|
"""
|
|
Build a base Exercise queryset filtered by:
|
|
- User's available equipment (through WorkoutEquipment)
|
|
- Excluded exercises from user preferences
|
|
- Already-used exercises in the current workout
|
|
- Target muscle groups (through ExerciseMuscle)
|
|
- Optionally, duration-based flag
|
|
- Fitness level (excludes complex patterns for beginners)
|
|
"""
|
|
qs = Exercise.objects.all()
|
|
|
|
# ---- Exclude exercises the user has explicitly blacklisted ----
|
|
excluded_ids = set(
|
|
self.user_preference.excluded_exercises.values_list('pk', flat=True)
|
|
)
|
|
if excluded_ids:
|
|
qs = qs.exclude(pk__in=excluded_ids)
|
|
|
|
# ---- Exclude already-used exercises in this workout ----
|
|
if self.used_exercise_ids:
|
|
qs = qs.exclude(pk__in=self.used_exercise_ids)
|
|
|
|
# ---- Exclude exercises with same name (cross-superset dedup) ----
|
|
if self.used_exercise_names:
|
|
name_exclude_q = Q()
|
|
for name in self.used_exercise_names:
|
|
if name:
|
|
name_exclude_q |= Q(name__iexact=name)
|
|
if name_exclude_q:
|
|
qs = qs.exclude(name_exclude_q)
|
|
|
|
# ---- Hard exclude exercises from recent workouts (Phase 6) ----
|
|
# Adaptive: if pool would be too small, relax hard exclude to soft penalty
|
|
if self.hard_exclude_ids:
|
|
test_qs = qs.exclude(pk__in=self.hard_exclude_ids)
|
|
if test_qs.count() >= 10:
|
|
qs = test_qs
|
|
else:
|
|
# Pool too small — convert hard exclude to soft penalty instead
|
|
self.recently_used_ids = self.recently_used_ids | self.hard_exclude_ids
|
|
if not hasattr(self, '_warned_small_pool'):
|
|
self.warnings.append(
|
|
'Exercise pool too small for full variety rotation — '
|
|
'relaxed recent exclusion to soft penalty.'
|
|
)
|
|
self._warned_small_pool = True
|
|
|
|
# ---- Filter by user's available equipment ----
|
|
available_equipment_ids = set(
|
|
self.user_preference.available_equipment.values_list('pk', flat=True)
|
|
)
|
|
if not available_equipment_ids:
|
|
# No equipment set: only allow bodyweight exercises (no WorkoutEquipment entries)
|
|
exercises_with_equipment = set(
|
|
WorkoutEquipment.objects.values_list('exercise_id', flat=True).distinct()
|
|
)
|
|
qs = qs.exclude(pk__in=exercises_with_equipment)
|
|
if not hasattr(self, '_warned_no_equipment'):
|
|
self.warnings.append(
|
|
'No equipment set — using bodyweight exercises only. '
|
|
'Update your equipment preferences for more variety.'
|
|
)
|
|
self._warned_no_equipment = True
|
|
elif available_equipment_ids:
|
|
# Cache equipment map on instance to avoid rebuilding per call
|
|
if not hasattr(self, '_equipment_map_cache'):
|
|
from collections import defaultdict
|
|
exercise_equipment_map = defaultdict(set)
|
|
for ex_id, eq_id in WorkoutEquipment.objects.values_list('exercise_id', 'equipment_id'):
|
|
exercise_equipment_map[ex_id].add(eq_id)
|
|
self._equipment_map_cache = dict(exercise_equipment_map)
|
|
self._bodyweight_ids_cache = set(
|
|
Exercise.objects.exclude(
|
|
pk__in=set(exercise_equipment_map.keys())
|
|
).values_list('pk', flat=True)
|
|
)
|
|
exercise_equipment_map = self._equipment_map_cache
|
|
bodyweight_ids = self._bodyweight_ids_cache
|
|
|
|
# AND logic: only include exercises where ALL required equipment is available
|
|
equipment_ok_ids = set()
|
|
for ex_id, required_equip in exercise_equipment_map.items():
|
|
if required_equip.issubset(available_equipment_ids):
|
|
equipment_ok_ids.add(ex_id)
|
|
|
|
allowed_ids = equipment_ok_ids | bodyweight_ids
|
|
qs = qs.filter(pk__in=allowed_ids)
|
|
|
|
# ---- Filter by muscle groups via ExerciseMuscle join ----
|
|
if muscle_groups:
|
|
normalized = [normalize_muscle_name(mg) for mg in muscle_groups]
|
|
muscle_ids = set(
|
|
Muscle.objects.filter(
|
|
name__in=normalized
|
|
).values_list('pk', flat=True)
|
|
)
|
|
# Also try case-insensitive matching for robustness
|
|
if not muscle_ids:
|
|
q = Q()
|
|
for name in normalized:
|
|
q |= Q(name__iexact=name)
|
|
muscle_ids = set(
|
|
Muscle.objects.filter(q).values_list('pk', flat=True)
|
|
)
|
|
if muscle_ids:
|
|
exercise_ids = set(
|
|
ExerciseMuscle.objects.filter(
|
|
muscle_id__in=muscle_ids
|
|
).values_list('exercise_id', flat=True)
|
|
)
|
|
qs = qs.filter(pk__in=exercise_ids)
|
|
|
|
# ---- Duration bias ----
|
|
if is_duration_based is True:
|
|
qs = qs.filter(is_duration=True)
|
|
elif is_duration_based is False:
|
|
# Rep-based supersets must use rep-capable exercises only.
|
|
qs = qs.filter(is_reps=True)
|
|
|
|
# ---- Fitness-level filtering ----
|
|
if fitness_level is not None and fitness_level <= 1:
|
|
# Beginners: exclude exercises with complex movement patterns
|
|
exclude_q = Q()
|
|
for pat in self.ADVANCED_PATTERNS:
|
|
exclude_q |= Q(movement_patterns__icontains=pat)
|
|
qs = qs.exclude(exclude_q)
|
|
|
|
# Exclude advanced exercises for beginners
|
|
if fitness_level is not None and fitness_level <= 1:
|
|
qs = qs.exclude(difficulty_level='advanced')
|
|
|
|
# ---- Complexity cap by fitness level ----
|
|
if fitness_level is not None:
|
|
complexity_caps = {1: 3, 2: 4, 3: 5, 4: 5}
|
|
max_complexity = complexity_caps.get(fitness_level, 5)
|
|
qs = qs.filter(
|
|
Q(complexity_rating__lte=max_complexity) | Q(complexity_rating__isnull=True)
|
|
)
|
|
|
|
# ---- Injury-based filtering ----
|
|
qs = self._apply_injury_filters(qs)
|
|
|
|
return qs.distinct()
|
|
|
|
def _get_bodyweight_queryset(self, muscle_groups=None, is_duration_based=None, fitness_level=None):
|
|
"""
|
|
Fallback queryset that only includes exercises with NO equipment
|
|
requirement (bodyweight). Ignores user's equipment preferences but
|
|
still applies safety filters (fitness level, injuries, complexity).
|
|
"""
|
|
exercises_with_equipment = set(
|
|
WorkoutEquipment.objects.values_list('exercise_id', flat=True).distinct()
|
|
)
|
|
qs = Exercise.objects.exclude(pk__in=exercises_with_equipment)
|
|
|
|
# Excluded exercises
|
|
excluded_ids = set(
|
|
self.user_preference.excluded_exercises.values_list('pk', flat=True)
|
|
)
|
|
if excluded_ids:
|
|
qs = qs.exclude(pk__in=excluded_ids)
|
|
|
|
# Already used
|
|
if self.used_exercise_ids:
|
|
qs = qs.exclude(pk__in=self.used_exercise_ids)
|
|
|
|
# Hard exclude from recent workouts (Phase 6)
|
|
if self.hard_exclude_ids:
|
|
qs = qs.exclude(pk__in=self.hard_exclude_ids)
|
|
|
|
# Muscle groups
|
|
if muscle_groups:
|
|
normalized = [normalize_muscle_name(mg) for mg in muscle_groups]
|
|
muscle_ids = set(
|
|
Muscle.objects.filter(name__in=normalized).values_list('pk', flat=True)
|
|
)
|
|
if not muscle_ids:
|
|
q = Q()
|
|
for name in normalized:
|
|
q |= Q(name__iexact=name)
|
|
muscle_ids = set(
|
|
Muscle.objects.filter(q).values_list('pk', flat=True)
|
|
)
|
|
if muscle_ids:
|
|
exercise_ids = set(
|
|
ExerciseMuscle.objects.filter(
|
|
muscle_id__in=muscle_ids
|
|
).values_list('exercise_id', flat=True)
|
|
)
|
|
qs = qs.filter(pk__in=exercise_ids)
|
|
|
|
if is_duration_based is True:
|
|
qs = qs.filter(is_duration=True)
|
|
elif is_duration_based is False:
|
|
qs = qs.filter(is_reps=True)
|
|
|
|
# ---- Safety: Fitness-level filtering (same as _get_filtered_queryset) ----
|
|
if fitness_level is not None and fitness_level <= 1:
|
|
exclude_q = Q()
|
|
for pat in self.ADVANCED_PATTERNS:
|
|
exclude_q |= Q(movement_patterns__icontains=pat)
|
|
qs = qs.exclude(exclude_q)
|
|
qs = qs.exclude(difficulty_level='advanced')
|
|
|
|
# ---- Safety: Complexity cap by fitness level ----
|
|
if fitness_level is not None:
|
|
complexity_caps = {1: 3, 2: 4, 3: 5, 4: 5}
|
|
max_complexity = complexity_caps.get(fitness_level, 5)
|
|
qs = qs.filter(
|
|
Q(complexity_rating__lte=max_complexity) | Q(complexity_rating__isnull=True)
|
|
)
|
|
|
|
# ---- Safety: Injury-based filtering ----
|
|
qs = self._apply_injury_filters(qs)
|
|
|
|
return qs.distinct()
|
|
|
|
def _apply_injury_filters(self, qs):
|
|
"""
|
|
Apply injury-based exercise exclusions with severity levels.
|
|
|
|
Supports both legacy format (list of strings) and new format
|
|
(list of {"type": str, "severity": "mild|moderate|severe"}).
|
|
|
|
Severity levels:
|
|
- mild: only exclude exercises explicitly dangerous for that injury
|
|
- moderate: current behavior (exclude high-impact, relevant patterns)
|
|
- severe: aggressive exclusion (broader pattern exclusion)
|
|
"""
|
|
injury_types = getattr(self.user_preference, 'injury_types', None) or []
|
|
|
|
if injury_types:
|
|
# Normalize to dict format for backward compatibility
|
|
injury_map = {}
|
|
for item in injury_types:
|
|
if isinstance(item, str):
|
|
injury_map[item] = 'moderate'
|
|
elif isinstance(item, dict):
|
|
injury_map[item.get('type', '')] = item.get('severity', 'moderate')
|
|
|
|
def _is_at_least(injury_type, min_severity):
|
|
"""Check if an injury has at least the given severity."""
|
|
levels = {'mild': 1, 'moderate': 2, 'severe': 3}
|
|
actual = injury_map.get(injury_type, '')
|
|
return levels.get(actual, 0) >= levels.get(min_severity, 0)
|
|
|
|
# Generate informational warnings about injury-based exclusions
|
|
if not hasattr(self, '_injury_warnings_emitted'):
|
|
self._injury_warnings_emitted = True
|
|
for inj_type, sev in injury_map.items():
|
|
label = inj_type.replace('_', ' ').title()
|
|
if sev == 'severe':
|
|
self.warnings.append(
|
|
f'Excluding high-impact and many weighted exercises due to severe {label.lower()} injury.'
|
|
)
|
|
elif sev == 'moderate':
|
|
self.warnings.append(
|
|
f'Excluding high-impact exercises due to {label.lower()} injury.'
|
|
)
|
|
else:
|
|
self.warnings.append(
|
|
f'Limiting certain movements due to mild {label.lower()} injury.'
|
|
)
|
|
|
|
# High impact exclusion for lower body injuries (moderate+)
|
|
lower_injuries = {'knee', 'ankle', 'hip', 'lower_back'}
|
|
if any(_is_at_least(inj, 'moderate') for inj in lower_injuries & set(injury_map)):
|
|
qs = qs.exclude(impact_level='high')
|
|
# Severe: also exclude medium impact
|
|
if any(_is_at_least(inj, 'severe') for inj in lower_injuries & set(injury_map)):
|
|
qs = qs.exclude(impact_level='medium')
|
|
|
|
if _is_at_least('knee', 'moderate') or _is_at_least('ankle', 'moderate'):
|
|
qs = qs.exclude(movement_patterns__icontains='plyometric')
|
|
# Severe knee/ankle: also exclude lunges
|
|
if _is_at_least('knee', 'severe') or _is_at_least('ankle', 'severe'):
|
|
qs = qs.exclude(movement_patterns__icontains='lunge')
|
|
|
|
if _is_at_least('lower_back', 'moderate'):
|
|
qs = qs.exclude(
|
|
Q(movement_patterns__icontains='hip hinge') &
|
|
Q(is_weight=True) &
|
|
Q(difficulty_level='advanced')
|
|
)
|
|
if _is_at_least('lower_back', 'severe'):
|
|
qs = qs.exclude(
|
|
Q(movement_patterns__icontains='hip hinge') &
|
|
Q(is_weight=True)
|
|
)
|
|
|
|
if _is_at_least('upper_back', 'moderate'):
|
|
qs = qs.exclude(
|
|
Q(movement_patterns__icontains='upper pull') &
|
|
Q(is_weight=True) &
|
|
Q(difficulty_level='advanced')
|
|
)
|
|
if _is_at_least('upper_back', 'severe'):
|
|
qs = qs.exclude(
|
|
Q(movement_patterns__icontains='upper pull') &
|
|
Q(is_weight=True)
|
|
)
|
|
|
|
if _is_at_least('shoulder', 'mild'):
|
|
qs = qs.exclude(movement_patterns__icontains='upper push - vertical')
|
|
if _is_at_least('shoulder', 'severe'):
|
|
qs = qs.exclude(
|
|
Q(movement_patterns__icontains='upper push') &
|
|
Q(is_weight=True)
|
|
)
|
|
|
|
if _is_at_least('hip', 'moderate'):
|
|
qs = qs.exclude(
|
|
Q(movement_patterns__icontains='lower push - squat') &
|
|
Q(difficulty_level='advanced')
|
|
)
|
|
if _is_at_least('hip', 'severe'):
|
|
qs = qs.exclude(movement_patterns__icontains='lower push - squat')
|
|
|
|
if _is_at_least('wrist', 'moderate'):
|
|
qs = qs.exclude(
|
|
Q(movement_patterns__icontains='olympic') &
|
|
Q(is_weight=True)
|
|
)
|
|
|
|
if _is_at_least('neck', 'moderate'):
|
|
qs = qs.exclude(
|
|
Q(movement_patterns__icontains='upper push - vertical') &
|
|
Q(is_weight=True)
|
|
)
|
|
else:
|
|
# Legacy: parse free-text injuries_limitations field
|
|
injuries = getattr(self.user_preference, 'injuries_limitations', '') or ''
|
|
if injuries:
|
|
injuries_lower = injuries.lower()
|
|
knee_keywords = ['knee', 'acl', 'mcl', 'meniscus', 'patella']
|
|
back_keywords = ['back', 'spine', 'spinal', 'disc', 'herniat']
|
|
shoulder_keywords = ['shoulder', 'rotator', 'labrum', 'impingement']
|
|
|
|
if any(kw in injuries_lower for kw in knee_keywords):
|
|
qs = qs.exclude(impact_level='high')
|
|
if any(kw in injuries_lower for kw in back_keywords):
|
|
qs = qs.exclude(impact_level='high')
|
|
qs = qs.exclude(
|
|
Q(movement_patterns__icontains='hip hinge') &
|
|
Q(is_weight=True) &
|
|
Q(difficulty_level='advanced')
|
|
)
|
|
if any(kw in injuries_lower for kw in shoulder_keywords):
|
|
qs = qs.exclude(movement_patterns__icontains='upper push - vertical')
|
|
|
|
return qs
|
|
|
|
def _weighted_pick(
|
|
self,
|
|
preferred_qs,
|
|
other_qs,
|
|
count,
|
|
superset_position=None,
|
|
similarity_scope=None,
|
|
):
|
|
"""
|
|
Pick up to *count* exercises using weighted random selection.
|
|
|
|
Preferred exercises are 3x more likely to be chosen than the
|
|
general pool, ensuring variety while still favouring matches.
|
|
|
|
Enforces movement-family deduplication:
|
|
- Intra-superset: no two exercises from the same family group
|
|
- Cross-workout: max N per family (1 for narrow, 2 for broad)
|
|
|
|
superset_position: 'early', 'late', or None. When set, boosts
|
|
exercises based on their exercise_tier (primary for early,
|
|
accessory for late).
|
|
"""
|
|
if count <= 0:
|
|
return []
|
|
|
|
preferred_list = list(preferred_qs)
|
|
other_list = list(other_qs)
|
|
|
|
# Build a weighted pool: each preferred exercise appears 3 times
|
|
pool = []
|
|
weight_preferred = 3
|
|
weight_other = 1
|
|
|
|
def _tier_boost(ex, base_w):
|
|
"""Apply tier-based weighting based on superset position."""
|
|
if not superset_position:
|
|
return base_w
|
|
tier = getattr(ex, 'exercise_tier', None)
|
|
if superset_position == 'early' and tier == 'primary':
|
|
return base_w * 2
|
|
elif superset_position == 'late' and tier == 'accessory':
|
|
return base_w * 2
|
|
return base_w
|
|
|
|
for ex in preferred_list:
|
|
w = weight_preferred
|
|
# Boost exercises that are progressions of recently completed exercises
|
|
if ex.pk in self.progression_boost_ids:
|
|
w = w * 2
|
|
if ex.pk in self.recently_used_ids:
|
|
w = 1 # Reduce weight for recently used
|
|
# Penalize overused movement patterns for variety (Phase 11)
|
|
# Fixed: check ALL comma-separated patterns, use max count
|
|
if self.used_movement_patterns:
|
|
ex_patterns = getattr(ex, 'movement_patterns', '') or ''
|
|
if ex_patterns:
|
|
max_pat_count = max(
|
|
(self.used_movement_patterns.get(p.strip().lower(), 0)
|
|
for p in ex_patterns.split(',') if p.strip()),
|
|
default=0,
|
|
)
|
|
if max_pat_count >= 3:
|
|
w = 1
|
|
elif max_pat_count >= 2:
|
|
w = max(1, w - 1)
|
|
w = _tier_boost(ex, w)
|
|
pool.extend([ex] * w)
|
|
for ex in other_list:
|
|
w = weight_other
|
|
if ex.pk in self.recently_used_ids:
|
|
w = 1 # Already 1 but keep explicit
|
|
w = _tier_boost(ex, w)
|
|
pool.extend([ex] * w)
|
|
|
|
if not pool:
|
|
return []
|
|
|
|
selected = []
|
|
selected_ids = set()
|
|
selected_names = set()
|
|
# Intra-superset family tracking
|
|
selected_family_groups = set() # group names used in this superset
|
|
selected_families = set() # exact families used in this superset
|
|
selected_family_counts = Counter() # exact family counts in this superset
|
|
selected_profiles = []
|
|
|
|
# Shuffle to break any ordering bias
|
|
random.shuffle(pool)
|
|
|
|
attempts = 0
|
|
max_attempts = len(pool) * 3 # avoid infinite loop on tiny pools
|
|
|
|
while len(selected) < count and attempts < max_attempts:
|
|
candidate = random.choice(pool)
|
|
candidate_name = (candidate.name or '').lower().strip()
|
|
|
|
if candidate.pk in selected_ids or candidate_name in selected_names:
|
|
attempts += 1
|
|
continue
|
|
|
|
# --- Movement family blocking ---
|
|
candidate_families = extract_movement_families(candidate.name)
|
|
blocked = False
|
|
|
|
for fam in candidate_families:
|
|
# Cross-workout: check family count limit
|
|
historical_count = self.used_movement_families.get(fam, 0)
|
|
in_superset_count = selected_family_counts.get(fam, 0)
|
|
if historical_count + in_superset_count >= self._get_family_limit(fam):
|
|
blocked = True
|
|
break
|
|
|
|
# Intra-superset: avoid exact family duplicates entirely.
|
|
if fam in selected_families:
|
|
blocked = True
|
|
break
|
|
|
|
# Intra-superset: check family group overlap
|
|
group = _FAMILY_TO_GROUP.get(fam)
|
|
if group and group in selected_family_groups:
|
|
blocked = True
|
|
break
|
|
|
|
if blocked:
|
|
attempts += 1
|
|
continue
|
|
|
|
if similarity_scope == 'working':
|
|
candidate_profile = self._build_similarity_profile(candidate)
|
|
if self._is_similarity_blocked(candidate_profile, selected_profiles):
|
|
attempts += 1
|
|
continue
|
|
|
|
selected.append(candidate)
|
|
selected_ids.add(candidate.pk)
|
|
selected_names.add(candidate_name)
|
|
if similarity_scope == 'working':
|
|
selected_profiles.append(candidate_profile)
|
|
# Track family groups for intra-superset blocking
|
|
for fam in candidate_families:
|
|
selected_families.add(fam)
|
|
selected_family_counts[fam] += 1
|
|
group = _FAMILY_TO_GROUP.get(fam)
|
|
if group:
|
|
selected_family_groups.add(group)
|
|
attempts += 1
|
|
|
|
return selected
|
|
|
|
@staticmethod
|
|
def _tokenize_text(value):
|
|
"""Tokenize free text into normalized, low-noise tokens."""
|
|
if not value:
|
|
return set()
|
|
tokens = set(re.findall(r"[a-z0-9]+", value.lower()))
|
|
stop_words = {
|
|
'and', 'or', 'the', 'with', 'to', 'a', 'an', 'of',
|
|
'single', 'arm', 'double', 'alternating',
|
|
'barbell', 'dumbbell', 'kettlebell', 'machine', 'cable',
|
|
'bodyweight',
|
|
}
|
|
return {tok for tok in tokens if tok not in stop_words and len(tok) > 1}
|
|
|
|
@staticmethod
|
|
def _tokenize_csv(value):
|
|
"""Tokenize comma-separated categorical fields."""
|
|
if not value:
|
|
return set()
|
|
return {part.strip().lower() for part in value.split(',') if part and part.strip()}
|
|
|
|
def _build_similarity_profile(self, ex):
|
|
"""Create a cached token profile used by similarity scoring."""
|
|
cached = self._exercise_profile_cache.get(ex.pk)
|
|
if cached is not None:
|
|
return cached
|
|
profile = {
|
|
'id': ex.pk,
|
|
'movement': self._tokenize_csv(getattr(ex, 'movement_patterns', '') or ''),
|
|
'muscles': self._tokenize_csv(getattr(ex, 'muscle_groups', '') or ''),
|
|
'equipment': self._tokenize_csv(getattr(ex, 'equipment_required', '') or ''),
|
|
'name_tokens': self._tokenize_text(getattr(ex, 'name', '') or ''),
|
|
}
|
|
self._exercise_profile_cache[ex.pk] = profile
|
|
return profile
|
|
|
|
@staticmethod
|
|
def _jaccard_similarity(left, right):
|
|
"""Jaccard similarity between token sets."""
|
|
if not left and not right:
|
|
return 0.0
|
|
union = left | right
|
|
if not union:
|
|
return 0.0
|
|
return len(left & right) / len(union)
|
|
|
|
def _exercise_similarity_score(self, candidate_profile, existing_profile):
|
|
"""Weighted similarity score in [0,1]."""
|
|
movement = self._jaccard_similarity(
|
|
candidate_profile['movement'], existing_profile['movement']
|
|
)
|
|
muscles = self._jaccard_similarity(
|
|
candidate_profile['muscles'], existing_profile['muscles']
|
|
)
|
|
equipment = self._jaccard_similarity(
|
|
candidate_profile['equipment'], existing_profile['equipment']
|
|
)
|
|
name = self._jaccard_similarity(
|
|
candidate_profile['name_tokens'], existing_profile['name_tokens']
|
|
)
|
|
return (
|
|
(0.45 * movement)
|
|
+ (0.35 * muscles)
|
|
+ (0.10 * equipment)
|
|
+ (0.10 * name)
|
|
)
|
|
|
|
def _is_similarity_blocked(self, candidate_profile, selected_profiles):
|
|
"""Block near-duplicate exercises within the workout and adjacent sets."""
|
|
for existing_profile in self.used_working_similarity_profiles:
|
|
if (
|
|
self._exercise_similarity_score(candidate_profile, existing_profile)
|
|
>= self.SIMILARITY_HARD_THRESHOLD
|
|
):
|
|
return True
|
|
for existing_profile in selected_profiles:
|
|
if (
|
|
self._exercise_similarity_score(candidate_profile, existing_profile)
|
|
>= self.SIMILARITY_HARD_THRESHOLD
|
|
):
|
|
return True
|
|
|
|
for existing_profile in self.last_working_similarity_profiles:
|
|
if (
|
|
self._exercise_similarity_score(candidate_profile, existing_profile)
|
|
>= self.SIMILARITY_SOFT_THRESHOLD
|
|
):
|
|
return True
|
|
for existing_profile in selected_profiles:
|
|
if (
|
|
self._exercise_similarity_score(candidate_profile, existing_profile)
|
|
>= self.SIMILARITY_SOFT_THRESHOLD
|
|
):
|
|
return True
|
|
return False
|
|
|
|
def _pair_sided_exercises(self, selected, base_qs):
|
|
"""
|
|
For exercises with a ``side`` value (e.g. 'Left', 'Right'), try
|
|
to include the matching opposite-side exercise in the selection.
|
|
|
|
This swaps out a non-sided exercise to keep the count stable, or
|
|
simply appends if the list is short.
|
|
"""
|
|
paired = list(selected)
|
|
paired_ids = {e.pk for e in paired}
|
|
|
|
exercises_to_add = []
|
|
|
|
for ex in list(paired):
|
|
if ex.side and ex.side.strip():
|
|
side_norm = self._normalize_side_value(ex.side)
|
|
opposite_norm = self._opposite_side(side_norm)
|
|
if not opposite_norm:
|
|
continue
|
|
|
|
# Find the matching partner by name similarity and opposite side
|
|
# Typically the name is identical except for side, e.g.
|
|
# "Single Arm Row Left" / "Single Arm Row Right"
|
|
base_name = ex.name
|
|
for side_word in ['Left', 'Right', 'left', 'right']:
|
|
base_name = base_name.replace(side_word, '').strip()
|
|
|
|
partner = (
|
|
Exercise.objects
|
|
.filter(
|
|
name__icontains=base_name,
|
|
)
|
|
.filter(self._side_values_q(opposite_norm))
|
|
.exclude(pk__in=self.used_exercise_ids)
|
|
.exclude(pk__in=paired_ids)
|
|
.first()
|
|
)
|
|
|
|
if partner and partner.pk not in paired_ids:
|
|
exercises_to_add.append(partner)
|
|
paired_ids.add(partner.pk)
|
|
|
|
# Insert partners right after their matching exercise
|
|
final = []
|
|
added_ids = set()
|
|
for ex in paired:
|
|
final.append(ex)
|
|
added_ids.add(ex.pk)
|
|
# Check if any partner should follow this exercise
|
|
for partner in exercises_to_add:
|
|
if partner.pk not in added_ids:
|
|
# Check if partner is the pair for this exercise
|
|
if ex.side and ex.side.strip():
|
|
base_name = ex.name
|
|
for side_word in ['Left', 'Right', 'left', 'right']:
|
|
base_name = base_name.replace(side_word, '').strip()
|
|
if base_name.lower() in partner.name.lower():
|
|
final.append(partner)
|
|
added_ids.add(partner.pk)
|
|
|
|
# Add any remaining partners that didn't get inserted
|
|
for partner in exercises_to_add:
|
|
if partner.pk not in added_ids:
|
|
final.append(partner)
|
|
added_ids.add(partner.pk)
|
|
|
|
return final
|
|
|
|
def _trim_preserving_pairs(self, selected, count):
|
|
"""
|
|
Trim selected exercises to count, but never split a Left/Right pair.
|
|
If keeping a Left exercise, always keep its Right partner (and vice versa).
|
|
"""
|
|
if len(selected) <= count:
|
|
return selected
|
|
|
|
# Identify paired indices
|
|
paired_indices = set()
|
|
for i, ex in enumerate(selected):
|
|
if self._normalize_side_value(getattr(ex, 'side', '')):
|
|
# Find its partner in the list
|
|
base_name = ex.name
|
|
for side_word in ['Left', 'Right', 'left', 'right']:
|
|
base_name = base_name.replace(side_word, '').strip()
|
|
for j, other in enumerate(selected):
|
|
if i != j and self._normalize_side_value(getattr(other, 'side', '')):
|
|
other_base = other.name
|
|
for side_word in ['Left', 'Right', 'left', 'right']:
|
|
other_base = other_base.replace(side_word, '').strip()
|
|
if base_name.lower() == other_base.lower():
|
|
paired_indices.add(i)
|
|
paired_indices.add(j)
|
|
|
|
result = []
|
|
for i, ex in enumerate(selected):
|
|
if len(result) >= count and i not in paired_indices:
|
|
continue
|
|
# If this is part of a pair, include it even if over count
|
|
if i in paired_indices or len(result) < count:
|
|
result.append(ex)
|
|
|
|
# If keeping pairs pushed us over count, remove non-paired exercises
|
|
# from the end to compensate
|
|
if len(result) > count + 1:
|
|
excess = len(result) - count
|
|
trimmed = []
|
|
removed = 0
|
|
# Build paired set for result indices
|
|
result_paired = set()
|
|
for i, ex in enumerate(result):
|
|
if self._normalize_side_value(getattr(ex, 'side', '')):
|
|
base_name = ex.name
|
|
for side_word in ['Left', 'Right', 'left', 'right']:
|
|
base_name = base_name.replace(side_word, '').strip()
|
|
for j, other in enumerate(result):
|
|
if i != j and self._normalize_side_value(getattr(other, 'side', '')):
|
|
other_base = other.name
|
|
for side_word in ['Left', 'Right', 'left', 'right']:
|
|
other_base = other_base.replace(side_word, '').strip()
|
|
if base_name.lower() == other_base.lower():
|
|
result_paired.add(i)
|
|
result_paired.add(j)
|
|
|
|
for i in range(len(result) - 1, -1, -1):
|
|
if removed >= excess:
|
|
break
|
|
if i not in result_paired:
|
|
result.pop(i)
|
|
removed += 1
|
|
|
|
return result
|
|
|
|
def _strip_side_tokens(self, name):
|
|
"""Normalize a name by removing left/right tokens."""
|
|
base = name or ''
|
|
for side_word in [
|
|
'Left', 'Right', 'left', 'right',
|
|
'left arm', 'right arm', 'left leg', 'right leg',
|
|
'left side', 'right side',
|
|
]:
|
|
base = base.replace(side_word, '').strip()
|
|
return base.lower()
|
|
|
|
@staticmethod
|
|
def _normalize_side_value(side):
|
|
"""Map DB side values to canonical left/right tokens."""
|
|
value = (side or '').strip().lower()
|
|
if value in _LEFT_SIDE_VALUES:
|
|
return 'left'
|
|
if value in _RIGHT_SIDE_VALUES:
|
|
return 'right'
|
|
return None
|
|
|
|
@staticmethod
|
|
def _opposite_side(side_norm):
|
|
"""Return opposite canonical side for left/right."""
|
|
if side_norm == 'left':
|
|
return 'right'
|
|
if side_norm == 'right':
|
|
return 'left'
|
|
return None
|
|
|
|
@staticmethod
|
|
def _side_values_q(side_norm):
|
|
"""Build a queryset filter matching any DB side token for a canonical side."""
|
|
q = Q()
|
|
values = _LEFT_SIDE_VALUES if side_norm == 'left' else _RIGHT_SIDE_VALUES
|
|
for side_value in values:
|
|
q |= Q(side__iexact=side_value)
|
|
return q
|
|
|
|
def _drop_unpaired_sided_exercises(self, selected):
|
|
"""Drop any left/right exercise that does not have its opposite side."""
|
|
side_groups = {}
|
|
for ex in selected:
|
|
side_val = self._normalize_side_value(getattr(ex, 'side', ''))
|
|
if side_val not in ('left', 'right'):
|
|
continue
|
|
key = self._strip_side_tokens(getattr(ex, 'name', ''))
|
|
side_groups.setdefault(key, {'left': [], 'right': []})
|
|
side_groups[key][side_val].append(ex.pk)
|
|
|
|
allowed_ids = set()
|
|
for key, sides in side_groups.items():
|
|
if sides['left'] and sides['right']:
|
|
allowed_ids.update(sides['left'])
|
|
allowed_ids.update(sides['right'])
|
|
|
|
filtered = []
|
|
removed_count = 0
|
|
for ex in selected:
|
|
side_val = self._normalize_side_value(getattr(ex, 'side', ''))
|
|
if side_val in ('left', 'right') and ex.pk not in allowed_ids:
|
|
removed_count += 1
|
|
continue
|
|
filtered.append(ex)
|
|
|
|
if removed_count:
|
|
self.warnings.append(
|
|
f'Removed {removed_count} unpaired side-specific exercises '
|
|
f'to enforce left/right pairing.'
|
|
)
|
|
return filtered
|
|
|
|
def _find_missing_side_partner(self, ex, base_qs, existing_ids):
|
|
"""
|
|
Try hard to find opposite-side partner for a sided exercise.
|
|
|
|
Search order:
|
|
1) base_qs with strict name-base match
|
|
2) global Exercise table with strict name-base match
|
|
3) base_qs with relaxed icontains name-base match
|
|
4) global Exercise table with relaxed icontains name-base match
|
|
"""
|
|
side_norm = self._normalize_side_value(getattr(ex, 'side', ''))
|
|
opposite_norm = self._opposite_side(side_norm)
|
|
if not opposite_norm:
|
|
return None
|
|
|
|
base_name = self._strip_side_tokens(getattr(ex, 'name', ''))
|
|
if not base_name:
|
|
return None
|
|
|
|
def _pick_from_queryset(qs, strict=True):
|
|
candidates = qs.filter(self._side_values_q(opposite_norm))
|
|
if strict:
|
|
candidates = [
|
|
c for c in candidates
|
|
if self._strip_side_tokens(getattr(c, 'name', '')) == base_name
|
|
]
|
|
return candidates[0] if candidates else None
|
|
return candidates.filter(name__icontains=base_name).first()
|
|
|
|
common_exclusions = Q(pk__in=existing_ids)
|
|
# Prefer unused exercise ids, but do not hard-fail pairing if only used counterpart exists.
|
|
preferred_exclusions = common_exclusions | Q(pk__in=self.used_exercise_ids)
|
|
|
|
base_preferred = base_qs.exclude(preferred_exclusions)
|
|
partner = _pick_from_queryset(base_preferred, strict=True)
|
|
if partner:
|
|
return partner
|
|
|
|
global_preferred = Exercise.objects.exclude(preferred_exclusions)
|
|
partner = _pick_from_queryset(global_preferred, strict=True)
|
|
if partner:
|
|
return partner
|
|
|
|
# Relaxed pass still avoiding duplicates in the current selection.
|
|
base_relaxed = base_qs.exclude(common_exclusions)
|
|
partner = _pick_from_queryset(base_relaxed, strict=False)
|
|
if partner:
|
|
return partner
|
|
|
|
global_relaxed = Exercise.objects.exclude(common_exclusions)
|
|
return _pick_from_queryset(global_relaxed, strict=False)
|
|
|
|
def _ensure_side_pair_integrity(
|
|
self,
|
|
selected,
|
|
base_qs,
|
|
count,
|
|
similarity_scope=None,
|
|
superset_position=None,
|
|
):
|
|
"""
|
|
Enforce strict left/right pairing:
|
|
- First attempt to add missing opposite-side partners
|
|
- Remove orphan left/right exercises only as a last resort
|
|
- Backfill with non-sided exercises when possible
|
|
"""
|
|
balanced = list(selected)
|
|
existing_ids = {ex.pk for ex in balanced}
|
|
|
|
added_partners = 0
|
|
for ex in list(balanced):
|
|
side_val = self._normalize_side_value(getattr(ex, 'side', ''))
|
|
if side_val not in ('left', 'right'):
|
|
continue
|
|
key = self._strip_side_tokens(getattr(ex, 'name', ''))
|
|
has_left = any(
|
|
self._normalize_side_value(getattr(other, 'side', '')) == 'left'
|
|
and self._strip_side_tokens(getattr(other, 'name', '')) == key
|
|
for other in balanced
|
|
)
|
|
has_right = any(
|
|
self._normalize_side_value(getattr(other, 'side', '')) == 'right'
|
|
and self._strip_side_tokens(getattr(other, 'name', '')) == key
|
|
for other in balanced
|
|
)
|
|
if has_left and has_right:
|
|
continue
|
|
|
|
partner = self._find_missing_side_partner(ex, base_qs, existing_ids)
|
|
if partner and partner.pk not in existing_ids:
|
|
balanced.append(partner)
|
|
existing_ids.add(partner.pk)
|
|
added_partners += 1
|
|
|
|
if added_partners:
|
|
# Keep sided pairs by preferentially removing non-sided fillers.
|
|
while len(balanced) > count:
|
|
remove_idx = None
|
|
for idx in range(len(balanced) - 1, -1, -1):
|
|
if self._normalize_side_value(getattr(balanced[idx], 'side', '')) not in ('left', 'right'):
|
|
remove_idx = idx
|
|
break
|
|
if remove_idx is None:
|
|
break
|
|
balanced.pop(remove_idx)
|
|
self.warnings.append(
|
|
f'Added {added_partners} missing opposite-side exercise partners.'
|
|
)
|
|
|
|
balanced = self._drop_unpaired_sided_exercises(balanced)
|
|
|
|
if len(balanced) < count:
|
|
deficit = count - len(balanced)
|
|
existing_ids = {ex.pk for ex in balanced}
|
|
filler_qs = (
|
|
base_qs.exclude(pk__in=existing_ids)
|
|
.filter(Q(side__isnull=True) | Q(side=''))
|
|
)
|
|
extras = self._weighted_pick(
|
|
filler_qs,
|
|
Exercise.objects.none(),
|
|
deficit,
|
|
superset_position=superset_position,
|
|
similarity_scope=similarity_scope,
|
|
)
|
|
balanced.extend(extras)
|
|
|
|
return balanced
|
|
|
|
def balance_stretch_positions(self, selected, muscle_groups=None, fitness_level=None):
|
|
"""
|
|
Improve stretch position variety for hypertrophy workouts.
|
|
|
|
Ensures exercises within a superset cover multiple stretch
|
|
positions (lengthened, mid, shortened) for more complete
|
|
muscle stimulus. Swaps the last non-primary exercise if
|
|
all exercises share the same stretch position.
|
|
|
|
Prefers 'lengthened' replacements (greater mechanical tension
|
|
at long muscle lengths = stronger hypertrophy stimulus).
|
|
"""
|
|
if len(selected) < 3:
|
|
return selected
|
|
|
|
position_counts = {}
|
|
for ex in selected:
|
|
pos = getattr(ex, 'stretch_position', None)
|
|
if pos:
|
|
position_counts[pos] = position_counts.get(pos, 0) + 1
|
|
|
|
# Check if variety is sufficient (no single position >= 75%)
|
|
if len(position_counts) >= 2:
|
|
total_with_pos = sum(position_counts.values())
|
|
max_count = max(position_counts.values())
|
|
if total_with_pos > 0 and max_count / total_with_pos < 0.75:
|
|
return selected # Good variety, no dominant position
|
|
|
|
dominant_position = max(position_counts, key=position_counts.get) if position_counts else None
|
|
if not dominant_position:
|
|
return selected # No stretch data available
|
|
|
|
# Find a replacement with a different stretch position
|
|
desired_positions = {'lengthened', 'mid', 'shortened'} - {dominant_position}
|
|
position_q = Q()
|
|
for pos in desired_positions:
|
|
position_q |= Q(stretch_position=pos)
|
|
|
|
replacement_qs = self._get_filtered_queryset(
|
|
muscle_groups=muscle_groups,
|
|
fitness_level=fitness_level,
|
|
).filter(position_q).exclude(pk__in={e.pk for e in selected})
|
|
|
|
replacements = list(replacement_qs[:5])
|
|
if not replacements:
|
|
return selected
|
|
|
|
# Prefer 'lengthened' for hypertrophy (greater mechanical tension)
|
|
lengthened = [r for r in replacements if r.stretch_position == 'lengthened']
|
|
pick = lengthened[0] if lengthened else replacements[0]
|
|
|
|
# Swap the last non-primary exercise
|
|
for i in range(len(selected) - 1, -1, -1):
|
|
if getattr(selected[i], 'exercise_tier', None) != 'primary':
|
|
old = selected[i]
|
|
selected[i] = pick
|
|
self.used_exercise_ids.discard(old.pk)
|
|
self.used_exercise_ids.add(pick.pk)
|
|
break
|
|
|
|
return selected
|