Coverage for app/logic/participants.py: 90%

103 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2025-07-22 20:03 +0000

1from flask import g 

2from peewee import fn, JOIN 

3from datetime import date 

4from app.models.user import User 

5from app.models.event import Event 

6from app.models.term import Term 

7from app.models.eventRsvp import EventRsvp 

8from app.models.program import Program 

9from app.models.eventParticipant import EventParticipant 

10from app.logic.users import isEligibleForProgram 

11from app.logic.volunteers import getEventLengthInHours 

12from app.logic.events import getEventRsvpCountsForTerm 

13from app.logic.createLogs import createRsvpLog 

14from collections import defaultdict 

15 

16def trainedParticipants(programID, targetTerm): 

17 """ 

18 This function tracks the users who have attended every Prerequisite 

19 event and adds them to a list that will not flag them when tracking hours. 

20 Returns a list of user objects who've completed all training events. 

21 """ 

22 

23 # Reset program eligibility each term for all other trainings 

24 isRelevantAllVolunteer = (Event.isAllVolunteerTraining) & (Event.term.academicYear == targetTerm.academicYear) 

25 isRelevantProgramTraining = (Event.program == programID) & (Event.term == targetTerm) & (Event.isTraining) 

26 allTrainings = (Event.select() 

27 .join(Term) 

28 .where(isRelevantAllVolunteer | isRelevantProgramTraining, 

29 Event.isCanceled == False)) 

30 

31 fullyTrainedUsers = (User.select() 

32 .join(EventParticipant) 

33 .where(EventParticipant.event.in_(allTrainings)) 

34 .group_by(EventParticipant.user) 

35 .having(fn.Count(EventParticipant.user) == len(allTrainings)).order_by(User.username)) 

36 

37 return list(fullyTrainedUsers) 

38 

39def addBnumberAsParticipant(bnumber, eventId): 

40 """Accepts scan input and signs in the user. If user exists or is already 

41 signed in will return user and login status""" 

42 try: 

43 kioskUser = User.get(User.bnumber == bnumber) 

44 except Exception as e: 

45 print(e) 

46 return None, "does not exist" 

47 

48 event = Event.get_by_id(eventId) 

49 if not isEligibleForProgram(event.program, kioskUser): 

50 userStatus = "banned" 

51 

52 elif checkUserVolunteer(kioskUser, event): 

53 userStatus = "already signed in" 

54 

55 else: 

56 userStatus = "success" 

57 # We are not using addPersonToEvent to do this because  

58 # that function checks if the event is in the past, but 

59 # someone could start signing people up via the kiosk 

60 # before an event has started 

61 totalHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate) 

62 EventParticipant.create (user=kioskUser, event=event, hoursEarned=totalHours) 

63 

64 return kioskUser, userStatus 

65 

66def checkUserRsvp(user, event): 

67 return EventRsvp.select().where(EventRsvp.user==user, EventRsvp.event == event).exists() 

68 

69def checkUserVolunteer(user, event): 

70 return EventParticipant.select().where(EventParticipant.user == user, EventParticipant.event == event).exists() 

71 

72def addPersonToEvent(user, event): 

73 """ 

74 Add a user to an event. 

75 If the event is in the past, add the user as a volunteer (EventParticipant) including hours worked. 

76 If the event is in the future, rsvp for the user (EventRsvp) 

77 

78 Returns True if the operation was successful, false otherwise 

79 """ 

80 try: 

81 volunteerExists = checkUserVolunteer(user, event) 

82 rsvpExists = checkUserRsvp(user, event) 

83 if event.isPastStart: 

84 if not volunteerExists: 

85 # We duplicate these two lines in addBnumberAsParticipant 

86 eventHours = getEventLengthInHours(event.timeStart, event.timeEnd, event.startDate) 

87 EventParticipant.create(user = user, event = event, hoursEarned = eventHours) 

88 else: 

89 if not rsvpExists: 

90 currentRsvp = getEventRsvpCountsForTerm(event.term) 

91 waitlist = currentRsvp[event.id] >= event.rsvpLimit if event.rsvpLimit is not None else 0 

92 EventRsvp.create(user = user, event = event, rsvpWaitlist = waitlist) 

93 

94 targetList = "the waitlist" if waitlist else "the RSVP list" 

95 if g.current_user.username == user.username: 

96 createRsvpLog(event.id, f"{user.fullName} joined {targetList}.") 

97 else: 

98 createRsvpLog(event.id, f"Added {user.fullName} to {targetList}.") 

99 

100 if volunteerExists or rsvpExists: 

101 return "already in" 

102 except Exception as e: 

