Coverage for app/logic/certification.py: 96%

91 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2025-05-02 15:35 +0000

1from peewee import JOIN, DoesNotExist, Case 

2from flask import g 

3from app.models.event import Event 

4from app.models.term import Term 

5from app.models.certification import Certification 

6from app.models.certificationRequirement import CertificationRequirement 

7from app.models.requirementMatch import RequirementMatch 

8from app.models.eventParticipant import EventParticipant 

9from app.models.user import User 

10 

11def termsAttended(certification=None, username=None): 

12 ''' 

13 Retrieve terms attended by a user for certification and filter them based on frequency of a term 

14 ''' 

15 attendedTerms = [] 

16 if username: 

17 attendance = (RequirementMatch.select() 

18 .join(EventParticipant, JOIN.LEFT_OUTER, on=(RequirementMatch.event == EventParticipant.event)) 

19 .where(RequirementMatch.requirement_id == certification) 

20 .where(EventParticipant.user == username)) 

21 for termRecord in range(len(attendance)): 

22 attendedTerms.append(attendance[termRecord].event.term.description) 

23 return attendedTerms 

24 

25def termsMissed(certification=None, username=None): 

26 ''' 

27 Calculate how many certification-eligible terms a student has missed based on their class level 

28 and attendance record. 

29 

30 Logic: 

31 - Each class level is expected to participate in 2 terms per year. 

32 - If the user is currently in their final spring term (e.g., Spring of senior year),  

33 they are expected to have completed all terms: missedTerms = (level + 1) * 2. 

34 - Otherwise, assume they’ve had one fewer term to attend: missedTerms = ((level + 1) * 2) - 1. 

35 - If the user's classification is None, assume just 1 expected term. 

36 - Subtract the number of terms the student has attended from the expected total to get the missed count. 

37 ''' 

38 classLevel = ["Freshman", "Sophomore", "Junior", "Senior"] 

39 currentTerm = g.current_term 

40 currentDescription = currentTerm.description 

41 

42 # looking into a scenario where the current term is summer so that we can reassigned the current term variable to the next term  

43 if currentTerm.isSummer == True: 

44 currentDescription = f'Fall {currentTerm.year}' 

45 currentTerm = Term.select(Term).where(Term.description == currentDescription).get() 

46 else: 

47 currentDescription = currentTerm.description 

48 

49 for level in range(4): 

50 user = User.select().where(User.username == username).get() 

51 if user.rawClassLevel == classLevel[level] and currentDescription == f'Spring {currentTerm.year}': 

52 missedTerms = (level + 1) * 2 

53 elif user.rawClassLevel == classLevel[level]: 

54 missedTerms = ((level + 1) * 2) - 1 

55 elif str(user.rawClassLevel) == "None": 

56 missedTerms = 1 

57 

58 attendedTerms = termsAttended(certification, username) 

59 missedTerms = missedTerms - len(attendedTerms) 

60 

61 return missedTerms 

62 

63def getCertRequirementsWithCompletion(*, certification, username): 

64 """ 

65 Differentiate between simple requirements and requirements completion checking. 

66 """ 

67 return getCertRequirements(certification, username) 

68 

69def getCertRequirements(certification=None, username=None): 

70 """ 

71 Return the requirements for all certifications, or for one if requested. 

72 

73 Keyword arguments: 

74 certification -- The id or object for a certification to request 

75 username -- The username to check for completion 

76 

77 Returns: 

78 A list of dictionaries with all certification data and requirements. If `certification` 

79 is given, returns only a list of requirement objects for the given certification. If  

80 `username` is given, the requirement objects have a `completed` attribute. 

81 """ 

82 reqList = (Certification.select(Certification, CertificationRequirement) 

83 .join(CertificationRequirement, JOIN.LEFT_OUTER, attr="requirement") 

84 .order_by(Certification.id, CertificationRequirement.order.asc(nulls="LAST"))) 

85 if certification: 

86 if username: 

87 # I don't know how to add something to a select, so we have to recreate the whole query :( 

88 completedCase = Case(None, ((EventParticipant.user_id.is_null(True), 0),), 1) 

