Coverage for app/controllers/admin/routes.py: 24%

352 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-04 20:38 +0000

1from flask import request, render_template, url_for, g, redirect 

2from flask import flash, abort, jsonify, session, send_file 

3from peewee import DoesNotExist, fn, IntegrityError 

4from playhouse.shortcuts import model_to_dict 

5import json 

6from datetime import datetime 

7import os 

8 

9from app import app 

10from app.models.program import Program 

11from app.models.event import Event 

12from app.models.eventRsvp import EventRsvp 

13from app.models.eventParticipant import EventParticipant 

14from app.models.user import User 

15from app.models.eventTemplate import EventTemplate 

16from app.models.adminLog import AdminLog 

17from app.models.eventRsvpLog import EventRsvpLog 

18from app.models.attachmentUpload import AttachmentUpload 

19from app.models.bonnerCohort import BonnerCohort 

20from app.models.certification import Certification 

21from app.models.user import User 

22from app.models.term import Term 

23from app.models.eventViews import EventView 

24from app.models.courseStatus import CourseStatus 

25 

26from app.logic.userManagement import getAllowedPrograms, getAllowedTemplates 

27from app.logic.createLogs import createAdminLog 

28from app.logic.certification import getCertRequirements, updateCertRequirements 

29from app.logic.utils import selectSurroundingTerms, getFilesFromRequest, getRedirectTarget, setRedirectTarget 

30from app.logic.events import cancelEvent, deleteEvent, attemptSaveEvent, preprocessEventData, calculateRecurringEventFrequency, deleteEventAndAllFollowing, deleteAllRecurringEvents, getBonnerEvents,addEventView, getEventRsvpCount, copyRsvpToNewEvent 

31from app.logic.participants import getParticipationStatusForTrainings, checkUserRsvp 

32from app.logic.minor import getMinorInterest 

33from app.logic.fileHandler import FileHandler 

34from app.logic.bonner import getBonnerCohorts, makeBonnerXls, rsvpForBonnerCohort 

35from app.logic.serviceLearningCourses import parseUploadedFile, saveCourseParticipantsToDatabase, unapprovedCourses, approvedCourses, getInstructorCourses 

36 

37from app.controllers.admin import admin_bp 

38 

39@admin_bp.route('/switch_user', methods=['POST']) 

40def switchUser(): 

41 if app.env == "production": 

42 print(f"An attempt was made to switch to another user by {g.current_user.username}!") 

43 abort(403) 

44 

45 print(f"Switching user from {g.current_user} to",request.form['newuser']) 

46 session['current_user'] = model_to_dict(User.get_by_id(request.form['newuser'])) 

47 

48 return redirect(request.referrer) 

49 

50 

51@admin_bp.route('/eventTemplates') 

52def templateSelect(): 

53 if g.current_user.isCeltsAdmin or g.current_user.isCeltsStudentStaff: 

54 allprograms = getAllowedPrograms(g.current_user) 

55 visibleTemplates = getAllowedTemplates(g.current_user) 

56 return render_template("/events/template_selector.html", 

57 programs=allprograms, 

58 celtsSponsoredProgram = Program.get(Program.isOtherCeltsSponsored), 

59 templates=visibleTemplates) 

60 else: 

61 abort(403) 

62 

63 

64@admin_bp.route('/eventTemplates/<templateid>/<programid>/create', methods=['GET','POST']) 

65def createEvent(templateid, programid): 

66 if not (g.current_user.isAdmin or g.current_user.isProgramManagerFor(programid)): 

67 abort(403) 

68 

69 # Validate given URL 

70 program = None 

71 try: 

72 template = EventTemplate.get_by_id(templateid) 

73 if programid: 

74 program = Program.get_by_id(programid) 

75 except DoesNotExist as e: 

76 print("Invalid template or program id:", e) 

77 flash("There was an error with your selection. Please try again or contact Systems Support.", "danger") 

78 return redirect(url_for("admin.program_picker")) 

79 

