Coverage for app/controllers/admin/routes.py: 24%

353 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-24 15:19 +0000

1from flask import request, render_template, url_for, g, redirect 

2from flask import flash, abort, jsonify, session, send_file 

3from peewee import DoesNotExist, fn, IntegrityError 

4from playhouse.shortcuts import model_to_dict 

5import json 

6from datetime import datetime 

7import os 

8 

9from app import app 

10from app.models.program import Program 

11from app.models.event import Event 

12from app.models.eventRsvp import EventRsvp 

13from app.models.eventParticipant import EventParticipant 

14from app.models.user import User 

15from app.models.eventTemplate import EventTemplate 

16from app.models.adminLog import AdminLog 

17from app.models.eventRsvpLog import EventRsvpLog 

18from app.models.attachmentUpload import AttachmentUpload 

19from app.models.bonnerCohort import BonnerCohort 

20from app.models.certification import Certification 

21from app.models.user import User 

22from app.models.term import Term 

23from app.models.eventViews import EventView 

24from app.models.courseStatus import CourseStatus 

25 

26from app.logic.userManagement import getAllowedPrograms, getAllowedTemplates 

27from app.logic.createLogs import createAdminLog 

28from app.logic.certification import getCertRequirements, updateCertRequirements 

29from app.logic.utils import selectSurroundingTerms, getFilesFromRequest, getRedirectTarget, setRedirectTarget 

30from app.logic.events import cancelEvent, deleteEvent, attemptSaveEvent, preprocessEventData, calculateRecurringEventFrequency, deleteEventAndAllFollowing, deleteAllRecurringEvents, getBonnerEvents,addEventView, getEventRsvpCount, copyRsvpToNewEvent, getCountdownToEvent 

31from app.logic.participants import getParticipationStatusForTrainings, checkUserRsvp 

32from app.logic.minor import getMinorInterest 

33from app.logic.fileHandler import FileHandler 

34from app.logic.bonner import getBonnerCohorts, makeBonnerXls, rsvpForBonnerCohort 

35from app.logic.serviceLearningCourses import parseUploadedFile, saveCourseParticipantsToDatabase, unapprovedCourses, approvedCourses, getInstructorCourses 

36 

37from app.controllers.admin import admin_bp 

38 

39@admin_bp.route('/switch_user', methods=['POST']) 

40def switchUser(): 

41 if app.env == "production": 

42 print(f"An attempt was made to switch to another user by {g.current_user.username}!") 

43 abort(403) 

44 

45 print(f"Switching user from {g.current_user} to",request.form['newuser']) 

46 session['current_user'] = model_to_dict(User.get_by_id(request.form['newuser'])) 

47 

48 return redirect(request.referrer) 

49 

50 

51@admin_bp.route('/eventTemplates') 

52def templateSelect(): 

53 if g.current_user.isCeltsAdmin or g.current_user.isCeltsStudentStaff: 

54 allprograms = getAllowedPrograms(g.current_user) 

55 visibleTemplates = getAllowedTemplates(g.current_user) 

56 return render_template("/events/template_selector.html", 

57 programs=allprograms, 

58 celtsSponsoredProgram = Program.get(Program.isOtherCeltsSponsored), 

59 templates=visibleTemplates) 

60 else: 

61 abort(403) 

62 

63 

64@admin_bp.route('/eventTemplates/<templateid>/<programid>/create', methods=['GET','POST']) 

65def createEvent(templateid, programid): 

66 if not (g.current_user.isAdmin or g.current_user.isProgramManagerFor(programid)): 

67 abort(403) 

68 

69 # Validate given URL 

70 program = None 

71 try: 

72 template = EventTemplate.get_by_id(templateid) 

73 if programid: 

74 program = Program.get_by_id(programid) 

75 except DoesNotExist as e: 

76 print("Invalid template or program id:", e) 

77 flash("There was an error with your selection. Please try again or contact Systems Support.", "danger") 

78 return redirect(url_for("admin.program_picker")) 

79 

