Coverage for app/logic/participants.py: 80%

173 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-09-02 19:10 +0000

1from flask import g 

2from peewee import fn, JOIN 

3from datetime import date 

4from app.models.user import User 

5from app.models.event import Event 

6from app.models.term import Term 

7from app.models.eventRsvp import EventRsvp 

8from app.models.program import Program 

9from app.models.eventParticipant import EventParticipant 

10from app.logic.users import isEligibleForProgram 

11from app.logic.sharedLogic import getEventLengthInHours 

12from app.logic.events import getEventRsvpCountsForTerm 

13from app.logic.createLogs import createRsvpLog 

14from collections import defaultdict 

15from app.models.backgroundCheck import BackgroundCheck 

16from app.models.programManager import ProgramManager 

17from datetime import datetime, date 

18from app.logic.createLogs import createActivityLog 

19 

20# ---------------------- Volunteer Stuff ---------------------- 

21 

22def trainedParticipants(programID, targetTerm): 

23 """ 

24 This function tracks the users who have attended every Prerequisite 

25 event and adds them to a list that will not flag them when tracking hours. 

26 Returns a list of user objects who've completed all training events. 

27 """ 

28 

29 # Reset program eligibility each term for all other trainings 

30 isRelevantAllVolunteer = (Event.isAllVolunteerTraining) & (Event.term.academicYear == targetTerm.academicYear) 

31 isRelevantProgramTraining = (Event.program == programID) & (Event.term == targetTerm) & (Event.isTraining) 

32 allTrainings = (Event.select() 

33 .join(Term) 

34 .where(isRelevantAllVolunteer | isRelevantProgramTraining, 

35 Event.isCanceled == False)) 

36 

37 fullyTrainedUsers = (User.select() 

38 .join(EventParticipant) 

39 .where(EventParticipant.event.in_(allTrainings)) 

40 .group_by(EventParticipant.user) 

41 .having(fn.Count(EventParticipant.user) == len(allTrainings)).order_by(User.username)) 

42 

43 return list(fullyTrainedUsers) 

44 

45def checkUserRsvp(user, event): 

46 return EventRsvp.select().where(EventRsvp.user==user, EventRsvp.event == event).exists() 

47 

48def unattendedRequiredEvents(program, user): 

49 

50 # Check for events that are prerequisite for program 

51 requiredEvents = (Event.select(Event) 

52 .where(Event.isTraining == True, Event.program == program)) 

53 

54 if requiredEvents: 

55 attendedRequiredEventsList = [] 

56 for event in requiredEvents: 

57 attendedRequirement = (EventParticipant.select().where(EventParticipant.user == user, EventParticipant.event == event)) 

58 if not attendedRequirement: 

59 attendedRequiredEventsList.append(event.name) 

60 if attendedRequiredEventsList is not None: 

61 return attendedRequiredEventsList 

62 else: 

63 return [] 

64def getParticipationStatusForTrainings(program, userList, term): 

65 """ 

66 This function returns a dictionary of all trainings for a program and 

67 whether the current user participated in them. 

68 

69 :returns: trainings for program and if the user participated 

70 """ 

71 isRelevantTraining = ((Event.isAllVolunteerTraining | ((Event.isTraining) & (Event.program == program))) & 

72 (Event.term.academicYear == term.academicYear)) 

73 programTrainings = (Event.select(Event, Term, EventParticipant, EventRsvp) 

74 .join(EventParticipant, JOIN.LEFT_OUTER).switch() 

75 .join(EventRsvp, JOIN.LEFT_OUTER).switch() 

76 .join(Term) 

77 .where(isRelevantTraining, (Event.isCanceled != True)).order_by(Event.startDate)) 

78 

79 # Create a dictionary where the keys are trainings and values are a set of those who attended 

80 trainingData = defaultdict(set) 

81 for training in programTrainings: 

82 try: 

83 if training.isPastStart: 

84 trainingData[training].add(training.eventparticipant.user_id) 

85 else: # The training has yet to happen 

86 trainingData[training].add(training.eventrsvp.user_id) 

87 except AttributeError: 

88 pass 

89 # Create a dictionary binding usernames to a list of [training, hasAttended] pairs. The tuples consist of the training (event object) and whether or not they attended it (bool) 

90 

91 # Necessarily complex algorithm to merge the attendances of trainings which have the same name 

92 # Structure of userParticipationStatus for a single user: 

93 # {user.username: {training1.name: [EventObject, hasAttended], training2.name: [EventObject, hasAttended]}, ...} 

94 userParticipationStatus = {user.username: {} for user in userList} 

95 for training, attendeeList in trainingData.items(): 

96 for user in userList: 

97 if training.name not in userParticipationStatus[user.username] or user.username in attendeeList: 

98 userParticipationStatus[user.username][training.name] = [training, user.username in attendeeList] 

99 

100 return {user.username: list(userParticipationStatus[user.username].values()) for user in userList} 

101 

102def sortParticipants(event, isLabor): 

