Coverage for app/logic/participants.py: 75%

125 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2026-03-06 19:06 +0000

1from flask import g 

2from peewee import fn, JOIN 

3from datetime import date 

4from app.models.user import User 

5from app.models.event import Event 

6from app.models.term import Term 

7from app.models.eventRsvp import EventRsvp 

8from app.models.program import Program 

9from app.models.eventParticipant import EventParticipant 

10from app.logic.users import isEligibleForProgram 

11from app.logic.volunteers import getEventLengthInHours 

12from app.logic.events import getEventRsvpCountsForTerm 

13from app.logic.createLogs import createRsvpLog 

14from collections import defaultdict 

15 

16def trainedParticipants(programID, targetTerm): 

17 """ 

18 This function tracks the users who have attended every Prerequisite 

19 event and adds them to a list that will not flag them when tracking hours. 

20 Returns a list of user objects who've completed all training events. 

21 """ 

22 

23 # Reset program eligibility each term for all other trainings 

24 isRelevantAllVolunteer = (Event.isAllVolunteerTraining) & (Event.term.academicYear == targetTerm.academicYear) 

25 isRelevantProgramTraining = (Event.program == programID) & (Event.term == targetTerm) & (Event.isTraining) 

26 allTrainings = (Event.select() 

27 .join(Term) 

28 .where(isRelevantAllVolunteer | isRelevantProgramTraining, 

29 Event.isCanceled == False)) 

30 

31 fullyTrainedUsers = (User.select() 

32 .join(EventParticipant) 

33 .where(EventParticipant.event.in_(allTrainings)) 

34 .group_by(EventParticipant.user) 

35 .having(fn.Count(EventParticipant.user) == len(allTrainings)).order_by(User.username)) 

36 

37 return list(fullyTrainedUsers) 

38 

39def addBnumberAsParticipant(bnumber, eventId): 

40 """Accepts scan input and signs in the user. If user exists or is already 

41 signed in will return user and login status""" 

42 try: 

43 kioskUser = User.get(User.bnumber == bnumber) 

44 except Exception as e: 

45 print(e) 

46 return None, "does not exist" 

47 

48 event = Event.get_by_id(eventId) 

49 if not isEligibleForProgram(event.program, kioskUser): 

50 userStatus = "banned" 

51 

52 elif checkUserVolunteer(kioskUser, event): 

53 userStatus = "already signed in" 

54 

55 else: 

56 # Non-RSVP and RSVP event handling 

57 userStatus = "success" 

58 if event.isRsvpRequired: 

59 # RSVP event: standard logic (RSVP before event, attend after) 

60 if event.isPastStart: 

61 totalHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate) 

62 EventParticipant.create(user=kioskUser, event=event, hoursEarned=totalHours) 

63 else: 

64 if not checkUserRsvp(kioskUser, event): 

65 currentRsvp = getEventRsvpCountsForTerm(event.term) 

66 waitlist = currentRsvp[event.id] >= event.rsvpLimit if event.rsvpLimit is not None else False 

67 EventRsvp.create(user=kioskUser, event=event, rsvpWaitlist=waitlist) 

68 targetList = "the waitlist" if waitlist else "the RSVP list" 

69 try: 

70 if g.current_user.username == kioskUser.username: 

71 createRsvpLog(event.id, f"{kioskUser.fullName} joined {targetList}.") 

72 else: 

73 createRsvpLog(event.id, f"Added {kioskUser.fullName} to {targetList}.") 

74 except Exception: 

75 pass 

76 else: 

77 # Non-RSVP event: scanner entry ALWAYS marks as attended regardless of timing 

78 totalHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate) 

79 EventParticipant.create(user=kioskUser, event=event, hoursEarned=totalHours) 

80 

81 return kioskUser, userStatus 

82 

83def checkUserRsvp(user, event): 

84 return EventRsvp.select().where(EventRsvp.user==user, EventRsvp.event == event).exists() 

85 

86def checkUserVolunteer(user, event): 

87 return EventParticipant.select().where(EventParticipant.user == user, EventParticipant.event == event).exists() 

88 

89def addPersonToEvent(user, event): 

90 """ 

91 Add a user to an event. 

92 If the event is in the past, add the user as a volunteer (EventParticipant) including hours worked. 

93 If the event is in the future, rsvp for the user (EventRsvp) 

94 

95 Returns True if the operation was successful, false otherwise 

96 """ 

97 try: 

98 volunteerExists = checkUserVolunteer(user, event) 

99 rsvpExists = checkUserRsvp(user, event) 

100 

101 if event.isRsvpRequired: 

102 # RSVP event logic 

103 if event.isPastStart: 

104 if not volunteerExists: 

105 eventHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate) 

106 EventParticipant.create(user = user, event = event, hoursEarned = eventHours) 

107 else: 

108 if not rsvpExists: 

109 currentRsvp = getEventRsvpCountsForTerm(event.term) 

110 waitlist = currentRsvp[event.id] >= event.rsvpLimit if event.rsvpLimit is not None else 0 

111 EventRsvp.create(user = user, event = event, rsvpWaitlist = waitlist) 

112 targetList = "the waitlist" if waitlist else "the RSVP list" 

113 if g.current_user.username == user.username: 

114 createRsvpLog(event.id, f"{user.fullName} joined {targetList}.") 

