Coverage for app/controllers/admin/routes.py: 24%

346 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-03-20 17:02 +0000

1from flask import request, render_template, url_for, g, redirect 

2from flask import flash, abort, jsonify, session, send_file 

3from peewee import DoesNotExist, fn, IntegrityError 

4from playhouse.shortcuts import model_to_dict 

5import json 

6from datetime import datetime 

7import os 

8 

9from app import app 

10from app.models.program import Program 

11from app.models.event import Event 

12from app.models.eventRsvp import EventRsvp 

13from app.models.eventParticipant import EventParticipant 

14from app.models.user import User 

15from app.models.eventTemplate import EventTemplate 

16from app.models.adminLog import AdminLog 

17from app.models.eventRsvpLog import EventRsvpLog 

18from app.models.attachmentUpload import AttachmentUpload 

19from app.models.bonnerCohort import BonnerCohort 

20from app.models.certification import Certification 

21from app.models.user import User 

22from app.models.term import Term 

23from app.models.eventViews import EventView 

24from app.models.courseStatus import CourseStatus 

25 

26from app.logic.userManagement import getAllowedPrograms, getAllowedTemplates 

27from app.logic.createLogs import createAdminLog 

28from app.logic.certification import getCertRequirements, updateCertRequirements 

29from app.logic.utils import selectSurroundingTerms, getFilesFromRequest, getRedirectTarget, setRedirectTarget 

30from app.logic.events import cancelEvent, deleteEvent, attemptSaveEvent, preprocessEventData, calculateRecurringEventFrequency, deleteEventAndAllFollowing, deleteAllRecurringEvents, getBonnerEvents,addEventView, getEventRsvpCount, copyRsvpToNewEvent 

31from app.logic.participants import getParticipationStatusForTrainings, checkUserRsvp 

32 

33from app.logic.fileHandler import FileHandler 

34from app.logic.bonner import getBonnerCohorts, makeBonnerXls, rsvpForBonnerCohort 

35from app.controllers.admin import admin_bp 

36from app.logic.serviceLearningCourses import parseUploadedFile, saveCourseParticipantsToDatabase, unapprovedCourses, approvedCourses, getInstructorCourses 

37 

38 

39 

40@admin_bp.route('/switch_user', methods=['POST']) 

41def switchUser(): 

42 if app.env == "production": 

43 print(f"An attempt was made to switch to another user by {g.current_user.username}!") 

44 abort(403) 

45 

46 print(f"Switching user from {g.current_user} to",request.form['newuser']) 

47 session['current_user'] = model_to_dict(User.get_by_id(request.form['newuser'])) 

48 

49 return redirect(request.referrer) 

50 

51 

52@admin_bp.route('/eventTemplates') 

53def templateSelect(): 

54 if g.current_user.isCeltsAdmin or g.current_user.isCeltsStudentStaff: 

55 allprograms = getAllowedPrograms(g.current_user) 

56 visibleTemplates = getAllowedTemplates(g.current_user) 

57 return render_template("/events/template_selector.html", 

58 programs=allprograms, 

59 celtsSponsoredProgram = Program.get(Program.isOtherCeltsSponsored), 

60 templates=visibleTemplates) 

61 else: 

62 abort(403) 

63 

64 

65@admin_bp.route('/eventTemplates/<templateid>/<programid>/create', methods=['GET','POST']) 

66def createEvent(templateid, programid): 

67 if not (g.current_user.isAdmin or g.current_user.isProgramManagerFor(programid)): 

68 abort(403) 

69 

70 # Validate given URL 

71 program = None 

72 try: 

73 template = EventTemplate.get_by_id(templateid) 

74 if programid: 

75 program = Program.get_by_id(programid) 

76 except DoesNotExist as e: 

77 print("Invalid template or program id:", e) 

78 flash("There was an error with your selection. Please try again or contact Systems Support.", "danger") 

79 return redirect(url_for("admin.program_picker")) 

80 

81 # Get the data from the form or from the template 

82 eventData = template.templateData 

83 

84 eventData['program'] = program 

85 

86 if request.method == "GET": 

87 eventData['contactName'] = "CELTS Admin" 

88 eventData['contactEmail'] = app.config['celts_admin_contact'] 

89 if program: 

90 eventData['location'] = program.defaultLocation 

91 if program.contactName: 

92 eventData['contactName'] = program.contactName 

93 if program.contactEmail: 

94 eventData['contactEmail'] = program.contactEmail 

95 

96 # Try to save the form 

97 if request.method == "POST": 

98 eventData.update(request.form.copy()) 

99 try: 

