Coverage for app/logic/events.py: 91%
213 statements
« prev ^ index » next coverage.py v7.2.5, created at 2023-05-24 14:13 +0000
« prev ^ index » next coverage.py v7.2.5, created at 2023-05-24 14:13 +0000
1from peewee import DoesNotExist, fn, JOIN
2from dateutil import parser
3from datetime import timedelta, date
4import datetime
5from werkzeug.datastructures import MultiDict
6from app.models import mainDB
7from app.models.user import User
8from app.models.event import Event
9from app.models.eventParticipant import EventParticipant
10from app.models.program import Program
11from app.models.programEvent import ProgramEvent
12from app.models.term import Term
13from app.models.programBan import ProgramBan
14from app.models.interest import Interest
15from app.models.eventRsvp import EventRsvp
16from app.models.programEvent import ProgramEvent
17from app.models.requirementMatch import RequirementMatch
18from app.models.certificationRequirement import CertificationRequirement
19from app.models.eventViews import EventView
21from app.logic.adminLogs import createLog
22from app.logic.utils import format24HourTime
23from app.logic.fileHandler import FileHandler
24from app.logic.certification import updateCertRequirementForEvent
26def getEvents(program_id=None):
28 if program_id:
29 Program.get_by_id(program_id) # raises an exception if program doesn't exist
30 return (Event.select(Event).join(ProgramEvent)
31 .where(ProgramEvent.program == program_id).distinct())
32 else:
33 return Event.select()
35def deleteEvent(eventId):
36 """
37 Deletes an event, if it is a recurring event, rename all following events
38 to make sure there is no gap in weeks.
39 """
40 event = Event.get_or_none(Event.id == eventId)
42 if event:
43 if event.recurringId:
44 recurringId = event.recurringId
45 recurringEvents = list(Event.select().where(Event.recurringId==recurringId).order_by(Event.id)) # orders for tests
46 eventDeleted = False
48 # once the deleted event is detected, change all other names to the previous event's name
49 for recurringEvent in recurringEvents:
50 if eventDeleted:
51 Event.update({Event.name:newEventName}).where(Event.id==recurringEvent.id).execute()
52 newEventName = recurringEvent.name
54 if recurringEvent == event:
55 newEventName = recurringEvent.name
56 eventDeleted = True
58 program = event.singleProgram
60 if program:
61 createLog(f"Deleted \"{event.name}\" for {program.programName}, which had a start date of {datetime.datetime.strftime(event.startDate, '%m/%d/%Y')}.")
62 else:
63 createLog(f"Deleted a non-program event, \"{event.name}\", which had a start date of {datetime.datetime.strftime(event.startDate, '%m/%d/%Y')}.")
65 event.delete_instance(recursive = True, delete_nullable = True)
67def deleteEventAndAllFollowing(eventId):
68 """
69 Deletes a recurring event and all the recurring events after it.
70 """
71 event = Event.get_or_none(Event.id == eventId)
72 if event:
73 if event.recurringId:
74 recurringId = event.recurringId
75 recurringSeries = list(Event.select().where((Event.recurringId == recurringId) & (Event.startDate >= event.startDate)))
76 for seriesEvent in recurringSeries:
77 seriesEvent.delete_instance(recursive = True)
79def deleteAllRecurringEvents(eventId):
80 """
81 Deletes all recurring events.
82 """
83 event = Event.get_or_none(Event.id == eventId)
84 if event:
85 if event.recurringId:
86 recurringId = event.recurringId
87 allRecurringEvents = list(Event.select().where(Event.recurringId == recurringId))
88 for aRecurringEvent in allRecurringEvents:
89 aRecurringEvent.delete_instance(recursive = True)
92def attemptSaveEvent(eventData, attachmentFiles = None):
93 """
94 Tries to save an event to the database:
95 Checks that the event data is valid and if it is it continus to saves the new
96 event to the database and adds files if there are any.
97 If it is not valid it will return a validation error.
99 Returns:
100 Created events and an error message.
101 """
102 newEventData = preprocessEventData(eventData)
103 isValid, validationErrorMessage = validateNewEventData(newEventData)
105 if not isValid:
106 return False, validationErrorMessage
108 try:
109 events = saveEventToDb(newEventData)
110 if attachmentFiles:
111 for event in events:
112 addFile= FileHandler(attachmentFiles, eventId=event.id)
113 addFile.saveFiles()
114 return events, ""
115 except Exception as e:
116 print(e)
117 return False, e
119def saveEventToDb(newEventData):
121 if not newEventData.get('valid', False):
122 raise Exception("Unvalidated data passed to saveEventToDb")
124 isNewEvent = ('id' not in newEventData)
126 eventsToCreate = []
127 recurringSeriesId = None
128 if isNewEvent and newEventData['isRecurring']:
129 eventsToCreate = calculateRecurringEventFrequency(newEventData)
130 recurringSeriesId = calculateNewrecurringId()
131 else:
132 eventsToCreate.append({'name': f"{newEventData['name']}",
133 'date':newEventData['startDate'],
134 "week":1})
135 eventRecords = []
136 for eventInstance in eventsToCreate:
137 with mainDB.atomic():
138 eventData = {
139 "term": newEventData['term'],
140 "name": eventInstance['name'],
141 "description": newEventData['description'],
142 "timeStart": newEventData['timeStart'],
143 "timeEnd": newEventData['timeEnd'],
144 "location": newEventData['location'],
145 "recurringId": recurringSeriesId,
146 "isFoodProvided" : newEventData['isFoodProvided'],
147 "isTraining": newEventData['isTraining'],
148 "isRsvpRequired": newEventData['isRsvpRequired'],
149 "isService": newEventData['isService'],
150 "startDate": eventInstance['date'],
151 "isAllVolunteerTraining": newEventData['isAllVolunteerTraining'],
152 "endDate": eventInstance['date'],
153 "contactEmail": newEventData['contactEmail'],
154 "contactName": newEventData['contactName']
155 }
157 # Create or update the event
158 if isNewEvent:
159 eventRecord = Event.create(**eventData)
160 if 'program' in newEventData:
161 ProgramEvent.create(program=newEventData['program'], event=eventRecord)
162 else:
163 eventRecord = Event.get_by_id(newEventData['id'])
164 Event.update(**eventData).where(Event.id == eventRecord).execute()
166 if 'certRequirement' in newEventData and newEventData['certRequirement'] != "":
167 updateCertRequirementForEvent(eventRecord, newEventData['certRequirement'])
169 eventRecords.append(eventRecord)
171 return eventRecords
173def getStudentLedEvents(term):
175 studentLedEvents = list(Event.select(Event, Program, ProgramEvent)
176 .join(ProgramEvent, attr = 'programEvent')
177 .join(Program)
178 .where(Program.isStudentLed,
179 Event.term == term)
180 .order_by(Event.startDate, Event.timeStart)
181 .execute())
182 programs = {}
184 for event in studentLedEvents:
185 programs.setdefault(event.programEvent.program, []).append(event)
187 return programs
189def getTrainingEvents(term, user):
190 """
191 The allTrainingsEvent query is designed to select and count eventId's after grouping them
192 together by id's of similiar value. The query will then return the event that is associated
193 with the most programs (highest count) by doing this we can ensure that the event being
194 returned is the All Trainings Event.
195 term: expected to be the ID of a term
196 user: expected to be the current user
197 return: a list of all trainings the user can view
198 """
199 trainingQuery = (Event.select(Event).distinct()
200 .join(ProgramEvent, JOIN.LEFT_OUTER)
201 .join(Program, JOIN.LEFT_OUTER)
202 .where(Event.isTraining == True,
203 Event.term == term)
204 .order_by(Event.isAllVolunteerTraining.desc(), Event.startDate, Event.timeStart))
206 hideBonner = (not user.isAdmin) and not (user.isStudent and user.isBonnerScholar)
207 if hideBonner:
208 trainingQuery = trainingQuery.where(Program.isBonnerScholars == False)
210 return list(trainingQuery.execute())
212def getBonnerEvents(term):
214 bonnerScholarsEvents = list(Event.select(Event,ProgramEvent, Program.id.alias("program_id"))
215 .join(ProgramEvent)
216 .join(Program)
217 .where(Program.isBonnerScholars,
218 Event.term == term)
219 .order_by(Event.startDate, Event.timeStart)
220 .execute())
221 return bonnerScholarsEvents
223def getOtherEvents(term):
224 """
225 Get the list of the events not caught by other functions to be displayed in
226 the Other Events section of the Events List page.
227 :return: A list of Other Event objects
228 """
229 # Gets all events that are not associated with a program and are not trainings
230 # Gets all events that have a program but don't fit anywhere
231 otherEvents = list(Event.select(Event, Program)
232 .join(ProgramEvent, JOIN.LEFT_OUTER)
233 .join(Program, JOIN.LEFT_OUTER)
234 .where(Event.term == term,
235 Event.isTraining == False,
236 Event.isAllVolunteerTraining == False,
237 ((ProgramEvent.program == None) |
238 ((Program.isStudentLed == False) &
239 (Program.isBonnerScholars == False))))
240 .order_by(Event.startDate, Event.timeStart, Event.id)
241 .execute())
243 return otherEvents
245def getUpcomingEventsForUser(user, asOf=datetime.datetime.now(), program=None):
246 """
247 Get the list of upcoming events that the user is interested in as long
248 as they are not banned from the program that the event is a part of.
249 :param user: a username or User object
250 :param asOf: The date to use when determining future and past events.
251 Used in testing, defaults to the current timestamp.
252 :return: A list of Event objects
253 """
255 events = (Event.select().distinct()
256 .join(ProgramEvent, JOIN.LEFT_OUTER)
257 .join(ProgramBan, JOIN.LEFT_OUTER, on=((ProgramBan.program == ProgramEvent.program) & (ProgramBan.user == user)))
258 .join(Interest, JOIN.LEFT_OUTER, on=(ProgramEvent.program == Interest.program))
259 .join(EventRsvp, JOIN.LEFT_OUTER, on=(Event.id == EventRsvp.event))
260 .where(Event.startDate >= asOf,
261 (Interest.user == user) | (EventRsvp.user == user),
262 ProgramBan.user.is_null(True) | (ProgramBan.endDate < asOf)))
264 if program:
265 events = events.where(ProgramEvent.program == program)
267 events = events.order_by(Event.startDate, Event.name)
269 events_list = []
270 shown_recurring_event_list = []
272 # removes all recurring events except for the next upcoming one
273 for event in events:
274 if event.recurringId:
275 if event.recurringId not in shown_recurring_event_list:
276 events_list.append(event)
277 shown_recurring_event_list.append(event.recurringId)
279 else:
280 events_list.append(event)
282 return events_list
284def getParticipatedEventsForUser(user):
285 """
286 Get all the events a user has participated in.
287 :param user: a username or User object
288 :param asOf: The date to use when determining future and past events.
289 Used in testing, defaults to the current timestamp.
290 :return: A list of Event objects
291 """
293 participatedEvents = (Event.select(Event, Program.programName)
294 .join(ProgramEvent, JOIN.LEFT_OUTER)
295 .join(Program, JOIN.LEFT_OUTER).switch()
296 .join(EventParticipant)
297 .where(EventParticipant.user == user,
298 Event.isAllVolunteerTraining == False)
299 .order_by(Event.startDate, Event.name))
301 allVolunteer = (Event.select(Event, "")
302 .join(EventParticipant)
303 .where(Event.isAllVolunteerTraining == True,
304 EventParticipant.user == user))
305 union = participatedEvents.union_all(allVolunteer)
306 unionParticipationWithVolunteer = list(union.select_from(union.c.id, union.c.programName, union.c.startDate, union.c.name).order_by(union.c.startDate, union.c.name).execute())
308 return unionParticipationWithVolunteer
310def validateNewEventData(data):
311 """
312 Confirm that the provided data is valid for an event.
314 Assumes the event data has been processed with `preprocessEventData`. NOT raw form data
316 Returns 3 values: (boolean success, the validation error message, the data object)
317 """
319 if 'on' in [data['isFoodProvided'], data['isRsvpRequired'], data['isTraining'], data['isService'], data['isRecurring']]:
320 return (False, "Raw form data passed to validate method. Preprocess first.")
322 if data['isRecurring'] and data['endDate'] < data['startDate']:
323 return (False, "Event start date is after event end date.")
325 if data['timeEnd'] <= data['timeStart']:
326 return (False, "Event end time must be after start time.")
328 # Validation if we are inserting a new event
329 if 'id' not in data:
331 event = (Event.select()
332 .where((Event.name == data['name']) &
333 (Event.location == data['location']) &
334 (Event.startDate == data['startDate']) &
335 (Event.timeStart == data['timeStart'])))
337 try:
338 Term.get_by_id(data['term'])
339 except DoesNotExist as e:
340 return (False, f"Not a valid term: {data['term']}")
342 if event.exists():
343 return (False, "This event already exists")
345 data['valid'] = True
346 return (True, "All inputs are valid.")
348def calculateNewrecurringId():
349 """
350 Gets the highest recurring Id so that a new recurring Id can be assigned
351 """
352 recurringId = Event.select(fn.MAX(Event.recurringId)).scalar()
353 if recurringId:
354 return recurringId + 1
355 else:
356 return 1
358def getPreviousRecurringEventData(recurringId):
359 """
360 Joins the User db table and Event Participant db table so that we can get the information of a participant if they attended an event
361 """
362 previousEventVolunteers = (User.select(User).distinct()
363 .join(EventParticipant)
364 .join(Event)
365 .where(Event.recurringId==recurringId))
366 return previousEventVolunteers
368def calculateRecurringEventFrequency(event):
369 """
370 Calculate the events to create based on a recurring event start and end date. Takes a
371 dictionary of event data.
373 Assumes that the data has been processed with `preprocessEventData`. NOT raw form data.
375 Return a list of events to create from the event data.
376 """
377 if not isinstance(event['endDate'], datetime.date) or not isinstance(event['startDate'], datetime.date):
378 raise Exception("startDate and endDate must be datetime.date objects.")
380 if event['endDate'] == event['startDate']:
381 raise Exception("This event is not a recurring event")
382 return [ {'name': f"{event['name']} Week {counter+1}",
383 'date': event['startDate'] + datetime.timedelta(days=7*counter),
384 "week": counter+1}
385 for counter in range(0, ((event['endDate']-event['startDate']).days//7)+1)]
387def preprocessEventData(eventData):
388 """
389 Ensures that the event data dictionary is consistent before it reaches the template or event logic.
391 - dates should exist and be date objects if there is a value
392 - checkbaxes should be True or False
393 - if term is given, convert it to a model object
394 - times should exist be strings in 24 hour format example: 14:40
395 - Look up matching certification requirement if necessary
396 """
397 ## Process checkboxes
398 eventCheckBoxes = ['isFoodProvided', 'isRsvpRequired', 'isService', 'isTraining', 'isRecurring', 'isAllVolunteerTraining']
400 for checkBox in eventCheckBoxes:
401 if checkBox not in eventData:
402 eventData[checkBox] = False
403 else:
404 eventData[checkBox] = bool(eventData[checkBox])
406 ## Process dates
407 eventDates = ['startDate', 'endDate']
408 for date in eventDates:
409 if date not in eventData:
410 eventData[date] = ''
411 elif type(eventData[date]) is str and eventData[date]:
412 eventData[date] = parser.parse(eventData[date])
413 elif not isinstance(eventData[date], datetime.date):
414 eventData[date] = ''
416 # If we aren't recurring, all of our events are single-day
417 if not eventData['isRecurring']:
418 eventData['endDate'] = eventData['startDate']
420 # Process terms
421 if 'term' in eventData:
422 try:
423 eventData['term'] = Term.get_by_id(eventData['term'])
424 except DoesNotExist:
425 eventData['term'] = ''
427 # Process requirement
428 if 'certRequirement' in eventData:
429 try:
430 eventData['certRequirement'] = CertificationRequirement.get_by_id(eventData['certRequirement'])
431 except DoesNotExist:
432 eventData['certRequirement'] = ''
433 elif 'id' in eventData:
434 # look up requirement
435 match = RequirementMatch.get_or_none(event=eventData['id'])
436 if match:
437 eventData['certRequirement'] = match.requirement
439 if 'timeStart' in eventData:
440 eventData['timeStart'] = format24HourTime(eventData['timeStart'])
442 if 'timeEnd' in eventData:
443 eventData['timeEnd'] = format24HourTime(eventData['timeEnd'])
445 return eventData
447def getTomorrowsEvents():
448 """Grabs each event that occurs tomorrow"""
449 tomorrowDate = date.today() + timedelta(days=1)
450 events = list(Event.select().where(Event.startDate==tomorrowDate))
451 return events
452def addEventView(viewer,event):
453 """This checks if the current user already viewed the event. If not, insert a recored to EventView table"""
454 if not viewer.isCeltsAdmin:
455 EventView.get_or_create(user = viewer, event = event)