80 # Get the data from the form or from the template 

81 eventData = template.templateData 

82 

83 eventData['program'] = program 

84 

85 if request.method == "GET": 

86 eventData['contactName'] = "CELTS Admin" 

87 eventData['contactEmail'] = app.config['celts_admin_contact'] 

88 if program: 

89 eventData['location'] = program.defaultLocation 

90 if program.contactName: 

91 eventData['contactName'] = program.contactName 

92 if program.contactEmail: 

93 eventData['contactEmail'] = program.contactEmail 

94 

95 # Try to save the form 

96 if request.method == "POST": 

97 eventData.update(request.form.copy()) 

98 try: 

99 savedEvents, validationErrorMessage = attemptSaveEvent(eventData, getFilesFromRequest(request)) 

100 

101 except Exception as e: 

102 print("Error saving event:", e) 

103 savedEvents = False 

104 validationErrorMessage = "Unknown Error Saving Event. Please try again" 

105 

106 if savedEvents: 

107 rsvpcohorts = request.form.getlist("cohorts[]") 

108 for year in rsvpcohorts: 

109 rsvpForBonnerCohort(int(year), savedEvents[0].id) 

110 

111 noun = (eventData['isRecurring'] == 'on' and "Events" or "Event") # pluralize 

112 flash(f"{noun} successfully created!", 'success') 

113 

114 if program: 

115 if len(savedEvents) > 1: 

116 createAdminLog(f"Created a recurring event, <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a>, for {program.programName}, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}. The last event in the series will be on {datetime.strftime(savedEvents[-1].startDate, '%m/%d/%Y')}.") 

117 else: 

118 createAdminLog(f"Created <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a> for {program.programName}, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}.") 

119 else: 

120 createAdminLog(f"Created a non-program event, <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a>, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}.") 

121 

122 return redirect(url_for("admin.eventDisplay", eventId = savedEvents[0].id)) 

123 else: 

124 flash(validationErrorMessage, 'warning') 

125 

126 # make sure our data is the same regardless of GET or POST 

127 preprocessEventData(eventData) 

128 isProgramManager = g.current_user.isProgramManagerFor(programid) 

129 

130 futureTerms = selectSurroundingTerms(g.current_term, prevTerms=0) 

131 

132 requirements, bonnerCohorts = [], [] 

133 if eventData['program'] is not None and eventData['program'].isBonnerScholars: 

134 requirements = getCertRequirements(Certification.BONNER) 

135 bonnerCohorts = getBonnerCohorts(limit=5) 

136 return render_template(f"/admin/{template.templateFile}", 

137 template = template, 

138 eventData = eventData, 

139 futureTerms = futureTerms, 

140 requirements = requirements, 

141 bonnerCohorts = bonnerCohorts, 

142 isProgramManager = isProgramManager) 

143 

144 

145@admin_bp.route('/event/<eventId>/rsvp', methods=['GET']) 

146def rsvpLogDisplay(eventId): 

147 event = Event.get_by_id(eventId) 

148 if g.current_user.isCeltsAdmin or (g.current_user.isCeltsStudentStaff and g.current_user.isProgramManagerFor(event.program)): 

149 allLogs = EventRsvpLog.select(EventRsvpLog, User).join(User).where(EventRsvpLog.event_id == eventId).order_by(EventRsvpLog.createdOn.desc()) 

150 return render_template("/events/rsvpLog.html", 

151 event = event, 

152 allLogs = allLogs) 

153 else: 

154 abort(403) 

155 

156@admin_bp.route('/event/<eventId>/renew', methods=['POST']) 

157def renewEvent(eventId): 

158 try: 

159 formData = request.form 

160 try: 

161 assert formData['timeStart'] < formData['timeEnd'] 

162 except AssertionError: 

163 flash("End time must be after start time", 'warning') 

164 return redirect(url_for('admin.eventDisplay', eventId = eventId)) 

165 

166 try: 

167 if formData.get('dateEnd'): 