115 else: 

116 createRsvpLog(event.id, f"Added {user.fullName} to {targetList}.") 

117 else: 

118 # Non-RSVP event logic 

119 if event.isPastStart: 

120 # After event: create EventParticipant (attended) 

121 if not volunteerExists: 

122 eventHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate) 

123 EventParticipant.create(user = user, event = event, hoursEarned = eventHours) 

124 else: 

125 # Before event: create EventRsvp (invited status) 

126 if not rsvpExists: 

127 EventRsvp.create(user = user, event = event, rsvpWaitlist = False) 

128 

129 if volunteerExists or rsvpExists: 

130 return "already in" 

131 except Exception as e: 

132 print(e) 

133 return False 

134 

135 return True 

136 

137def unattendedRequiredEvents(program, user): 

138 

139 # Check for events that are prerequisite for program 

140 requiredEvents = (Event.select(Event) 

141 .where(Event.isTraining == True, Event.program == program)) 

142 

143 if requiredEvents: 

144 attendedRequiredEventsList = [] 

145 for event in requiredEvents: 

146 attendedRequirement = (EventParticipant.select().where(EventParticipant.user == user, EventParticipant.event == event)) 

147 if not attendedRequirement: 

148 attendedRequiredEventsList.append(event.name) 

149 if attendedRequiredEventsList is not None: 

150 return attendedRequiredEventsList 

151 else: 

152 return [] 

153 

154 

155def getEventParticipants(event): 

156 eventParticipants = (EventParticipant.select(EventParticipant, User) 

157 .join(User) 

158 .where(EventParticipant.event == event)) 

159 

160 return [p for p in eventParticipants] 

161 

162def getParticipationStatusForTrainings(program, userList, term): 

163 """ 

164 This function returns a dictionary of all trainings for a program and 

165 whether the current user participated in them. 

166 

167 :returns: trainings for program and if the user participated 

168 """ 

169 isRelevantTraining = ((Event.isAllVolunteerTraining | ((Event.isTraining) & (Event.program == program))) & 

170 (Event.term.academicYear == term.academicYear)) 

171 programTrainings = (Event.select(Event, Term, EventParticipant, EventRsvp) 

172 .join(EventParticipant, JOIN.LEFT_OUTER).switch() 

173 .join(EventRsvp, JOIN.LEFT_OUTER).switch() 

174 .join(Term) 

175 .where(isRelevantTraining, (Event.isCanceled != True)).order_by(Event.startDate)) 

176 

177 # Create a dictionary where the keys are trainings and values are a set of those who attended 

178 trainingData = defaultdict(set) 

179 for training in programTrainings: 

180 try: 

181 if training.isPastStart: 

182 trainingData[training].add(training.eventparticipant.user_id) 

183 else: # The training has yet to happen 

184 trainingData[training].add(training.eventrsvp.user_id) 

185 except AttributeError: 

186 pass 

187 # Create a dictionary binding usernames to a list of [training, hasAttended] pairs. The tuples consist of the training (event object) and whether or not they attended it (bool) 

188 

189 # Necessarily complex algorithm to merge the attendances of trainings which have the same name 

190 # Structure of userParticipationStatus for a single user: 

191 # {user.username: {training1.name: [EventObject, hasAttended], training2.name: [EventObject, hasAttended]}, ...} 

192 userParticipationStatus = {user.username: {} for user in userList} 

193 for training, attendeeList in trainingData.items(): 

194 for user in userList: 

195 if training.name not in userParticipationStatus[user.username] or user.username in attendeeList: 

196 userParticipationStatus[user.username][training.name] = [training, user.username in attendeeList] 

197 

198 return {user.username: list(userParticipationStatus[user.username].values()) for user in userList} 

199 

200 

201def sortParticipantsByStatus(event): 

202 """ 

203 Takes in an event object, queries all participants, and then filters those 

204 participants by their attendee status. 

205 

206 return: a list of participants who didn't attend, a list of participants who are waitlisted, 

207 a list of participants who attended, and a list of all participants who have some status for the  

208 event. 

209 """ 

210 eventParticipants = getEventParticipants(event) 

211 

212 # get all RSVPs for event and filter out those that did not attend into separate list 

213 eventRsvpData = list(EventRsvp.select(EventRsvp, User).join(User).where(EventRsvp.event==event).order_by(EventRsvp.rsvpTime)) 

214 eventNonAttendedData = [rsvp for rsvp in eventRsvpData if rsvp.user not in eventParticipants] 

215 

216 if event.isPastStart: 

217 eventVolunteerData = eventParticipants 

218 

219 # if the event date has passed disregard the waitlist 

220 eventWaitlistData = [] 

221 else: 

222 # if rsvp is required for the event, grab all volunteers that are in the waitlist 

223 eventWaitlistData = [volunteer for volunteer in (eventParticipants + eventRsvpData) if volunteer.rsvpWaitlist and event.isRsvpRequired] 

224 

225 # put all participants and non-waitlisted RSVPs into the volunteer data 

226 eventVolunteerData = [volunteer for volunteer in (eventParticipants + eventNonAttendedData) if volunteer not in eventWaitlistData] 

227 eventNonAttendedData = [] 

228 

229 return eventNonAttendedData, eventWaitlistData, eventVolunteerData, eventParticipants