Coverage for app/logic/participants.py: 75%
125 statements
« prev ^ index » next coverage.py v7.10.2, created at 2026-03-06 19:06 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2026-03-06 19:06 +0000
1from flask import g
2from peewee import fn, JOIN
3from datetime import date
4from app.models.user import User
5from app.models.event import Event
6from app.models.term import Term
7from app.models.eventRsvp import EventRsvp
8from app.models.program import Program
9from app.models.eventParticipant import EventParticipant
10from app.logic.users import isEligibleForProgram
11from app.logic.volunteers import getEventLengthInHours
12from app.logic.events import getEventRsvpCountsForTerm
13from app.logic.createLogs import createRsvpLog
14from collections import defaultdict
16def trainedParticipants(programID, targetTerm):
17 """
18 This function tracks the users who have attended every Prerequisite
19 event and adds them to a list that will not flag them when tracking hours.
20 Returns a list of user objects who've completed all training events.
21 """
23 # Reset program eligibility each term for all other trainings
24 isRelevantAllVolunteer = (Event.isAllVolunteerTraining) & (Event.term.academicYear == targetTerm.academicYear)
25 isRelevantProgramTraining = (Event.program == programID) & (Event.term == targetTerm) & (Event.isTraining)
26 allTrainings = (Event.select()
27 .join(Term)
28 .where(isRelevantAllVolunteer | isRelevantProgramTraining,
29 Event.isCanceled == False))
31 fullyTrainedUsers = (User.select()
32 .join(EventParticipant)
33 .where(EventParticipant.event.in_(allTrainings))
34 .group_by(EventParticipant.user)
35 .having(fn.Count(EventParticipant.user) == len(allTrainings)).order_by(User.username))
37 return list(fullyTrainedUsers)
39def addBnumberAsParticipant(bnumber, eventId):
40 """Accepts scan input and signs in the user. If user exists or is already
41 signed in will return user and login status"""
42 try:
43 kioskUser = User.get(User.bnumber == bnumber)
44 except Exception as e:
45 print(e)
46 return None, "does not exist"
48 event = Event.get_by_id(eventId)
49 if not isEligibleForProgram(event.program, kioskUser):
50 userStatus = "banned"
52 elif checkUserVolunteer(kioskUser, event):
53 userStatus = "already signed in"
55 else:
56 # Non-RSVP and RSVP event handling
57 userStatus = "success"
58 if event.isRsvpRequired:
59 # RSVP event: standard logic (RSVP before event, attend after)
60 if event.isPastStart:
61 totalHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate)
62 EventParticipant.create(user=kioskUser, event=event, hoursEarned=totalHours)
63 else:
64 if not checkUserRsvp(kioskUser, event):
65 currentRsvp = getEventRsvpCountsForTerm(event.term)
66 waitlist = currentRsvp[event.id] >= event.rsvpLimit if event.rsvpLimit is not None else False
67 EventRsvp.create(user=kioskUser, event=event, rsvpWaitlist=waitlist)
68 targetList = "the waitlist" if waitlist else "the RSVP list"
69 try:
70 if g.current_user.username == kioskUser.username:
71 createRsvpLog(event.id, f"{kioskUser.fullName} joined {targetList}.")
72 else:
73 createRsvpLog(event.id, f"Added {kioskUser.fullName} to {targetList}.")
74 except Exception:
75 pass
76 else:
77 # Non-RSVP event: scanner entry ALWAYS marks as attended regardless of timing
78 totalHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate)
79 EventParticipant.create(user=kioskUser, event=event, hoursEarned=totalHours)
81 return kioskUser, userStatus
83def checkUserRsvp(user, event):
84 return EventRsvp.select().where(EventRsvp.user==user, EventRsvp.event == event).exists()
86def checkUserVolunteer(user, event):
87 return EventParticipant.select().where(EventParticipant.user == user, EventParticipant.event == event).exists()
89def addPersonToEvent(user, event):
90 """
91 Add a user to an event.
92 If the event is in the past, add the user as a volunteer (EventParticipant) including hours worked.
93 If the event is in the future, rsvp for the user (EventRsvp)
95 Returns True if the operation was successful, false otherwise
96 """
97 try:
98 volunteerExists = checkUserVolunteer(user, event)
99 rsvpExists = checkUserRsvp(user, event)
101 if event.isRsvpRequired:
102 # RSVP event logic
103 if event.isPastStart:
104 if not volunteerExists:
105 eventHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate)
106 EventParticipant.create(user = user, event = event, hoursEarned = eventHours)
107 else:
108 if not rsvpExists:
109 currentRsvp = getEventRsvpCountsForTerm(event.term)
110 waitlist = currentRsvp[event.id] >= event.rsvpLimit if event.rsvpLimit is not None else 0
111 EventRsvp.create(user = user, event = event, rsvpWaitlist = waitlist)
112 targetList = "the waitlist" if waitlist else "the RSVP list"
113 if g.current_user.username == user.username:
114 createRsvpLog(event.id, f"{user.fullName} joined {targetList}.")
115 else:
116 createRsvpLog(event.id, f"Added {user.fullName} to {targetList}.")
117 else:
118 # Non-RSVP event logic
119 if event.isPastStart:
120 # After event: create EventParticipant (attended)
121 if not volunteerExists:
122 eventHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate)
123 EventParticipant.create(user = user, event = event, hoursEarned = eventHours)
124 else:
125 # Before event: create EventRsvp (invited status)
126 if not rsvpExists:
127 EventRsvp.create(user = user, event = event, rsvpWaitlist = False)
129 if volunteerExists or rsvpExists:
130 return "already in"
131 except Exception as e:
132 print(e)
133 return False
135 return True
137def unattendedRequiredEvents(program, user):
139 # Check for events that are prerequisite for program
140 requiredEvents = (Event.select(Event)
141 .where(Event.isTraining == True, Event.program == program))
143 if requiredEvents:
144 attendedRequiredEventsList = []
145 for event in requiredEvents:
146 attendedRequirement = (EventParticipant.select().where(EventParticipant.user == user, EventParticipant.event == event))
147 if not attendedRequirement:
148 attendedRequiredEventsList.append(event.name)
149 if attendedRequiredEventsList is not None:
150 return attendedRequiredEventsList
151 else:
152 return []
155def getEventParticipants(event):
156 eventParticipants = (EventParticipant.select(EventParticipant, User)
157 .join(User)
158 .where(EventParticipant.event == event))
160 return [p for p in eventParticipants]
162def getParticipationStatusForTrainings(program, userList, term):
163 """
164 This function returns a dictionary of all trainings for a program and
165 whether the current user participated in them.
167 :returns: trainings for program and if the user participated
168 """
169 isRelevantTraining = ((Event.isAllVolunteerTraining | ((Event.isTraining) & (Event.program == program))) &
170 (Event.term.academicYear == term.academicYear))
171 programTrainings = (Event.select(Event, Term, EventParticipant, EventRsvp)
172 .join(EventParticipant, JOIN.LEFT_OUTER).switch()
173 .join(EventRsvp, JOIN.LEFT_OUTER).switch()
174 .join(Term)
175 .where(isRelevantTraining, (Event.isCanceled != True)).order_by(Event.startDate))
177 # Create a dictionary where the keys are trainings and values are a set of those who attended
178 trainingData = defaultdict(set)
179 for training in programTrainings:
180 try:
181 if training.isPastStart:
182 trainingData[training].add(training.eventparticipant.user_id)
183 else: # The training has yet to happen
184 trainingData[training].add(training.eventrsvp.user_id)
185 except AttributeError:
186 pass
187 # Create a dictionary binding usernames to a list of [training, hasAttended] pairs. The tuples consist of the training (event object) and whether or not they attended it (bool)
189 # Necessarily complex algorithm to merge the attendances of trainings which have the same name
190 # Structure of userParticipationStatus for a single user:
191 # {user.username: {training1.name: [EventObject, hasAttended], training2.name: [EventObject, hasAttended]}, ...}
192 userParticipationStatus = {user.username: {} for user in userList}
193 for training, attendeeList in trainingData.items():
194 for user in userList:
195 if training.name not in userParticipationStatus[user.username] or user.username in attendeeList:
196 userParticipationStatus[user.username][training.name] = [training, user.username in attendeeList]
198 return {user.username: list(userParticipationStatus[user.username].values()) for user in userList}
201def sortParticipantsByStatus(event):
202 """
203 Takes in an event object, queries all participants, and then filters those
204 participants by their attendee status.
206 return: a list of participants who didn't attend, a list of participants who are waitlisted,
207 a list of participants who attended, and a list of all participants who have some status for the
208 event.
209 """
210 eventParticipants = getEventParticipants(event)
212 # get all RSVPs for event and filter out those that did not attend into separate list
213 eventRsvpData = list(EventRsvp.select(EventRsvp, User).join(User).where(EventRsvp.event==event).order_by(EventRsvp.rsvpTime))
214 eventNonAttendedData = [rsvp for rsvp in eventRsvpData if rsvp.user not in eventParticipants]
216 if event.isPastStart:
217 eventVolunteerData = eventParticipants
219 # if the event date has passed disregard the waitlist
220 eventWaitlistData = []
221 else:
222 # if rsvp is required for the event, grab all volunteers that are in the waitlist
223 eventWaitlistData = [volunteer for volunteer in (eventParticipants + eventRsvpData) if volunteer.rsvpWaitlist and event.isRsvpRequired]
225 # put all participants and non-waitlisted RSVPs into the volunteer data
226 eventVolunteerData = [volunteer for volunteer in (eventParticipants + eventNonAttendedData) if volunteer not in eventWaitlistData]
227 eventNonAttendedData = []
229 return eventNonAttendedData, eventWaitlistData, eventVolunteerData, eventParticipants