Coverage for app/logic/participants.py: 73%

135 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2026-04-29 19:36 +0000

1from flask import g 

2from peewee import fn, JOIN 

3from datetime import date 

4from app.models.user import User 

5from app.models.event import Event 

6from app.models.term import Term 

7from app.models.eventRsvp import EventRsvp 

8from app.models.program import Program 

9from app.models.eventParticipant import EventParticipant 

10from app.logic.users import isEligibleForProgram 

11from app.logic.volunteers import getEventLengthInHours 

12from app.logic.events import getEventRsvpCountsForTerm 

13from app.logic.createLogs import createRsvpLog 

14from collections import defaultdict 

15 

16def trainedParticipants(programID, targetTerm): 

17 """ 

18 This function tracks the users who have attended every Prerequisite 

19 event and adds them to a list that will not flag them when tracking hours. 

20 Returns a list of user objects who've completed all training events. 

21 """ 

22 

23 # Reset program eligibility each term for all other trainings 

24 isRelevantAllVolunteer = (Event.isAllVolunteerTraining) & (Event.term.academicYear == targetTerm.academicYear) 

25 isRelevantProgramTraining = (Event.program == programID) & (Event.term == targetTerm) & (Event.isTraining) 

26 allTrainings = (Event.select() 

27 .join(Term) 

28 .where(isRelevantAllVolunteer | isRelevantProgramTraining, 

29 Event.isCanceled == False)) 

30 

31 fullyTrainedUsers = (User.select() 

32 .join(EventParticipant) 

33 .where(EventParticipant.event.in_(allTrainings)) 

34 .group_by(EventParticipant.user) 

35 .having(fn.Count(EventParticipant.user) == len(allTrainings)).order_by(User.username)) 

36 

37 return list(fullyTrainedUsers) 

38 

39def addBnumberAsParticipant(bnumber, eventId): 

40 """Accepts scan input and signs in the user. If user exists or is already 

41 signed in will return user and login status""" 

42 try: 

43 kioskUser = User.get(User.bnumber == bnumber) 

44 except Exception as e: 

45 print(e) 

46 return None, "does not exist" 

47 

48 event = Event.get_by_id(eventId) 

49 if not isEligibleForProgram(event.program, kioskUser): 

50 userStatus = "banned" 

51 

52 elif checkUserVolunteer(kioskUser, event): 

53 userStatus = "already signed in" 

54 

55 else: 

56 # Non-RSVP and RSVP event handling 

57 userStatus = "success" 

58 if event.isRsvpRequired: 

59 # RSVP event: standard logic (RSVP before event, attend after) 

60 if event.isPastStart: 

61 totalHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate) 

62 EventParticipant.create(user=kioskUser, event=event, hoursEarned=totalHours) 

63 else: 

64 if not checkUserRsvp(kioskUser, event): 

65 currentRsvp = getEventRsvpCountsForTerm(event.term) 

66 waitlist = currentRsvp[event.id] >= event.rsvpLimit if event.rsvpLimit is not None else False 

67 EventRsvp.create(user=kioskUser, event=event, rsvpWaitlist=waitlist) 

68 targetList = getTargetList(event, waitlist) 

69 try: 

70 if g.current_user.username == kioskUser.username: 

71 createRsvpLog(event.id, f"{kioskUser.fullName} joined {targetList}.") 

72 else: 

73 createRsvpLog(event.id, f"Added {kioskUser.fullName} to {targetList}.") 

74 except Exception: 

75 pass 

76 else: 

77 # Non-RSVP event: scanner entry ALWAYS marks as attended regardless of timing 

78 totalHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate) 

79 EventParticipant.create(user=kioskUser, event=event, hoursEarned=totalHours) 

80 

81 return kioskUser, userStatus 

82 

83def checkUserRsvp(user, event): 

84 return EventRsvp.select().where(EventRsvp.user==user, EventRsvp.event == event).exists() 

85 

86def checkUserVolunteer(user, event): 

87 return EventParticipant.select().where(EventParticipant.user == user, EventParticipant.event == event).exists() 

88 

89def getTargetList(event, waitlist=False): 

90 return "the waitlist" if waitlist else "the Invited list" if not event.isRsvpRequired else "the RSVP list" 

91 

92def addPersonToEvent(user, event): 

93 """ 

94 Add a user to an event. 

95 If the event is in the past, add the user as a volunteer (EventParticipant) including hours worked. 

96 If the event is in the future, rsvp for the user (EventRsvp) 

97 

98 Returns True if the operation was successful, false otherwise 

99 """ 

100 try: 

101 volunteerExists = checkUserVolunteer(user, event) 

102 rsvpExists = checkUserRsvp(user, event) 

103 

104 if event.isRsvpRequired: 

105 # RSVP event logic 

106 if event.isPastStart: 

107 if not volunteerExists: 

108 eventHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate) 

109 EventParticipant.create(user = user, event = event, hoursEarned = eventHours) 

110 try: 

111 createRsvpLog(event.id, f"Marked {user.fullName} as attended.") 

112 except Exception: 

113 pass 

114 else: 

115 if not rsvpExists: 

116 currentRsvp = getEventRsvpCountsForTerm(event.term) 

117 waitlist = currentRsvp[event.id] >= event.rsvpLimit if event.rsvpLimit is not None else 0 

118 EventRsvp.create(user = user, event = event, rsvpWaitlist = waitlist) 

119 targetList = "the waitlist" if waitlist else "the RSVP list" 

120 if g.current_user.username == user.username: 