168 assert formData['dateStart'] < formData['dateEnd'] 

169 except AssertionError: 

170 flash("End date must be after start date", 'warning') 

171 return redirect(url_for('admin.eventDisplay', eventId = eventId)) 

172 

173 

174 priorEvent = model_to_dict(Event.get_by_id(eventId)) 

175 newEventDict = priorEvent.copy() 

176 newEventDict.pop('id') 

177 newEventDict.update({ 

178 'program': int(priorEvent['program']['id']), 

179 'term': int(priorEvent['term']['id']), 

180 'timeStart': formData['timeStart'], 

181 'timeEnd': formData['timeEnd'], 

182 'location': formData['location'], 

183 'startDate': f'{formData["startDate"][-4:]}-{formData["startDate"][0:-5]}', 

184 'endDate': f'{formData["endDate"][-4:]}-{formData["endDate"][0:-5]}', 

185 'isRecurring': bool(priorEvent['recurringId']) 

186 }) 

187 newEvent, message = attemptSaveEvent(newEventDict, renewedEvent = True) 

188 if message: 

189 flash(message, "danger") 

190 return redirect(url_for('admin.eventDisplay', eventId = eventId)) 

191 

192 copyRsvpToNewEvent(priorEvent, newEvent[0]) 

193 createAdminLog(f"Renewed {priorEvent['name']} as <a href='event/{newEvent[0].id}/view'>{newEvent[0].name}</a>.") 

194 flash("Event successfully renewed.", "success") 

195 return redirect(url_for('admin.eventDisplay', eventId = newEvent[0].id)) 

196 

197 

198 except Exception as e: 

199 print("Error while trying to renew event:", e) 

200 flash("There was an error renewing the event. Please try again or contact Systems Support.", 'danger') 

201 return redirect(url_for('admin.eventDisplay', eventId = eventId)) 

202 

203 

204 

205@admin_bp.route('/event/<eventId>/view', methods=['GET']) 

206@admin_bp.route('/event/<eventId>/edit', methods=['GET','POST']) 

207def eventDisplay(eventId): 

208 pageViewsCount = EventView.select().where(EventView.event == eventId).count() 

209 if request.method == 'GET' and request.path == f'/event/{eventId}/view': 

210 viewer = g.current_user 

211 event = Event.get_by_id(eventId) 

212 addEventView(viewer,event) 

213 # Validate given URL 

214 try: 

215 event = Event.get_by_id(eventId) 

216 except DoesNotExist as e: 

217 print(f"Unknown event: {eventId}") 

218 abort(404) 

219 

220 notPermitted = not (g.current_user.isCeltsAdmin or g.current_user.isProgramManagerForEvent(event)) 

221 if 'edit' in request.url_rule.rule and notPermitted: 

222 abort(403) 

223 

224 eventData = model_to_dict(event, recurse=False) 

225 associatedAttachments = AttachmentUpload.select().where(AttachmentUpload.event == event) 

226 filepaths = FileHandler(eventId=event.id).retrievePath(associatedAttachments) 

227 

228 image = None 

229 picurestype = [".jpeg", ".png", ".gif", ".jpg", ".svg", ".webp"] 

230 for attachment in associatedAttachments: 

231 for extension in picurestype: 

232 if (attachment.fileName.endswith(extension) and attachment.isDisplayed == True): 

233 image = filepaths[attachment.fileName][0] 

234 if image: 

235 break 

236 

237 

238 if request.method == "POST": # Attempt to save form 

239 eventData = request.form.copy() 

240 try: 

241 savedEvents, validationErrorMessage = attemptSaveEvent(eventData, getFilesFromRequest(request)) 

242 

243 except Exception as e: 

244 print("Error saving event:", e) 

245 savedEvents = False 

246 validationErrorMessage = "Unknown Error Saving Event. Please try again" 

247 

248 

249 if savedEvents: 

250 rsvpcohorts = request.form.getlist("cohorts[]") 

251 for year in rsvpcohorts: 

