Coverage for app/controllers/admin/routes.py: 24%
385 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-07-11 17:51 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2024-07-11 17:51 +0000
1from flask import request, render_template, url_for, g, redirect
2from flask import flash, abort, jsonify, session, send_file
3from peewee import DoesNotExist, fn, IntegrityError
4from playhouse.shortcuts import model_to_dict
5import json
6from datetime import datetime
7import os
9from app import app
10from app.models.program import Program
11from app.models.event import Event
12from app.models.eventRsvp import EventRsvp
13from app.models.eventParticipant import EventParticipant
14from app.models.user import User
15from app.models.course import Course
16from app.models.courseInstructor import CourseInstructor
17from app.models.courseParticipant import CourseParticipant
18from app.models.eventTemplate import EventTemplate
19from app.models.activityLog import ActivityLog
20from app.models.eventRsvpLog import EventRsvpLog
21from app.models.attachmentUpload import AttachmentUpload
22from app.models.bonnerCohort import BonnerCohort
23from app.models.certification import Certification
24from app.models.user import User
25from app.models.term import Term
26from app.models.eventViews import EventView
27from app.models.courseStatus import CourseStatus
29from app.logic.userManagement import getAllowedPrograms, getAllowedTemplates
30from app.logic.createLogs import createActivityLog
31from app.logic.certification import getCertRequirements, updateCertRequirements
32from app.logic.utils import selectSurroundingTerms, getFilesFromRequest, getRedirectTarget, setRedirectTarget
33from app.logic.events import cancelEvent, deleteEvent, attemptSaveEvent, preprocessEventData, calculateRecurringEventFrequency, deleteEventAndAllFollowing, deleteAllRecurringEvents, getBonnerEvents,addEventView, getEventRsvpCount, copyRsvpToNewEvent, getCountdownToEvent
34from app.logic.participants import getParticipationStatusForTrainings, checkUserRsvp
35from app.logic.minor import getMinorInterest
36from app.logic.fileHandler import FileHandler
37from app.logic.bonner import getBonnerCohorts, makeBonnerXls, rsvpForBonnerCohort, addBonnerCohortToRsvpLog
38from app.logic.serviceLearningCourses import parseUploadedFile, saveCourseParticipantsToDatabase, unapprovedCourses, approvedCourses, getImportedCourses, getInstructorCourses, editImportedCourses
40from app.controllers.admin import admin_bp
42@admin_bp.route('/switch_user', methods=['POST'])
43def switchUser():
44 if app.env == "production":
45 print(f"An attempt was made to switch to another user by {g.current_user.username}!")
46 abort(403)
48 print(f"Switching user from {g.current_user} to",request.form['newuser'])
49 session['current_user'] = model_to_dict(User.get_by_id(request.form['newuser']))
51 return redirect(request.referrer)
54@admin_bp.route('/eventTemplates')
55def templateSelect():
56 if g.current_user.isCeltsAdmin or g.current_user.isCeltsStudentStaff:
57 allprograms = getAllowedPrograms(g.current_user)
58 visibleTemplates = getAllowedTemplates(g.current_user)
59 return render_template("/events/template_selector.html",
60 programs=allprograms,
61 celtsSponsoredProgram = Program.get(Program.isOtherCeltsSponsored),
62 templates=visibleTemplates)
63 else:
64 abort(403)
67@admin_bp.route('/eventTemplates/<templateid>/<programid>/create', methods=['GET','POST'])
68def createEvent(templateid, programid):
69 if not (g.current_user.isAdmin or g.current_user.isProgramManagerFor(programid)):
70 abort(403)
72 # Validate given URL
73 program = None
74 try:
75 template = EventTemplate.get_by_id(templateid)
76 if programid:
77 program = Program.get_by_id(programid)
78 except DoesNotExist as e:
79 print("Invalid template or program id:", e)
80 flash("There was an error with your selection. Please try again or contact Systems Support.", "danger")
81 return redirect(url_for("admin.program_picker"))
83 # Get the data from the form or from the template
84 eventData = template.templateData
86 eventData['program'] = program
88 if request.method == "GET":
89 eventData['contactName'] = "CELTS Admin"
90 eventData['contactEmail'] = app.config['celts_admin_contact']
91 if program:
92 eventData['location'] = program.defaultLocation
93 if program.contactName:
94 eventData['contactName'] = program.contactName
95 if program.contactEmail:
96 eventData['contactEmail'] = program.contactEmail
98 # Try to save the form
99 if request.method == "POST":
100 eventData.update(request.form.copy())
101 try:
102 savedEvents, validationErrorMessage = attemptSaveEvent(eventData, getFilesFromRequest(request))
104 except Exception as e:
105 print("Error saving event:", e)
106 savedEvents = False
107 validationErrorMessage = "Unknown Error Saving Event. Please try again"
109 if savedEvents:
110 rsvpcohorts = request.form.getlist("cohorts[]")
111 for year in rsvpcohorts:
112 rsvpForBonnerCohort(int(year), savedEvents[0].id)
113 addBonnerCohortToRsvpLog(int(year), savedEvents[0].id)
116 noun = (eventData['isRecurring'] == 'on' and "Events" or "Event") # pluralize
117 flash(f"{noun} successfully created!", 'success')
119 if program:
120 if len(savedEvents) > 1:
121 createActivityLog(f"Created a recurring event, <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a>, for {program.programName}, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}. The last event in the series will be on {datetime.strftime(savedEvents[-1].startDate, '%m/%d/%Y')}.")
122 else:
123 createActivityLog(f"Created <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a> for {program.programName}, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}.")
124 else:
125 createActivityLog(f"Created a non-program event, <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a>, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}.")
127 return redirect(url_for("admin.eventDisplay", eventId = savedEvents[0].id))
128 else:
129 flash(validationErrorMessage, 'warning')
131 # make sure our data is the same regardless of GET or POST
132 preprocessEventData(eventData)
133 isProgramManager = g.current_user.isProgramManagerFor(programid)
135 futureTerms = selectSurroundingTerms(g.current_term, prevTerms=0)
137 requirements, bonnerCohorts = [], []
138 if eventData['program'] is not None and eventData['program'].isBonnerScholars:
139 requirements = getCertRequirements(Certification.BONNER)
140 bonnerCohorts = getBonnerCohorts(limit=5)
141 return render_template(f"/admin/{template.templateFile}",
142 template = template,
143 eventData = eventData,
144 futureTerms = futureTerms,
145 requirements = requirements,
146 bonnerCohorts = bonnerCohorts,
147 isProgramManager = isProgramManager)
150@admin_bp.route('/event/<eventId>/rsvp', methods=['GET'])
151def rsvpLogDisplay(eventId):
152 event = Event.get_by_id(eventId)
153 if g.current_user.isCeltsAdmin or (g.current_user.isCeltsStudentStaff and g.current_user.isProgramManagerFor(event.program)):
154 allLogs = EventRsvpLog.select(EventRsvpLog, User).join(User, on=(EventRsvpLog.createdBy == User.username)).where(EventRsvpLog.event_id == eventId).order_by(EventRsvpLog.createdOn.desc())
155 return render_template("/events/rsvpLog.html",
156 event = event,
157 allLogs = allLogs)
158 else:
159 abort(403)
161@admin_bp.route('/event/<eventId>/renew', methods=['POST'])
162def renewEvent(eventId):
163 try:
164 formData = request.form
165 try:
166 assert formData['timeStart'] < formData['timeEnd']
167 except AssertionError:
168 flash("End time must be after start time", 'warning')
169 return redirect(url_for('admin.eventDisplay', eventId = eventId))
171 try:
172 if formData.get('dateEnd'):
173 assert formData['dateStart'] < formData['dateEnd']
174 except AssertionError:
175 flash("End date must be after start date", 'warning')
176 return redirect(url_for('admin.eventDisplay', eventId = eventId))
179 priorEvent = model_to_dict(Event.get_by_id(eventId))
180 newEventDict = priorEvent.copy()
181 newEventDict.pop('id')
182 newEventDict.update({
183 'program': int(priorEvent['program']['id']),
184 'term': int(priorEvent['term']['id']),
185 'timeStart': formData['timeStart'],
186 'timeEnd': formData['timeEnd'],
187 'location': formData['location'],
188 'startDate': f'{formData["startDate"][-4:]}-{formData["startDate"][0:-5]}',
189 'endDate': f'{formData["endDate"][-4:]}-{formData["endDate"][0:-5]}',
190 'isRecurring': bool(priorEvent['recurringId'])
191 })
192 newEvent, message = attemptSaveEvent(newEventDict, renewedEvent = True)
193 if message:
194 flash(message, "danger")
195 return redirect(url_for('admin.eventDisplay', eventId = eventId))
197 copyRsvpToNewEvent(priorEvent, newEvent[0])
198 createActivityLog(f"Renewed {priorEvent['name']} as <a href='event/{newEvent[0].id}/view'>{newEvent[0].name}</a>.")
199 flash("Event successfully renewed.", "success")
200 return redirect(url_for('admin.eventDisplay', eventId = newEvent[0].id))
203 except Exception as e:
204 print("Error while trying to renew event:", e)
205 flash("There was an error renewing the event. Please try again or contact Systems Support.", 'danger')
206 return redirect(url_for('admin.eventDisplay', eventId = eventId))
210@admin_bp.route('/event/<eventId>/view', methods=['GET'])
211@admin_bp.route('/event/<eventId>/edit', methods=['GET','POST'])
212def eventDisplay(eventId):
213 pageViewsCount = EventView.select().where(EventView.event == eventId).count()
214 if request.method == 'GET' and request.path == f'/event/{eventId}/view':
215 viewer = g.current_user
216 event = Event.get_by_id(eventId)
217 addEventView(viewer,event)
218 # Validate given URL
219 try:
220 event = Event.get_by_id(eventId)
221 except DoesNotExist as e:
222 print(f"Unknown event: {eventId}")
223 abort(404)
225 notPermitted = not (g.current_user.isCeltsAdmin or g.current_user.isProgramManagerForEvent(event))
226 if 'edit' in request.url_rule.rule and notPermitted:
227 abort(403)
229 eventData = model_to_dict(event, recurse=False)
230 associatedAttachments = AttachmentUpload.select().where(AttachmentUpload.event == event)
231 filepaths = FileHandler(eventId=event.id).retrievePath(associatedAttachments)
233 image = None
234 picurestype = [".jpeg", ".png", ".gif", ".jpg", ".svg", ".webp"]
235 for attachment in associatedAttachments:
236 for extension in picurestype:
237 if (attachment.fileName.endswith(extension) and attachment.isDisplayed == True):
238 image = filepaths[attachment.fileName][0]
239 if image:
240 break
243 if request.method == "POST": # Attempt to save form
244 eventData = request.form.copy()
245 try:
246 savedEvents, validationErrorMessage = attemptSaveEvent(eventData, getFilesFromRequest(request))
248 except Exception as e:
249 print("Error saving event:", e)
250 savedEvents = False
251 validationErrorMessage = "Unknown Error Saving Event. Please try again"
254 if savedEvents:
255 rsvpcohorts = request.form.getlist("cohorts[]")
256 for year in rsvpcohorts:
257 rsvpForBonnerCohort(int(year), event.id)
258 addBonnerCohortToRsvpLog(int(year), event.id)
260 flash("Event successfully updated!", "success")
261 return redirect(url_for("admin.eventDisplay", eventId = event.id))
262 else:
263 flash(validationErrorMessage, 'warning')
265 # make sure our data is the same regardless of GET and POST
266 preprocessEventData(eventData)
267 eventData['program'] = event.program
268 futureTerms = selectSurroundingTerms(g.current_term)
269 userHasRSVPed = checkUserRsvp(g.current_user, event)
270 filepaths = FileHandler(eventId=event.id).retrievePath(associatedAttachments)
271 isProgramManager = g.current_user.isProgramManagerFor(eventData['program'])
272 requirements, bonnerCohorts = [], []
274 if eventData['program'] and eventData['program'].isBonnerScholars:
275 requirements = getCertRequirements(Certification.BONNER)
276 bonnerCohorts = getBonnerCohorts(limit=5)
278 rule = request.url_rule
280 # Event Edit
281 if 'edit' in rule.rule:
282 return render_template("admin/createEvent.html",
283 eventData = eventData,
284 futureTerms=futureTerms,
285 event = event,
286 requirements = requirements,
287 bonnerCohorts = bonnerCohorts,
288 userHasRSVPed = userHasRSVPed,
289 isProgramManager = isProgramManager,
290 filepaths = filepaths)
291 # Event View
292 else:
293 # get text representations of dates for html
294 eventData['timeStart'] = event.timeStart.strftime("%-I:%M %p")
295 eventData['timeEnd'] = event.timeEnd.strftime("%-I:%M %p")
296 eventData['startDate'] = event.startDate.strftime("%m/%d/%Y")
297 eventCountdown = getCountdownToEvent(event)
300 # Identify the next event in a recurring series
301 if event.recurringId:
302 eventSeriesList = list(Event.select().where(Event.recurringId == event.recurringId)
303 .where((Event.isCanceled == False) | (Event.id == event.id))
304 .order_by(Event.startDate))
305 eventIndex = eventSeriesList.index(event)
306 if len(eventSeriesList) != (eventIndex + 1):
307 eventData["nextRecurringEvent"] = eventSeriesList[eventIndex + 1]
309 currentEventRsvpAmount = getEventRsvpCount(event.id)
311 userParticipatedTrainingEvents = getParticipationStatusForTrainings(eventData['program'], [g.current_user], g.current_term)
313 return render_template("eventView.html",
314 eventData=eventData,
315 event=event,
316 userHasRSVPed=userHasRSVPed,
317 programTrainings=userParticipatedTrainingEvents,
318 currentEventRsvpAmount=currentEventRsvpAmount,
319 isProgramManager=isProgramManager,
320 filepaths=filepaths,
321 image=image,
322 pageViewsCount=pageViewsCount,
323 eventCountdown=eventCountdown)
327@admin_bp.route('/event/<eventId>/cancel', methods=['POST'])
328def cancelRoute(eventId):
329 if g.current_user.isAdmin:
330 try:
331 cancelEvent(eventId)
332 return redirect(request.referrer)
334 except Exception as e:
335 print('Error while canceling event:', e)
336 return "", 500
338 else:
339 abort(403)
341@admin_bp.route('/event/<eventId>/delete', methods=['POST'])
342def deleteRoute(eventId):
343 try:
344 deleteEvent(eventId)
345 flash("Event successfully deleted.", "success")
346 return redirect(url_for("main.events", selectedTerm=g.current_term))
348 except Exception as e:
349 print('Error while canceling event:', e)
350 return "", 500
352@admin_bp.route('/event/<eventId>/deleteEventAndAllFollowing', methods=['POST'])
353def deleteEventAndAllFollowingRoute(eventId):
354 try:
355 deleteEventAndAllFollowing(eventId)
356 flash("Events successfully deleted.", "success")
357 return redirect(url_for("main.events", selectedTerm=g.current_term))
359 except Exception as e:
360 print('Error while canceling event:', e)
361 return "", 500
363@admin_bp.route('/event/<eventId>/deleteAllRecurring', methods=['POST'])
364def deleteAllRecurringEventsRoute(eventId):
365 try:
366 deleteAllRecurringEvents(eventId)
367 flash("Events successfully deleted.", "success")
368 return redirect(url_for("main.events", selectedTerm=g.current_term))
370 except Exception as e:
371 print('Error while canceling event:', e)
372 return "", 500
374@admin_bp.route('/makeRecurringEvents', methods=['POST'])
375def addRecurringEvents():
376 recurringEvents = calculateRecurringEventFrequency(preprocessEventData(request.form.copy()))
377 return json.dumps(recurringEvents, default=str)
380@admin_bp.route('/userProfile', methods=['POST'])
381def userProfile():
382 volunteerName= request.form.copy()
383 if volunteerName['searchStudentsInput']:
384 username = volunteerName['searchStudentsInput'].strip("()")
385 user=username.split('(')[-1]
386 return redirect(url_for('main.viewUsersProfile', username=user))
387 else:
388 flash(f"Please enter the first name or the username of the student you would like to search for.", category='danger')
389 return redirect(url_for('admin.studentSearchPage'))
391@admin_bp.route('/search_student', methods=['GET'])
392def studentSearchPage():
393 if g.current_user.isAdmin:
394 return render_template("/admin/searchStudentPage.html")
395 abort(403)
397@admin_bp.route('/addParticipants', methods = ['GET'])
398def addParticipants():
399 '''Renders the page, will be removed once merged with full page'''
401 return render_template('addParticipants.html',
402 title="Add Participants")
404@admin_bp.route('/activityLogs', methods = ['GET', 'POST'])
405def activityLogs():
406 if g.current_user.isCeltsAdmin:
407 allLogs = ActivityLog.select(ActivityLog, User).join(User).order_by(ActivityLog.createdOn.desc())
408 return render_template("/admin/activityLogs.html",
409 allLogs = allLogs)
410 else:
411 abort(403)
413@admin_bp.route("/deleteEventFile", methods=["POST"])
414def deleteEventFile():
415 fileData= request.form
416 eventfile=FileHandler(eventId=fileData["databaseId"])
417 eventfile.deleteFile(fileData["fileId"])
418 return ""
420@admin_bp.route("/uploadCourseParticipant", methods= ["POST"])
421def addCourseFile():
422 fileData = request.files['addCourseParticipants']
423 filePath = os.path.join(app.config["files"]["base_path"], fileData.filename)
424 fileData.save(filePath)
425 (session['cpPreview'], session['cpErrors']) = parseUploadedFile(filePath)
426 os.remove(filePath)
427 return redirect(url_for("admin.manageServiceLearningCourses"))
429@admin_bp.route('/manageServiceLearning', methods = ['GET', 'POST'])
430@admin_bp.route('/manageServiceLearning/<term>', methods = ['GET', 'POST'])
431def manageServiceLearningCourses(term=None):
433 """
434 The SLC management page for admins
435 """
436 if not g.current_user.isCeltsAdmin:
437 abort(403)
439 if request.method == 'POST' and "submitParticipant" in request.form:
440 saveCourseParticipantsToDatabase(session.pop('cpPreview', {}))
441 flash('Courses and participants saved successfully!', 'success')
442 return redirect(url_for('admin.manageServiceLearningCourses'))
444 manageTerm = Term.get_or_none(Term.id == term) or g.current_term
446 setRedirectTarget(request.full_path)
447 # retrieve and store the courseID of the imported course from a session variable if it exists.
448 # This allows us to export the courseID in the html and use it.
449 courseID = session.get("alterCourseId")
451 if courseID:
452 # delete courseID from the session if it was retrieved, for storage purposes.
453 session.pop("alterCourseId")
454 return render_template('/admin/manageServiceLearningFaculty.html',
455 courseInstructors = getInstructorCourses(),
456 unapprovedCourses = unapprovedCourses(manageTerm),
457 approvedCourses = approvedCourses(manageTerm),
458 importedCourses = getImportedCourses(manageTerm),
459 terms = selectSurroundingTerms(g.current_term),
460 term = manageTerm,
461 cpPreview = session.get('cpPreview', {}),
462 cpPreviewErrors = session.get('cpErrors', []),
463 courseID = courseID
464 )
466 return render_template('/admin/manageServiceLearningFaculty.html',
467 courseInstructors = getInstructorCourses(),
468 unapprovedCourses = unapprovedCourses(manageTerm),
469 approvedCourses = approvedCourses(manageTerm),
470 importedCourses = getImportedCourses(manageTerm),
471 terms = selectSurroundingTerms(g.current_term),
472 term = manageTerm,
473 cpPreview= session.get('cpPreview',{}),
474 cpPreviewErrors = session.get('cpErrors',[])
475 )
477@admin_bp.route('/admin/getSidebarInformation', methods=['GET'])
478def getSidebarInformation() -> str:
479 """
480 Get the count of unapproved courses and students interested in the minor for the current term
481 to display in the admin sidebar. It must be returned as a string to be received by the
482 ajax request.
483 """
484 unapprovedCoursesCount: int = len(unapprovedCourses(g.current_term))
485 interestedStudentsCount: int = len(getMinorInterest())
486 return {"unapprovedCoursesCount": unapprovedCoursesCount,
487 "interestedStudentsCount": interestedStudentsCount}
489@admin_bp.route("/deleteUploadedFile", methods= ["POST"])
490def removeFromSession():
491 try:
492 session.pop('cpPreview')
493 except KeyError:
494 pass
496 return ""
498@admin_bp.route('/manageServiceLearning/imported/<courseID>', methods = ['POST', 'GET'])
499def alterImportedCourse(courseID):
500 """
501 This route handles a GET and a POST request for the purpose of imported courses.
502 The GET request provides preexisting information of an imported course in a modal.
503 The POST request updates a specific imported course (course name, course abbreviation,
504 hours earned on completion, list of instructors) in the database with new information
505 coming from the imported courses modal.
506 """
507 if request.method == 'GET':
508 try:
509 targetCourse = Course.get_by_id(courseID)
510 targetInstructors = CourseInstructor.select().where(CourseInstructor.course == targetCourse)
512 try:
513 serviceHours = list(CourseParticipant.select().where(CourseParticipant.course_id == targetCourse.id))[0].hoursEarned
514 except IndexError: # If a course has no participant, IndexError will be raised
515 serviceHours = 20
517 courseData = model_to_dict(targetCourse, recurse=False)
518 courseData['instructors'] = [model_to_dict(instructor.user) for instructor in targetInstructors]
519 courseData['hoursEarned'] = serviceHours
521 return jsonify(courseData)
523 except DoesNotExist:
524 flash("Course not found")
525 return jsonify({"error": "Course not found"}), 404
527 if request.method == 'POST':
528 # Update course information in the database
529 courseData = request.form.copy()
530 editImportedCourses(courseData)
531 session['alterCourseId'] = courseID
533 return redirect(url_for("admin.manageServiceLearningCourses", term=courseData['termId']))
536@admin_bp.route("/manageBonner")
537def manageBonner():
538 if not g.current_user.isCeltsAdmin:
539 abort(403)
541 return render_template("/admin/bonnerManagement.html",
542 cohorts=getBonnerCohorts(),
543 events=getBonnerEvents(g.current_term),
544 requirements=getCertRequirements(certification=Certification.BONNER))
546@admin_bp.route("/bonner/<year>/<method>/<username>", methods=["POST"])
547def updatecohort(year, method, username):
548 if not g.current_user.isCeltsAdmin:
549 abort(403)
551 try:
552 user = User.get_by_id(username)
553 except:
554 abort(500)
556 if method == "add":
557 try:
558 BonnerCohort.create(year=year, user=user)
559 flash(f"Successfully added {user.fullName} to {year} Bonner Cohort.", "success")
560 except IntegrityError as e:
561 # if they already exist, ignore the error
562 flash(f'Error: {user.fullName} already added.', "danger")
563 pass
565 elif method == "remove":
566 BonnerCohort.delete().where(BonnerCohort.user == user, BonnerCohort.year == year).execute()
567 flash(f"Successfully removed {user.fullName} from {year} Bonner Cohort.", "success")
568 else:
569 flash(f"Error: {user.fullName} can't be added.", "danger")
570 abort(500)
572 return ""
574@admin_bp.route("/bonnerxls")
575def bonnerxls():
576 if not g.current_user.isCeltsAdmin:
577 abort(403)
579 newfile = makeBonnerXls()
580 return send_file(open(newfile, 'rb'), download_name='BonnerStudents.xlsx', as_attachment=True)
582@admin_bp.route("/saveRequirements/<certid>", methods=["POST"])
583def saveRequirements(certid):
584 if not g.current_user.isCeltsAdmin:
585 abort(403)
587 newRequirements = updateCertRequirements(certid, request.get_json())
589 return jsonify([requirement.id for requirement in newRequirements])
592@admin_bp.route("/displayEventFile", methods=["POST"])
593def displayEventFile():
594 fileData = request.form
595 eventfile = FileHandler(eventId=fileData["id"])
596 isChecked = fileData.get('checked') == 'true'
597 eventfile.changeDisplay(fileData['id'], isChecked)
598 return ""