Coverage for app/logic/events.py: 91%

213 statements  

« prev     ^ index     » next       coverage.py v7.2.5, created at 2023-05-24 14:13 +0000

1from peewee import DoesNotExist, fn, JOIN 

2from dateutil import parser 

3from datetime import timedelta, date 

4import datetime 

5from werkzeug.datastructures import MultiDict 

6from app.models import mainDB 

7from app.models.user import User 

8from app.models.event import Event 

9from app.models.eventParticipant import EventParticipant 

10from app.models.program import Program 

11from app.models.programEvent import ProgramEvent 

12from app.models.term import Term 

13from app.models.programBan import ProgramBan 

14from app.models.interest import Interest 

15from app.models.eventRsvp import EventRsvp 

16from app.models.programEvent import ProgramEvent 

17from app.models.requirementMatch import RequirementMatch 

18from app.models.certificationRequirement import CertificationRequirement 

19from app.models.eventViews import EventView 

20 

21from app.logic.adminLogs import createLog 

22from app.logic.utils import format24HourTime 

23from app.logic.fileHandler import FileHandler 

24from app.logic.certification import updateCertRequirementForEvent 

25 

26def getEvents(program_id=None): 

27 

28 if program_id: 

29 Program.get_by_id(program_id) # raises an exception if program doesn't exist 

30 return (Event.select(Event).join(ProgramEvent) 

31 .where(ProgramEvent.program == program_id).distinct()) 

32 else: 

33 return Event.select() 

34 

35def deleteEvent(eventId): 

36 """ 

37 Deletes an event, if it is a recurring event, rename all following events 

38 to make sure there is no gap in weeks. 

39 """ 

40 event = Event.get_or_none(Event.id == eventId) 

41 

42 if event: 

43 if event.recurringId: 

44 recurringId = event.recurringId 

45 recurringEvents = list(Event.select().where(Event.recurringId==recurringId).order_by(Event.id)) # orders for tests 

46 eventDeleted = False 

47 

48 # once the deleted event is detected, change all other names to the previous event's name 

49 for recurringEvent in recurringEvents: 

50 if eventDeleted: 

51 Event.update({Event.name:newEventName}).where(Event.id==recurringEvent.id).execute() 

52 newEventName = recurringEvent.name 

53 

54 if recurringEvent == event: 

55 newEventName = recurringEvent.name 

56 eventDeleted = True 

57 

58 program = event.singleProgram 

59 

60 if program: 

61 createLog(f"Deleted \"{event.name}\" for {program.programName}, which had a start date of {datetime.datetime.strftime(event.startDate, '%m/%d/%Y')}.") 

62 else: 

63 createLog(f"Deleted a non-program event, \"{event.name}\", which had a start date of {datetime.datetime.strftime(event.startDate, '%m/%d/%Y')}.") 

64 

65 event.delete_instance(recursive = True, delete_nullable = True) 

66 

67def deleteEventAndAllFollowing(eventId): 

68 """ 

69 Deletes a recurring event and all the recurring events after it. 

70 """ 

71 event = Event.get_or_none(Event.id == eventId) 

72 if event: 

73 if event.recurringId: 

74 recurringId = event.recurringId 

75 recurringSeries = list(Event.select().where((Event.recurringId == recurringId) & (Event.startDate >= event.startDate))) 

76 for seriesEvent in recurringSeries: 

77 seriesEvent.delete_instance(recursive = True) 

78 

79def deleteAllRecurringEvents(eventId): 

80 """ 

81 Deletes all recurring events. 

82 """ 

83 event = Event.get_or_none(Event.id == eventId) 

84 if event: 

85 if event.recurringId: 

86 recurringId = event.recurringId 

87 allRecurringEvents = list(Event.select().where(Event.recurringId == recurringId)) 

88 for aRecurringEvent in allRecurringEvents: 

89 aRecurringEvent.delete_instance(recursive = True) 

90 

91 

92def attemptSaveEvent(eventData, attachmentFiles = None): 

93 """ 

94 Tries to save an event to the database: 

95 Checks that the event data is valid and if it is it continus to saves the new 

96 event to the database and adds files if there are any. 

97 If it is not valid it will return a validation error. 

98 

99 Returns: 

100 Created events and an error message. 

101 """ 

102 newEventData = preprocessEventData(eventData) 

103 isValid, validationErrorMessage = validateNewEventData(newEventData) 

104 

105 if not isValid: 

106 return False, validationErrorMessage 

107 

108 try: 

109 events = saveEventToDb(newEventData) 

110 if attachmentFiles: 

111 for event in events: 

112 addFile= FileHandler(attachmentFiles, eventId=event.id) 

113 addFile.saveFiles() 

114 return events, "" 

115 except Exception as e: 