252 rsvpForBonnerCohort(int(year), event.id) 

253 

254 flash("Event successfully updated!", "success") 

255 return redirect(url_for("admin.eventDisplay", eventId = event.id)) 

256 else: 

257 flash(validationErrorMessage, 'warning') 

258 

259 # make sure our data is the same regardless of GET and POST 

260 preprocessEventData(eventData) 

261 eventData['program'] = event.program 

262 futureTerms = selectSurroundingTerms(g.current_term) 

263 userHasRSVPed = checkUserRsvp(g.current_user, event) 

264 filepaths = FileHandler(eventId=event.id).retrievePath(associatedAttachments) 

265 isProgramManager = g.current_user.isProgramManagerFor(eventData['program']) 

266 requirements, bonnerCohorts = [], [] 

267 

268 if eventData['program'] and eventData['program'].isBonnerScholars: 

269 requirements = getCertRequirements(Certification.BONNER) 

270 bonnerCohorts = getBonnerCohorts(limit=5) 

271 

272 rule = request.url_rule 

273 

274 # Event Edit 

275 if 'edit' in rule.rule: 

276 return render_template("admin/createEvent.html", 

277 eventData = eventData, 

278 futureTerms=futureTerms, 

279 event = event, 

280 requirements = requirements, 

281 bonnerCohorts = bonnerCohorts, 

282 userHasRSVPed = userHasRSVPed, 

283 isProgramManager = isProgramManager, 

284 filepaths = filepaths) 

285 # Event View 

286 else: 

287 # get text representations of dates 

288 eventData['timeStart'] = event.timeStart.strftime("%-I:%M %p") 

289 eventData['timeEnd'] = event.timeEnd.strftime("%-I:%M %p") 

290 eventData["startDate"] = event.startDate.strftime("%m/%d/%Y") 

291 

292 # Identify the next event in a recurring series 

293 if event.recurringId: 

294 eventSeriesList = list(Event.select().where(Event.recurringId == event.recurringId) 

295 .where((Event.isCanceled == False) | (Event.id == event.id)) 

296 .order_by(Event.startDate)) 

297 eventIndex = eventSeriesList.index(event) 

298 if len(eventSeriesList) != (eventIndex + 1): 

299 eventData["nextRecurringEvent"] = eventSeriesList[eventIndex + 1] 

300 

301 currentEventRsvpAmount = getEventRsvpCount(event.id) 

302 

303 userParticipatedTrainingEvents = getParticipationStatusForTrainings(eventData['program'], [g.current_user], g.current_term) 

304 

305 return render_template("eventView.html", 

306 eventData = eventData, 

307 event = event, 

308 userHasRSVPed = userHasRSVPed, 

309 programTrainings = userParticipatedTrainingEvents, 

310 currentEventRsvpAmount = currentEventRsvpAmount, 

311 isProgramManager = isProgramManager, 

312 filepaths = filepaths, 

313 image = image, 

314 pageViewsCount= pageViewsCount) 

315 

316 

317@admin_bp.route('/event/<eventId>/cancel', methods=['POST']) 

318def cancelRoute(eventId): 

319 if g.current_user.isAdmin: 

320 try: 

321 cancelEvent(eventId) 

322 return redirect(request.referrer) 

323 

324 except Exception as e: 

325 print('Error while canceling event:', e) 

326 return "", 500 

327 

328 else: 

329 abort(403) 

330 

331@admin_bp.route('/event/<eventId>/delete', methods=['POST']) 

332def deleteRoute(eventId): 

333 try: 

334 deleteEvent(eventId) 

335 flash("Event successfully deleted.", "success") 

336 return redirect(url_for("main.events", selectedTerm=g.current_term)) 

337 

338 except Exception as e: 

339 print('Error while canceling event:', e) 

340 return "", 500 

341@admin_bp.route('/event/<eventId>/deleteEventAndAllFollowing', methods=['POST']) 

342def deleteEventAndAllFollowingRoute(eventId): 