103 print(e) 

104 return False 

105 

106 return True 

107 

108def unattendedRequiredEvents(program, user): 

109 

110 # Check for events that are prerequisite for program 

111 requiredEvents = (Event.select(Event) 

112 .where(Event.isTraining == True, Event.program == program)) 

113 

114 if requiredEvents: 

115 attendedRequiredEventsList = [] 

116 for event in requiredEvents: 

117 attendedRequirement = (EventParticipant.select().where(EventParticipant.user == user, EventParticipant.event == event)) 

118 if not attendedRequirement: 

119 attendedRequiredEventsList.append(event.name) 

120 if attendedRequiredEventsList is not None: 

121 return attendedRequiredEventsList 

122 else: 

123 return [] 

124 

125 

126def getEventParticipants(event): 

127 eventParticipants = (EventParticipant.select(EventParticipant, User) 

128 .join(User) 

129 .where(EventParticipant.event == event)) 

130 

131 return [p for p in eventParticipants] 

132 

133def getParticipationStatusForTrainings(program, userList, term): 

134 """ 

135 This function returns a dictionary of all trainings for a program and 

136 whether the current user participated in them. 

137 

138 :returns: trainings for program and if the user participated 

139 """ 

140 isRelevantTraining = ((Event.isAllVolunteerTraining | ((Event.isTraining) & (Event.program == program))) & 

141 (Event.term.academicYear == term.academicYear)) 

142 programTrainings = (Event.select(Event, Term, EventParticipant, EventRsvp) 

143 .join(EventParticipant, JOIN.LEFT_OUTER).switch() 

144 .join(EventRsvp, JOIN.LEFT_OUTER).switch() 

145 .join(Term) 

146 .where(isRelevantTraining, (Event.isCanceled != True)).order_by(Event.startDate)) 

147 

148 # Create a dictionary where the keys are trainings and values are a set of those who attended 

149 trainingData = defaultdict(set) 

150 for training in programTrainings: 

151 try: 

152 if training.isPastStart: 

153 trainingData[training].add(training.eventparticipant.user_id) 

154 else: # The training has yet to happen 

155 trainingData[training].add(training.eventrsvp.user_id) 

156 except AttributeError: 

157 pass 

158 # Create a dictionary binding usernames to a list of [training, hasAttended] pairs. The tuples consist of the training (event object) and whether or not they attended it (bool) 

159 

160 # Necessarily complex algorithm to merge the attendances of trainings which have the same name 

161 # Structure of userParticipationStatus for a single user: 

162 # {user.username: {training1.name: [EventObject, hasAttended], training2.name: [EventObject, hasAttended]}, ...} 

163 userParticipationStatus = {user.username: {} for user in userList} 

164 for training, attendeeList in trainingData.items(): 

165 for user in userList: 

166 if training.name not in userParticipationStatus[user.username] or user.username in attendeeList: 

167 userParticipationStatus[user.username][training.name] = [training, user.username in attendeeList] 

168 

169 return {user.username: list(userParticipationStatus[user.username].values()) for user in userList} 

170 

171 

172def sortParticipantsByStatus(event): 

173 """ 

174 Takes in an event object, queries all participants, and then filters those 

175 participants by their attendee status. 

176 

177 return: a list of participants who didn't attend, a list of participants who are waitlisted, 

178 a list of participants who attended, and a list of all participants who have some status for the  

179 event. 

180 """ 

181 eventParticipants = getEventParticipants(event) 

182 

183 # get all RSVPs for event and filter out those that did not attend into separate list 

184 eventRsvpData = list(EventRsvp.select(EventRsvp, User).join(User).where(EventRsvp.event==event).order_by(EventRsvp.rsvpTime)) 

185 eventNonAttendedData = [rsvp for rsvp in eventRsvpData if rsvp.user not in eventParticipants] 

186 

187 if event.isPastStart: 

188 eventVolunteerData = eventParticipants 

189 

190 # if the event date has passed disregard the waitlist 

191 eventWaitlistData = [] 

192 else: 

193 # if rsvp is required for the event, grab all volunteers that are in the waitlist 

194 eventWaitlistData = [volunteer for volunteer in (eventParticipants + eventRsvpData) if volunteer.rsvpWaitlist and event.isRsvpRequired] 

195 

196 # put the rest of the users that are not on the waitlist into the volunteer data 

197 eventVolunteerData = [volunteer for volunteer in eventNonAttendedData if volunteer not in eventWaitlistData] 

198 eventNonAttendedData = [] 

199 

200 return eventNonAttendedData, eventWaitlistData, eventVolunteerData, eventParticipants