116 print(e) 

117 return False, e 

118 

119def saveEventToDb(newEventData): 

120 

121 if not newEventData.get('valid', False): 

122 raise Exception("Unvalidated data passed to saveEventToDb") 

123 

124 isNewEvent = ('id' not in newEventData) 

125 

126 eventsToCreate = [] 

127 recurringSeriesId = None 

128 if isNewEvent and newEventData['isRecurring']: 

129 eventsToCreate = calculateRecurringEventFrequency(newEventData) 

130 recurringSeriesId = calculateNewrecurringId() 

131 else: 

132 eventsToCreate.append({'name': f"{newEventData['name']}", 

133 'date':newEventData['startDate'], 

134 "week":1}) 

135 eventRecords = [] 

136 for eventInstance in eventsToCreate: 

137 with mainDB.atomic(): 

138 eventData = { 

139 "term": newEventData['term'], 

140 "name": eventInstance['name'], 

141 "description": newEventData['description'], 

142 "timeStart": newEventData['timeStart'], 

143 "timeEnd": newEventData['timeEnd'], 

144 "location": newEventData['location'], 

145 "recurringId": recurringSeriesId, 

146 "isFoodProvided" : newEventData['isFoodProvided'], 

147 "isTraining": newEventData['isTraining'], 

148 "isRsvpRequired": newEventData['isRsvpRequired'], 

149 "isService": newEventData['isService'], 

150 "startDate": eventInstance['date'], 

151 "isAllVolunteerTraining": newEventData['isAllVolunteerTraining'], 

152 "endDate": eventInstance['date'], 

153 "contactEmail": newEventData['contactEmail'], 

154 "contactName": newEventData['contactName'] 

155 } 

156 

157 # Create or update the event 

158 if isNewEvent: 

159 eventRecord = Event.create(**eventData) 

160 if 'program' in newEventData: 

161 ProgramEvent.create(program=newEventData['program'], event=eventRecord) 

162 else: 

163 eventRecord = Event.get_by_id(newEventData['id']) 

164 Event.update(**eventData).where(Event.id == eventRecord).execute() 

165 

166 if 'certRequirement' in newEventData and newEventData['certRequirement'] != "": 

167 updateCertRequirementForEvent(eventRecord, newEventData['certRequirement']) 

168 

169 eventRecords.append(eventRecord) 

170 

171 return eventRecords 

172 

173def getStudentLedEvents(term): 

174 

175 studentLedEvents = list(Event.select(Event, Program, ProgramEvent) 

176 .join(ProgramEvent, attr = 'programEvent') 

177 .join(Program) 

178 .where(Program.isStudentLed, 

179 Event.term == term) 

180 .order_by(Event.startDate, Event.timeStart) 

181 .execute()) 

182 programs = {} 

183 

184 for event in studentLedEvents: 

185 programs.setdefault(event.programEvent.program, []).append(event) 

186 

187 return programs 

188 

189def getTrainingEvents(term, user): 

190 """ 

191 The allTrainingsEvent query is designed to select and count eventId's after grouping them 

192 together by id's of similiar value. The query will then return the event that is associated 

193 with the most programs (highest count) by doing this we can ensure that the event being 

194 returned is the All Trainings Event. 

195 term: expected to be the ID of a term 

196 user: expected to be the current user 

197 return: a list of all trainings the user can view 

198 """ 

199 trainingQuery = (Event.select(Event).distinct() 

200 .join(ProgramEvent, JOIN.LEFT_OUTER) 

201 .join(Program, JOIN.LEFT_OUTER) 

202 .where(Event.isTraining == True, 

203 Event.term == term) 

204 .order_by(Event.isAllVolunteerTraining.desc(), Event.startDate, Event.timeStart)) 

205 

206 hideBonner = (not user.isAdmin) and not (user.isStudent and user.isBonnerScholar) 

207 if hideBonner: 

208 trainingQuery = trainingQuery.where(Program.isBonnerScholars == False) 

209 

210 return list(trainingQuery.execute()) 

211 

212def getBonnerEvents(term): 

213 

214 bonnerScholarsEvents = list(Event.select(Event,ProgramEvent, Program.id.alias("program_id")) 

215 .join(ProgramEvent) 

216 .join(Program) 

217 .where(Program.isBonnerScholars, 

218 Event.term == term) 

219 .order_by(Event.startDate, Event.timeStart) 

220 .execute()) 

221 return bonnerScholarsEvents 

222 

223def getOtherEvents(term): 

224 """ 

225 Get the list of the events not caught by other functions to be displayed in 

226 the Other Events section of the Events List page. 

227 :return: A list of Other Event objects 

228 """ 

229 # Gets all events that are not associated with a program and are not trainings 