100 savedEvents, validationErrorMessage = attemptSaveEvent(eventData, getFilesFromRequest(request)) 

101 

102 except Exception as e: 

103 print("Error saving event:", e) 

104 savedEvents = False 

105 validationErrorMessage = "Unknown Error Saving Event. Please try again" 

106 

107 if savedEvents: 

108 rsvpcohorts = request.form.getlist("cohorts[]") 

109 for year in rsvpcohorts: 

110 rsvpForBonnerCohort(int(year), savedEvents[0].id) 

111 

112 noun = (eventData['isRecurring'] == 'on' and "Events" or "Event") # pluralize 

113 flash(f"{noun} successfully created!", 'success') 

114 

115 if program: 

116 if len(savedEvents) > 1: 

117 createAdminLog(f"Created a recurring event, <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a>, for {program.programName}, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}. The last event in the series will be on {datetime.strftime(savedEvents[-1].startDate, '%m/%d/%Y')}.") 

118 else: 

119 createAdminLog(f"Created <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a> for {program.programName}, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}.") 

120 else: 

121 createAdminLog(f"Created a non-program event, <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a>, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}.") 

122 

123 return redirect(url_for("admin.eventDisplay", eventId = savedEvents[0].id)) 

124 else: 

125 flash(validationErrorMessage, 'warning') 

126 

127 # make sure our data is the same regardless of GET or POST 

128 preprocessEventData(eventData) 

129 isProgramManager = g.current_user.isProgramManagerFor(programid) 

130 

131 futureTerms = selectSurroundingTerms(g.current_term, prevTerms=0) 

132 

133 requirements, bonnerCohorts = [], [] 

134 if eventData['program'] is not None and eventData['program'].isBonnerScholars: 

135 requirements = getCertRequirements(Certification.BONNER) 

136 bonnerCohorts = getBonnerCohorts(limit=5) 

137 return render_template(f"/admin/{template.templateFile}", 

138 template = template, 

139 eventData = eventData, 

140 futureTerms = futureTerms, 

141 requirements = requirements, 

142 bonnerCohorts = bonnerCohorts, 

143 isProgramManager = isProgramManager) 

144 

145 

146@admin_bp.route('/event/<eventId>/rsvp', methods=['GET']) 

147def rsvpLogDisplay(eventId): 

148 event = Event.get_by_id(eventId) 

149 if g.current_user.isCeltsAdmin or (g.current_user.isCeltsStudentStaff and g.current_user.isProgramManagerFor(event.program)): 

150 allLogs = EventRsvpLog.select(EventRsvpLog, User).join(User).where(EventRsvpLog.event_id == eventId).order_by(EventRsvpLog.createdOn.desc()) 

151 return render_template("/events/rsvpLog.html", 

152 event = event, 

153 allLogs = allLogs) 

154 else: 

155 abort(403) 

156 

157@admin_bp.route('/event/<eventId>/renew', methods=['POST']) 

158def renewEvent(eventId): 

159 try: 

160 formData = request.form 

161 try: 

162 assert formData['timeStart'] < formData['timeEnd'] 

163 except AssertionError: 

164 flash("End time must be after start time", 'warning') 

165 return redirect(url_for('admin.eventDisplay', eventId = eventId)) 

166 

167 try: 

168 if formData.get('dateEnd'): 

169 assert formData['dateStart'] < formData['dateEnd'] 

170 except AssertionError: 

171 flash("End date must be after start date", 'warning') 

172 return redirect(url_for('admin.eventDisplay', eventId = eventId)) 

173 

174 

175 priorEvent = model_to_dict(Event.get_by_id(eventId)) 

176 newEventDict = priorEvent.copy() 

177 newEventDict.pop('id') 

178 newEventDict.update({ 

179 'program': int(priorEvent['program']['id']), 

180 'term': int(priorEvent['term']['id']), 

181 'timeStart': formData['timeStart'], 

182 'timeEnd': formData['timeEnd'], 

183 'location': formData['location'], 

184 'startDate': f'{formData["startDate"][-4:]}-{formData["startDate"][0:-5]}', 

185 'endDate': f'{formData["endDate"][-4:]}-{formData["endDate"][0:-5]}', 

186 'isRecurring': bool(priorEvent['recurringId']) 

187 }) 

188 newEvent, message = attemptSaveEvent(newEventDict, renewedEvent = True) 

189 if message: 

190 flash(message, "danger") 

191 return redirect(url_for('admin.eventDisplay', eventId = eventId)) 

192 

193 copyRsvpToNewEvent(priorEvent, newEvent[0]) 

194 createAdminLog(f"Renewed {priorEvent['name']} as <a href='event/{newEvent[0].id}/view'>{newEvent[0].name}</a>.") 