80 # Get the data from the form or from the template 

81 eventData = template.templateData 

82 

83 eventData['program'] = program 

84 

85 if request.method == "GET": 

86 eventData['contactName'] = "CELTS Admin" 

87 eventData['contactEmail'] = app.config['celts_admin_contact'] 

88 if program: 

89 eventData['location'] = program.defaultLocation 

90 if program.contactName: 

91 eventData['contactName'] = program.contactName 

92 if program.contactEmail: 

93 eventData['contactEmail'] = program.contactEmail 

94 

95 # Try to save the form 

96 if request.method == "POST": 

97 eventData.update(request.form.copy()) 

98 try: 

99 savedEvents, validationErrorMessage = attemptSaveEvent(eventData, getFilesFromRequest(request)) 

100 

101 except Exception as e: 

102 print("Error saving event:", e) 

103 savedEvents = False 

104 validationErrorMessage = "Unknown Error Saving Event. Please try again" 

105 

106 if savedEvents: 

107 rsvpcohorts = request.form.getlist("cohorts[]") 

108 for year in rsvpcohorts: 

109 rsvpForBonnerCohort(int(year), savedEvents[0].id) 

110 

111 noun = (eventData['isRecurring'] == 'on' and "Events" or "Event") # pluralize 

112 flash(f"{noun} successfully created!", 'success') 

113 

114 if program: 

115 if len(savedEvents) > 1: 

116 createAdminLog(f"Created a recurring event, <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a>, for {program.programName}, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}. The last event in the series will be on {datetime.strftime(savedEvents[-1].startDate, '%m/%d/%Y')}.") 

117 else: 

118 createAdminLog(f"Created <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a> for {program.programName}, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}.") 

119 else: 

120 createAdminLog(f"Created a non-program event, <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a>, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}.") 

121 

122 return redirect(url_for("admin.eventDisplay", eventId = savedEvents[0].id)) 

123 else: 

124 flash(validationErrorMessage, 'warning') 

125 

126 # make sure our data is the same regardless of GET or POST 

127 preprocessEventData(eventData) 

128 isProgramManager = g.current_user.isProgramManagerFor(programid) 

129 

130 futureTerms = selectSurroundingTerms(g.current_term, prevTerms=0) 

131 

132 requirements, bonnerCohorts = [], [] 

133 if eventData['program'] is not None and eventData['program'].isBonnerScholars: 

134 requirements = getCertRequirements(Certification.BONNER) 

135 bonnerCohorts = getBonnerCohorts(limit=5) 

136 return render_template(f"/admin/{template.templateFile}", 

137 template = template, 

138 eventData = eventData, 

139 futureTerms = futureTerms, 

140 requirements = requirements, 

141 bonnerCohorts = bonnerCohorts, 

142 isProgramManager = isProgramManager) 

143 

144 

145@admin_bp.route('/event/<eventId>/rsvp', methods=['GET']) 

146def rsvpLogDisplay(eventId): 

147 event = Event.get_by_id(eventId) 

148 if g.current_user.isCeltsAdmin or (g.current_user.isCeltsStudentStaff and g.current_user.isProgramManagerFor(event.program)): 

149 allLogs = EventRsvpLog.select(EventRsvpLog, User).join(User).where(EventRsvpLog.event_id == eventId).order_by(EventRsvpLog.createdOn.desc()) 

150 return render_template("/events/rsvpLog.html", 

151 event = event, 

152 allLogs = allLogs) 

153 else: 

154 abort(403) 

155 

156@admin_bp.route('/event/<eventId>/renew', methods=['POST']) 

157def renewEvent(eventId): 

158 try: 

159 formData = request.form 

160 try: 

161 assert formData['timeStart'] < formData['timeEnd'] 

162 except AssertionError: 

163 flash("End time must be after start time", 'warning') 

164 return redirect(url_for('admin.eventDisplay', eventId = eventId)) 

165 

166 try: 

167 if formData.get('dateEnd'): 

168 assert formData['dateStart'] < formData['dateEnd'] 

