Coverage for app/controllers/admin/routes.py: 24%
382 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-06-18 19:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2024-06-18 19:54 +0000
1from flask import request, render_template, url_for, g, redirect
2from flask import flash, abort, jsonify, session, send_file
3from peewee import DoesNotExist, fn, IntegrityError
4from playhouse.shortcuts import model_to_dict
5import json
6from datetime import datetime
7import os
9from app import app
10from app.models.program import Program
11from app.models.event import Event
12from app.models.eventRsvp import EventRsvp
13from app.models.eventParticipant import EventParticipant
14from app.models.user import User
15from app.models.course import Course
16from app.models.courseInstructor import CourseInstructor
17from app.models.courseParticipant import CourseParticipant
18from app.models.eventTemplate import EventTemplate
19from app.models.activityLog import ActivityLog
20from app.models.eventRsvpLog import EventRsvpLog
21from app.models.attachmentUpload import AttachmentUpload
22from app.models.bonnerCohort import BonnerCohort
23from app.models.certification import Certification
24from app.models.user import User
25from app.models.term import Term
26from app.models.eventViews import EventView
27from app.models.courseStatus import CourseStatus
29from app.logic.userManagement import getAllowedPrograms, getAllowedTemplates
30from app.logic.createLogs import createActivityLog
31from app.logic.certification import getCertRequirements, updateCertRequirements
32from app.logic.utils import selectSurroundingTerms, getFilesFromRequest, getRedirectTarget, setRedirectTarget
33from app.logic.events import cancelEvent, deleteEvent, attemptSaveEvent, preprocessEventData, calculateRecurringEventFrequency, deleteEventAndAllFollowing, deleteAllRecurringEvents, getBonnerEvents,addEventView, getEventRsvpCount, copyRsvpToNewEvent, getCountdownToEvent
34from app.logic.participants import getParticipationStatusForTrainings, checkUserRsvp
35from app.logic.minor import getMinorInterest
36from app.logic.fileHandler import FileHandler
37from app.logic.bonner import getBonnerCohorts, makeBonnerXls, rsvpForBonnerCohort
38from app.logic.serviceLearningCourses import parseUploadedFile, saveCourseParticipantsToDatabase, unapprovedCourses, approvedCourses, getImportedCourses, getInstructorCourses, editImportedCourses
40from app.controllers.admin import admin_bp
42@admin_bp.route('/switch_user', methods=['POST'])
43def switchUser():
44 if app.env == "production":
45 print(f"An attempt was made to switch to another user by {g.current_user.username}!")
46 abort(403)
48 print(f"Switching user from {g.current_user} to",request.form['newuser'])
49 session['current_user'] = model_to_dict(User.get_by_id(request.form['newuser']))
51 return redirect(request.referrer)
54@admin_bp.route('/eventTemplates')
55def templateSelect():
56 if g.current_user.isCeltsAdmin or g.current_user.isCeltsStudentStaff:
57 allprograms = getAllowedPrograms(g.current_user)
58 visibleTemplates = getAllowedTemplates(g.current_user)
59 return render_template("/events/template_selector.html",
60 programs=allprograms,
61 celtsSponsoredProgram = Program.get(Program.isOtherCeltsSponsored),
62 templates=visibleTemplates)
63 else:
64 abort(403)
67@admin_bp.route('/eventTemplates/<templateid>/<programid>/create', methods=['GET','POST'])
68def createEvent(templateid, programid):
69 if not (g.current_user.isAdmin or g.current_user.isProgramManagerFor(programid)):
70 abort(403)
72 # Validate given URL
73 program = None
74 try:
75 template = EventTemplate.get_by_id(templateid)
76 if programid:
77 program = Program.get_by_id(programid)
78 except DoesNotExist as e:
79 print("Invalid template or program id:", e)
80 flash("There was an error with your selection. Please try again or contact Systems Support.", "danger")
81 return redirect(url_for("admin.program_picker"))
83 # Get the data from the form or from the template
84 eventData = template.templateData
86 eventData['program'] = program
88 if request.method == "GET":
89 eventData['contactName'] = "CELTS Admin"
90 eventData['contactEmail'] = app.config['celts_admin_contact']
91 if program:
92 eventData['location'] = program.defaultLocation
93 if program.contactName:
94 eventData['contactName'] = program.contactName
95 if program.contactEmail:
96 eventData['contactEmail'] = program.contactEmail
98 # Try to save the form
99 if request.method == "POST":
100 eventData.update(request.form.copy())
101 try:
102 savedEvents, validationErrorMessage = attemptSaveEvent(eventData, getFilesFromRequest(request))
104 except Exception as e:
105 print("Error saving event:", e)
106 savedEvents = False
107 validationErrorMessage = "Unknown Error Saving Event. Please try again"
109 if savedEvents:
110 rsvpcohorts = request.form.getlist("cohorts[]")
111 for year in rsvpcohorts:
112 rsvpForBonnerCohort(int(year), savedEvents[0].id)
114 noun = (eventData['isRecurring'] == 'on' and "Events" or "Event") # pluralize
115 flash(f"{noun} successfully created!", 'success')
117 if program:
118 if len(savedEvents) > 1:
119 createActivityLog(f"Created a recurring event, <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a>, for {program.programName}, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}. The last event in the series will be on {datetime.strftime(savedEvents[-1].startDate, '%m/%d/%Y')}.")
120 else:
121 createActivityLog(f"Created <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a> for {program.programName}, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}.")
122 else:
123 createActivityLog(f"Created a non-program event, <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a>, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}.")
125 return redirect(url_for("admin.eventDisplay", eventId = savedEvents[0].id))
126 else:
127 flash(validationErrorMessage, 'warning')
129 # make sure our data is the same regardless of GET or POST
130 preprocessEventData(eventData)
131 isProgramManager = g.current_user.isProgramManagerFor(programid)
133 futureTerms = selectSurroundingTerms(g.current_term, prevTerms=0)
135 requirements, bonnerCohorts = [], []
136 if eventData['program'] is not None and eventData['program'].isBonnerScholars:
137 requirements = getCertRequirements(Certification.BONNER)
138 bonnerCohorts = getBonnerCohorts(limit=5)
139 return render_template(f"/admin/{template.templateFile}",
140 template = template,
141 eventData = eventData,
142 futureTerms = futureTerms,
143 requirements = requirements,
144 bonnerCohorts = bonnerCohorts,
145 isProgramManager = isProgramManager)
148@admin_bp.route('/event/<eventId>/rsvp', methods=['GET'])
149def rsvpLogDisplay(eventId):
150 event = Event.get_by_id(eventId)
151 if g.current_user.isCeltsAdmin or (g.current_user.isCeltsStudentStaff and g.current_user.isProgramManagerFor(event.program)):
152 allLogs = EventRsvpLog.select(EventRsvpLog, User).join(User).where(EventRsvpLog.event_id == eventId).order_by(EventRsvpLog.createdOn.desc())
153 return render_template("/events/rsvpLog.html",
154 event = event,
155 allLogs = allLogs)
156 else:
157 abort(403)
159@admin_bp.route('/event/<eventId>/renew', methods=['POST'])
160def renewEvent(eventId):
161 try:
162 formData = request.form
163 try:
164 assert formData['timeStart'] < formData['timeEnd']
165 except AssertionError:
166 flash("End time must be after start time", 'warning')
167 return redirect(url_for('admin.eventDisplay', eventId = eventId))
169 try:
170 if formData.get('dateEnd'):
171 assert formData['dateStart'] < formData['dateEnd']
172 except AssertionError:
173 flash("End date must be after start date", 'warning')
174 return redirect(url_for('admin.eventDisplay', eventId = eventId))
177 priorEvent = model_to_dict(Event.get_by_id(eventId))
178 newEventDict = priorEvent.copy()
179 newEventDict.pop('id')
180 newEventDict.update({
181 'program': int(priorEvent['program']['id']),
182 'term': int(priorEvent['term']['id']),
183 'timeStart': formData['timeStart'],
184 'timeEnd': formData['timeEnd'],
185 'location': formData['location'],
186 'startDate': f'{formData["startDate"][-4:]}-{formData["startDate"][0:-5]}',
187 'endDate': f'{formData["endDate"][-4:]}-{formData["endDate"][0:-5]}',
188 'isRecurring': bool(priorEvent['recurringId'])
189 })
190 newEvent, message = attemptSaveEvent(newEventDict, renewedEvent = True)
191 if message:
192 flash(message, "danger")
193 return redirect(url_for('admin.eventDisplay', eventId = eventId))
195 copyRsvpToNewEvent(priorEvent, newEvent[0])
196 createActivityLog(f"Renewed {priorEvent['name']} as <a href='event/{newEvent[0].id}/view'>{newEvent[0].name}</a>.")
197 flash("Event successfully renewed.", "success")
198 return redirect(url_for('admin.eventDisplay', eventId = newEvent[0].id))
201 except Exception as e:
202 print("Error while trying to renew event:", e)
203 flash("There was an error renewing the event. Please try again or contact Systems Support.", 'danger')
204 return redirect(url_for('admin.eventDisplay', eventId = eventId))
208@admin_bp.route('/event/<eventId>/view', methods=['GET'])
209@admin_bp.route('/event/<eventId>/edit', methods=['GET','POST'])
210def eventDisplay(eventId):
211 pageViewsCount = EventView.select().where(EventView.event == eventId).count()
212 if request.method == 'GET' and request.path == f'/event/{eventId}/view':
213 viewer = g.current_user
214 event = Event.get_by_id(eventId)
215 addEventView(viewer,event)
216 # Validate given URL
217 try:
218 event = Event.get_by_id(eventId)
219 except DoesNotExist as e:
220 print(f"Unknown event: {eventId}")
221 abort(404)
223 notPermitted = not (g.current_user.isCeltsAdmin or g.current_user.isProgramManagerForEvent(event))
224 if 'edit' in request.url_rule.rule and notPermitted:
225 abort(403)
227 eventData = model_to_dict(event, recurse=False)
228 associatedAttachments = AttachmentUpload.select().where(AttachmentUpload.event == event)
229 filepaths = FileHandler(eventId=event.id).retrievePath(associatedAttachments)
231 image = None
232 picurestype = [".jpeg", ".png", ".gif", ".jpg", ".svg", ".webp"]
233 for attachment in associatedAttachments:
234 for extension in picurestype:
235 if (attachment.fileName.endswith(extension) and attachment.isDisplayed == True):
236 image = filepaths[attachment.fileName][0]
237 if image:
238 break
241 if request.method == "POST": # Attempt to save form
242 eventData = request.form.copy()
243 try:
244 savedEvents, validationErrorMessage = attemptSaveEvent(eventData, getFilesFromRequest(request))
246 except Exception as e:
247 print("Error saving event:", e)
248 savedEvents = False
249 validationErrorMessage = "Unknown Error Saving Event. Please try again"
252 if savedEvents:
253 rsvpcohorts = request.form.getlist("cohorts[]")
254 for year in rsvpcohorts:
255 rsvpForBonnerCohort(int(year), event.id)
257 flash("Event successfully updated!", "success")
258 return redirect(url_for("admin.eventDisplay", eventId = event.id))
259 else:
260 flash(validationErrorMessage, 'warning')
262 # make sure our data is the same regardless of GET and POST
263 preprocessEventData(eventData)
264 eventData['program'] = event.program
265 futureTerms = selectSurroundingTerms(g.current_term)
266 userHasRSVPed = checkUserRsvp(g.current_user, event)
267 filepaths = FileHandler(eventId=event.id).retrievePath(associatedAttachments)
268 isProgramManager = g.current_user.isProgramManagerFor(eventData['program'])
269 requirements, bonnerCohorts = [], []
271 if eventData['program'] and eventData['program'].isBonnerScholars:
272 requirements = getCertRequirements(Certification.BONNER)
273 bonnerCohorts = getBonnerCohorts(limit=5)
275 rule = request.url_rule
277 # Event Edit
278 if 'edit' in rule.rule:
279 return render_template("admin/createEvent.html",
280 eventData = eventData,
281 futureTerms=futureTerms,
282 event = event,
283 requirements = requirements,
284 bonnerCohorts = bonnerCohorts,
285 userHasRSVPed = userHasRSVPed,
286 isProgramManager = isProgramManager,
287 filepaths = filepaths)
288 # Event View
289 else:
290 # get text representations of dates for html
291 eventData['timeStart'] = event.timeStart.strftime("%-I:%M %p")
292 eventData['timeEnd'] = event.timeEnd.strftime("%-I:%M %p")
293 eventData['startDate'] = event.startDate.strftime("%m/%d/%Y")
294 eventCountdown = getCountdownToEvent(event)
297 # Identify the next event in a recurring series
298 if event.recurringId:
299 eventSeriesList = list(Event.select().where(Event.recurringId == event.recurringId)
300 .where((Event.isCanceled == False) | (Event.id == event.id))
301 .order_by(Event.startDate))
302 eventIndex = eventSeriesList.index(event)
303 if len(eventSeriesList) != (eventIndex + 1):
304 eventData["nextRecurringEvent"] = eventSeriesList[eventIndex + 1]
306 currentEventRsvpAmount = getEventRsvpCount(event.id)
308 userParticipatedTrainingEvents = getParticipationStatusForTrainings(eventData['program'], [g.current_user], g.current_term)
310 return render_template("eventView.html",
311 eventData=eventData,
312 event=event,
313 userHasRSVPed=userHasRSVPed,
314 programTrainings=userParticipatedTrainingEvents,
315 currentEventRsvpAmount=currentEventRsvpAmount,
316 isProgramManager=isProgramManager,
317 filepaths=filepaths,
318 image=image,
319 pageViewsCount=pageViewsCount,
320 eventCountdown=eventCountdown)
324@admin_bp.route('/event/<eventId>/cancel', methods=['POST'])
325def cancelRoute(eventId):
326 if g.current_user.isAdmin:
327 try:
328 cancelEvent(eventId)
329 return redirect(request.referrer)
331 except Exception as e:
332 print('Error while canceling event:', e)
333 return "", 500
335 else:
336 abort(403)
338@admin_bp.route('/event/<eventId>/delete', methods=['POST'])
339def deleteRoute(eventId):
340 try:
341 deleteEvent(eventId)
342 flash("Event successfully deleted.", "success")
343 return redirect(url_for("main.events", selectedTerm=g.current_term))
345 except Exception as e:
346 print('Error while canceling event:', e)
347 return "", 500
349@admin_bp.route('/event/<eventId>/deleteEventAndAllFollowing', methods=['POST'])
350def deleteEventAndAllFollowingRoute(eventId):
351 try:
352 deleteEventAndAllFollowing(eventId)
353 flash("Events successfully deleted.", "success")
354 return redirect(url_for("main.events", selectedTerm=g.current_term))
356 except Exception as e:
357 print('Error while canceling event:', e)
358 return "", 500
360@admin_bp.route('/event/<eventId>/deleteAllRecurring', methods=['POST'])
361def deleteAllRecurringEventsRoute(eventId):
362 try:
363 deleteAllRecurringEvents(eventId)
364 flash("Events successfully deleted.", "success")
365 return redirect(url_for("main.events", selectedTerm=g.current_term))
367 except Exception as e:
368 print('Error while canceling event:', e)
369 return "", 500
371@admin_bp.route('/makeRecurringEvents', methods=['POST'])
372def addRecurringEvents():
373 recurringEvents = calculateRecurringEventFrequency(preprocessEventData(request.form.copy()))
374 return json.dumps(recurringEvents, default=str)
377@admin_bp.route('/userProfile', methods=['POST'])
378def userProfile():
379 volunteerName= request.form.copy()
380 if volunteerName['searchStudentsInput']:
381 username = volunteerName['searchStudentsInput'].strip("()")
382 user=username.split('(')[-1]
383 return redirect(url_for('main.viewUsersProfile', username=user))
384 else:
385 flash(f"Please enter the first name or the username of the student you would like to search for.", category='danger')
386 return redirect(url_for('admin.studentSearchPage'))
388@admin_bp.route('/search_student', methods=['GET'])
389def studentSearchPage():
390 if g.current_user.isAdmin:
391 return render_template("/admin/searchStudentPage.html")
392 abort(403)
394@admin_bp.route('/addParticipants', methods = ['GET'])
395def addParticipants():
396 '''Renders the page, will be removed once merged with full page'''
398 return render_template('addParticipants.html',
399 title="Add Participants")
401@admin_bp.route('/activityLogs', methods = ['GET', 'POST'])
402def activityLogs():
403 if g.current_user.isCeltsAdmin:
404 allLogs = ActivityLog.select(ActivityLog, User).join(User).order_by(ActivityLog.createdOn.desc())
405 return render_template("/admin/activityLogs.html",
406 allLogs = allLogs)
407 else:
408 abort(403)
410@admin_bp.route("/deleteEventFile", methods=["POST"])
411def deleteEventFile():
412 fileData= request.form
413 eventfile=FileHandler(eventId=fileData["databaseId"])
414 eventfile.deleteFile(fileData["fileId"])
415 return ""
417@admin_bp.route("/uploadCourseParticipant", methods= ["POST"])
418def addCourseFile():
419 fileData = request.files['addCourseParticipants']
420 filePath = os.path.join(app.config["files"]["base_path"], fileData.filename)
421 fileData.save(filePath)
422 (session['cpPreview'], session['cpErrors']) = parseUploadedFile(filePath)
423 os.remove(filePath)
424 return redirect(url_for("admin.manageServiceLearningCourses"))
426@admin_bp.route('/manageServiceLearning', methods = ['GET', 'POST'])
427@admin_bp.route('/manageServiceLearning/<term>', methods = ['GET', 'POST'])
428def manageServiceLearningCourses(term=None):
430 """
431 The SLC management page for admins
432 """
433 if not g.current_user.isCeltsAdmin:
434 abort(403)
436 if request.method == 'POST' and "submitParticipant" in request.form:
437 saveCourseParticipantsToDatabase(session.pop('cpPreview', {}))
438 flash('Courses and participants saved successfully!', 'success')
439 return redirect(url_for('admin.manageServiceLearningCourses'))
441 manageTerm = Term.get_or_none(Term.id == term) or g.current_term
443 setRedirectTarget(request.full_path)
444 # retrieve and store the courseID of the imported course from a session variable if it exists.
445 # This allows us to export the courseID in the html and use it.
446 courseID = session.get("alterCourseId")
448 if courseID:
449 # delete courseID from the session if it was retrieved, for storage purposes.
450 session.pop("alterCourseId")
451 return render_template('/admin/manageServiceLearningFaculty.html',
452 courseInstructors = getInstructorCourses(),
453 unapprovedCourses = unapprovedCourses(manageTerm),
454 approvedCourses = approvedCourses(manageTerm),
455 importedCourses = getImportedCourses(manageTerm),
456 terms = selectSurroundingTerms(g.current_term),
457 term = manageTerm,
458 cpPreview = session.get('cpPreview', {}),
459 cpPreviewErrors = session.get('cpErrors', []),
460 courseID = courseID
461 )
463 return render_template('/admin/manageServiceLearningFaculty.html',
464 courseInstructors = getInstructorCourses(),
465 unapprovedCourses = unapprovedCourses(manageTerm),
466 approvedCourses = approvedCourses(manageTerm),
467 importedCourses = getImportedCourses(manageTerm),
468 terms = selectSurroundingTerms(g.current_term),
469 term = manageTerm,
470 cpPreview= session.get('cpPreview',{}),
471 cpPreviewErrors = session.get('cpErrors',[])
472 )
474@admin_bp.route('/admin/getSidebarInformation', methods=['GET'])
475def getSidebarInformation() -> str:
476 """
477 Get the count of unapproved courses and students interested in the minor for the current term
478 to display in the admin sidebar. It must be returned as a string to be received by the
479 ajax request.
480 """
481 unapprovedCoursesCount: int = len(unapprovedCourses(g.current_term))
482 interestedStudentsCount: int = len(getMinorInterest())
483 return {"unapprovedCoursesCount": unapprovedCoursesCount,
484 "interestedStudentsCount": interestedStudentsCount}
486@admin_bp.route("/deleteUploadedFile", methods= ["POST"])
487def removeFromSession():
488 try:
489 session.pop('cpPreview')
490 except KeyError:
491 pass
493 return ""
495@admin_bp.route('/manageServiceLearning/imported/<courseID>', methods = ['POST', 'GET'])
496def alterImportedCourse(courseID):
497 """
498 This route handles a GET and a POST request for the purpose of imported courses.
499 The GET request provides preexisting information of an imported course in a modal.
500 The POST request updates a specific imported course (course name, course abbreviation,
501 hours earned on completion, list of instructors) in the database with new information
502 coming from the imported courses modal.
503 """
504 if request.method == 'GET':
505 try:
506 targetCourse = Course.get_by_id(courseID)
507 targetInstructors = CourseInstructor.select().where(CourseInstructor.course == targetCourse)
509 try:
510 serviceHours = list(CourseParticipant.select().where(CourseParticipant.course_id == targetCourse.id))[0].hoursEarned
511 except IndexError: # If a course has no participant, IndexError will be raised
512 serviceHours = 20
514 courseData = model_to_dict(targetCourse, recurse=False)
515 courseData['instructors'] = [model_to_dict(instructor.user) for instructor in targetInstructors]
516 courseData['hoursEarned'] = serviceHours
518 return jsonify(courseData)
520 except DoesNotExist:
521 flash("Course not found")
522 return jsonify({"error": "Course not found"}), 404
524 if request.method == 'POST':
525 # Update course information in the database
526 courseData = request.form.copy()
527 editImportedCourses(courseData)
528 session['alterCourseId'] = courseID
530 return redirect(url_for("admin.manageServiceLearningCourses", term=courseData['termId']))
533@admin_bp.route("/manageBonner")
534def manageBonner():
535 if not g.current_user.isCeltsAdmin:
536 abort(403)
538 return render_template("/admin/bonnerManagement.html",
539 cohorts=getBonnerCohorts(),
540 events=getBonnerEvents(g.current_term),
541 requirements=getCertRequirements(certification=Certification.BONNER))
543@admin_bp.route("/bonner/<year>/<method>/<username>", methods=["POST"])
544def updatecohort(year, method, username):
545 if not g.current_user.isCeltsAdmin:
546 abort(403)
548 try:
549 user = User.get_by_id(username)
550 except:
551 abort(500)
553 if method == "add":
554 try:
555 BonnerCohort.create(year=year, user=user)
556 flash(f"Successfully added {user.fullName} to {year} Bonner Cohort.", "success")
557 except IntegrityError as e:
558 # if they already exist, ignore the error
559 flash(f'Error: {user.fullName} already added.', "danger")
560 pass
562 elif method == "remove":
563 BonnerCohort.delete().where(BonnerCohort.user == user, BonnerCohort.year == year).execute()
564 flash(f"Successfully removed {user.fullName} from {year} Bonner Cohort.", "success")
565 else:
566 flash(f"Error: {user.fullName} can't be added.", "danger")
567 abort(500)
569 return ""
571@admin_bp.route("/bonnerxls")
572def bonnerxls():
573 if not g.current_user.isCeltsAdmin:
574 abort(403)
576 newfile = makeBonnerXls()
577 return send_file(open(newfile, 'rb'), download_name='BonnerStudents.xlsx', as_attachment=True)
579@admin_bp.route("/saveRequirements/<certid>", methods=["POST"])
580def saveRequirements(certid):
581 if not g.current_user.isCeltsAdmin:
582 abort(403)
584 newRequirements = updateCertRequirements(certid, request.get_json())
586 return jsonify([requirement.id for requirement in newRequirements])
589@admin_bp.route("/displayEventFile", methods=["POST"])
590def displayEventFile():
591 fileData= request.form
592 eventfile=FileHandler(eventId=fileData["id"])
593 eventfile.changeDisplay(fileData['id'])
594 return ""