195 flash("Event successfully renewed.", "success") 

196 return redirect(url_for('admin.eventDisplay', eventId = newEvent[0].id)) 

197 

198 

199 except Exception as e: 

200 print("Error while trying to renew event:", e) 

201 flash("There was an error renewing the event. Please try again or contact Systems Support.", 'danger') 

202 return redirect(url_for('admin.eventDisplay', eventId = eventId)) 

203 

204 

205 

206@admin_bp.route('/event/<eventId>/view', methods=['GET']) 

207@admin_bp.route('/event/<eventId>/edit', methods=['GET','POST']) 

208def eventDisplay(eventId): 

209 pageViewsCount = EventView.select().where(EventView.event == eventId).count() 

210 if request.method == 'GET' and request.path == f'/event/{eventId}/view': 

211 viewer = g.current_user 

212 event = Event.get_by_id(eventId) 

213 addEventView(viewer,event) 

214 # Validate given URL 

215 try: 

216 event = Event.get_by_id(eventId) 

217 except DoesNotExist as e: 

218 print(f"Unknown event: {eventId}") 

219 abort(404) 

220 

221 notPermitted = not (g.current_user.isCeltsAdmin or g.current_user.isProgramManagerForEvent(event)) 

222 if 'edit' in request.url_rule.rule and notPermitted: 

223 abort(403) 

224 

225 eventData = model_to_dict(event, recurse=False) 

226 associatedAttachments = AttachmentUpload.select().where(AttachmentUpload.event == event) 

227 filepaths = FileHandler(eventId=event.id).retrievePath(associatedAttachments) 

228 

229 image = None 

230 picurestype = [".jpeg", ".png", ".gif", ".jpg", ".svg", ".webp"] 

231 for attachment in associatedAttachments: 

232 for extension in picurestype: 

233 if (attachment.fileName.endswith(extension) and attachment.isDisplayed == True): 

234 image = filepaths[attachment.fileName][0] 

235 if image: 

236 break 

237 

238 

239 if request.method == "POST": # Attempt to save form 

240 eventData = request.form.copy() 

241 try: 

242 savedEvents, validationErrorMessage = attemptSaveEvent(eventData, getFilesFromRequest(request)) 

243 

244 except Exception as e: 

245 print("Error saving event:", e) 

246 savedEvents = False 

247 validationErrorMessage = "Unknown Error Saving Event. Please try again" 

248 

249 

250 if savedEvents: 

251 rsvpcohorts = request.form.getlist("cohorts[]") 

252 for year in rsvpcohorts: 

253 rsvpForBonnerCohort(int(year), event.id) 

254 

255 flash("Event successfully updated!", "success") 

256 return redirect(url_for("admin.eventDisplay", eventId = event.id)) 

257 else: 

258 flash(validationErrorMessage, 'warning') 

259 

260 # make sure our data is the same regardless of GET and POST 

261 preprocessEventData(eventData) 

262 eventData['program'] = event.program 

263 futureTerms = selectSurroundingTerms(g.current_term) 

264 userHasRSVPed = checkUserRsvp(g.current_user, event) 

265 filepaths = FileHandler(eventId=event.id).retrievePath(associatedAttachments) 

266 isProgramManager = g.current_user.isProgramManagerFor(eventData['program']) 

267 requirements, bonnerCohorts = [], [] 

268 

269 if eventData['program'] and eventData['program'].isBonnerScholars: 

270 requirements = getCertRequirements(Certification.BONNER) 

271 bonnerCohorts = getBonnerCohorts(limit=5) 

272 

273 rule = request.url_rule 

274 

275 # Event Edit 

276 if 'edit' in rule.rule: 

277 return render_template("admin/createEvent.html", 

278 eventData = eventData, 

279 futureTerms=futureTerms, 

280 event = event, 

281 requirements = requirements, 

282 bonnerCohorts = bonnerCohorts, 

283 userHasRSVPed = userHasRSVPed, 

284 isProgramManager = isProgramManager, 

285 filepaths = filepaths) 

286 # Event View 

287 else: 

288 # get text representations of dates 

289 eventData['timeStart'] = event.timeStart.strftime("%-I:%M %p") 

290 eventData['timeEnd'] = event.timeEnd.strftime("%-I:%M %p") 

291 eventData["startDate"] = event.startDate.strftime("%m/%d/%Y") 

292 

293 # Identify the next event in a recurring series 

294 if event.recurringId: 

295 eventSeriesList = list(Event.select().where(Event.recurringId == event.recurringId) 

296 .where((Event.isCanceled == False) | (Event.id == event.id)) 

297 .order_by(Event.startDate)) 