169 except AssertionError: 

170 flash("End date must be after start date", 'warning') 

171 return redirect(url_for('admin.eventDisplay', eventId = eventId)) 

172 

173 

174 priorEvent = model_to_dict(Event.get_by_id(eventId)) 

175 newEventDict = priorEvent.copy() 

176 newEventDict.pop('id') 

177 newEventDict.update({ 

178 'program': int(priorEvent['program']['id']), 

179 'term': int(priorEvent['term']['id']), 

180 'timeStart': formData['timeStart'], 

181 'timeEnd': formData['timeEnd'], 

182 'location': formData['location'], 

183 'startDate': f'{formData["startDate"][-4:]}-{formData["startDate"][0:-5]}', 

184 'endDate': f'{formData["endDate"][-4:]}-{formData["endDate"][0:-5]}', 

185 'isRecurring': bool(priorEvent['recurringId']) 

186 }) 

187 newEvent, message = attemptSaveEvent(newEventDict, renewedEvent = True) 

188 if message: 

189 flash(message, "danger") 

190 return redirect(url_for('admin.eventDisplay', eventId = eventId)) 

191 

192 copyRsvpToNewEvent(priorEvent, newEvent[0]) 

193 createAdminLog(f"Renewed {priorEvent['name']} as <a href='event/{newEvent[0].id}/view'>{newEvent[0].name}</a>.") 

194 flash("Event successfully renewed.", "success") 

195 return redirect(url_for('admin.eventDisplay', eventId = newEvent[0].id)) 

196 

197 

198 except Exception as e: 

199 print("Error while trying to renew event:", e) 

200 flash("There was an error renewing the event. Please try again or contact Systems Support.", 'danger') 

201 return redirect(url_for('admin.eventDisplay', eventId = eventId)) 

202 

203 

204 

205@admin_bp.route('/event/<eventId>/view', methods=['GET']) 

206@admin_bp.route('/event/<eventId>/edit', methods=['GET','POST']) 

207def eventDisplay(eventId): 

208 pageViewsCount = EventView.select().where(EventView.event == eventId).count() 

209 if request.method == 'GET' and request.path == f'/event/{eventId}/view': 

210 viewer = g.current_user 

211 event = Event.get_by_id(eventId) 

212 addEventView(viewer,event) 

213 # Validate given URL 

214 try: 

215 event = Event.get_by_id(eventId) 

216 except DoesNotExist as e: 

217 print(f"Unknown event: {eventId}") 

218 abort(404) 

219 

220 notPermitted = not (g.current_user.isCeltsAdmin or g.current_user.isProgramManagerForEvent(event)) 

221 if 'edit' in request.url_rule.rule and notPermitted: 

222 abort(403) 

223 

224 eventData = model_to_dict(event, recurse=False) 

225 associatedAttachments = AttachmentUpload.select().where(AttachmentUpload.event == event) 

226 filepaths = FileHandler(eventId=event.id).retrievePath(associatedAttachments) 

227 

228 image = None 

229 picurestype = [".jpeg", ".png", ".gif", ".jpg", ".svg", ".webp"] 

230 for attachment in associatedAttachments: 

231 for extension in picurestype: 

232 if (attachment.fileName.endswith(extension) and attachment.isDisplayed == True): 

233 image = filepaths[attachment.fileName][0] 

234 if image: 

235 break 

236 

237 

238 if request.method == "POST": # Attempt to save form 

239 eventData = request.form.copy() 

240 try: 

241 savedEvents, validationErrorMessage = attemptSaveEvent(eventData, getFilesFromRequest(request)) 

242 

243 except Exception as e: 

244 print("Error saving event:", e) 

245 savedEvents = False 

246 validationErrorMessage = "Unknown Error Saving Event. Please try again" 

247 

248 

249 if savedEvents: 

250 rsvpcohorts = request.form.getlist("cohorts[]") 

251 for year in rsvpcohorts: 

252 rsvpForBonnerCohort(int(year), event.id) 