103 """ 

104 Takes in an event object, queries all participants, and then filters those 

105 participants by their attendee status. 

106 

107 return: a list of participants who didn't attend, a list of participants who are waitlisted, 

108 a list of participants who attended, and a list of all participants who have some status for the  

109 event. 

110 """ 

111 if not isLabor: 

112 eventVolunteers = getEventParticipants(event, False) 

113 

114 # get all RSVPs for event and filter out those that did not attend into separate list 

115 eventRsvpData = list(EventRsvp.select(EventRsvp, User).join(User).where(EventRsvp.event==event).order_by(EventRsvp.rsvpTime)) 

116 eventNonAttendedData = [rsvp for rsvp in eventRsvpData if rsvp.user not in eventVolunteers] 

117 

118 if event.isPastStart: 

119 eventVolunteerData = eventVolunteers 

120 

121 # if the event date has passed disregard the waitlist 

122 eventWaitlistData = [] 

123 else: 

124 # if rsvp is required for the event, grab all volunteers that are in the waitlist 

125 eventWaitlistData = [volunteer for volunteer in (eventVolunteers + eventRsvpData) if volunteer.rsvpWaitlist and event.isRsvpRequired] 

126 

127 # put the rest of the users that are not on the waitlist into the volunteer data 

128 eventVolunteerData = [volunteer for volunteer in eventNonAttendedData if volunteer not in eventWaitlistData] 

129 eventNonAttendedData = [] 

130 

131 return eventNonAttendedData, eventWaitlistData, eventVolunteerData, eventVolunteers 

132 else: 

133 eventLabor = getEventParticipants(event, True) 

134 

135 eventLaborData = eventLabor 

136 

137 return eventLaborData, eventLabor 

138 

139def updateEventVolunteers(participantData): 

140 """ 

141 Create new entry in event participant table if user does not exist. Otherwise, updates the record. 

142 

143 param: participantData- an ImmutableMultiDict that contains data from every row of the page along with the associated username. 

144 """ 

145 event = Event.get_or_none(Event.id==participantData['event']) 

146 if not event: 

147 raise Exception("Event does not exist.") 

148 

149 for username in participantData.getlist("username"): 

150 userObject = User.get_or_none(User.username==username) 

151 eventParticipant = EventParticipant.get_or_none(user=userObject, event=participantData['event']) 

152 if userObject: 

153 if participantData.get(f'checkbox_{username}'): #if the user is marked as present 

154 inputHours = participantData.get(f'inputHours_{username}') 

155 hoursEarned = float(inputHours) if inputHours else 0 

156 if eventParticipant: 

157 ((EventParticipant.update({EventParticipant.hoursEarned: hoursEarned}) 

158 .where(EventParticipant.event==event.id, EventParticipant.user==userObject.username)) 

159 .execute()) 

160 else: 

161 EventParticipant.create(user=userObject, event=event, hoursEarned=hoursEarned) 

162 else: 

163 ((EventParticipant.delete() 

164 .where(EventParticipant.user==userObject.username, EventParticipant.event==event.id)) 

165 .execute()) 

166 else: 

167 return False 

168 return True 

169 

170# ---------------------- Mutual Stuff ---------------------- 

171 

172def addBnumberAsParticipant(bnumber, eventId, isLabor): 

173 """Accepts scan input and signs in the user. If user exists or is already 

174 signed in will return user and login status""" 

175 try: 

176 kioskUser = User.get(User.bnumber == bnumber) 

177 except Exception as e: 

178 print(e) 

179 return None, "does not exist" 

180 

181 event = Event.get_by_id(eventId) 

182 if not isEligibleForProgram(event.program, kioskUser): 

183 userStatus = "banned" 

184 

185 elif checkUserParticipant(kioskUser, event): 

186 userStatus = "already signed in" 

187 

188 else: 

189 # We are not using addVolunteerToEvent to do this because  

190 # that function checks if the event is in the past, but 

191 # someone could start signing people up via the kiosk 

192 # before an event has started 

193 userStatus = "success" 

194 if isLabor == True: 

195 EventParticipant.create(user=kioskUser, event=event, didWork = False, isLabor = True) 

196 else: 

197 totalHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate) 

198 EventParticipant.create (user=kioskUser, event=event, hoursEarned=totalHours, didWork = True, isLabor = False) 

199 

200 return kioskUser, userStatus 

201 

202def checkUserParticipant(user, event): 

203 return EventParticipant.select().where(EventParticipant.user == user, EventParticipant.event == event).exists() 

204 

205def getEventParticipants(event, laborCheck): 

206 if laborCheck == True: 

207 eventVolunteers = (EventParticipant.select(EventParticipant, User) 

208 .join(User) 

209 .where((EventParticipant.event == event), (EventParticipant.isLabor == True))) 

210 else: 

211 eventVolunteers = (EventParticipant.select(EventParticipant, User) 

212 .join(User) 

213 .where((EventParticipant.event == event) & (EventParticipant.isLabor == False))) 

214 

215 return [p for p in eventVolunteers] 

216 

