Coverage for app/logic/certification.py: 56%

62 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-28 18:58 +0000

1from flask import abort 

2from peewee import JOIN, DoesNotExist, Case 

3 

4from app.models.certification import Certification 

5from app.models.certificationRequirement import CertificationRequirement 

6from app.models.requirementMatch import RequirementMatch 

7from app.models.eventParticipant import EventParticipant 

8 

9def getCertRequirementsWithCompletion(*, certification, username): 

10 """ 

11 Function to differentiate between simple requirements and requirements completion checking. 

12 See: `getCertRequirements` 

13 """ 

14 return getCertRequirements(certification, username) 

15 

16def getCertRequirements(certification=None, username=None): 

17 """ 

18 Return the requirements for all certifications, or for one if requested. 

19 

20 Keyword arguments: 

21 certification -- The id or object for a certification to request 

22 username -- The username to check for completion 

23 

24 Returns: 

25 A list of dictionaries with all certification data and requirements. If `certification` 

26 is given, returns only a list of requirement objects for the given certification. If  

27 `username` is given, the requirement objects have a `completed` attribute. 

28 """ 

29 reqList = (Certification.select(Certification, CertificationRequirement) 

30 .join(CertificationRequirement, JOIN.LEFT_OUTER, attr="requirement") 

31 .order_by(Certification.id, CertificationRequirement.order.asc(nulls="LAST"))) 

32 

33 if certification: 

34 if username: 

35 completedCase = Case(None, ((EventParticipant.user_id.is_null(True), 0),), 1) 

36 reqList = (Certification 

37 .select(Certification, CertificationRequirement, completedCase.alias("completed")) 

38 .join(CertificationRequirement, JOIN.LEFT_OUTER, attr="requirement") 

39 .join(RequirementMatch, JOIN.LEFT_OUTER) 

40 .join(EventParticipant, JOIN.LEFT_OUTER, on=(RequirementMatch.event == EventParticipant.event)) 

41 .where(EventParticipant.user.is_null(True) | (EventParticipant.user == username)) 

42 .order_by(Certification.id, CertificationRequirement.order.asc(nulls="LAST"))) 

43 reqList = reqList.where(Certification.id == certification, CertificationRequirement.id.is_null(False)) 

44 reqList = reqList.distinct() 

45 

46 certs = [] 

47 for cert in reqList: 

48 if username: 

49 cert.requirement.completed = bool(cert.__dict__['completed']) 

50 certs.append(cert.requirement) 

51 return certs 

52 

53 certs = {} 

54 for cert in reqList: 

55 if cert.id not in certs.keys(): 

56 certs[cert.id] = {"data": cert, "requirements": []} 

57 

58 if getattr(cert, 'requirement', None): 

59 certs[cert.id]["requirements"].append(cert.requirement) 

60 

61 return certs 

62 

63def deleteRequirement(rowID): 

64 try: 

65 req_id = CertificationRequirement.get_by_id(rowID) 

66 req_id.delete_instance() 

67 return True 

68 except DoesNotExist: 

69 return False 

70 

71def updateCertRequirements(newRequirements, certId=Certification.BONNER): 

72 """ 

73 Update the certification requirements in the database to match the provided list of requirement data. 

74 

75 The order of the list matters. Any ids that are in the database and not in `newRequirements` will be  

76 removed. IDs that do not exist in the database will be created (and given a new, auto-generated ID). 

77 

78 Arguments: 

79 certId - The id of the certification whose requirements we are updating 

80 newRequirements - a dictionary of dictionaries. Each dictionary needs 'id', 'required', 'frequency', and 'name'. 

81 

82 Returns: 

83 A list of CertificationRequirement objects corresponding to the given `newRequirements` list. 

84 """ 

85 # update existing and add new requirements 

86 

87 if newRequirements.get("save-new",None) is not None: 

88 newRequirement = CertificationRequirement() 

89 actualRequirements = newRequirements['save-new'] 

90 newRequirement.certification = certId 

91 newRequirement.isRequired = bool(actualRequirements.get('required', False)) 

92 newRequirement.frequency = actualRequirements['frequency'] 

93 newRequirement.name =actualRequirements['name'] 

94 newRequirement.save() 

95 return True 

96 

97 else: 

98 certKey = list(newRequirements.keys())[0] 

99 try: 

100 certRequirement = CertificationRequirement.get(CertificationRequirement.id == certKey) 

101 except DoesNotExist: 

102 abort(403) 

103 

104 requirement_info = newRequirements[certKey] 

105 certRequirement.certification = certId 

106 certRequirement.isRequired = bool(requirement_info.get('required', False)) 

107 certRequirement.frequency = requirement_info['frequency'] 

108 certRequirement.name =requirement_info['name'] 

109 certRequirement.save() 

110 return True 

111 

112def updateCertRequirementForEvent(event, requirement): 

113 """ 

114 Add a certification requirement to an event.  

115 Replaces the requirement for an event if the event already exists. 

116 

117 Arguments: 

118 event - an Event object or id 

119 requirement - a CertificationRequirement object or id 

120 """ 

121 # delete existing matches for our event 

122 for match in RequirementMatch.select().where(RequirementMatch.event == event): 

123 match.delete_instance() 

124 

125 RequirementMatch.create(event=event, requirement=requirement)