230 # Gets all events that have a program but don't fit anywhere 

231 otherEvents = list(Event.select(Event, Program) 

232 .join(ProgramEvent, JOIN.LEFT_OUTER) 

233 .join(Program, JOIN.LEFT_OUTER) 

234 .where(Event.term == term, 

235 Event.isTraining == False, 

236 Event.isAllVolunteerTraining == False, 

237 ((ProgramEvent.program == None) | 

238 ((Program.isStudentLed == False) & 

239 (Program.isBonnerScholars == False)))) 

240 .order_by(Event.startDate, Event.timeStart, Event.id) 

241 .execute()) 

242 

243 return otherEvents 

244 

245def getUpcomingEventsForUser(user, asOf=datetime.datetime.now(), program=None): 

246 """ 

247 Get the list of upcoming events that the user is interested in as long 

248 as they are not banned from the program that the event is a part of. 

249 :param user: a username or User object 

250 :param asOf: The date to use when determining future and past events. 

251 Used in testing, defaults to the current timestamp. 

252 :return: A list of Event objects 

253 """ 

254 

255 events = (Event.select().distinct() 

256 .join(ProgramEvent, JOIN.LEFT_OUTER) 

257 .join(ProgramBan, JOIN.LEFT_OUTER, on=((ProgramBan.program == ProgramEvent.program) & (ProgramBan.user == user))) 

258 .join(Interest, JOIN.LEFT_OUTER, on=(ProgramEvent.program == Interest.program)) 

259 .join(EventRsvp, JOIN.LEFT_OUTER, on=(Event.id == EventRsvp.event)) 

260 .where(Event.startDate >= asOf, 

261 (Interest.user == user) | (EventRsvp.user == user), 

262 ProgramBan.user.is_null(True) | (ProgramBan.endDate < asOf))) 

263 

264 if program: 

265 events = events.where(ProgramEvent.program == program) 

266 

267 events = events.order_by(Event.startDate, Event.name) 

268 

269 events_list = [] 

270 shown_recurring_event_list = [] 

271 

272 # removes all recurring events except for the next upcoming one 

273 for event in events: 

274 if event.recurringId: 

275 if event.recurringId not in shown_recurring_event_list: 

276 events_list.append(event) 

277 shown_recurring_event_list.append(event.recurringId) 

278 

279 else: 

280 events_list.append(event) 

281 

282 return events_list 

283 

284def getParticipatedEventsForUser(user): 

285 """ 

286 Get all the events a user has participated in. 

287 :param user: a username or User object 

288 :param asOf: The date to use when determining future and past events. 

289 Used in testing, defaults to the current timestamp. 

290 :return: A list of Event objects 

291 """ 

292 

293 participatedEvents = (Event.select(Event, Program.programName) 

294 .join(ProgramEvent, JOIN.LEFT_OUTER) 

295 .join(Program, JOIN.LEFT_OUTER).switch() 

296 .join(EventParticipant) 

297 .where(EventParticipant.user == user, 

298 Event.isAllVolunteerTraining == False) 

299 .order_by(Event.startDate, Event.name)) 

300 

301 allVolunteer = (Event.select(Event, "") 

302 .join(EventParticipant) 

303 .where(Event.isAllVolunteerTraining == True, 

304 EventParticipant.user == user)) 

305 union = participatedEvents.union_all(allVolunteer) 

306 unionParticipationWithVolunteer = list(union.select_from(union.c.id, union.c.programName, union.c.startDate, union.c.name).order_by(union.c.startDate, union.c.name).execute()) 

307 

308 return unionParticipationWithVolunteer 

309 

310def validateNewEventData(data): 

311 """ 

312 Confirm that the provided data is valid for an event. 

313 

314 Assumes the event data has been processed with `preprocessEventData`. NOT raw form data 

315 

316 Returns 3 values: (boolean success, the validation error message, the data object) 

317 """ 

318 

319 if 'on' in [data['isFoodProvided'], data['isRsvpRequired'], data['isTraining'], data['isService'], data['isRecurring']]: 

320 return (False, "Raw form data passed to validate method. Preprocess first.") 

321 

322 if data['isRecurring'] and data['endDate'] < data['startDate']: 

323 return (False, "Event start date is after event end date.") 

324 

325 if data['timeEnd'] <= data['timeStart']: 

326 return (False, "Event end time must be after start time.") 

327 

328 # Validation if we are inserting a new event 

329 if 'id' not in data: 

330 

331 event = (Event.select() 

332 .where((Event.name == data['name']) & 

333 (Event.location == data['location']) & 

334 (Event.startDate == data['startDate']) & 

335 (Event.timeStart == data['timeStart']))) 

336 

337 try: 

338 Term.get_by_id(data['term']) 