253 

254 flash("Event successfully updated!", "success") 

255 return redirect(url_for("admin.eventDisplay", eventId = event.id)) 

256 else: 

257 flash(validationErrorMessage, 'warning') 

258 

259 # make sure our data is the same regardless of GET and POST 

260 preprocessEventData(eventData) 

261 eventData['program'] = event.program 

262 futureTerms = selectSurroundingTerms(g.current_term) 

263 userHasRSVPed = checkUserRsvp(g.current_user, event) 

264 filepaths = FileHandler(eventId=event.id).retrievePath(associatedAttachments) 

265 isProgramManager = g.current_user.isProgramManagerFor(eventData['program']) 

266 requirements, bonnerCohorts = [], [] 

267 

268 if eventData['program'] and eventData['program'].isBonnerScholars: 

269 requirements = getCertRequirements(Certification.BONNER) 

270 bonnerCohorts = getBonnerCohorts(limit=5) 

271 

272 rule = request.url_rule 

273 

274 # Event Edit 

275 if 'edit' in rule.rule: 

276 return render_template("admin/createEvent.html", 

277 eventData = eventData, 

278 futureTerms=futureTerms, 

279 event = event, 

280 requirements = requirements, 

281 bonnerCohorts = bonnerCohorts, 

282 userHasRSVPed = userHasRSVPed, 

283 isProgramManager = isProgramManager, 

284 filepaths = filepaths) 

285 # Event View 

286 else: 

287 # get text representations of dates for html 

288 eventData['timeStart'] = event.timeStart.strftime("%-I:%M %p") 

289 eventData['timeEnd'] = event.timeEnd.strftime("%-I:%M %p") 

290 eventData['startDate'] = event.startDate.strftime("%m/%d/%Y") 

291 eventCountdown = getCountdownToEvent(event) 

292 

293 

294 # Identify the next event in a recurring series 

295 if event.recurringId: 

296 eventSeriesList = list(Event.select().where(Event.recurringId == event.recurringId) 

297 .where((Event.isCanceled == False) | (Event.id == event.id)) 

298 .order_by(Event.startDate)) 

299 eventIndex = eventSeriesList.index(event) 

300 if len(eventSeriesList) != (eventIndex + 1): 

301 eventData["nextRecurringEvent"] = eventSeriesList[eventIndex + 1] 

302 

303 currentEventRsvpAmount = getEventRsvpCount(event.id) 

304 

305 userParticipatedTrainingEvents = getParticipationStatusForTrainings(eventData['program'], [g.current_user], g.current_term) 

306 

307 return render_template("eventView.html", 

308 eventData=eventData, 

309 event=event, 

310 userHasRSVPed=userHasRSVPed, 

311 programTrainings=userParticipatedTrainingEvents, 

312 currentEventRsvpAmount=currentEventRsvpAmount, 

313 isProgramManager=isProgramManager, 

314 filepaths=filepaths, 

315 image=image, 

316 pageViewsCount=pageViewsCount, 

317 eventCountdown=eventCountdown) 

318 

319 

320 

321@admin_bp.route('/event/<eventId>/cancel', methods=['POST']) 

322def cancelRoute(eventId): 

323 if g.current_user.isAdmin: 

324 try: 

325 cancelEvent(eventId) 

326 return redirect(request.referrer) 

327 

328 except Exception as e: 

329 print('Error while canceling event:', e) 

330 return "", 500 

331 

332 else: 

333 abort(403) 

334 

335@admin_bp.route('/event/<eventId>/delete', methods=['POST']) 

336def deleteRoute(eventId): 

337 try: 

338 deleteEvent(eventId) 

339 flash("Event successfully deleted.", "success") 

340 return redirect(url_for("main.events", selectedTerm=g.current_term)) 

341 

342 except Exception as e: 

343 print('Error while canceling event:', e) 

344 return "", 500 

345@admin_bp.route('/event/<eventId>/deleteEventAndAllFollowing', methods=['POST']) 

