Coverage for app/logic/participants.py: 90%
103 statements
« prev ^ index » next coverage.py v7.2.7, created at 2025-07-22 20:03 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2025-07-22 20:03 +0000
1from flask import g
2from peewee import fn, JOIN
3from datetime import date
4from app.models.user import User
5from app.models.event import Event
6from app.models.term import Term
7from app.models.eventRsvp import EventRsvp
8from app.models.program import Program
9from app.models.eventParticipant import EventParticipant
10from app.logic.users import isEligibleForProgram
11from app.logic.volunteers import getEventLengthInHours
12from app.logic.events import getEventRsvpCountsForTerm
13from app.logic.createLogs import createRsvpLog
14from collections import defaultdict
16def trainedParticipants(programID, targetTerm):
17 """
18 This function tracks the users who have attended every Prerequisite
19 event and adds them to a list that will not flag them when tracking hours.
20 Returns a list of user objects who've completed all training events.
21 """
23 # Reset program eligibility each term for all other trainings
24 isRelevantAllVolunteer = (Event.isAllVolunteerTraining) & (Event.term.academicYear == targetTerm.academicYear)
25 isRelevantProgramTraining = (Event.program == programID) & (Event.term == targetTerm) & (Event.isTraining)
26 allTrainings = (Event.select()
27 .join(Term)
28 .where(isRelevantAllVolunteer | isRelevantProgramTraining,
29 Event.isCanceled == False))
31 fullyTrainedUsers = (User.select()
32 .join(EventParticipant)
33 .where(EventParticipant.event.in_(allTrainings))
34 .group_by(EventParticipant.user)
35 .having(fn.Count(EventParticipant.user) == len(allTrainings)).order_by(User.username))
37 return list(fullyTrainedUsers)
39def addBnumberAsParticipant(bnumber, eventId):
40 """Accepts scan input and signs in the user. If user exists or is already
41 signed in will return user and login status"""
42 try:
43 kioskUser = User.get(User.bnumber == bnumber)
44 except Exception as e:
45 print(e)
46 return None, "does not exist"
48 event = Event.get_by_id(eventId)
49 if not isEligibleForProgram(event.program, kioskUser):
50 userStatus = "banned"
52 elif checkUserVolunteer(kioskUser, event):
53 userStatus = "already signed in"
55 else:
56 userStatus = "success"
57 # We are not using addPersonToEvent to do this because
58 # that function checks if the event is in the past, but
59 # someone could start signing people up via the kiosk
60 # before an event has started
61 totalHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate)
62 EventParticipant.create (user=kioskUser, event=event, hoursEarned=totalHours)
64 return kioskUser, userStatus
66def checkUserRsvp(user, event):
67 return EventRsvp.select().where(EventRsvp.user==user, EventRsvp.event == event).exists()
69def checkUserVolunteer(user, event):
70 return EventParticipant.select().where(EventParticipant.user == user, EventParticipant.event == event).exists()
72def addPersonToEvent(user, event):
73 """
74 Add a user to an event.
75 If the event is in the past, add the user as a volunteer (EventParticipant) including hours worked.
76 If the event is in the future, rsvp for the user (EventRsvp)
78 Returns True if the operation was successful, false otherwise
79 """
80 try:
81 volunteerExists = checkUserVolunteer(user, event)
82 rsvpExists = checkUserRsvp(user, event)
83 if event.isPastStart:
84 if not volunteerExists:
85 # We duplicate these two lines in addBnumberAsParticipant
86 eventHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate)
87 EventParticipant.create(user = user, event = event, hoursEarned = eventHours)
88 else:
89 if not rsvpExists:
90 currentRsvp = getEventRsvpCountsForTerm(event.term)
91 waitlist = currentRsvp[event.id] >= event.rsvpLimit if event.rsvpLimit is not None else 0
92 EventRsvp.create(user = user, event = event, rsvpWaitlist = waitlist)
94 targetList = "the waitlist" if waitlist else "the RSVP list"
95 if g.current_user.username == user.username:
96 createRsvpLog(event.id, f"{user.fullName} joined {targetList}.")
97 else:
98 createRsvpLog(event.id, f"Added {user.fullName} to {targetList}.")
100 if volunteerExists or rsvpExists:
101 return "already in"
102 except Exception as e:
103 print(e)
104 return False
106 return True
108def unattendedRequiredEvents(program, user):
110 # Check for events that are prerequisite for program
111 requiredEvents = (Event.select(Event)
112 .where(Event.isTraining == True, Event.program == program))
114 if requiredEvents:
115 attendedRequiredEventsList = []
116 for event in requiredEvents:
117 attendedRequirement = (EventParticipant.select().where(EventParticipant.user == user, EventParticipant.event == event))
118 if not attendedRequirement:
119 attendedRequiredEventsList.append(event.name)
120 if attendedRequiredEventsList is not None:
121 return attendedRequiredEventsList
122 else:
123 return []
126def getEventParticipants(event):
127 eventParticipants = (EventParticipant.select(EventParticipant, User)
128 .join(User)
129 .where(EventParticipant.event == event))
131 return [p for p in eventParticipants]
133def getParticipationStatusForTrainings(program, userList, term):
134 """
135 This function returns a dictionary of all trainings for a program and
136 whether the current user participated in them.
138 :returns: trainings for program and if the user participated
139 """
140 isRelevantTraining = ((Event.isAllVolunteerTraining | ((Event.isTraining) & (Event.program == program))) &
141 (Event.term.academicYear == term.academicYear))
142 programTrainings = (Event.select(Event, Term, EventParticipant, EventRsvp)
143 .join(EventParticipant, JOIN.LEFT_OUTER).switch()
144 .join(EventRsvp, JOIN.LEFT_OUTER).switch()
145 .join(Term)
146 .where(isRelevantTraining, (Event.isCanceled != True)).order_by(Event.startDate))
148 # Create a dictionary where the keys are trainings and values are a set of those who attended
149 trainingData = defaultdict(set)
150 for training in programTrainings:
151 try:
152 if training.isPastStart:
153 trainingData[training].add(training.eventparticipant.user_id)
154 else: # The training has yet to happen
155 trainingData[training].add(training.eventrsvp.user_id)
156 except AttributeError:
157 pass
158 # Create a dictionary binding usernames to a list of [training, hasAttended] pairs. The tuples consist of the training (event object) and whether or not they attended it (bool)
160 # Necessarily complex algorithm to merge the attendances of trainings which have the same name
161 # Structure of userParticipationStatus for a single user:
162 # {user.username: {training1.name: [EventObject, hasAttended], training2.name: [EventObject, hasAttended]}, ...}
163 userParticipationStatus = {user.username: {} for user in userList}
164 for training, attendeeList in trainingData.items():
165 for user in userList:
166 if training.name not in userParticipationStatus[user.username] or user.username in attendeeList:
167 userParticipationStatus[user.username][training.name] = [training, user.username in attendeeList]
169 return {user.username: list(userParticipationStatus[user.username].values()) for user in userList}
172def sortParticipantsByStatus(event):
173 """
174 Takes in an event object, queries all participants, and then filters those
175 participants by their attendee status.
177 return: a list of participants who didn't attend, a list of participants who are waitlisted,
178 a list of participants who attended, and a list of all participants who have some status for the
179 event.
180 """
181 eventParticipants = getEventParticipants(event)
183 # get all RSVPs for event and filter out those that did not attend into separate list
184 eventRsvpData = list(EventRsvp.select(EventRsvp, User).join(User).where(EventRsvp.event==event).order_by(EventRsvp.rsvpTime))
185 eventNonAttendedData = [rsvp for rsvp in eventRsvpData if rsvp.user not in eventParticipants]
187 if event.isPastStart:
188 eventVolunteerData = eventParticipants
190 # if the event date has passed disregard the waitlist
191 eventWaitlistData = []
192 else:
193 # if rsvp is required for the event, grab all volunteers that are in the waitlist
194 eventWaitlistData = [volunteer for volunteer in (eventParticipants + eventRsvpData) if volunteer.rsvpWaitlist and event.isRsvpRequired]
196 # put the rest of the users that are not on the waitlist into the volunteer data
197 eventVolunteerData = [volunteer for volunteer in eventNonAttendedData if volunteer not in eventWaitlistData]
198 eventNonAttendedData = []
200 return eventNonAttendedData, eventWaitlistData, eventVolunteerData, eventParticipants