339 except DoesNotExist as e: 

340 return (False, f"Not a valid term: {data['term']}") 

341 

342 if event.exists(): 

343 return (False, "This event already exists") 

344 

345 data['valid'] = True 

346 return (True, "All inputs are valid.") 

347 

348def calculateNewrecurringId(): 

349 """ 

350 Gets the highest recurring Id so that a new recurring Id can be assigned 

351 """ 

352 recurringId = Event.select(fn.MAX(Event.recurringId)).scalar() 

353 if recurringId: 

354 return recurringId + 1 

355 else: 

356 return 1 

357 

358def getPreviousRecurringEventData(recurringId): 

359 """ 

360 Joins the User db table and Event Participant db table so that we can get the information of a participant if they attended an event 

361 """ 

362 previousEventVolunteers = (User.select(User).distinct() 

363 .join(EventParticipant) 

364 .join(Event) 

365 .where(Event.recurringId==recurringId)) 

366 return previousEventVolunteers 

367 

368def calculateRecurringEventFrequency(event): 

369 """ 

370 Calculate the events to create based on a recurring event start and end date. Takes a 

371 dictionary of event data. 

372 

373 Assumes that the data has been processed with `preprocessEventData`. NOT raw form data. 

374 

375 Return a list of events to create from the event data. 

376 """ 

377 if not isinstance(event['endDate'], datetime.date) or not isinstance(event['startDate'], datetime.date): 

378 raise Exception("startDate and endDate must be datetime.date objects.") 

379 

380 if event['endDate'] == event['startDate']: 

381 raise Exception("This event is not a recurring event") 

382 return [ {'name': f"{event['name']} Week {counter+1}", 

383 'date': event['startDate'] + datetime.timedelta(days=7*counter), 

384 "week": counter+1} 

385 for counter in range(0, ((event['endDate']-event['startDate']).days//7)+1)] 

386 

387def preprocessEventData(eventData): 

388 """ 

389 Ensures that the event data dictionary is consistent before it reaches the template or event logic. 

390 

391 - dates should exist and be date objects if there is a value 

392 - checkbaxes should be True or False 

393 - if term is given, convert it to a model object 

394 - times should exist be strings in 24 hour format example: 14:40 

395 - Look up matching certification requirement if necessary 

396 """ 

397 ## Process checkboxes 

398 eventCheckBoxes = ['isFoodProvided', 'isRsvpRequired', 'isService', 'isTraining', 'isRecurring', 'isAllVolunteerTraining'] 

399 

400 for checkBox in eventCheckBoxes: 

401 if checkBox not in eventData: 

402 eventData[checkBox] = False 

403 else: 

404 eventData[checkBox] = bool(eventData[checkBox]) 

405 

406 ## Process dates 

407 eventDates = ['startDate', 'endDate'] 

408 for date in eventDates: 

409 if date not in eventData: 

410 eventData[date] = '' 

411 elif type(eventData[date]) is str and eventData[date]: 

412 eventData[date] = parser.parse(eventData[date]) 

413 elif not isinstance(eventData[date], datetime.date): 

414 eventData[date] = '' 

415 

416 # If we aren't recurring, all of our events are single-day 

417 if not eventData['isRecurring']: 

418 eventData['endDate'] = eventData['startDate'] 

419 

420 # Process terms 

421 if 'term' in eventData: 

422 try: 

423 eventData['term'] = Term.get_by_id(eventData['term']) 

424 except DoesNotExist: 

425 eventData['term'] = '' 

426 

427 # Process requirement 

428 if 'certRequirement' in eventData: 

429 try: 

430 eventData['certRequirement'] = CertificationRequirement.get_by_id(eventData['certRequirement']) 

431 except DoesNotExist: 

432 eventData['certRequirement'] = '' 

433 elif 'id' in eventData: 

434 # look up requirement 

435 match = RequirementMatch.get_or_none(event=eventData['id']) 

436 if match: 

437 eventData['certRequirement'] = match.requirement 

438 

439 if 'timeStart' in eventData: 

440 eventData['timeStart'] = format24HourTime(eventData['timeStart']) 

441 

442 if 'timeEnd' in eventData: 

443 eventData['timeEnd'] = format24HourTime(eventData['timeEnd']) 

444 

445 return eventData 

446 

447def getTomorrowsEvents(): 

448 """Grabs each event that occurs tomorrow""" 

449 tomorrowDate = date.today() + timedelta(days=1) 

450 events = list(Event.select().where(Event.startDate==tomorrowDate)) 

451 return events 

452def addEventView(viewer,event): 

453 """This checks if the current user already viewed the event. If not, insert a recored to EventView table""" 

454 if not viewer.isCeltsAdmin: 

455 EventView.get_or_create(user = viewer, event = event)