346def deleteEventAndAllFollowingRoute(eventId): 

347 try: 

348 deleteEventAndAllFollowing(eventId) 

349 flash("Events successfully deleted.", "success") 

350 return redirect(url_for("main.events", selectedTerm=g.current_term)) 

351 

352 except Exception as e: 

353 print('Error while canceling event:', e) 

354 return "", 500 

355@admin_bp.route('/event/<eventId>/deleteAllRecurring', methods=['POST']) 

356def deleteAllRecurringEventsRoute(eventId): 

357 try: 

358 deleteAllRecurringEvents(eventId) 

359 flash("Events successfully deleted.", "success") 

360 return redirect(url_for("main.events", selectedTerm=g.current_term)) 

361 

362 except Exception as e: 

363 print('Error while canceling event:', e) 

364 return "", 500 

365 

366@admin_bp.route('/makeRecurringEvents', methods=['POST']) 

367def addRecurringEvents(): 

368 recurringEvents = calculateRecurringEventFrequency(preprocessEventData(request.form.copy())) 

369 return json.dumps(recurringEvents, default=str) 

370 

371 

372@admin_bp.route('/userProfile', methods=['POST']) 

373def userProfile(): 

374 volunteerName= request.form.copy() 

375 if volunteerName['searchStudentsInput']: 

376 username = volunteerName['searchStudentsInput'].strip("()") 

377 user=username.split('(')[-1] 

378 return redirect(url_for('main.viewUsersProfile', username=user)) 

379 else: 

380 flash(f"Please enter the first name or the username of the student you would like to search for.", category='danger') 

381 return redirect(url_for('admin.studentSearchPage')) 

382 

383@admin_bp.route('/search_student', methods=['GET']) 

384def studentSearchPage(): 

385 if g.current_user.isAdmin: 

386 return render_template("/admin/searchStudentPage.html") 

387 abort(403) 

388 

389@admin_bp.route('/addParticipants', methods = ['GET']) 

390def addParticipants(): 

391 '''Renders the page, will be removed once merged with full page''' 

392 

393 return render_template('addParticipants.html', 

394 title="Add Participants") 

395 

396@admin_bp.route('/adminLogs', methods = ['GET', 'POST']) 

397def adminLogs(): 

398 if g.current_user.isCeltsAdmin: 

399 allLogs = AdminLog.select(AdminLog, User).join(User).order_by(AdminLog.createdOn.desc()) 

400 return render_template("/admin/adminLogs.html", 

401 allLogs = allLogs) 

402 else: 

403 abort(403) 

404 

405@admin_bp.route("/deleteEventFile", methods=["POST"]) 

406def deleteEventFile(): 

407 fileData= request.form 

408 eventfile=FileHandler(eventId=fileData["databaseId"]) 

409 eventfile.deleteFile(fileData["fileId"]) 

410 return "" 

411 

412@admin_bp.route("/uploadCourseParticipant", methods= ["POST"]) 

413def addCourseFile(): 

414 fileData = request.files['addCourseParticipants'] 

415 filePath = os.path.join(app.config["files"]["base_path"], fileData.filename) 

416 fileData.save(filePath) 

417 (session['cpPreview'], session['cpErrors']) = parseUploadedFile(filePath) 

418 os.remove(filePath) 

419 return redirect(url_for("admin.manageServiceLearningCourses")) 

420 

421@admin_bp.route('/manageServiceLearning', methods = ['GET', 'POST']) 

422@admin_bp.route('/manageServiceLearning/<term>', methods = ['GET', 'POST']) 

423def manageServiceLearningCourses(term=None): 

424 """ 

425 The SLC management page for admins 

426 """ 

427 if not g.current_user.isCeltsAdmin: 

428 abort(403) 

429 

430 if request.method == 'POST' and "submitParticipant" in request.form: 

431 saveCourseParticipantsToDatabase(session.pop('cpPreview', {})) 

432 flash('Courses and participants saved successfully!', 'success') 

433 return redirect(url_for('admin.manageServiceLearningCourses')) 