298 eventIndex = eventSeriesList.index(event) 

299 if len(eventSeriesList) != (eventIndex + 1): 

300 eventData["nextRecurringEvent"] = eventSeriesList[eventIndex + 1] 

301 

302 currentEventRsvpAmount = getEventRsvpCount(event.id) 

303 

304 userParticipatedTrainingEvents = getParticipationStatusForTrainings(eventData['program'], [g.current_user], g.current_term) 

305 

306 return render_template("eventView.html", 

307 eventData = eventData, 

308 event = event, 

309 userHasRSVPed = userHasRSVPed, 

310 programTrainings = userParticipatedTrainingEvents, 

311 currentEventRsvpAmount = currentEventRsvpAmount, 

312 isProgramManager = isProgramManager, 

313 filepaths = filepaths, 

314 image = image, 

315 pageViewsCount= pageViewsCount) 

316 

317 

318@admin_bp.route('/event/<eventId>/cancel', methods=['POST']) 

319def cancelRoute(eventId): 

320 if g.current_user.isAdmin: 

321 try: 

322 cancelEvent(eventId) 

323 return redirect(request.referrer) 

324 

325 except Exception as e: 

326 print('Error while canceling event:', e) 

327 return "", 500 

328 

329 else: 

330 abort(403) 

331 

332@admin_bp.route('/event/<eventId>/delete', methods=['POST']) 

333def deleteRoute(eventId): 

334 try: 

335 deleteEvent(eventId) 

336 flash("Event successfully deleted.", "success") 

337 return redirect(url_for("main.events", selectedTerm=g.current_term)) 

338 

339 except Exception as e: 

340 print('Error while canceling event:', e) 

341 return "", 500 

342@admin_bp.route('/event/<eventId>/deleteEventAndAllFollowing', methods=['POST']) 

343def deleteEventAndAllFollowingRoute(eventId): 

344 try: 

345 deleteEventAndAllFollowing(eventId) 

346 flash("Events successfully deleted.", "success") 

347 return redirect(url_for("main.events", selectedTerm=g.current_term)) 

348 

349 except Exception as e: 

350 print('Error while canceling event:', e) 

351 return "", 500 

352@admin_bp.route('/event/<eventId>/deleteAllRecurring', methods=['POST']) 

353def deleteAllRecurringEventsRoute(eventId): 

354 try: 

355 deleteAllRecurringEvents(eventId) 

356 flash("Events successfully deleted.", "success") 

357 return redirect(url_for("main.events", selectedTerm=g.current_term)) 

358 

359 except Exception as e: 

360 print('Error while canceling event:', e) 

361 return "", 500 

362 

363@admin_bp.route('/makeRecurringEvents', methods=['POST']) 

364def addRecurringEvents(): 

365 recurringEvents = calculateRecurringEventFrequency(preprocessEventData(request.form.copy())) 

366 return json.dumps(recurringEvents, default=str) 

367 

368 

369@admin_bp.route('/userProfile', methods=['POST']) 

370def userProfile(): 

371 volunteerName= request.form.copy() 

372 if volunteerName['searchStudentsInput']: 

373 username = volunteerName['searchStudentsInput'].strip("()") 

374 user=username.split('(')[-1] 

375 return redirect(url_for('main.viewUsersProfile', username=user)) 

376 else: 

377 flash(f"Please enter the first name or the username of the student you would like to search for.", category='danger') 

378 return redirect(url_for('admin.studentSearchPage')) 

379 

380@admin_bp.route('/search_student', methods=['GET']) 

381def studentSearchPage(): 

382 if g.current_user.isAdmin: 

383 return render_template("/admin/searchStudentPage.html") 

384 abort(403) 

385 

386@admin_bp.route('/addParticipants', methods = ['GET']) 

387def addParticipants(): 

388 '''Renders the page, will be removed once merged with full page''' 

389 

390 return render_template('addParticipants.html', 

391 title="Add Participants") 

392 

393@admin_bp.route('/adminLogs', methods = ['GET', 'POST']) 

394def adminLogs(): 

395 if g.current_user.isCeltsAdmin: 

396 allLogs = AdminLog.select(AdminLog, User).join(User).order_by(AdminLog.createdOn.desc()) 

397 return render_template("/admin/adminLogs.html", 

398 allLogs = allLogs) 

399 else: 

400 abort(403) 

401 

402@admin_bp.route("/deleteEventFile", methods=["POST"]) 

403def deleteEventFile(): 

404 fileData= request.form 

405 eventfile=FileHandler(eventId=fileData["databaseId"]) 

