Coverage for app/controllers/admin/routes.py: 24%
385 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-07-17 14:25 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2024-07-17 14:25 +0000
1from flask import request, render_template, url_for, g, redirect
2from flask import flash, abort, jsonify, session, send_file
3from peewee import DoesNotExist, fn, IntegrityError
4from playhouse.shortcuts import model_to_dict
5import json
6from datetime import datetime
7import os
9from app import app
10from app.models.program import Program
11from app.models.event import Event
12from app.models.eventRsvp import EventRsvp
13from app.models.eventParticipant import EventParticipant
14from app.models.user import User
15from app.models.course import Course
16from app.models.courseInstructor import CourseInstructor
17from app.models.courseParticipant import CourseParticipant
18from app.models.eventTemplate import EventTemplate
19from app.models.activityLog import ActivityLog
20from app.models.eventRsvpLog import EventRsvpLog
21from app.models.attachmentUpload import AttachmentUpload
22from app.models.bonnerCohort import BonnerCohort
23from app.models.certification import Certification
24from app.models.user import User
25from app.models.term import Term
26from app.models.eventViews import EventView
27from app.models.courseStatus import CourseStatus
29from app.logic.userManagement import getAllowedPrograms, getAllowedTemplates
30from app.logic.createLogs import createActivityLog
31from app.logic.certification import getCertRequirements, updateCertRequirements
32from app.logic.utils import selectSurroundingTerms, getFilesFromRequest, getRedirectTarget, setRedirectTarget
33from app.logic.events import cancelEvent, deleteEvent, attemptSaveEvent, preprocessEventData, calculateRecurringEventFrequency, deleteEventAndAllFollowing, deleteAllRecurringEvents, getBonnerEvents,addEventView, getEventRsvpCount, copyRsvpToNewEvent, getCountdownToEvent
34from app.logic.participants import getParticipationStatusForTrainings, checkUserRsvp
35from app.logic.minor import getMinorInterest
36from app.logic.fileHandler import FileHandler
37from app.logic.bonner import getBonnerCohorts, makeBonnerXls, rsvpForBonnerCohort, addBonnerCohortToRsvpLog
38from app.logic.serviceLearningCourses import parseUploadedFile, saveCourseParticipantsToDatabase, unapprovedCourses, approvedCourses, getImportedCourses, getInstructorCourses, editImportedCourses
40from app.controllers.admin import admin_bp
42@admin_bp.route('/switch_user', methods=['POST'])
43def switchUser():
44 if app.env == "production":
45 print(f"An attempt was made to switch to another user by {g.current_user.username}!")
46 abort(403)
48 print(f"Switching user from {g.current_user} to",request.form['newuser'])
49 session['current_user'] = model_to_dict(User.get_by_id(request.form['newuser']))
51 return redirect(request.referrer)
54@admin_bp.route('/eventTemplates')
55def templateSelect():
56 if g.current_user.isCeltsAdmin or g.current_user.isCeltsStudentStaff:
57 allprograms = getAllowedPrograms(g.current_user)
58 visibleTemplates = getAllowedTemplates(g.current_user)
59 return render_template("/events/template_selector.html",
60 programs=allprograms,
61 celtsSponsoredProgram = Program.get(Program.isOtherCeltsSponsored),
62 templates=visibleTemplates)
63 else:
64 abort(403)
67@admin_bp.route('/eventTemplates/<templateid>/<programid>/create', methods=['GET','POST'])
68def createEvent(templateid, programid):
69 if not (g.current_user.isAdmin or g.current_user.isProgramManagerFor(programid)):
70 abort(403)
72 # Validate given URL
73 program = None
74 try:
75 template = EventTemplate.get_by_id(templateid)
76 if programid:
77 program = Program.get_by_id(programid)
78 except DoesNotExist as e:
79 print("Invalid template or program id:", e)
80 flash("There was an error with your selection. Please try again or contact Systems Support.", "danger")
81 return redirect(url_for("admin.program_picker"))
83 # Get the data from the form or from the template
84 eventData = template.templateData
86 eventData['program'] = program
88 if request.method == "GET":
89 eventData['contactName'] = "CELTS Admin"
90 eventData['contactEmail'] = app.config['celts_admin_contact']
91 if program:
92 eventData['location'] = program.defaultLocation
93 if program.contactName:
94 eventData['contactName'] = program.contactName
95 if program.contactEmail:
96 eventData['contactEmail'] = program.contactEmail
98 # Try to save the form
99 if request.method == "POST":
100 eventData.update(request.form.copy())
101 try:
102 savedEvents, validationErrorMessage = attemptSaveEvent(eventData, getFilesFromRequest(request))
104 except Exception as e:
105 print("Error saving event:", e)
106 savedEvents = False
107 validationErrorMessage = "Unknown Error Saving Event. Please try again"
109 if savedEvents:
110 rsvpcohorts = request.form.getlist("cohorts[]")
111 for year in rsvpcohorts:
112 rsvpForBonnerCohort(int(year), savedEvents[0].id)
113 addBonnerCohortToRsvpLog(int(year), savedEvents[0].id)
116 noun = (eventData['isRecurring'] == 'on' and "Events" or "Event") # pluralize
117 flash(f"{noun} successfully created!", 'success')
119 if program:
120 if len(savedEvents) > 1:
121 createActivityLog(f"Created a recurring event, <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a>, for {program.programName}, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}. The last event in the series will be on {datetime.strftime(savedEvents[-1].startDate, '%m/%d/%Y')}.")
122 else:
123 createActivityLog(f"Created <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a> for {program.programName}, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}.")
124 else:
125 createActivityLog(f"Created a non-program event, <a href=\"{url_for('admin.eventDisplay', eventId = savedEvents[0].id)}\">{savedEvents[0].name}</a>, with a start date of {datetime.strftime(eventData['startDate'], '%m/%d/%Y')}.")
127 return redirect(url_for("admin.eventDisplay", eventId = savedEvents[0].id))
128 else:
129 flash(validationErrorMessage, 'warning')
131 # make sure our data is the same regardless of GET or POST
132 preprocessEventData(eventData)
133 isProgramManager = g.current_user.isProgramManagerFor(programid)
135 futureTerms = selectSurroundingTerms(g.current_term, prevTerms=0)
137 requirements, bonnerCohorts = [], []
138 if eventData['program'] is not None and eventData['program'].isBonnerScholars:
139 requirements = getCertRequirements(Certification.BONNER)
140 bonnerCohorts = getBonnerCohorts(limit=5)
141 return render_template(f"/admin/{template.templateFile}",
142 template = template,
143 eventData = eventData,
144 futureTerms = futureTerms,
145 requirements = requirements,
146 bonnerCohorts = bonnerCohorts,
147 isProgramManager = isProgramManager)
150@admin_bp.route('/event/<eventId>/rsvp', methods=['GET'])
151def rsvpLogDisplay(eventId):
152 event = Event.get_by_id(eventId)
153 if g.current_user.isCeltsAdmin or (g.current_user.isCeltsStudentStaff and g.current_user.isProgramManagerFor(event.program)):
154 allLogs = EventRsvpLog.select(EventRsvpLog, User).join(User, on=(EventRsvpLog.createdBy == User.username)).where(EventRsvpLog.event_id == eventId).order_by(EventRsvpLog.createdOn.desc())
155 return render_template("/events/rsvpLog.html",
156 event = event,
157 allLogs = allLogs)
158 else:
159 abort(403)
161@admin_bp.route('/event/<eventId>/renew', methods=['POST'])
162def renewEvent(eventId):
163 try:
164 formData = request.form
165 try:
166 assert formData['timeStart'] < formData['timeEnd']
167 except AssertionError:
168 flash("End time must be after start time", 'warning')
169 return redirect(url_for('admin.eventDisplay', eventId = eventId))
171 try:
172 if formData.get('dateEnd'):
173 assert formData['dateStart'] < formData['dateEnd']
174 except AssertionError:
175 flash("End date must be after start date", 'warning')
176 return redirect(url_for('admin.eventDisplay', eventId = eventId))
179 priorEvent = model_to_dict(Event.get_by_id(eventId))
180 newEventDict = priorEvent.copy()
181 newEventDict.pop('id')
182 newEventDict.update({
183 'program': int(priorEvent['program']['id']),
184 'term': int(priorEvent['term']['id']),
185 'timeStart': formData['timeStart'],
186 'timeEnd': formData['timeEnd'],
187 'location': formData['location'],
188 'startDate': f'{formData["startDate"][-4:]}-{formData["startDate"][0:-5]}',
189 'endDate': f'{formData["endDate"][-4:]}-{formData["endDate"][0:-5]}',
190 'isRecurring': bool(priorEvent['recurringId'])
191 })
192 newEvent, message = attemptSaveEvent(newEventDict, renewedEvent = True)
193 if message:
194 flash(message, "danger")
195 return redirect(url_for('admin.eventDisplay', eventId = eventId))
197 copyRsvpToNewEvent(priorEvent, newEvent[0])
198 createActivityLog(f"Renewed {priorEvent['name']} as <a href='event/{newEvent[0].id}/view'>{newEvent[0].name}</a>.")
199 flash("Event successfully renewed.", "success")
200 return redirect(url_for('admin.eventDisplay', eventId = newEvent[0].id))
203 except Exception as e:
204 print("Error while trying to renew event:", e)
205 flash("There was an error renewing the event. Please try again or contact Systems Support.", 'danger')
206 return redirect(url_for('admin.eventDisplay', eventId = eventId))
210@admin_bp.route('/event/<eventId>/view', methods=['GET'])
211@admin_bp.route('/event/<eventId>/edit', methods=['GET','POST'])
212def eventDisplay(eventId):
213 pageViewsCount = EventView.select().where(EventView.event == eventId).count()
214 if request.method == 'GET' and request.path == f'/event/{eventId}/view':
215 viewer = g.current_user
216 event = Event.get_by_id(eventId)
217 addEventView(viewer,event)
218 # Validate given URL
219 try:
220 event = Event.get_by_id(eventId)
221 except DoesNotExist as e:
222 print(f"Unknown event: {eventId}")
223 abort(404)
225 notPermitted = not (g.current_user.isCeltsAdmin or g.current_user.isProgramManagerForEvent(event))
226 if 'edit' in request.url_rule.rule and notPermitted:
227 abort(403)
229 eventData = model_to_dict(event, recurse=False)
230 associatedAttachments = AttachmentUpload.select().where(AttachmentUpload.event == event)
231 filepaths = FileHandler(eventId=event.id).retrievePath(associatedAttachments)
233 image = None
234 picurestype = [".jpeg", ".png", ".gif", ".jpg", ".svg", ".webp"]
235 for attachment in associatedAttachments:
236 for extension in picurestype:
237 if (attachment.fileName.endswith(extension) and attachment.isDisplayed == True):
238 image = filepaths[attachment.fileName][0]
239 if image:
240 break
243 if request.method == "POST": # Attempt to save form
244 eventData = request.form.copy()
245 try:
246 savedEvents, validationErrorMessage = attemptSaveEvent(eventData, getFilesFromRequest(request))
248 except Exception as e:
249 print("Error saving event:", e)
250 savedEvents = False
251 validationErrorMessage = "Unknown Error Saving Event. Please try again"
254 if savedEvents:
255 rsvpcohorts = request.form.getlist("cohorts[]")
256 for year in rsvpcohorts:
257 rsvpForBonnerCohort(int(year), event.id)
258 addBonnerCohortToRsvpLog(int(year), event.id)
260 flash("Event successfully updated!", "success")
261 return redirect(url_for("admin.eventDisplay", eventId = event.id))
262 else:
263 flash(validationErrorMessage, 'warning')
265 # make sure our data is the same regardless of GET and POST
266 preprocessEventData(eventData)
267 eventData['program'] = event.program
268 futureTerms = selectSurroundingTerms(g.current_term)
269 userHasRSVPed = checkUserRsvp(g.current_user, event)
270 filepaths = FileHandler(eventId=event.id).retrievePath(associatedAttachments)
271 isProgramManager = g.current_user.isProgramManagerFor(eventData['program'])
272 requirements, bonnerCohorts = [], []
274 if eventData['program'] and eventData['program'].isBonnerScholars:
275 requirements = getCertRequirements(Certification.BONNER)
276 bonnerCohorts = getBonnerCohorts(limit=5)
278 rule = request.url_rule
280 # Event Edit
281 if 'edit' in rule.rule:
282 return render_template("admin/createEvent.html",
283 eventData = eventData,
284 futureTerms=futureTerms,
285 event = event,
286 requirements = requirements,
287 bonnerCohorts = bonnerCohorts,
288 userHasRSVPed = userHasRSVPed,
289 isProgramManager = isProgramManager,
290 filepaths = filepaths)
291 # Event View
292 else:
293 # get text representations of dates for html
294 eventData['timeStart'] = event.timeStart.strftime("%-I:%M %p")
295 eventData['timeEnd'] = event.timeEnd.strftime("%-I:%M %p")
296 eventData['startDate'] = event.startDate.strftime("%m/%d/%Y")
297 eventCountdown = getCountdownToEvent(event)
300 # Identify the next event in a recurring series
301 if event.recurringId:
302 eventSeriesList = list(Event.select().where(Event.recurringId == event.recurringId)
303 .where((Event.isCanceled == False) | (Event.id == event.id))
304 .order_by(Event.startDate))
305 eventIndex = eventSeriesList.index(event)
306 if len(eventSeriesList) != (eventIndex + 1):
307 eventData["nextRecurringEvent"] = eventSeriesList[eventIndex + 1]
309 currentEventRsvpAmount = getEventRsvpCount(event.id)
311 userParticipatedTrainingEvents = getParticipationStatusForTrainings(eventData['program'], [g.current_user], g.current_term)
313 return render_template("eventView.html",
314 eventData=eventData,
315 event=event,
316 userHasRSVPed=userHasRSVPed,
317 programTrainings=userParticipatedTrainingEvents,
318 currentEventRsvpAmount=currentEventRsvpAmount,
319 isProgramManager=isProgramManager,
320 filepaths=filepaths,
321 image=image,
322 pageViewsCount=pageViewsCount,
323 eventCountdown=eventCountdown
324 )
328@admin_bp.route('/event/<eventId>/cancel', methods=['POST'])
329def cancelRoute(eventId):
330 if g.current_user.isAdmin:
331 try:
332 cancelEvent(eventId)
333 return redirect(request.referrer)
335 except Exception as e:
336 print('Error while canceling event:', e)
337 return "", 500
339 else:
340 abort(403)
342@admin_bp.route('/event/<eventId>/delete', methods=['POST'])
343def deleteRoute(eventId):
344 try:
345 deleteEvent(eventId)
346 flash("Event successfully deleted.", "success")
347 return redirect(url_for("main.events", selectedTerm=g.current_term))
349 except Exception as e:
350 print('Error while canceling event:', e)
351 return "", 500
353@admin_bp.route('/event/<eventId>/deleteEventAndAllFollowing', methods=['POST'])
354def deleteEventAndAllFollowingRoute(eventId):
355 try:
356 deleteEventAndAllFollowing(eventId)
357 flash("Events successfully deleted.", "success")
358 return redirect(url_for("main.events", selectedTerm=g.current_term))
360 except Exception as e:
361 print('Error while canceling event:', e)
362 return "", 500
364@admin_bp.route('/event/<eventId>/deleteAllRecurring', methods=['POST'])
365def deleteAllRecurringEventsRoute(eventId):
366 try:
367 deleteAllRecurringEvents(eventId)
368 flash("Events successfully deleted.", "success")
369 return redirect(url_for("main.events", selectedTerm=g.current_term))
371 except Exception as e:
372 print('Error while canceling event:', e)
373 return "", 500
375@admin_bp.route('/makeRecurringEvents', methods=['POST'])
376def addRecurringEvents():
377 recurringEvents = calculateRecurringEventFrequency(preprocessEventData(request.form.copy()))
378 return json.dumps(recurringEvents, default=str)
381@admin_bp.route('/userProfile', methods=['POST'])
382def userProfile():
383 volunteerName= request.form.copy()
384 if volunteerName['searchStudentsInput']:
385 username = volunteerName['searchStudentsInput'].strip("()")
386 user=username.split('(')[-1]
387 return redirect(url_for('main.viewUsersProfile', username=user))
388 else:
389 flash(f"Please enter the first name or the username of the student you would like to search for.", category='danger')
390 return redirect(url_for('admin.studentSearchPage'))
392@admin_bp.route('/search_student', methods=['GET'])
393def studentSearchPage():
394 if g.current_user.isAdmin:
395 return render_template("/admin/searchStudentPage.html")
396 abort(403)
398@admin_bp.route('/addParticipants', methods = ['GET'])
399def addParticipants():
400 '''Renders the page, will be removed once merged with full page'''
402 return render_template('addParticipants.html',
403 title="Add Participants")
405@admin_bp.route('/activityLogs', methods = ['GET', 'POST'])
406def activityLogs():
407 if g.current_user.isCeltsAdmin:
408 allLogs = ActivityLog.select(ActivityLog, User).join(User).order_by(ActivityLog.createdOn.desc())
409 return render_template("/admin/activityLogs.html",
410 allLogs = allLogs)
411 else:
412 abort(403)
414@admin_bp.route("/deleteEventFile", methods=["POST"])
415def deleteEventFile():
416 fileData= request.form
417 eventfile=FileHandler(eventId=fileData["databaseId"])
418 eventfile.deleteFile(fileData["fileId"])
419 return ""
421@admin_bp.route("/uploadCourseParticipant", methods= ["POST"])
422def addCourseFile():
423 fileData = request.files['addCourseParticipants']
424 filePath = os.path.join(app.config["files"]["base_path"], fileData.filename)
425 fileData.save(filePath)
426 (session['cpPreview'], session['cpErrors']) = parseUploadedFile(filePath)
427 os.remove(filePath)
428 return redirect(url_for("admin.manageServiceLearningCourses"))
430@admin_bp.route('/manageServiceLearning', methods = ['GET', 'POST'])
431@admin_bp.route('/manageServiceLearning/<term>', methods = ['GET', 'POST'])
432def manageServiceLearningCourses(term=None):
434 """
435 The SLC management page for admins
436 """
437 if not g.current_user.isCeltsAdmin:
438 abort(403)
440 if request.method == 'POST' and "submitParticipant" in request.form:
441 saveCourseParticipantsToDatabase(session.pop('cpPreview', {}))
442 flash('Courses and participants saved successfully!', 'success')
443 return redirect(url_for('admin.manageServiceLearningCourses'))
445 manageTerm = Term.get_or_none(Term.id == term) or g.current_term
447 setRedirectTarget(request.full_path)
448 # retrieve and store the courseID of the imported course from a session variable if it exists.
449 # This allows us to export the courseID in the html and use it.
450 courseID = session.get("alterCourseId")
452 if courseID:
453 # delete courseID from the session if it was retrieved, for storage purposes.
454 session.pop("alterCourseId")
455 return render_template('/admin/manageServiceLearningFaculty.html',
456 courseInstructors = getInstructorCourses(),
457 unapprovedCourses = unapprovedCourses(manageTerm),
458 approvedCourses = approvedCourses(manageTerm),
459 importedCourses = getImportedCourses(manageTerm),
460 terms = selectSurroundingTerms(g.current_term),
461 term = manageTerm,
462 cpPreview = session.get('cpPreview', {}),
463 cpPreviewErrors = session.get('cpErrors', []),
464 courseID = courseID
465 )
467 return render_template('/admin/manageServiceLearningFaculty.html',
468 courseInstructors = getInstructorCourses(),
469 unapprovedCourses = unapprovedCourses(manageTerm),
470 approvedCourses = approvedCourses(manageTerm),
471 importedCourses = getImportedCourses(manageTerm),
472 terms = selectSurroundingTerms(g.current_term),
473 term = manageTerm,
474 cpPreview= session.get('cpPreview',{}),
475 cpPreviewErrors = session.get('cpErrors',[])
476 )
478@admin_bp.route('/admin/getSidebarInformation', methods=['GET'])
479def getSidebarInformation() -> str:
480 """
481 Get the count of unapproved courses and students interested in the minor for the current term
482 to display in the admin sidebar. It must be returned as a string to be received by the
483 ajax request.
484 """
485 unapprovedCoursesCount: int = len(unapprovedCourses(g.current_term))
486 interestedStudentsCount: int = len(getMinorInterest())
487 return {"unapprovedCoursesCount": unapprovedCoursesCount,
488 "interestedStudentsCount": interestedStudentsCount}
490@admin_bp.route("/deleteUploadedFile", methods= ["POST"])
491def removeFromSession():
492 try:
493 session.pop('cpPreview')
494 except KeyError:
495 pass
497 return ""
499@admin_bp.route('/manageServiceLearning/imported/<courseID>', methods = ['POST', 'GET'])
500def alterImportedCourse(courseID):
501 """
502 This route handles a GET and a POST request for the purpose of imported courses.
503 The GET request provides preexisting information of an imported course in a modal.
504 The POST request updates a specific imported course (course name, course abbreviation,
505 hours earned on completion, list of instructors) in the database with new information
506 coming from the imported courses modal.
507 """
508 if request.method == 'GET':
509 try:
510 targetCourse = Course.get_by_id(courseID)
511 targetInstructors = CourseInstructor.select().where(CourseInstructor.course == targetCourse)
513 try:
514 serviceHours = list(CourseParticipant.select().where(CourseParticipant.course_id == targetCourse.id))[0].hoursEarned
515 except IndexError: # If a course has no participant, IndexError will be raised
516 serviceHours = 20
518 courseData = model_to_dict(targetCourse, recurse=False)
519 courseData['instructors'] = [model_to_dict(instructor.user) for instructor in targetInstructors]
520 courseData['hoursEarned'] = serviceHours
522 return jsonify(courseData)
524 except DoesNotExist:
525 flash("Course not found")
526 return jsonify({"error": "Course not found"}), 404
528 if request.method == 'POST':
529 # Update course information in the database
530 courseData = request.form.copy()
531 editImportedCourses(courseData)
532 session['alterCourseId'] = courseID
534 return redirect(url_for("admin.manageServiceLearningCourses", term=courseData['termId']))
537@admin_bp.route("/manageBonner")
538def manageBonner():
539 if not g.current_user.isCeltsAdmin:
540 abort(403)
542 return render_template("/admin/bonnerManagement.html",
543 cohorts=getBonnerCohorts(),
544 events=getBonnerEvents(g.current_term),
545 requirements=getCertRequirements(certification=Certification.BONNER))
547@admin_bp.route("/bonner/<year>/<method>/<username>", methods=["POST"])
548def updatecohort(year, method, username):
549 if not g.current_user.isCeltsAdmin:
550 abort(403)
552 try:
553 user = User.get_by_id(username)
554 except:
555 abort(500)
557 if method == "add":
558 try:
559 BonnerCohort.create(year=year, user=user)
560 flash(f"Successfully added {user.fullName} to {year} Bonner Cohort.", "success")
561 except IntegrityError as e:
562 # if they already exist, ignore the error
563 flash(f'Error: {user.fullName} already added.', "danger")
564 pass
566 elif method == "remove":
567 BonnerCohort.delete().where(BonnerCohort.user == user, BonnerCohort.year == year).execute()
568 flash(f"Successfully removed {user.fullName} from {year} Bonner Cohort.", "success")
569 else:
570 flash(f"Error: {user.fullName} can't be added.", "danger")
571 abort(500)
573 return ""
575@admin_bp.route("/bonnerxls")
576def bonnerxls():
577 if not g.current_user.isCeltsAdmin:
578 abort(403)
580 newfile = makeBonnerXls()
581 return send_file(open(newfile, 'rb'), download_name='BonnerStudents.xlsx', as_attachment=True)
583@admin_bp.route("/saveRequirements/<certid>", methods=["POST"])
584def saveRequirements(certid):
585 if not g.current_user.isCeltsAdmin:
586 abort(403)
588 newRequirements = updateCertRequirements(certid, request.get_json())
590 return jsonify([requirement.id for requirement in newRequirements])
593@admin_bp.route("/displayEventFile", methods=["POST"])
594def displayEventFile():
595 fileData = request.form
596 eventfile = FileHandler(eventId=fileData["id"])
597 isChecked = fileData.get('checked') == 'true'
598 eventfile.changeDisplay(fileData['id'], isChecked)
599 return ""