Coverage for app/logic/participants.py: 80%
173 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-09-02 19:10 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-09-02 19:10 +0000
1from flask import g
2from peewee import fn, JOIN
3from datetime import date
4from app.models.user import User
5from app.models.event import Event
6from app.models.term import Term
7from app.models.eventRsvp import EventRsvp
8from app.models.program import Program
9from app.models.eventParticipant import EventParticipant
10from app.logic.users import isEligibleForProgram
11from app.logic.sharedLogic import getEventLengthInHours
12from app.logic.events import getEventRsvpCountsForTerm
13from app.logic.createLogs import createRsvpLog
14from collections import defaultdict
15from app.models.backgroundCheck import BackgroundCheck
16from app.models.programManager import ProgramManager
17from datetime import datetime, date
18from app.logic.createLogs import createActivityLog
20# ---------------------- Volunteer Stuff ----------------------
22def trainedParticipants(programID, targetTerm):
23 """
24 This function tracks the users who have attended every Prerequisite
25 event and adds them to a list that will not flag them when tracking hours.
26 Returns a list of user objects who've completed all training events.
27 """
29 # Reset program eligibility each term for all other trainings
30 isRelevantAllVolunteer = (Event.isAllVolunteerTraining) & (Event.term.academicYear == targetTerm.academicYear)
31 isRelevantProgramTraining = (Event.program == programID) & (Event.term == targetTerm) & (Event.isTraining)
32 allTrainings = (Event.select()
33 .join(Term)
34 .where(isRelevantAllVolunteer | isRelevantProgramTraining,
35 Event.isCanceled == False))
37 fullyTrainedUsers = (User.select()
38 .join(EventParticipant)
39 .where(EventParticipant.event.in_(allTrainings))
40 .group_by(EventParticipant.user)
41 .having(fn.Count(EventParticipant.user) == len(allTrainings)).order_by(User.username))
43 return list(fullyTrainedUsers)
45def checkUserRsvp(user, event):
46 return EventRsvp.select().where(EventRsvp.user==user, EventRsvp.event == event).exists()
48def unattendedRequiredEvents(program, user):
50 # Check for events that are prerequisite for program
51 requiredEvents = (Event.select(Event)
52 .where(Event.isTraining == True, Event.program == program))
54 if requiredEvents:
55 attendedRequiredEventsList = []
56 for event in requiredEvents:
57 attendedRequirement = (EventParticipant.select().where(EventParticipant.user == user, EventParticipant.event == event))
58 if not attendedRequirement:
59 attendedRequiredEventsList.append(event.name)
60 if attendedRequiredEventsList is not None:
61 return attendedRequiredEventsList
62 else:
63 return []
64def getParticipationStatusForTrainings(program, userList, term):
65 """
66 This function returns a dictionary of all trainings for a program and
67 whether the current user participated in them.
69 :returns: trainings for program and if the user participated
70 """
71 isRelevantTraining = ((Event.isAllVolunteerTraining | ((Event.isTraining) & (Event.program == program))) &
72 (Event.term.academicYear == term.academicYear))
73 programTrainings = (Event.select(Event, Term, EventParticipant, EventRsvp)
74 .join(EventParticipant, JOIN.LEFT_OUTER).switch()
75 .join(EventRsvp, JOIN.LEFT_OUTER).switch()
76 .join(Term)
77 .where(isRelevantTraining, (Event.isCanceled != True)).order_by(Event.startDate))
79 # Create a dictionary where the keys are trainings and values are a set of those who attended
80 trainingData = defaultdict(set)
81 for training in programTrainings:
82 try:
83 if training.isPastStart:
84 trainingData[training].add(training.eventparticipant.user_id)
85 else: # The training has yet to happen
86 trainingData[training].add(training.eventrsvp.user_id)
87 except AttributeError:
88 pass
89 # Create a dictionary binding usernames to a list of [training, hasAttended] pairs. The tuples consist of the training (event object) and whether or not they attended it (bool)
91 # Necessarily complex algorithm to merge the attendances of trainings which have the same name
92 # Structure of userParticipationStatus for a single user:
93 # {user.username: {training1.name: [EventObject, hasAttended], training2.name: [EventObject, hasAttended]}, ...}
94 userParticipationStatus = {user.username: {} for user in userList}
95 for training, attendeeList in trainingData.items():
96 for user in userList:
97 if training.name not in userParticipationStatus[user.username] or user.username in attendeeList:
98 userParticipationStatus[user.username][training.name] = [training, user.username in attendeeList]
100 return {user.username: list(userParticipationStatus[user.username].values()) for user in userList}
102def sortParticipants(event, isLabor):
103 """
104 Takes in an event object, queries all participants, and then filters those
105 participants by their attendee status.
107 return: a list of participants who didn't attend, a list of participants who are waitlisted,
108 a list of participants who attended, and a list of all participants who have some status for the
109 event.
110 """
111 if not isLabor:
112 eventVolunteers = getEventParticipants(event, False)
114 # get all RSVPs for event and filter out those that did not attend into separate list
115 eventRsvpData = list(EventRsvp.select(EventRsvp, User).join(User).where(EventRsvp.event==event).order_by(EventRsvp.rsvpTime))
116 eventNonAttendedData = [rsvp for rsvp in eventRsvpData if rsvp.user not in eventVolunteers]
118 if event.isPastStart:
119 eventVolunteerData = eventVolunteers
121 # if the event date has passed disregard the waitlist
122 eventWaitlistData = []
123 else:
124 # if rsvp is required for the event, grab all volunteers that are in the waitlist
125 eventWaitlistData = [volunteer for volunteer in (eventVolunteers + eventRsvpData) if volunteer.rsvpWaitlist and event.isRsvpRequired]
127 # put the rest of the users that are not on the waitlist into the volunteer data
128 eventVolunteerData = [volunteer for volunteer in eventNonAttendedData if volunteer not in eventWaitlistData]
129 eventNonAttendedData = []
131 return eventNonAttendedData, eventWaitlistData, eventVolunteerData, eventVolunteers
132 else:
133 eventLabor = getEventParticipants(event, True)
135 eventLaborData = eventLabor
137 return eventLaborData, eventLabor
139def updateEventVolunteers(participantData):
140 """
141 Create new entry in event participant table if user does not exist. Otherwise, updates the record.
143 param: participantData- an ImmutableMultiDict that contains data from every row of the page along with the associated username.
144 """
145 event = Event.get_or_none(Event.id==participantData['event'])
146 if not event:
147 raise Exception("Event does not exist.")
149 for username in participantData.getlist("username"):
150 userObject = User.get_or_none(User.username==username)
151 eventParticipant = EventParticipant.get_or_none(user=userObject, event=participantData['event'])
152 if userObject:
153 if participantData.get(f'checkbox_{username}'): #if the user is marked as present
154 inputHours = participantData.get(f'inputHours_{username}')
155 hoursEarned = float(inputHours) if inputHours else 0
156 if eventParticipant:
157 ((EventParticipant.update({EventParticipant.hoursEarned: hoursEarned})
158 .where(EventParticipant.event==event.id, EventParticipant.user==userObject.username))
159 .execute())
160 else:
161 EventParticipant.create(user=userObject, event=event, hoursEarned=hoursEarned)
162 else:
163 ((EventParticipant.delete()
164 .where(EventParticipant.user==userObject.username, EventParticipant.event==event.id))
165 .execute())
166 else:
167 return False
168 return True
170# ---------------------- Mutual Stuff ----------------------
172def addBnumberAsParticipant(bnumber, eventId, isLabor):
173 """Accepts scan input and signs in the user. If user exists or is already
174 signed in will return user and login status"""
175 try:
176 kioskUser = User.get(User.bnumber == bnumber)
177 except Exception as e:
178 print(e)
179 return None, "does not exist"
181 event = Event.get_by_id(eventId)
182 if not isEligibleForProgram(event.program, kioskUser):
183 userStatus = "banned"
185 elif checkUserParticipant(kioskUser, event):
186 userStatus = "already signed in"
188 else:
189 # We are not using addVolunteerToEvent to do this because
190 # that function checks if the event is in the past, but
191 # someone could start signing people up via the kiosk
192 # before an event has started
193 userStatus = "success"
194 if isLabor == True:
195 EventParticipant.create(user=kioskUser, event=event, didWork = False, isLabor = True)
196 else:
197 totalHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate)
198 EventParticipant.create (user=kioskUser, event=event, hoursEarned=totalHours, didWork = True, isLabor = False)
200 return kioskUser, userStatus
202def checkUserParticipant(user, event):
203 return EventParticipant.select().where(EventParticipant.user == user, EventParticipant.event == event).exists()
205def getEventParticipants(event, laborCheck):
206 if laborCheck == True:
207 eventVolunteers = (EventParticipant.select(EventParticipant, User)
208 .join(User)
209 .where((EventParticipant.event == event), (EventParticipant.isLabor == True)))
210 else:
211 eventVolunteers = (EventParticipant.select(EventParticipant, User)
212 .join(User)
213 .where((EventParticipant.event == event) & (EventParticipant.isLabor == False)))
215 return [p for p in eventVolunteers]
217def addUserBackgroundCheck(user, bgType, bgStatus, dateCompleted):
218 """
219 Changes the status of a users background check depending on what was marked
220 on their volunteer profile.
221 """
222 today = date.today()
223 user = User.get_by_id(user)
224 if bgStatus == '' and dateCompleted == '':
225 createActivityLog(f"Marked {user.firstName} {user.lastName}'s background check for {bgType} as 'Draft'.")
226 else:
227 if not dateCompleted:
228 dateCompleted = None
229 update = BackgroundCheck.create(user=user, type=bgType, backgroundCheckStatus=bgStatus, dateCompleted=dateCompleted)
230 if bgStatus == 'Submitted':
231 createActivityLog(f"Marked {user.firstName} {user.lastName}'s background check for {bgType} as submitted.")
232 elif bgStatus == 'Passed':
233 createActivityLog(f"Marked {user.firstName} {user.lastName}'s background check for {bgType} as passed.")
234 else:
235 createActivityLog(f"Marked {user.firstName} {user.lastName}'s background check for {bgType} as failed.")
237def setProgramManager(username, program_id, action):
238 '''
239 Assigns or removes a user as a student manager for a program.
241 param: username - a string
242 program_id - id
243 action: add, remove
245 '''
246 programManager = User.get(User.username==username)
247 if action == "add":
248 programManager.addProgramManager(program_id)
249 elif action == "remove":
250 programManager.removeProgramManager(program_id)
252def deleteUserBackgroundCheck(bgCheckId, user):
253 """
254 Deletes the user's background check by marking it as deleted with a timestamp and user information.
255 """
256 bgCheck = BackgroundCheck.get_or_none(BackgroundCheck.id == bgCheckId)
258 if bgCheck:
259 (BackgroundCheck.update({BackgroundCheck.deletionDate: datetime.now(), BackgroundCheck.deletedBy: user})
260 .where(BackgroundCheck.id == bgCheck.id)
261 .execute())
263def addParticipantToEvent(user, event, isLabor):
264 """
265 Add a user to an event.
266 If the event is in the past, add the user as a volunteer or laborer (EventParticipant) including hours worked.
267 If the event is in the future, rsvp for the user (EventRsvp)
269 Returns True if the operation was successful, false otherwise
270 """
271 try:
272 participantExists = checkUserParticipant(user, event)
273 rsvpExists = checkUserRsvp(user, event)
274 if not participantExists:
275 if not isLabor:
276 if event.isPastStart:
277 if not participantExists:
278 # We duplicate these two lines in addBnumberAsParticipant
279 eventHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate)
280 EventParticipant.create(user = user, event = event, hoursEarned = eventHours)
281 else:
282 if not rsvpExists:
283 currentRsvp = getEventRsvpCountsForTerm(event.term)
284 waitlist = currentRsvp[event.id] >= event.rsvpLimit if event.rsvpLimit is not None else 0
285 EventRsvp.create(user = user, event = event, rsvpWaitlist = waitlist)
287 targetList = "the waitlist" if waitlist else "the RSVP list"
288 if g.current_user.username == user.username:
289 createRsvpLog(event.id, f"{user.fullName} joined {targetList}.")
290 else:
291 createRsvpLog(event.id, f"Added {user.fullName} to {targetList}.")
292 else:
293 EventParticipant.create(user=user, event=event, didWork=False, isLabor=True)
294 elif participantExists or rsvpExists:
295 return "already in"
296 except Exception as e:
297 print(e)
298 return False
300 return True
302# ---------------------- Labor Stuff ----------------------
304def updateEventLabor(participantData):
305 """
306 Create new entry in event labor table if user does not exist. Otherwise, updates the record.
308 param: participantData- an ImmutableMultiDict that contains data from every row of the page along with the associated username.
309 """
310 event = Event.get_or_none(Event.id == participantData['event'])
311 if not event:
312 raise Exception("Event does not exist.")
314 for username in participantData.getlist("username"):
315 userObject = User.get_or_none(User.username == username)
316 if not userObject:
317 continue
319 eventLabor = EventParticipant.get_or_none(user=userObject, event=event)
320 checkbox_value = participantData.get(f'checkbox_{username}', 'off')
321 didWork = checkbox_value == "on"
323 if eventLabor:
324 (EventParticipant.update({
325 EventParticipant.didWork: didWork
326 })
327 .where(EventParticipant.event == event.id, EventParticipant.user == userObject.username)
328 .execute())
329 else:
330 EventParticipant.create(
331 user=userObject,
332 event=event,
333 didWork=didWork
334 )
336 return True