89 reqList = (Certification 

90 .select(Certification, CertificationRequirement, completedCase.alias("completed")) 

91 .join(CertificationRequirement, JOIN.LEFT_OUTER, attr="requirement") 

92 .join(RequirementMatch, JOIN.LEFT_OUTER) 

93 .join(EventParticipant, JOIN.LEFT_OUTER, on=(RequirementMatch.event == EventParticipant.event)) 

94 .where(EventParticipant.user.is_null(True) | (EventParticipant.user == username)) 

95 .order_by(Certification.id, CertificationRequirement.order.asc(nulls="LAST"))) 

96 

97 # we have to add the is not null check so that `cert.requirement` always exists 

98 reqList = reqList.where(Certification.id == certification, CertificationRequirement.id.is_null(False)) 

99 certificationList = [] 

100 for cert in reqList: 

101 if username: 

102 cert.requirement.completed = bool(cert.__dict__['completed']) 

103 # this is to get the calculation when it comes to events with term as their frequency 

104 if cert.requirement.frequency == "term": 

105 cert.requirement.missedTerms = termsMissed(cert.requirement.id, username) 

106 cert.requirement.attendedTerms = len(termsAttended(cert.requirement.id, username)) 

107 cert.requirement.attendedDescriptions = termsAttended(cert.requirement.id, username) 

108 certificationList.append(cert.requirement) 

109 

110 # the .distinct() doesn't work efficiently, so we have to manually go through the list and removed duplicates that exist 

111 validCertification = set() 

112 certificationIndex = 0 

113 uniqueCertification = [] 

114 

115 for cert in certificationList: 

116 if certificationList[certificationIndex] not in validCertification: 

117 validCertification.add(certificationList[certificationIndex]) 

118 uniqueCertification.append(certificationList[certificationIndex]) 

119 

120 certificationIndex += 1 

121 

122 certificationList = uniqueCertification 

123 

124 return certificationList 

125 

126 certificationDict = {} 

127 for cert in reqList: 

128 if cert.id not in certificationDict.keys(): 

129 certificationDict[cert.id] = {"data": cert, "requirements": []} 

130 if getattr(cert, 'requirement', None): 

131 certificationDict[cert.id]["requirements"].append(cert.requirement) 

132 

133 return certificationDict 

134 

135def updateCertRequirements(certId, newRequirements): 

136 """ 

137 Update the certification requirements in the database to match the provided list of requirement data. 

138 

139 The order of the list matters. Any ids that are in the database and not in `newRequirements` will be  

140 removed. IDs that do not exist in the database will be created (and given a new, auto-generated ID). 

141 

142 Arguments: 

143 certId - The id of the certification whose requirements we are updating 

144 newRequirements - a list of dictionaries. Each dictionary needs 'id', 'required', 'frequency', and 'name'. 

145 

146 Returns: 

147 A list of CertificationRequirement objects corresponding to the given `newRequirements` list. 

148 """ 

149 # check for missing ids to remove 

150 saveIds = [requirementData['id'] for requirementData in newRequirements] 

151 CertificationRequirement.delete().where(CertificationRequirement.certification_id == certId, CertificationRequirement.id.not_in(saveIds)).execute() 

152 

153 # update existing and add new requirements 

154 requirements = [] 

155 for order, requirementData in enumerate(newRequirements): 

156 try: 

157 newRequirement = CertificationRequirement.get_by_id(requirementData['id']) 

158 except DoesNotExist: 

159 newRequirement = CertificationRequirement() 

160 

161 newRequirement.certification = certId 

162 newRequirement.isRequired = bool(requirementData['required']) 

163 newRequirement.frequency = requirementData['frequency'] 

164 newRequirement.name = requirementData['name'] 

165 newRequirement.order = order 

166 newRequirement.save() 

167 

168 requirements.append(newRequirement) 

169 

170 return requirements 

171 

172def updateCertRequirementForEvent(event, requirement): 

173 """ 

174 Add a certification requirement to an event.  

175 Replaces the requirement for an event if the event already exists. 

176 

177 Arguments: 

178 event - an Event object or id 

179 requirement - a CertificationRequirement object or id 

180 """ 

181 # delete existing matches for our event 

182 for match in RequirementMatch.select().where(RequirementMatch.event == event): 

183 match.delete_instance() 

184 

185 RequirementMatch.create(event=event, requirement=requirement)