Coverage for app/controllers/admin/routes.py: 24%
382 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-06-21 18:28 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2024-06-21 18:28 +0000
1from flask import request, render_template, url_for, g, redirect
2from flask import flash, abort, jsonify, session, send_file
3from peewee import DoesNotExist, fn, IntegrityError
4from playhouse.shortcuts import model_to_dict
5import json
6from datetime import datetime
7import os
9from app import app
10from app.models.program import Program
11from app.models.event import Event
12from app.models.eventRsvp import EventRsvp
13from app.models.eventParticipant import EventParticipant
14from app.models.user import User
15from app.models.course import Course
16from app.models.courseInstructor import CourseInstructor
17from app.models.courseParticipant import CourseParticipant
18from app.models.eventTemplate import EventTemplate
19from app.models.activityLog import ActivityLog
20from app.models.eventRsvpLog import EventRsvpLog
21from app.models.attachmentUpload import AttachmentUpload
22from app.models.bonnerCohort import BonnerCohort
23from app.models.certification import Certification
24from app.models.user import User
25from app.models.term import Term
26from app.models.eventViews import EventView
27from app.models.courseStatus import CourseStatus
29from app.logic.userManagement import getAllowedPrograms, getAllowedTemplates
30from app.logic.createLogs import createActivityLog
31from app.logic.certification import getCertRequirements, updateCertRequirements
32from app.logic.utils import selectSurroundingTerms, getFilesFromRequest, getRedirectTarget, setRedirectTarget
33from app.logic.events import cancelEvent, deleteEvent, attemptSaveEvent, preprocessEventData, calculateRecurringEventFrequency, deleteEventAndAllFollowing, deleteAllRecurringEvents, getBonnerEvents,addEventView, getEventRsvpCount, copyRsvpToNewEvent, getCountdownToEvent
34from app.logic.participants import getParticipationStatusForTrainings, checkUserRsvp
35from app.logic.minor import getMinorInterest
36from app.logic.fileHandler import FileHandler
37from app.logic.bonner import getBonnerCohorts, makeBonnerXls, rsvpForBonnerCohort
38from app.logic.serviceLearningCourses import parseUploadedFile, saveCourseParticipantsToDatabase, unapprovedCourses, approvedCourses, getImportedCourses, getInstructorCourses, editImportedCourses
40from app.controllers.admin import admin_bp
43@admin_bp.route('/switch_user', methods=['POST'])
44def switchUser():
45 if app.env == "production":
46 print(f"An attempt was made to switch to another user by {g.current_user.username}!")
47 abort(403)
49 print(f"Switching user from {g.current_user} to",request.form['newuser'])
50 session['current_user'] = model_to_dict(User.get_by_id(request.form['newuser']))
52 return redirect(request.referrer)
55@admin_bp.route('/eventTemplates')
56def templateSelect():
57 if g.current_user.isCeltsAdmin or g.current_user.isCeltsStudentStaff:
58 allprograms = getAllowedPrograms(g.current_user)
59 visibleTemplates = getAllowedTemplates(g.current_user)
60 return render_template("/events/template_selector.html",
61 programs=allprograms,
62 celtsSponsoredProgram = Program.get(Program.isOtherCeltsSponsored),
63 templates=visibleTemplates)
64 else:
65 abort(403)
68@admin_bp.route('/eventTemplates/<templateid>/<programid>/create', methods=['GET','POST'])
69def createEvent(templateid, programid):
70 if not (g.current_user.isAdmin or g.current_user.isProgramManagerFor(programid)):
71 abort(403)
73 # Validate given URL
74 program = None
75 try:
76 template = EventTemplate.get_by_id(templateid)
77 if programid:
78 program = Program.get_by_id(programid)
79 except DoesNotExist as e:
80 print("Invalid template or program id:", e)
81 flash("There was an error with your selection. Please try again or contact Systems Support.", "danger")
82 return redirect(url_for("admin.program_picker"))
84 # Get the data from the form or from the template
85 eventData = template.templateData
87 eventData['program'] = program
89 if request.method == "GET":
90 eventData['contactName'] = "CELTS Admin"
91 eventData['contactEmail'] = app.config['celts_admin_contact']
92 if program:
93 eventData['location'] = program.defaultLocation
94 if program.contactName:
95 eventData['contactName'] = program.contactName
96 if program.contactEmail:
97 eventData['contactEmail'] = program.contactEmail
99 # Try to save the form
100 if request.method == "POST":
101 eventData.update(request.form.copy())
102 try:
103 savedEvents, validationErrorMessage = attemptSaveEvent(eventData, getFilesFromRequest(request))
105 except Exception as e:
106 print("Error saving event:", e)
107 savedEvents = False
108 validationErrorMessage = "Unknown Error Saving Event. Please try again"
110 if savedEvents:
111 rsvpcohorts = request.form.getlist("cohorts[]")
112 for year in rsvpcohorts:
113 rsvpForBonnerCohort(int(year), savedEvents[0].id)
115 noun = (eventData['isRecurring'] == 'on' and "Events" or "Event") # pluralize
116 flash(f"{noun} successfully created!", 'success')
118 if program:
119 if len(savedEvents) > 1:
120 createActivityLog(f"Created a recurring event, <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a>, for {program.programName}, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}. The last event in the series will be on {datetime.strftime(savedEvents[-1].startDate, '%m/%d/%Y')}.")
121 else:
122 createActivityLog(f"Created <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a> for {program.programName}, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}.")
123 else:
124 createActivityLog(f"Created a non-program event, <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a>, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}.")
126 return redirect(url_for("admin.eventDisplay", eventId = savedEvents[0].id))
127 else:
128 flash(validationErrorMessage, 'warning')
130 # make sure our data is the same regardless of GET or POST
131 preprocessEventData(eventData)
132 isProgramManager = g.current_user.isProgramManagerFor(programid)
134 futureTerms = selectSurroundingTerms(g.current_term, prevTerms=0)
136 requirements, bonnerCohorts = [], []
137 if eventData['program'] is not None and eventData['program'].isBonnerScholars:
138 requirements = getCertRequirements(Certification.BONNER)
139 bonnerCohorts = getBonnerCohorts(limit=5)
140 return render_template(f"/admin/{template.templateFile}",
141 template = template,
142 eventData = eventData,
143 futureTerms = futureTerms,
144 requirements = requirements,
145 bonnerCohorts = bonnerCohorts,
146 isProgramManager = isProgramManager)
149@admin_bp.route('/event/<eventId>/rsvp', methods=['GET'])
150def rsvpLogDisplay(eventId):
151 event = Event.get_by_id(eventId)
152 if g.current_user.isCeltsAdmin or (g.current_user.isCeltsStudentStaff and g.current_user.isProgramManagerFor(event.program)):
153 allLogs = EventRsvpLog.select(EventRsvpLog, User).join(User).where(EventRsvpLog.event_id == eventId).order_by(EventRsvpLog.createdOn.desc())
154 return render_template("/events/rsvpLog.html",
155 event = event,
156 allLogs = allLogs)
157 else:
158 abort(403)
160@admin_bp.route('/event/<eventId>/renew', methods=['POST'])
161def renewEvent(eventId):
162 try:
163 formData = request.form
164 try:
165 assert formData['timeStart'] < formData['timeEnd']
166 except AssertionError:
167 flash("End time must be after start time", 'warning')
168 return redirect(url_for('admin.eventDisplay', eventId = eventId))
170 try:
171 if formData.get('dateEnd'):
172 assert formData['dateStart'] < formData['dateEnd']
173 except AssertionError:
174 flash("End date must be after start date", 'warning')
175 return redirect(url_for('admin.eventDisplay', eventId = eventId))
178 priorEvent = model_to_dict(Event.get_by_id(eventId))
179 newEventDict = priorEvent.copy()
180 newEventDict.pop('id')
181 newEventDict.update({
182 'program': int(priorEvent['program']['id']),
183 'term': int(priorEvent['term']['id']),
184 'timeStart': formData['timeStart'],
185 'timeEnd': formData['timeEnd'],
186 'location': formData['location'],
187 'startDate': f'{formData["startDate"][-4:]}-{formData["startDate"][0:-5]}',
188 'endDate': f'{formData["endDate"][-4:]}-{formData["endDate"][0:-5]}',
189 'isRecurring': bool(priorEvent['recurringId'])
190 })
191 newEvent, message = attemptSaveEvent(newEventDict, renewedEvent = True)
192 if message:
193 flash(message, "danger")
194 return redirect(url_for('admin.eventDisplay', eventId = eventId))
196 copyRsvpToNewEvent(priorEvent, newEvent[0])
197 createActivityLog(f"Renewed {priorEvent['name']} as <a href='event/{newEvent[0].id}/view'>{newEvent[0].name}</a>.")
198 flash("Event successfully renewed.", "success")
199 return redirect(url_for('admin.eventDisplay', eventId = newEvent[0].id))
202 except Exception as e:
203 print("Error while trying to renew event:", e)
204 flash("There was an error renewing the event. Please try again or contact Systems Support.", 'danger')
205 return redirect(url_for('admin.eventDisplay', eventId = eventId))
209@admin_bp.route('/event/<eventId>/view', methods=['GET'])
210@admin_bp.route('/event/<eventId>/edit', methods=['GET','POST'])
211def eventDisplay(eventId):
212 pageViewsCount = EventView.select().where(EventView.event == eventId).count()
213 if request.method == 'GET' and request.path == f'/event/{eventId}/view':
214 viewer = g.current_user
215 event = Event.get_by_id(eventId)
216 addEventView(viewer,event)
217 # Validate given URL
218 try:
219 event = Event.get_by_id(eventId)
220 except DoesNotExist as e:
221 print(f"Unknown event: {eventId}")
222 abort(404)
224 notPermitted = not (g.current_user.isCeltsAdmin or g.current_user.isProgramManagerForEvent(event))
225 if 'edit' in request.url_rule.rule and notPermitted:
226 abort(403)
228 eventData = model_to_dict(event, recurse=False)
229 associatedAttachments = AttachmentUpload.select().where(AttachmentUpload.event == event)
230 filepaths = FileHandler(eventId=event.id).retrievePath(associatedAttachments)
232 image = None
233 picurestype = [".jpeg", ".png", ".gif", ".jpg", ".svg", ".webp"]
234 for attachment in associatedAttachments:
235 for extension in picurestype:
236 if (attachment.fileName.endswith(extension) and attachment.isDisplayed == True):
237 image = filepaths[attachment.fileName][0]
238 if image:
239 break
242 if request.method == "POST": # Attempt to save form
243 eventData = request.form.copy()
244 try:
245 savedEvents, validationErrorMessage = attemptSaveEvent(eventData, getFilesFromRequest(request))
247 except Exception as e:
248 print("Error saving event:", e)
249 savedEvents = False
250 validationErrorMessage = "Unknown Error Saving Event. Please try again"
253 if savedEvents:
254 rsvpcohorts = request.form.getlist("cohorts[]")
255 for year in rsvpcohorts:
256 rsvpForBonnerCohort(int(year), event.id)
258 flash("Event successfully updated!", "success")
259 return redirect(url_for("admin.eventDisplay", eventId = event.id))
260 else:
261 flash(validationErrorMessage, 'warning')
263 # make sure our data is the same regardless of GET and POST
264 preprocessEventData(eventData)
265 eventData['program'] = event.program
266 futureTerms = selectSurroundingTerms(g.current_term)
267 userHasRSVPed = checkUserRsvp(g.current_user, event)
268 filepaths = FileHandler(eventId=event.id).retrievePath(associatedAttachments)
269 isProgramManager = g.current_user.isProgramManagerFor(eventData['program'])
270 requirements, bonnerCohorts = [], []
272 if eventData['program'] and eventData['program'].isBonnerScholars:
273 requirements = getCertRequirements(Certification.BONNER)
274 bonnerCohorts = getBonnerCohorts(limit=5)
276 rule = request.url_rule
278 # Event Edit
279 if 'edit' in rule.rule:
280 return render_template("admin/createEvent.html",
281 eventData = eventData,
282 futureTerms=futureTerms,
283 event = event,
284 requirements = requirements,
285 bonnerCohorts = bonnerCohorts,
286 userHasRSVPed = userHasRSVPed,
287 isProgramManager = isProgramManager,
288 filepaths = filepaths)
289 # Event View
290 else:
291 # get text representations of dates for html
292 eventData['timeStart'] = event.timeStart.strftime("%-I:%M %p")
293 eventData['timeEnd'] = event.timeEnd.strftime("%-I:%M %p")
294 eventData['startDate'] = event.startDate.strftime("%m/%d/%Y")
295 eventCountdown = getCountdownToEvent(event)
298 # Identify the next event in a recurring series
299 if event.recurringId:
300 eventSeriesList = list(Event.select().where(Event.recurringId == event.recurringId)
301 .where((Event.isCanceled == False) | (Event.id == event.id))
302 .order_by(Event.startDate))
303 eventIndex = eventSeriesList.index(event)
304 if len(eventSeriesList) != (eventIndex + 1):
305 eventData["nextRecurringEvent"] = eventSeriesList[eventIndex + 1]
307 currentEventRsvpAmount = getEventRsvpCount(event.id)
309 userParticipatedTrainingEvents = getParticipationStatusForTrainings(eventData['program'], [g.current_user], g.current_term)
311 return render_template("eventView.html",
312 eventData=eventData,
313 event=event,
314 userHasRSVPed=userHasRSVPed,
315 programTrainings=userParticipatedTrainingEvents,
316 currentEventRsvpAmount=currentEventRsvpAmount,
317 isProgramManager=isProgramManager,
318 filepaths=filepaths,
319 image=image,
320 pageViewsCount=pageViewsCount,
321 eventCountdown=eventCountdown)
325@admin_bp.route('/event/<eventId>/cancel', methods=['POST'])
326def cancelRoute(eventId):
327 if g.current_user.isAdmin:
328 try:
329 cancelEvent(eventId)
330 return redirect(request.referrer)
332 except Exception as e:
333 print('Error while canceling event:', e)
334 return "", 500
336 else:
337 abort(403)
339@admin_bp.route('/event/<eventId>/delete', methods=['POST'])
340def deleteRoute(eventId):
341 try:
342 deleteEvent(eventId)
343 flash("Event successfully deleted.", "success")
344 return redirect(url_for("main.events", selectedTerm=g.current_term))
346 except Exception as e:
347 print('Error while canceling event:', e)
348 return "", 500
350@admin_bp.route('/event/<eventId>/deleteEventAndAllFollowing', methods=['POST'])
351def deleteEventAndAllFollowingRoute(eventId):
352 try:
353 deleteEventAndAllFollowing(eventId)
354 flash("Events successfully deleted.", "success")
355 return redirect(url_for("main.events", selectedTerm=g.current_term))
357 except Exception as e:
358 print('Error while canceling event:', e)
359 return "", 500
361@admin_bp.route('/event/<eventId>/deleteAllRecurring', methods=['POST'])
362def deleteAllRecurringEventsRoute(eventId):
363 try:
364 deleteAllRecurringEvents(eventId)
365 flash("Events successfully deleted.", "success")
366 return redirect(url_for("main.events", selectedTerm=g.current_term))
368 except Exception as e:
369 print('Error while canceling event:', e)
370 return "", 500
372@admin_bp.route('/makeRecurringEvents', methods=['POST'])
373def addRecurringEvents():
374 recurringEvents = calculateRecurringEventFrequency(preprocessEventData(request.form.copy()))
375 return json.dumps(recurringEvents, default=str)
378@admin_bp.route('/userProfile', methods=['POST'])
379def userProfile():
380 volunteerName= request.form.copy()
381 if volunteerName['searchStudentsInput']:
382 username = volunteerName['searchStudentsInput'].strip("()")
383 user=username.split('(')[-1]
384 return redirect(url_for('main.viewUsersProfile', username=user))
385 else:
386 flash(f"Please enter the first name or the username of the student you would like to search for.", category='danger')
387 return redirect(url_for('admin.studentSearchPage'))
389@admin_bp.route('/search_student', methods=['GET'])
390def studentSearchPage():
391 if g.current_user.isAdmin:
392 return render_template("/admin/searchStudentPage.html")
393 abort(403)
395@admin_bp.route('/addParticipants', methods = ['GET'])
396def addParticipants():
397 '''Renders the page, will be removed once merged with full page'''
399 return render_template('addParticipants.html',
400 title="Add Participants")
402@admin_bp.route('/activityLogs', methods = ['GET', 'POST'])
403def activityLogs():
404 if g.current_user.isCeltsAdmin:
405 allLogs = ActivityLog.select(ActivityLog, User).join(User).order_by(ActivityLog.createdOn.desc())
406 return render_template("/admin/activityLogs.html",
407 allLogs = allLogs)
408 else:
409 abort(403)
411@admin_bp.route("/deleteEventFile", methods=["POST"])
412def deleteEventFile():
413 fileData= request.form
414 eventfile=FileHandler(eventId=fileData["databaseId"])
415 eventfile.deleteFile(fileData["fileId"])
416 return ""
418@admin_bp.route("/uploadCourseParticipant", methods= ["POST"])
419def addCourseFile():
420 fileData = request.files['addCourseParticipants']
421 filePath = os.path.join(app.config["files"]["base_path"], fileData.filename)
422 fileData.save(filePath)
423 (session['cpPreview'], session['cpErrors']) = parseUploadedFile(filePath)
424 os.remove(filePath)
425 return redirect(url_for("admin.manageServiceLearningCourses"))
427@admin_bp.route('/manageServiceLearning', methods = ['GET', 'POST'])
428@admin_bp.route('/manageServiceLearning/<term>', methods = ['GET', 'POST'])
429def manageServiceLearningCourses(term=None):
431 """
432 The SLC management page for admins
433 """
434 if not g.current_user.isCeltsAdmin:
435 abort(403)
437 if request.method == 'POST' and "submitParticipant" in request.form:
438 saveCourseParticipantsToDatabase(session.pop('cpPreview', {}))
439 flash('Courses and participants saved successfully!', 'success')
440 return redirect(url_for('admin.manageServiceLearningCourses'))
442 manageTerm = Term.get_or_none(Term.id == term) or g.current_term
444 setRedirectTarget(request.full_path)
445 # retrieve and store the courseID of the imported course from a session variable if it exists.
446 # This allows us to export the courseID in the html and use it.
447 courseID = session.get("alterCourseId")
449 if courseID:
450 # delete courseID from the session if it was retrieved, for storage purposes.
451 session.pop("alterCourseId")
452 return render_template('/admin/manageServiceLearningFaculty.html',
453 courseInstructors = getInstructorCourses(),
454 unapprovedCourses = unapprovedCourses(manageTerm),
455 approvedCourses = approvedCourses(manageTerm),
456 importedCourses = getImportedCourses(manageTerm),
457 terms = selectSurroundingTerms(g.current_term),
458 term = manageTerm,
459 cpPreview = session.get('cpPreview', {}),
460 cpPreviewErrors = session.get('cpErrors', []),
461 courseID = courseID
462 )
464 return render_template('/admin/manageServiceLearningFaculty.html',
465 courseInstructors = getInstructorCourses(),
466 unapprovedCourses = unapprovedCourses(manageTerm),
467 approvedCourses = approvedCourses(manageTerm),
468 importedCourses = getImportedCourses(manageTerm),
469 terms = selectSurroundingTerms(g.current_term),
470 term = manageTerm,
471 cpPreview= session.get('cpPreview',{}),
472 cpPreviewErrors = session.get('cpErrors',[])
473 )
475@admin_bp.route('/admin/getSidebarInformation', methods=['GET'])
476def getSidebarInformation() -> str:
477 """
478 Get the count of unapproved courses and students interested in the minor for the current term
479 to display in the admin sidebar. It must be returned as a string to be received by the
480 ajax request.
481 """
482 unapprovedCoursesCount: int = len(unapprovedCourses(g.current_term))
483 interestedStudentsCount: int = len(getMinorInterest())
484 return {"unapprovedCoursesCount": unapprovedCoursesCount,
485 "interestedStudentsCount": interestedStudentsCount}
487@admin_bp.route("/deleteUploadedFile", methods= ["POST"])
488def removeFromSession():
489 try:
490 session.pop('cpPreview')
491 except KeyError:
492 pass
494 return ""
496@admin_bp.route('/manageServiceLearning/imported/<courseID>', methods = ['POST', 'GET'])
497def alterImportedCourse(courseID):
498 """
499 This route handles a GET and a POST request for the purpose of imported courses.
500 The GET request provides preexisting information of an imported course in a modal.
501 The POST request updates a specific imported course (course name, course abbreviation,
502 hours earned on completion, list of instructors) in the database with new information
503 coming from the imported courses modal.
504 """
505 if request.method == 'GET':
506 try:
507 targetCourse = Course.get_by_id(courseID)
508 targetInstructors = CourseInstructor.select().where(CourseInstructor.course == targetCourse)
510 try:
511 serviceHours = list(CourseParticipant.select().where(CourseParticipant.course_id == targetCourse.id))[0].hoursEarned
512 except IndexError: # If a course has no participant, IndexError will be raised
513 serviceHours = 20
515 courseData = model_to_dict(targetCourse, recurse=False)
516 courseData['instructors'] = [model_to_dict(instructor.user) for instructor in targetInstructors]
517 courseData['hoursEarned'] = serviceHours
519 return jsonify(courseData)
521 except DoesNotExist:
522 flash("Course not found")
523 return jsonify({"error": "Course not found"}), 404
525 if request.method == 'POST':
526 # Update course information in the database
527 courseData = request.form.copy()
528 editImportedCourses(courseData)
529 session['alterCourseId'] = courseID
531 return redirect(url_for("admin.manageServiceLearningCourses", term=courseData['termId']))
534@admin_bp.route("/manageBonner")
535def manageBonner():
536 if not g.current_user.isCeltsAdmin:
537 abort(403)
539 return render_template("/admin/bonnerManagement.html",
540 cohorts=getBonnerCohorts(),
541 events=getBonnerEvents(g.current_term),
542 requirements=getCertRequirements(certification=Certification.BONNER))
544@admin_bp.route("/bonner/<year>/<method>/<username>", methods=["POST"])
545def updatecohort(year, method, username):
546 if not g.current_user.isCeltsAdmin:
547 abort(403)
549 try:
550 user = User.get_by_id(username)
551 except:
552 abort(500)
554 if method == "add":
555 try:
556 BonnerCohort.create(year=year, user=user)
557 flash(f"Successfully added {user.fullName} to {year} Bonner Cohort.", "success")
558 except IntegrityError as e:
559 # if they already exist, ignore the error
560 flash(f'Error: {user.fullName} already added.', "danger")
561 pass
563 elif method == "remove":
564 BonnerCohort.delete().where(BonnerCohort.user == user, BonnerCohort.year == year).execute()
565 flash(f"Successfully removed {user.fullName} from {year} Bonner Cohort.", "success")
566 else:
567 flash(f"Error: {user.fullName} can't be added.", "danger")
568 abort(500)
570 return ""
572@admin_bp.route("/bonnerxls")
573def bonnerxls():
574 if not g.current_user.isCeltsAdmin:
575 abort(403)
577 newfile = makeBonnerXls()
578 return send_file(open(newfile, 'rb'), download_name='BonnerStudents.xlsx', as_attachment=True)
580@admin_bp.route("/saveRequirements/<certid>", methods=["POST"])
581def saveRequirements(certid):
582 if not g.current_user.isCeltsAdmin:
583 abort(403)
585 newRequirements = updateCertRequirements(certid, request.get_json())
587 return jsonify([requirement.id for requirement in newRequirements])
590@admin_bp.route("/displayEventFile", methods=["POST"])
591def displayEventFile():
592 fileData= request.form
593 eventfile=FileHandler(eventId=fileData["id"])
594 eventfile.changeDisplay(fileData['id'])
595 return ""