406 eventfile.deleteFile(fileData["fileId"]) 

407 return "" 

408 

409@admin_bp.route("/uploadCourseParticipant", methods= ["POST"]) 

410def addCourseFile(): 

411 fileData = request.files['addCourseParticipants'] 

412 filePath = os.path.join(app.config["files"]["base_path"], fileData.filename) 

413 fileData.save(filePath) 

414 (session['cpPreview'], session['cpErrors']) = parseUploadedFile(filePath) 

415 os.remove(filePath) 

416 return redirect(url_for("admin.manageServiceLearningCourses")) 

417 

418@admin_bp.route('/manageServiceLearning', methods = ['GET', 'POST']) 

419@admin_bp.route('/manageServiceLearning/<term>', methods = ['GET', 'POST']) 

420def manageServiceLearningCourses(term=None): 

421 """ 

422 The SLC management page for admins 

423 """ 

424 if not g.current_user.isCeltsAdmin: 

425 abort(403) 

426 

427 if request.method == 'POST' and "submitParticipant" in request.form: 

428 saveCourseParticipantsToDatabase(session.pop('cpPreview', {})) 

429 flash('Courses and participants saved successfully!', 'success') 

430 return redirect(url_for('admin.manageServiceLearningCourses')) 

431 

432 manageTerm = Term.get_or_none(Term.id == term) or g.current_term 

433 

434 setRedirectTarget(request.full_path) 

435 

436 return render_template('/admin/manageServiceLearningFaculty.html', 

437 courseInstructors = getInstructorCourses(), 

438 unapprovedCourses = unapprovedCourses(manageTerm), 

439 approvedCourses = approvedCourses(manageTerm), 

440 terms = selectSurroundingTerms(g.current_term), 

441 term = manageTerm, 

442 cpPreview= session.get('cpPreview',{}), 

443 cpPreviewErrors = session.get('cpErrors',[]) 

444 ) 

445 

446@admin_bp.route("/deleteUploadedFile", methods= ["POST"]) 

447def removeFromSession(): 

448 try: 

449 session.pop('cpPreview') 

450 except KeyError: 

451 pass 

452 

453 return "" 

454 

455@admin_bp.route("/manageBonner") 

456def manageBonner(): 

457 if not g.current_user.isCeltsAdmin: 

458 abort(403) 

459 

460 return render_template("/admin/bonnerManagement.html", 

461 cohorts=getBonnerCohorts(), 

462 events=getBonnerEvents(g.current_term), 

463 requirements = getCertRequirements(certification=Certification.BONNER)) 

464 

465@admin_bp.route("/bonner/<year>/<method>/<username>", methods=["POST"]) 

466def updatecohort(year, method, username): 

467 if not g.current_user.isCeltsAdmin: 

468 abort(403) 

469 

470 try: 

471 user = User.get_by_id(username) 

472 except: 

473 abort(500) 

474 

475 if method == "add": 

476 try: 

477 BonnerCohort.create(year=year, user=user) 

478 flash(f"Successfully added {user.fullName} to {year} Bonner Cohort.", "success") 

479 except IntegrityError as e: 

480 # if they already exist, ignore the error 

481 flash(f'Error: {user.fullName} already added.', "danger") 

482 pass 

483 

484 elif method == "remove": 

485 BonnerCohort.delete().where(BonnerCohort.user == user, BonnerCohort.year == year).execute() 

486 flash(f"Successfully removed {user.fullName} from {year} Bonner Cohort.", "success") 

487 else: 

488 flash(f"Error: {user.fullName} can't be added.", "danger") 

489 abort(500) 

490 

491 return "" 

492 

493@admin_bp.route("/bonnerxls") 

494def bonnerxls(): 

495 if not g.current_user.isCeltsAdmin: 

496 abort(403) 

497 

498 newfile = makeBonnerXls() 

499 return send_file(open(newfile, 'rb'), download_name='BonnerStudents.xlsx', as_attachment=True) 

500 

501@admin_bp.route("/saveRequirements/<certid>", methods=["POST"]) 

502def saveRequirements(certid): 

503 if not g.current_user.isCeltsAdmin: 

504 abort(403) 

505 

506 newRequirements = updateCertRequirements(certid, request.get_json()) 

507 

508 return jsonify([requirement.id for requirement in newRequirements]) 

509 

510 

511@admin_bp.route("/displayEventFile", methods=["POST"]) 

512def displayEventFile(): 

513 fileData= request.form 

514 eventfile=FileHandler(eventId=fileData["id"]) 

515 eventfile.changeDisplay(fileData['id']) 

516 return ""