434 

435 manageTerm = Term.get_or_none(Term.id == term) or g.current_term 

436 

437 setRedirectTarget(request.full_path) 

438 

439 return render_template('/admin/manageServiceLearningFaculty.html', 

440 courseInstructors = getInstructorCourses(), 

441 unapprovedCourses = unapprovedCourses(manageTerm), 

442 approvedCourses = approvedCourses(manageTerm), 

443 terms = selectSurroundingTerms(g.current_term), 

444 term = manageTerm, 

445 cpPreview= session.get('cpPreview',{}), 

446 cpPreviewErrors = session.get('cpErrors',[]) 

447 ) 

448 

449@admin_bp.route('/admin/getSidebarInformation', methods=['GET']) 

450def getSidebarInformation() -> str: 

451 """ 

452 Get the count of unapproved courses and students interested in the minor for the current term  

453 to display in the admin sidebar. It must be returned as a string to be received by the 

454 ajax request. 

455 """ 

456 unapprovedCoursesCount: int = len(unapprovedCourses(g.current_term)) 

457 interestedStudentsCount: int = len(getMinorInterest()) 

458 return {"unapprovedCoursesCount": unapprovedCoursesCount, 

459 "interestedStudentsCount": interestedStudentsCount} 

460 

461@admin_bp.route("/deleteUploadedFile", methods= ["POST"]) 

462def removeFromSession(): 

463 try: 

464 session.pop('cpPreview') 

465 except KeyError: 

466 pass 

467 

468 return "" 

469 

470@admin_bp.route("/manageBonner") 

471def manageBonner(): 

472 if not g.current_user.isCeltsAdmin: 

473 abort(403) 

474 

475 return render_template("/admin/bonnerManagement.html", 

476 cohorts=getBonnerCohorts(), 

477 events=getBonnerEvents(g.current_term), 

478 requirements=getCertRequirements(certification=Certification.BONNER)) 

479 

480@admin_bp.route("/bonner/<year>/<method>/<username>", methods=["POST"]) 

481def updatecohort(year, method, username): 

482 if not g.current_user.isCeltsAdmin: 

483 abort(403) 

484 

485 try: 

486 user = User.get_by_id(username) 

487 except: 

488 abort(500) 

489 

490 if method == "add": 

491 try: 

492 BonnerCohort.create(year=year, user=user) 

493 flash(f"Successfully added {user.fullName} to {year} Bonner Cohort.", "success") 

494 except IntegrityError as e: 

495 # if they already exist, ignore the error 

496 flash(f'Error: {user.fullName} already added.', "danger") 

497 pass 

498 

499 elif method == "remove": 

500 BonnerCohort.delete().where(BonnerCohort.user == user, BonnerCohort.year == year).execute() 

501 flash(f"Successfully removed {user.fullName} from {year} Bonner Cohort.", "success") 

502 else: 

503 flash(f"Error: {user.fullName} can't be added.", "danger") 

504 abort(500) 

505 

506 return "" 

507 

508@admin_bp.route("/bonnerxls") 

509def bonnerxls(): 

510 if not g.current_user.isCeltsAdmin: 

511 abort(403) 

512 

513 newfile = makeBonnerXls() 

514 return send_file(open(newfile, 'rb'), download_name='BonnerStudents.xlsx', as_attachment=True) 

515 

516@admin_bp.route("/saveRequirements/<certid>", methods=["POST"]) 

517def saveRequirements(certid): 

518 if not g.current_user.isCeltsAdmin: 

519 abort(403) 

520 

521 newRequirements = updateCertRequirements(certid, request.get_json()) 

522 

523 return jsonify([requirement.id for requirement in newRequirements]) 

524 

525 

526@admin_bp.route("/displayEventFile", methods=["POST"]) 

527def displayEventFile(): 

528 fileData= request.form 

529 eventfile=FileHandler(eventId=fileData["id"]) 

530 eventfile.changeDisplay(fileData['id']) 

531 return ""