343 try: 

344 deleteEventAndAllFollowing(eventId) 

345 flash("Events successfully deleted.", "success") 

346 return redirect(url_for("main.events", selectedTerm=g.current_term)) 

347 

348 except Exception as e: 

349 print('Error while canceling event:', e) 

350 return "", 500 

351@admin_bp.route('/event/<eventId>/deleteAllRecurring', methods=['POST']) 

352def deleteAllRecurringEventsRoute(eventId): 

353 try: 

354 deleteAllRecurringEvents(eventId) 

355 flash("Events successfully deleted.", "success") 

356 return redirect(url_for("main.events", selectedTerm=g.current_term)) 

357 

358 except Exception as e: 

359 print('Error while canceling event:', e) 

360 return "", 500 

361 

362@admin_bp.route('/makeRecurringEvents', methods=['POST']) 

363def addRecurringEvents(): 

364 recurringEvents = calculateRecurringEventFrequency(preprocessEventData(request.form.copy())) 

365 return json.dumps(recurringEvents, default=str) 

366 

367 

368@admin_bp.route('/userProfile', methods=['POST']) 

369def userProfile(): 

370 volunteerName= request.form.copy() 

371 if volunteerName['searchStudentsInput']: 

372 username = volunteerName['searchStudentsInput'].strip("()") 

373 user=username.split('(')[-1] 

374 return redirect(url_for('main.viewUsersProfile', username=user)) 

375 else: 

376 flash(f"Please enter the first name or the username of the student you would like to search for.", category='danger') 

377 return redirect(url_for('admin.studentSearchPage')) 

378 

379@admin_bp.route('/search_student', methods=['GET']) 

380def studentSearchPage(): 

381 if g.current_user.isAdmin: 

382 return render_template("/admin/searchStudentPage.html") 

383 abort(403) 

384 

385@admin_bp.route('/addParticipants', methods = ['GET']) 

386def addParticipants(): 

387 '''Renders the page, will be removed once merged with full page''' 

388 

389 return render_template('addParticipants.html', 

390 title="Add Participants") 

391 

392@admin_bp.route('/adminLogs', methods = ['GET', 'POST']) 

393def adminLogs(): 

394 if g.current_user.isCeltsAdmin: 

395 allLogs = AdminLog.select(AdminLog, User).join(User).order_by(AdminLog.createdOn.desc()) 

396 return render_template("/admin/adminLogs.html", 

397 allLogs = allLogs) 

398 else: 

399 abort(403) 

400 

401@admin_bp.route("/deleteEventFile", methods=["POST"]) 

402def deleteEventFile(): 

403 fileData= request.form 

404 eventfile=FileHandler(eventId=fileData["databaseId"]) 

405 eventfile.deleteFile(fileData["fileId"]) 

406 return "" 

407 

408@admin_bp.route("/uploadCourseParticipant", methods= ["POST"]) 

409def addCourseFile(): 

410 fileData = request.files['addCourseParticipants'] 

411 filePath = os.path.join(app.config["files"]["base_path"], fileData.filename) 

412 fileData.save(filePath) 

413 (session['cpPreview'], session['cpErrors']) = parseUploadedFile(filePath) 

414 os.remove(filePath) 

415 return redirect(url_for("admin.manageServiceLearningCourses")) 

416 

417@admin_bp.route('/manageServiceLearning', methods = ['GET', 'POST']) 

418@admin_bp.route('/manageServiceLearning/<term>', methods = ['GET', 'POST']) 

419def manageServiceLearningCourses(term=None): 

420 """ 

421 The SLC management page for admins 

422 """ 

423 if not g.current_user.isCeltsAdmin: 

424 abort(403) 

425 

426 if request.method == 'POST' and "submitParticipant" in request.form: 

427 saveCourseParticipantsToDatabase(session.pop('cpPreview', {})) 

428 flash('Courses and participants saved successfully!', 'success') 

429 return redirect(url_for('admin.manageServiceLearningCourses')) 