217def addUserBackgroundCheck(user, bgType, bgStatus, dateCompleted): 

218 """ 

219 Changes the status of a users background check depending on what was marked 

220 on their volunteer profile. 

221 """ 

222 today = date.today() 

223 user = User.get_by_id(user) 

224 if bgStatus == '' and dateCompleted == '': 

225 createActivityLog(f"Marked {user.firstName} {user.lastName}'s background check for {bgType} as 'Draft'.") 

226 else: 

227 if not dateCompleted: 

228 dateCompleted = None 

229 update = BackgroundCheck.create(user=user, type=bgType, backgroundCheckStatus=bgStatus, dateCompleted=dateCompleted) 

230 if bgStatus == 'Submitted': 

231 createActivityLog(f"Marked {user.firstName} {user.lastName}'s background check for {bgType} as submitted.") 

232 elif bgStatus == 'Passed': 

233 createActivityLog(f"Marked {user.firstName} {user.lastName}'s background check for {bgType} as passed.") 

234 else: 

235 createActivityLog(f"Marked {user.firstName} {user.lastName}'s background check for {bgType} as failed.") 

236 

237def setProgramManager(username, program_id, action): 

238 ''' 

239 Assigns or removes a user as a student manager for a program. 

240 

241 param: username - a string 

242 program_id - id 

243 action: add, remove 

244 

245 ''' 

246 programManager = User.get(User.username==username) 

247 if action == "add": 

248 programManager.addProgramManager(program_id) 

249 elif action == "remove": 

250 programManager.removeProgramManager(program_id) 

251 

252def deleteUserBackgroundCheck(bgCheckId, user): 

253 """ 

254 Deletes the user's background check by marking it as deleted with a timestamp and user information. 

255 """ 

256 bgCheck = BackgroundCheck.get_or_none(BackgroundCheck.id == bgCheckId) 

257 

258 if bgCheck: 

259 (BackgroundCheck.update({BackgroundCheck.deletionDate: datetime.now(), BackgroundCheck.deletedBy: user}) 

260 .where(BackgroundCheck.id == bgCheck.id) 

261 .execute()) 

262 

263def addParticipantToEvent(user, event, isLabor): 

264 """ 

265 Add a user to an event. 

266 If the event is in the past, add the user as a volunteer or laborer (EventParticipant) including hours worked. 

267 If the event is in the future, rsvp for the user (EventRsvp) 

268 

269 Returns True if the operation was successful, false otherwise 

270 """ 

271 try: 

272 participantExists = checkUserParticipant(user, event) 

273 rsvpExists = checkUserRsvp(user, event) 

274 if not participantExists: 

275 if not isLabor: 

276 if event.isPastStart: 

277 if not participantExists: 

278 # We duplicate these two lines in addBnumberAsParticipant 

279 eventHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate) 

280 EventParticipant.create(user = user, event = event, hoursEarned = eventHours) 

281 else: 

282 if not rsvpExists: 

283 currentRsvp = getEventRsvpCountsForTerm(event.term) 

284 waitlist = currentRsvp[event.id] >= event.rsvpLimit if event.rsvpLimit is not None else 0 

285 EventRsvp.create(user = user, event = event, rsvpWaitlist = waitlist) 

286 

287 targetList = "the waitlist" if waitlist else "the RSVP list" 

288 if g.current_user.username == user.username: 

289 createRsvpLog(event.id, f"{user.fullName} joined {targetList}.") 

290 else: 

291 createRsvpLog(event.id, f"Added {user.fullName} to {targetList}.") 

292 else: 

293 EventParticipant.create(user=user, event=event, didWork=False, isLabor=True) 

294 elif participantExists or rsvpExists: 

295 return "already in" 

296 except Exception as e: 

297 print(e) 

298 return False 

299 

300 return True 

301 

302# ---------------------- Labor Stuff ---------------------- 

303 

304def updateEventLabor(participantData): 

305 """ 

306 Create new entry in event labor table if user does not exist. Otherwise, updates the record. 

307 

308 param: participantData- an ImmutableMultiDict that contains data from every row of the page along with the associated username. 

309 """ 

310 event = Event.get_or_none(Event.id == participantData['event']) 

311 if not event: 

312 raise Exception("Event does not exist.") 

313 

314 for username in participantData.getlist("username"): 

315 userObject = User.get_or_none(User.username == username) 

316 if not userObject: 

317 continue 

318 

319 eventLabor = EventParticipant.get_or_none(user=userObject, event=event) 

320 checkbox_value = participantData.get(f'checkbox_{username}', 'off') 

321 didWork = checkbox_value == "on" 

322 

323 if eventLabor: 

324 (EventParticipant.update({ 

325 EventParticipant.didWork: didWork 

326 }) 

327 .where(EventParticipant.event == event.id, EventParticipant.user == userObject.username) 

328 .execute()) 

329 else: 

330 EventParticipant.create( 

331 user=userObject, 

332 event=event, 

333 didWork=didWork 

334 ) 

335 

336 return True