121 createRsvpLog(event.id, f"{user.fullName} joined {targetList}.") 

122 else: 

123 createRsvpLog(event.id, f"Added {user.fullName} to {targetList}.") 

124 else: 

125 # Non-RSVP event logic 

126 if event.isPastStart: 

127 # After event: create EventParticipant (attended) 

128 if not volunteerExists: 

129 eventHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate) 

130 EventParticipant.create(user = user, event = event, hoursEarned = eventHours) 

131 try: 

132 createRsvpLog(event.id, f"Marked {user.fullName} as attended.") 

133 except Exception: 

134 pass 

135 else: 

136 # Before event: create EventRsvp (invited status) 

137 if not rsvpExists: 

138 EventRsvp.create(user = user, event = event, rsvpWaitlist = False) 

139 

140 if volunteerExists or rsvpExists: 

141 return "already in" 

142 except Exception as e: 

143 print(e) 

144 return False 

145 

146 return True 

147 

148def unattendedRequiredEvents(program, user): 

149 

150 # Check for events that are prerequisite for program 

151 requiredEvents = (Event.select(Event) 

152 .where(Event.isTraining == True, Event.program == program)) 

153 

154 if requiredEvents: 

155 attendedRequiredEventsList = [] 

156 for event in requiredEvents: 

157 attendedRequirement = (EventParticipant.select().where(EventParticipant.user == user, EventParticipant.event == event)) 

158 if not attendedRequirement: 

159 attendedRequiredEventsList.append(event.name) 

160 if attendedRequiredEventsList is not None: 

161 return attendedRequiredEventsList 

162 else: 

163 return [] 

164 

165 

166def getEventParticipants(event): 

167 eventParticipants = (EventParticipant.select(EventParticipant, User) 

168 .join(User) 

169 .where(EventParticipant.event == event)) 

170 

171 return [p for p in eventParticipants] 

172 

173def getParticipationStatusForTrainings(program, userList, term): 

174 """ 

175 This function returns a dictionary of all trainings for a program and 

176 whether the current user participated in them. 

177 

178 :returns: trainings for program and if the user participated 

179 """ 

180 isRelevantTraining = ((Event.isAllVolunteerTraining | ((Event.isTraining) & (Event.program == program))) & 

181 (Event.term.academicYear == term.academicYear)) 

182 programTrainings = (Event.select(Event, Term, EventParticipant, EventRsvp) 

183 .join(EventParticipant, JOIN.LEFT_OUTER).switch() 

184 .join(EventRsvp, JOIN.LEFT_OUTER).switch() 

185 .join(Term) 

186 .where(isRelevantTraining, (Event.isCanceled != True)).order_by(Event.startDate)) 

187 

188 # Create a dictionary where the keys are trainings and values are a set of those who attended 

189 trainingData = defaultdict(set) 

190 for training in programTrainings: 

191 try: 

192 if training.isPastStart: 

193 trainingData[training].add(training.eventparticipant.user_id) 

194 else: # The training has yet to happen 

195 trainingData[training].add(training.eventrsvp.user_id) 

196 except AttributeError: 

197 pass 

198 # Create a dictionary binding usernames to a list of [training, hasAttended] pairs. The tuples consist of the training (event object) and whether or not they attended it (bool) 

199 

200 # Necessarily complex algorithm to merge the attendances of trainings which have the same name 

201 # Structure of userParticipationStatus for a single user: 

202 # {user.username: {training1.name: [EventObject, hasAttended], training2.name: [EventObject, hasAttended]}, ...} 

203 userParticipationStatus = {user.username: {} for user in userList} 

204 for training, attendeeList in trainingData.items(): 

205 for user in userList: 

206 if training.name not in userParticipationStatus[user.username] or user.username in attendeeList: 

207 userParticipationStatus[user.username][training.name] = [training, user.username in attendeeList] 

208 

209 return {user.username: list(userParticipationStatus[user.username].values()) for user in userList} 

210 

211 

212def sortParticipantsByStatus(event): 

213 """ 

214 Takes in an event object, queries all participants, and then filters those 

215 participants by their attendee status. 

216 

217 return: a list of participants who didn't attend, a list of participants who are waitlisted, 

218 a list of participants who attended, and a list of all participants who have some status for the  

219 event. 

220 """ 

221 eventParticipants = getEventParticipants(event) 

222 

223 # get all RSVPs for event and filter out those that did not attend into separate list 

224 eventRsvpData = list(EventRsvp.select(EventRsvp, User).join(User).where(EventRsvp.event==event).order_by(EventRsvp.rsvpTime)) 

225 eventNonAttendedData = [rsvp for rsvp in eventRsvpData if rsvp.user not in eventParticipants] 

226 

227 if event.isPastStart: 

228 eventVolunteerData = eventParticipants 

229 

230 # if the event date has passed disregard the waitlist 

231 eventWaitlistData = [] 

232 else: 

233 # if rsvp is required for the event, grab all volunteers that are in the waitlist 

234 eventWaitlistData = [volunteer for volunteer in (eventParticipants + eventRsvpData) if volunteer.rsvpWaitlist and event.isRsvpRequired] 

235 

236 # put all participants and non-waitlisted RSVPs into the volunteer data 

237 eventVolunteerData = [volunteer for volunteer in (eventParticipants + eventNonAttendedData) if volunteer not in eventWaitlistData] 

238 eventNonAttendedData = [] 

239 

240 return eventNonAttendedData, eventWaitlistData, eventVolunteerData, eventParticipants