Coverage for app/logic/events.py: 92%
231 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-01-29 16:34 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2024-01-29 16:34 +0000
1from flask import url_for
2from peewee import DoesNotExist, fn, JOIN
3from dateutil import parser
4from datetime import timedelta, date
5import datetime
6from werkzeug.datastructures import MultiDict
7from app.models import mainDB
8from app.models.user import User
9from app.models.event import Event
10from app.models.eventParticipant import EventParticipant
11from app.models.program import Program
12from app.models.term import Term
13from app.models.programBan import ProgramBan
14from app.models.interest import Interest
15from app.models.eventRsvp import EventRsvp
16from app.models.requirementMatch import RequirementMatch
17from app.models.certificationRequirement import CertificationRequirement
18from app.models.eventViews import EventView
20from app.logic.createLogs import createAdminLog
21from app.logic.utils import format24HourTime
22from app.logic.fileHandler import FileHandler
23from app.logic.certification import updateCertRequirementForEvent
25def cancelEvent(eventId):
26 """
27 Cancels an event.
28 """
29 event = Event.get_or_none(Event.id == eventId)
30 if event:
31 event.isCanceled = True
32 event.save()
34 program = event.program
35 createAdminLog(f"Canceled <a href= \"{url_for('admin.eventDisplay', eventId = event)}\" >{event.name}</a> for {program.programName}, which had a start date of {datetime.datetime.strftime(event.startDate, '%m/%d/%Y')}.")
38def deleteEvent(eventId):
39 """
40 Deletes an event, if it is a recurring event, rename all following events
41 to make sure there is no gap in weeks.
42 """
43 event = Event.get_or_none(Event.id == eventId)
45 if event:
46 if event.recurringId:
47 recurringId = event.recurringId
48 recurringEvents = list(Event.select().where(Event.recurringId==recurringId).order_by(Event.id)) # orders for tests
49 eventDeleted = False
51 # once the deleted event is detected, change all other names to the previous event's name
52 for recurringEvent in recurringEvents:
53 if eventDeleted:
54 Event.update({Event.name:newEventName}).where(Event.id==recurringEvent.id).execute()
55 newEventName = recurringEvent.name
57 if recurringEvent == event:
58 newEventName = recurringEvent.name
59 eventDeleted = True
61 program = event.program
63 if program:
64 createAdminLog(f"Deleted \"{event.name}\" for {program.programName}, which had a start date of {datetime.datetime.strftime(event.startDate, '%m/%d/%Y')}.")
65 else:
66 createAdminLog(f"Deleted a non-program event, \"{event.name}\", which had a start date of {datetime.datetime.strftime(event.startDate, '%m/%d/%Y')}.")
68 event.delete_instance(recursive = True, delete_nullable = True)
70def deleteEventAndAllFollowing(eventId):
71 """
72 Deletes a recurring event and all the recurring events after it.
73 """
74 event = Event.get_or_none(Event.id == eventId)
75 if event:
76 if event.recurringId:
77 recurringId = event.recurringId
78 recurringSeries = list(Event.select().where((Event.recurringId == recurringId) & (Event.startDate >= event.startDate)))
79 for seriesEvent in recurringSeries:
80 seriesEvent.delete_instance(recursive = True)
82def deleteAllRecurringEvents(eventId):
83 """
84 Deletes all recurring events.
85 """
86 event = Event.get_or_none(Event.id == eventId)
87 if event:
88 if event.recurringId:
89 recurringId = event.recurringId
90 allRecurringEvents = list(Event.select().where(Event.recurringId == recurringId))
91 for aRecurringEvent in allRecurringEvents:
92 aRecurringEvent.delete_instance(recursive = True)
95def attemptSaveEvent(eventData, attachmentFiles = None):
96 """
97 Tries to save an event to the database:
98 Checks that the event data is valid and if it is it continus to saves the new
99 event to the database and adds files if there are any.
100 If it is not valid it will return a validation error.
102 Returns:
103 Created events and an error message.
104 """
106 # Manually set the value of RSVP Limit if it is and empty string since it is
107 # automatically changed from "" to 0
108 if eventData["rsvpLimit"] == "":
109 eventData["rsvpLimit"] = None
110 newEventData = preprocessEventData(eventData)
111 isValid, validationErrorMessage = validateNewEventData(newEventData)
113 if not isValid:
114 return False, validationErrorMessage
116 try:
117 events = saveEventToDb(newEventData)
118 if attachmentFiles:
119 for event in events:
120 addFile= FileHandler(attachmentFiles, eventId=event.id)
121 addFile.saveFiles(saveOriginalFile=events[0])
123 return events, " "
124 except Exception as e:
125 print(f'Failed attemptSaveEvent() with Exception:{e}')
126 return False, e
128def saveEventToDb(newEventData):
129 if not newEventData.get('valid', False):
130 raise Exception("Unvalidated data passed to saveEventToDb")
132 isNewEvent = ('id' not in newEventData)
135 eventsToCreate = []
136 recurringSeriesId = None
137 if isNewEvent and newEventData['isRecurring']:
138 eventsToCreate = calculateRecurringEventFrequency(newEventData)
139 recurringSeriesId = calculateNewrecurringId()
140 else:
141 eventsToCreate.append({'name': f"{newEventData['name']}",
142 'date':newEventData['startDate'],
143 "week":1})
144 eventRecords = []
146 for eventInstance in eventsToCreate:
147 with mainDB.atomic():
149 eventData = {
150 "term": newEventData['term'],
151 "name": eventInstance['name'],
152 "description": newEventData['description'],
153 "timeStart": newEventData['timeStart'],
154 "timeEnd": newEventData['timeEnd'],
155 "location": newEventData['location'],
156 "isFoodProvided" : newEventData['isFoodProvided'],
157 "isTraining": newEventData['isTraining'],
158 "isRsvpRequired": newEventData['isRsvpRequired'],
159 "isService": newEventData['isService'],
160 "startDate": eventInstance['date'],
161 "rsvpLimit": newEventData['rsvpLimit'],
162 "endDate": eventInstance['date'],
163 "contactEmail": newEventData['contactEmail'],
164 "contactName": newEventData['contactName']
165 }
167 # The three fields below are only relevant during event creation so we only set/change them when
168 # it is a new event.
169 if isNewEvent:
170 eventData['program'] = newEventData['program']
171 eventData['recurringId'] = recurringSeriesId
172 eventData["isAllVolunteerTraining"] = newEventData['isAllVolunteerTraining']
173 eventRecord = Event.create(**eventData)
174 else:
175 eventRecord = Event.get_by_id(newEventData['id'])
176 Event.update(**eventData).where(Event.id == eventRecord).execute()
178 if 'certRequirement' in newEventData and newEventData['certRequirement'] != "":
179 updateCertRequirementForEvent(eventRecord, newEventData['certRequirement'])
181 eventRecords.append(eventRecord)
183 return eventRecords
185def getStudentLedEvents(term):
186 studentLedEvents = list(Event.select(Event, Program)
187 .join(Program)
188 .where(Program.isStudentLed,
189 Event.term == term)
190 .order_by(Event.startDate, Event.timeStart)
191 .execute())
193 programs = {}
195 for event in studentLedEvents:
196 programs.setdefault(event.program, []).append(event)
198 return programs
200def getUpcomingStudentLedCount(term, currentTime):
201 """
202 Return a count of all upcoming events for each student led program.
203 """
205 upcomingCount = (Program.select(Program.id, fn.COUNT(Event.id).alias("eventCount"))
206 .join(Event, on=(Program.id == Event.program_id))
207 .where(Program.isStudentLed,
208 Event.term == term,
209 (Event.endDate > currentTime) | ((Event.endDate == currentTime) & (Event.timeEnd >= currentTime)),
210 Event.isCanceled == False)
211 .group_by(Program.id))
213 programCountDict = {}
215 for programCount in upcomingCount:
216 programCountDict[programCount.id] = programCount.eventCount
217 return programCountDict
219def getTrainingEvents(term, user):
220 """
221 The allTrainingsEvent query is designed to select and count eventId's after grouping them
222 together by id's of similiar value. The query will then return the event that is associated
223 with the most programs (highest count) by doing this we can ensure that the event being
224 returned is the All Trainings Event.
225 term: expected to be the ID of a term
226 user: expected to be the current user
227 return: a list of all trainings the user can view
228 """
229 trainingQuery = (Event.select(Event).distinct()
230 .join(Program, JOIN.LEFT_OUTER)
231 .where(Event.isTraining == True,
232 Event.term == term)
233 .order_by(Event.isAllVolunteerTraining.desc(), Event.startDate, Event.timeStart))
235 hideBonner = (not user.isAdmin) and not (user.isStudent and user.isBonnerScholar)
236 if hideBonner:
237 trainingQuery = trainingQuery.where(Program.isBonnerScholars == False)
239 return list(trainingQuery.execute())
241def getBonnerEvents(term):
242 bonnerScholarsEvents = list(Event.select(Event, Program.id.alias("program_id"))
243 .join(Program)
244 .where(Program.isBonnerScholars,
245 Event.term == term)
246 .order_by(Event.startDate, Event.timeStart)
247 .execute())
248 return bonnerScholarsEvents
250def getOtherEvents(term):
251 """
252 Get the list of the events not caught by other functions to be displayed in
253 the Other Events section of the Events List page.
254 :return: A list of Other Event objects
255 """
256 # Gets all events that are not associated with a program and are not trainings
257 # Gets all events that have a program but don't fit anywhere
259 otherEvents = list(Event.select(Event, Program)
260 .join(Program, JOIN.LEFT_OUTER)
261 .where(Event.term == term,
262 Event.isTraining == False,
263 Event.isAllVolunteerTraining == False,
264 ((Program.isOtherCeltsSponsored) |
265 ((Program.isStudentLed == False) &
266 (Program.isBonnerScholars == False))))
267 .order_by(Event.startDate, Event.timeStart, Event.id)
268 .execute())
270 return otherEvents
272def getUpcomingEventsForUser(user, asOf=datetime.datetime.now(), program=None):
273 """
274 Get the list of upcoming events that the user is interested in as long
275 as they are not banned from the program that the event is a part of.
276 :param user: a username or User object
277 :param asOf: The date to use when determining future and past events.
278 Used in testing, defaults to the current timestamp.
279 :return: A list of Event objects
280 """
282 events = (Event.select().distinct()
283 .join(ProgramBan, JOIN.LEFT_OUTER, on=((ProgramBan.program == Event.program) & (ProgramBan.user == user)))
284 .join(Interest, JOIN.LEFT_OUTER, on=(Event.program == Interest.program))
285 .join(EventRsvp, JOIN.LEFT_OUTER, on=(Event.id == EventRsvp.event))
286 .where(Event.startDate >= asOf,
287 (Interest.user == user) | (EventRsvp.user == user),
288 ProgramBan.user.is_null(True) | (ProgramBan.endDate < asOf)))
290 if program:
291 events = events.where(Event.program == program)
293 events = events.order_by(Event.startDate, Event.name)
295 events_list = []
296 shown_recurring_event_list = []
298 # removes all recurring events except for the next upcoming one
299 for event in events:
300 if event.recurringId:
301 if not event.isCanceled:
302 if event.recurringId not in shown_recurring_event_list:
303 events_list.append(event)
304 shown_recurring_event_list.append(event.recurringId)
305 else:
306 if not event.isCanceled:
307 events_list.append(event)
309 return events_list
311def getParticipatedEventsForUser(user):
312 """
313 Get all the events a user has participated in.
314 :param user: a username or User object
315 :param asOf: The date to use when determining future and past events.
316 Used in testing, defaults to the current timestamp.
317 :return: A list of Event objects
318 """
320 participatedEvents = (Event.select(Event, Program.programName)
321 .join(Program, JOIN.LEFT_OUTER).switch()
322 .join(EventParticipant)
323 .where(EventParticipant.user == user,
324 Event.isAllVolunteerTraining == False)
325 .order_by(Event.startDate, Event.name))
327 allVolunteer = (Event.select(Event, "")
328 .join(EventParticipant)
329 .where(Event.isAllVolunteerTraining == True,
330 EventParticipant.user == user))
331 union = participatedEvents.union_all(allVolunteer)
332 unionParticipationWithVolunteer = list(union.select_from(union.c.id, union.c.programName, union.c.startDate, union.c.name).order_by(union.c.startDate, union.c.name).execute())
334 return unionParticipationWithVolunteer
336def validateNewEventData(data):
337 """
338 Confirm that the provided data is valid for an event.
340 Assumes the event data has been processed with `preprocessEventData`. NOT raw form data
342 Returns 3 values: (boolean success, the validation error message, the data object)
343 """
345 if 'on' in [data['isFoodProvided'], data['isRsvpRequired'], data['isTraining'], data['isService'], data['isRecurring']]:
346 return (False, "Raw form data passed to validate method. Preprocess first.")
348 if data['isRecurring'] and data['endDate'] < data['startDate']:
349 return (False, "Event start date is after event end date.")
351 if data['timeEnd'] <= data['timeStart']:
352 return (False, "Event end time must be after start time.")
354 # Validation if we are inserting a new event
355 if 'id' not in data:
357 event = (Event.select()
358 .where((Event.name == data['name']) &
359 (Event.location == data['location']) &
360 (Event.startDate == data['startDate']) &
361 (Event.timeStart == data['timeStart'])))
363 try:
364 Term.get_by_id(data['term'])
365 except DoesNotExist as e:
366 return (False, f"Not a valid term: {data['term']}")
368 if event.exists():
369 return (False, "This event already exists")
371 data['valid'] = True
372 return (True, "All inputs are valid.")
374def calculateNewrecurringId():
375 """
376 Gets the highest recurring Id so that a new recurring Id can be assigned
377 """
378 recurringId = Event.select(fn.MAX(Event.recurringId)).scalar()
379 if recurringId:
380 return recurringId + 1
381 else:
382 return 1
384def getPreviousRecurringEventData(recurringId):
385 """
386 Joins the User db table and Event Participant db table so that we can get the information of a participant if they attended an event
387 """
388 previousEventVolunteers = (User.select(User).distinct()
389 .join(EventParticipant)
390 .join(Event)
391 .where(Event.recurringId==recurringId))
392 return previousEventVolunteers
394def calculateRecurringEventFrequency(event):
395 """
396 Calculate the events to create based on a recurring event start and end date. Takes a
397 dictionary of event data.
399 Assumes that the data has been processed with `preprocessEventData`. NOT raw form data.
401 Return a list of events to create from the event data.
402 """
403 if not isinstance(event['endDate'], datetime.date) or not isinstance(event['startDate'], datetime.date):
404 raise Exception("startDate and endDate must be datetime.date objects.")
406 if event['endDate'] == event['startDate']:
407 raise Exception("This event is not a recurring event")
408 return [ {'name': f"{event['name']} Week {counter+1}",
409 'date': event['startDate'] + datetime.timedelta(days=7*counter),
410 "week": counter+1}
411 for counter in range(0, ((event['endDate']-event['startDate']).days//7)+1)]
413def preprocessEventData(eventData):
414 """
415 Ensures that the event data dictionary is consistent before it reaches the template or event logic.
417 - dates should exist and be date objects if there is a value
418 - checkboxes should be True or False
419 - if term is given, convert it to a model object
420 - times should exist be strings in 24 hour format example: 14:40
421 - Look up matching certification requirement if necessary
422 """
423 ## Process checkboxes
424 eventCheckBoxes = ['isFoodProvided', 'isRsvpRequired', 'isService', 'isTraining', 'isRecurring', 'isAllVolunteerTraining']
426 for checkBox in eventCheckBoxes:
427 if checkBox not in eventData:
428 eventData[checkBox] = False
429 else:
430 eventData[checkBox] = bool(eventData[checkBox])
432 ## Process dates
433 eventDates = ['startDate', 'endDate']
434 for date in eventDates:
435 if date not in eventData:
436 eventData[date] = ''
437 elif type(eventData[date]) is str and eventData[date]:
438 eventData[date] = parser.parse(eventData[date])
439 elif not isinstance(eventData[date], datetime.date):
440 eventData[date] = ''
442 # If we aren't recurring, all of our events are single-day
443 if not eventData['isRecurring']:
444 eventData['endDate'] = eventData['startDate']
446 # Process terms
447 if 'term' in eventData:
448 try:
449 eventData['term'] = Term.get_by_id(eventData['term'])
450 except DoesNotExist:
451 eventData['term'] = ''
453 # Process requirement
454 if 'certRequirement' in eventData:
455 try:
456 eventData['certRequirement'] = CertificationRequirement.get_by_id(eventData['certRequirement'])
457 except DoesNotExist:
458 eventData['certRequirement'] = ''
459 elif 'id' in eventData:
460 # look up requirement
461 match = RequirementMatch.get_or_none(event=eventData['id'])
462 if match:
463 eventData['certRequirement'] = match.requirement
465 if 'timeStart' in eventData:
466 eventData['timeStart'] = format24HourTime(eventData['timeStart'])
468 if 'timeEnd' in eventData:
469 eventData['timeEnd'] = format24HourTime(eventData['timeEnd'])
471 return eventData
473def getTomorrowsEvents():
474 """Grabs each event that occurs tomorrow"""
475 tomorrowDate = date.today() + timedelta(days=1)
476 events = list(Event.select().where(Event.startDate==tomorrowDate))
477 return events
479def addEventView(viewer,event):
480 """This checks if the current user already viewed the event. If not, insert a recored to EventView table"""
481 if not viewer.isCeltsAdmin:
482 EventView.get_or_create(user = viewer, event = event)
484def getEventRsvpCountsForTerm(term):
485 """
486 Get all of the RSVPs for the events that exist in the term.
487 Returns a dictionary with the event id as the key and the amount of
488 current RSVPs to that event as the pair.
489 """
490 amount = (Event.select(Event, fn.COUNT(EventRsvp.event_id).alias('count'))
491 .join(EventRsvp, JOIN.LEFT_OUTER)
492 .where(Event.term == term)
493 .group_by(Event.id))
495 amountAsDict = {event.id: event.count for event in amount}
497 return amountAsDict
499def getEventRsvpCount(eventId):
500 """
501 Returns the number of RSVP'd participants for a given eventId.
502 """
503 return len(EventRsvp.select().where(EventRsvp.event_id == eventId))