430 

431 manageTerm = Term.get_or_none(Term.id == term) or g.current_term 

432 

433 setRedirectTarget(request.full_path) 

434 

435 return render_template('/admin/manageServiceLearningFaculty.html', 

436 courseInstructors = getInstructorCourses(), 

437 unapprovedCourses = unapprovedCourses(manageTerm), 

438 approvedCourses = approvedCourses(manageTerm), 

439 terms = selectSurroundingTerms(g.current_term), 

440 term = manageTerm, 

441 cpPreview= session.get('cpPreview',{}), 

442 cpPreviewErrors = session.get('cpErrors',[]) 

443 ) 

444 

445@admin_bp.route('/admin/getSidebarInformation', methods=['GET']) 

446def getSidebarInformation() -> str: 

447 """ 

448 Get the count of unapproved courses and students interested in the minor for the current term  

449 to display in the admin sidebar. It must be returned as a string to be received by the 

450 ajax request. 

451 """ 

452 unapprovedCoursesCount: int = len(unapprovedCourses(g.current_term)) 

453 interestedStudentsCount: int = len(getMinorInterest()) 

454 return {"unapprovedCoursesCount": unapprovedCoursesCount, 

455 "interestedStudentsCount": interestedStudentsCount} 

456 

457@admin_bp.route("/deleteUploadedFile", methods= ["POST"]) 

458def removeFromSession(): 

459 try: 

460 session.pop('cpPreview') 

461 except KeyError: 

462 pass 

463 

464 return "" 

465 

466@admin_bp.route("/manageBonner") 

467def manageBonner(): 

468 if not g.current_user.isCeltsAdmin: 

469 abort(403) 

470 

471 return render_template("/admin/bonnerManagement.html", 

472 cohorts=getBonnerCohorts(), 

473 events=getBonnerEvents(g.current_term), 

474 requirements = getCertRequirements(certification=Certification.BONNER)) 

475 

476@admin_bp.route("/bonner/<year>/<method>/<username>", methods=["POST"]) 

477def updatecohort(year, method, username): 

478 if not g.current_user.isCeltsAdmin: 

479 abort(403) 

480 

481 try: 

482 user = User.get_by_id(username) 

483 except: 

484 abort(500) 

485 

486 if method == "add": 

487 try: 

488 BonnerCohort.create(year=year, user=user) 

489 flash(f"Successfully added {user.fullName} to {year} Bonner Cohort.", "success") 

490 except IntegrityError as e: 

491 # if they already exist, ignore the error 

492 flash(f'Error: {user.fullName} already added.', "danger") 

493 pass 

494 

495 elif method == "remove": 

496 BonnerCohort.delete().where(BonnerCohort.user == user, BonnerCohort.year == year).execute() 

497 flash(f"Successfully removed {user.fullName} from {year} Bonner Cohort.", "success") 

498 else: 

499 flash(f"Error: {user.fullName} can't be added.", "danger") 

500 abort(500) 

501 

502 return "" 

503 

504@admin_bp.route("/bonnerxls") 

505def bonnerxls(): 

506 if not g.current_user.isCeltsAdmin: 

507 abort(403) 

508 

509 newfile = makeBonnerXls() 

510 return send_file(open(newfile, 'rb'), download_name='BonnerStudents.xlsx', as_attachment=True) 

511 

512@admin_bp.route("/saveRequirements/<certid>", methods=["POST"]) 

513def saveRequirements(certid): 

514 if not g.current_user.isCeltsAdmin: 

515 abort(403) 

516 

517 newRequirements = updateCertRequirements(certid, request.get_json()) 

518 

519 return jsonify([requirement.id for requirement in newRequirements]) 

520 

521 

522@admin_bp.route("/displayEventFile", methods=["POST"]) 

523def displayEventFile(): 

524 fileData= request.form 

525 eventfile=FileHandler(eventId=fileData["id"]) 

526 eventfile.changeDisplay(fileData['id']) 

527 return ""