Coverage for app/logic/certification.py: 93%
133 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-12-18 20:14 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-12-18 20:14 +0000
1from peewee import JOIN, fn, DoesNotExist, Case
2from flask import g
3from app.models.event import Event
4from app.models.term import Term
5from app.models.certification import Certification
6from app.models.certificationRequirement import CertificationRequirement
7from app.models.requirementMatch import RequirementMatch
8from app.models.eventParticipant import EventParticipant
9from app.models.user import User
10import math
11def termsAttended(certification, username):
12 '''
13 Retrieve terms attended by a user for certification and filter them based on frequency of a term
14 '''
15 attendedTerms = []
16 attendance = (RequirementMatch.select()
17 .join(EventParticipant, JOIN.LEFT_OUTER, on=(RequirementMatch.event == EventParticipant.event))
18 .where(RequirementMatch.requirement_id == certification)
19 .where(EventParticipant.user == username))
20 for termRecord in range(len(attendance)):
21 if not attendance[termRecord].event.term.isSummer:
22 attendedTerms.append(attendance[termRecord].event.term.description)
23 totalTerms = termsInTotal(username)
24 attendedTerms = {term for term in attendedTerms if term in totalTerms}
25 return attendedTerms
28def termsInTotal(username):
29 '''
30 The function returns all non-summer academic terms a student should have, based on their class level where it finds
31 the start term and populate from it with Fall-start alignment and special handling for NULL/Non-degree class level
32 '''
33 currentTerm = g.current_term
34 currentDesc = currentTerm.description
35 user = User.select().where(User.username == username).get()
36 if currentTerm.isSummer and user.rawClassLevel == "Freshman":
37 currentDesc = f"Fall {currentTerm.year}"
38 elif currentTerm.isSummer:
39 currentDesc = f"Spring {currentTerm.year}"
40 classLevel = ["Freshman", "Sophomore", "Junior", "Senior"]
41 totalTerms = []
42 for level, name in enumerate(classLevel):
43 if user.rawClassLevel == name:
44 totalTermsCount = (level + 1) * 2
45 if currentDesc.startswith("Fall"):
46 totalTermsCount -= 1
47 if currentDesc.startswith("Spring"):
48 startYear = currentTerm.year - level - 1
49 else:
50 startYear = currentTerm.year - level
51 for k in range(totalTermsCount):
52 if k % 2 == 0:
53 season = "Fall"
54 year = startYear + (k // 2)
55 else:
56 season = "Spring"
57 year = startYear + (k // 2) + 1
58 totalTerms.append(f"{season} {year}")
59 break
61 if user.rawClassLevel is None or user.rawClassLevel in ["NULL", "Graduating", "Non-Degree"]:
62 totalTermsCount = 8
63 currentYear = currentTerm.year
64 currentSeason = "Fall" if "Fall" in currentDesc else "Spring"
65 for a in range(totalTermsCount):
66 totalTerms.append(f"{currentSeason} {currentYear}")
67 if currentSeason == "Fall":
68 currentSeason = "Spring"
69 else:
70 currentSeason = "Fall"
71 currentYear -= 1
72 list.reverse(totalTerms)
73 return totalTerms
75def termsMissed(certification, username):
76 '''
77 Calculate how many certification-eligible terms a student has missed based on their class level
78 and attendance record.
79 '''
80 totalTerms = termsInTotal(username)
81 attendedTerms = termsAttended(certification, username)
82 missedTerms = [term for term in totalTerms if term not in attendedTerms]
83 return missedTerms
86def getCertRequirementsWithCompletion(*, certification, username):
87 """
88 Differentiate between simple requirements and requirements completion checking.
89 """
90 return getCertRequirements(certification, username, reqCheck=True)
92def getCertRequirements(certification=None, username=None, reqCheck=False):
93 """
94 Return the requirements for all certifications, or for one if requested.
96 Keyword arguments:
97 certification -- The id or object for a certification to request
98 username -- The username to check for completion
100 Returns:
101 A list of dictionaries with all certification data and requirements. If `certification`
102 is given, returns only a list of requirement objects for the given certification. If
103 `username` is given, the requirement objects have a `completed` attribute.
104 """
105 reqList = (Certification.select(Certification, CertificationRequirement)
106 .join(CertificationRequirement, JOIN.LEFT_OUTER, attr="requirement")
107 .order_by(Certification.id, CertificationRequirement.order.asc(nulls="LAST")))
108 if certification:
109 if username:
110 # I don't know how to add something to a select, so we have to recreate the whole query :(
111 completedCase = Case(None, ((EventParticipant.user_id.is_null(True), 0),), 1)
112 reqList = (Certification
113 .select(Certification, CertificationRequirement, completedCase.alias("completed"))
114 .join(CertificationRequirement, JOIN.LEFT_OUTER, attr="requirement")
115 .join(RequirementMatch, JOIN.LEFT_OUTER)
116 .join(EventParticipant, JOIN.LEFT_OUTER, on=(RequirementMatch.event == EventParticipant.event) & (EventParticipant.user == username))
117 .order_by(Certification.id, CertificationRequirement.order.asc(nulls="LAST")))
118 # we have to add the is not null check so that `cert.requirement` always exists
119 reqList = reqList.where(Certification.id == certification, CertificationRequirement.id.is_null(False))
120 certificationList = []
121 for cert in reqList:
122 if username:
123 cert.requirement.completed = bool(cert.__dict__['completed'])
124 # this is to get the calculation when it comes to events with term, twice, annual as their frequency
125 cert.requirement.attendedTerms = len(termsAttended(cert.requirement.id, username))
126 cert.requirement.attendedDescriptions = termsAttended(cert.requirement.id, username)
127 if cert.requirement.frequency == "term":
128 cert.requirement.missedTerms = len(termsMissed(cert.requirement.id, username))
129 cert.requirement.missedDescriptions = termsMissed(cert.requirement.id, username)
130 cert.requirement.totalTerms = len(termsInTotal(username))
131 elif cert.requirement.frequency == "annual":
132 totalTerms = len(termsInTotal(username))
133 cert.requirement.attendedAnnual = len(termsAttended(cert.requirement.id, username))
134 cert.requirement.totalAnnual = int(math.floor(totalTerms/2+0.5)) if totalTerms % 2 == 1 else totalTerms/2
135 elif cert.requirement.frequency == "once" and cert.requirement.completed:
136 term_record = (RequirementMatch
137 .select(RequirementMatch, Event, Term)
138 .join(Event)
139 .join(Term)
140 .where(RequirementMatch.requirement == cert.requirement.id)
141 .order_by(Term.year.desc()) # latest term first
142 .first()
143 )
144 cert.requirement.attendedTerm = term_record.event.term.description
145 certificationList.append(cert.requirement)
147 # the .distinct() doesn't work efficiently, so we have to manually go through the list and removed duplicates that exist
148 validCertification = set()
149 certificationIndex = 0
150 uniqueCertification = []
152 for cert in certificationList:
153 req = certificationList[certificationIndex]
154 if req not in validCertification:
155 validCertification.add(req)
156 uniqueCertification.append(req)
157 # Override incomplete requirement when a completed 'once' requirement is found when removing duplicates
158 elif reqCheck and req.frequency == "once" and req.completed:
159 for i in range(len(uniqueCertification)):
160 if uniqueCertification[i].id == req.id and not uniqueCertification[i].completed:
161 uniqueCertification[i] = req
162 validCertification.add(req)
163 certificationIndex += 1
164 certificationList = uniqueCertification
165 return certificationList
166 certificationDict = {}
167 for cert in reqList:
168 if cert.id not in certificationDict.keys():
169 certificationDict[cert.id] = {"data": cert, "requirements": []}
170 if getattr(cert, 'requirement', None):
171 certificationDict[cert.id]["requirements"].append(cert.requirement)
172 return certificationDict
174def updateCertRequirements(certId, newRequirements):
175 """
176 Update the certification requirements in the database to match the provided list of requirement data.
178 The order of the list matters. Any ids that are in the database and not in `newRequirements` will be
179 removed. IDs that do not exist in the database will be created (and given a new, auto-generated ID).
181 Arguments:
182 certId - The id of the certification whose requirements we are updating
183 newRequirements - a list of dictionaries. Each dictionary needs 'id', 'required', 'frequency', and 'name'.
185 Returns:
186 A list of CertificationRequirement objects corresponding to the given `newRequirements` list.
187 """
188 # check for missing ids to remove
189 saveIds = [requirementData['id'] for requirementData in newRequirements]
190 CertificationRequirement.delete().where(CertificationRequirement.certification_id == certId, CertificationRequirement.id.not_in(saveIds)).execute()
192 # update existing and add new requirements
193 requirements = []
194 for order, requirementData in enumerate(newRequirements):
195 try:
196 newRequirement = CertificationRequirement.get_by_id(requirementData['id'])
197 except DoesNotExist:
198 newRequirement = CertificationRequirement()
200 newRequirement.certification = certId
201 newRequirement.isRequired = bool(requirementData['required'])
202 newRequirement.frequency = requirementData['frequency']
203 newRequirement.name = requirementData['name']
204 newRequirement.order = order
205 newRequirement.save()
207 requirements.append(newRequirement)
209 return requirements
211def updateCertRequirementForEvent(event, requirement):
212 """
213 Add a certification requirement to an event.
214 Replaces the requirement for an event if the event already exists.
216 Arguments:
217 event - an Event object or id
218 requirement - a CertificationRequirement object or id
219 """
220 # delete existing matches for our event
221 for match in RequirementMatch.select().where(RequirementMatch.event == event):
222 match.delete_instance()
224 RequirementMatch.create(event=event, requirement=requirement)