Coverage for app/controllers/main/routes.py: 26%

318 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-08-25 18:59 +0000

1import json 

2import datetime 

3from peewee import JOIN 

4from http import cookies 

5from playhouse.shortcuts import model_to_dict 

6from flask import request, render_template, g, abort, flash, redirect, url_for, make_response, session 

7 

8from app.controllers.main import main_bp 

9from app import app 

10from app.models.term import Term 

11from app.models.user import User 

12from app.models.note import Note 

13from app.models.event import Event 

14from app.models.program import Program 

15from app.models.interest import Interest 

16from app.models.eventRsvp import EventRsvp 

17from app.models.celtsLabor import CeltsLabor 

18from app.models.programBan import ProgramBan 

19from app.models.profileNote import ProfileNote 

20from app.models.insuranceInfo import InsuranceInfo 

21from app.models.certification import Certification 

22from app.models.programManager import ProgramManager 

23from app.models.backgroundCheck import BackgroundCheck 

24from app.models.emergencyContact import EmergencyContact 

25from app.models.eventParticipant import EventParticipant 

26from app.models.courseInstructor import CourseInstructor 

27from app.models.backgroundCheckType import BackgroundCheckType 

28 

29from app.logic.events import getUpcomingEventsForUser, getParticipatedEventsForUser, getTrainingEvents, getEventRsvpCountsForTerm, getUpcomingStudentLedCount, getStudentLedEvents, getBonnerEvents, getOtherEvents 

30from app.logic.transcript import * 

31from app.logic.loginManager import logout 

32from app.logic.searchUsers import searchUsers 

33from app.logic.utils import selectSurroundingTerms 

34from app.logic.celtsLabor import getCeltsLaborHistory 

35from app.logic.createLogs import createRsvpLog, createActivityLog 

36from app.logic.certification import getCertRequirementsWithCompletion 

37from app.logic.landingPage import getManagerProgramDict, getActiveEventTab 

38from app.logic.minor import toggleMinorInterest, getCommunityEngagementByTerm, getEngagementTotal 

39from app.logic.participants import unattendedRequiredEvents, trainedParticipants, getParticipationStatusForTrainings, checkUserRsvp, addPersonToEvent 

40from app.logic.users import addUserInterest, removeUserInterest, banUser, unbanUser, isEligibleForProgram, getUserBGCheckHistory, addProfileNote, deleteProfileNote, updateDietInfo 

41 

42@main_bp.route('/logout', methods=['GET']) 

43def redirectToLogout(): 

44 return redirect(logout()) 

45 

46@main_bp.route('/', methods=['GET']) 

47def landingPage(): 

48 

49 managerProgramDict = getManagerProgramDict(g.current_user) 

50 # Optimize the query to fetch programs with non-canceled, non-past events in the current term 

51 

52 programsWithEventsList = list(Program.select(Program, Event) 

53 .join(Event) 

54 .where((Event.term == g.current_term) & (Event.isCanceled == False)) 

55 .distinct() 

56 .execute()) # Ensure only unique programs are included 

57 # Limit returned list to events in the future 

58 futureEvents = [p for p in programsWithEventsList if not p.event.isPastEnd] 

59 

60 return render_template("/main/landingPage.html", 

61 managerProgramDict=managerProgramDict, 

62 term=g.current_term, 

63 programsWithEventsList = futureEvents) 

64 

65 

66 

67 

68@main_bp.route('/goToEventsList/<programID>', methods=['GET']) 

69def goToEventsList(programID): 

70 return {"activeTab": getActiveEventTab(programID)} 

71 

72@main_bp.route('/eventsList/<selectedTerm>', methods=['GET'], defaults={'activeTab': "studentLedEvents", 'programID': 0}) 

73@main_bp.route('/eventsList/<selectedTerm>/<activeTab>', methods=['GET'], defaults={'programID': 0}) 

74@main_bp.route('/eventsList/<selectedTerm>/<activeTab>/<programID>', methods=['GET']) 

75def events(selectedTerm, activeTab, programID): 

76 currentTerm = g.current_term 

77 if selectedTerm: 

78 currentTerm = selectedTerm 

79 currentTime = datetime.datetime.now() 

80 

81 listOfTerms = Term.select() 

82 participantRSVP = EventRsvp.select(EventRsvp, Event).join(Event).where(EventRsvp.user == g.current_user) 

83 rsvpedEventsID = [event.event.id for event in participantRSVP] 

84 

85 term = Term.get_by_id(currentTerm) 

86 

87 currentEventRsvpAmount = getEventRsvpCountsForTerm(term) 

88 studentLedEvents = getStudentLedEvents(term) 

89 countUpcomingStudentLedEvents = getUpcomingStudentLedCount(term, currentTime) 

90 trainingEvents = getTrainingEvents(term, g.current_user) 

91 bonnerEvents = getBonnerEvents(term) 

92 otherEvents = getOtherEvents(term) 

93 

94 managersProgramDict = getManagerProgramDict(g.current_user) 

95 

96 # Fetch toggle state from session 

97 toggle_state = session.get('toggleState', 'unchecked') 

98 

99 return render_template("/events/event_list.html", 

100 selectedTerm = term, 

101 studentLedEvents = studentLedEvents, 

102 trainingEvents = trainingEvents, 

103 bonnerEvents = bonnerEvents, 

104 otherEvents = otherEvents, 

105 listOfTerms = listOfTerms, 

106 rsvpedEventsID = rsvpedEventsID, 

107 currentEventRsvpAmount = currentEventRsvpAmount, 

108 currentTime = currentTime, 

109 user = g.current_user, 

110 activeTab = activeTab, 

111 programID = int(programID), 

112 managersProgramDict = managersProgramDict, 

113 countUpcomingStudentLedEvents = countUpcomingStudentLedEvents, 

114 toggle_state = toggle_state 

115 ) 

116 

117@main_bp.route('/updateToggleState', methods=['POST']) 

118def update_toggle_state(): 

119 toggle_state = request.form.get('toggleState') 

120 

121 # Update session with toggle state 

122 session['toggleState'] = toggle_state 

123 

124 return "", 200 

125 

126@main_bp.route('/profile/<username>', methods=['GET']) 

127def viewUsersProfile(username): 

128 """ 

129 This function displays the information of a volunteer to the user 

130 """ 

131 try: 

132 volunteer = User.get(User.username == username) 

133 except Exception as e: 

134 if g.current_user.isAdmin: 

135 flash(f"{username} does not exist! ", category='danger') 

136 return redirect(url_for('admin.studentSearchPage')) 

137 else: 

138 abort(403) # Error 403 if non admin/student-staff user trys to access via url 

139 

140 if (g.current_user == volunteer) or g.current_user.isAdmin: 

141 upcomingEvents = getUpcomingEventsForUser(volunteer) 

142 participatedEvents = getParticipatedEventsForUser(volunteer) 

143 programs = Program.select() 

144 if not g.current_user.isBonnerScholar and not g.current_user.isAdmin: 

145 programs = programs.where(Program.isBonnerScholars == False) 

146 interests = Interest.select(Interest, Program).join(Program).where(Interest.user == volunteer) 

147 programsInterested = [interest.program for interest in interests] 

148 

149 rsvpedEventsList = EventRsvp.select(EventRsvp, Event).join(Event).where(EventRsvp.user == volunteer) 

150 rsvpedEvents = [event.event.id for event in rsvpedEventsList] 

151 

152 programManagerPrograms = ProgramManager.select(ProgramManager, Program).join(Program).where(ProgramManager.user == volunteer) 

153 permissionPrograms = [entry.program.id for entry in programManagerPrograms] 

154 

155 allBackgroundHistory = getUserBGCheckHistory(volunteer) 

156 backgroundTypes = list(BackgroundCheckType.select()) 

157 

158 eligibilityTable = [] 

159 

160 for program in programs: 

161 banNotes = list(ProgramBan.select(ProgramBan, Note) 

162 .join(Note, on=(ProgramBan.banNote == Note.id)) 

163 .where(ProgramBan.user == volunteer, 

164 ProgramBan.program == program, 

165 ProgramBan.endDate > datetime.datetime.now()).execute()) 

166 userParticipatedTrainingEvents = getParticipationStatusForTrainings(program, [volunteer], g.current_term) 

167 try: 

168 allTrainingsComplete = False not in [attended for event, attended in userParticipatedTrainingEvents[username]] # Did volunteer attend all events 

169 except KeyError: 

170 allTrainingsComplete = False 

171 noteForDict = banNotes[-1].banNote.noteContent if banNotes else "" 

172 eligibilityTable.append({"program": program, 

173 "completedTraining": allTrainingsComplete, 

174 "trainingList": userParticipatedTrainingEvents, 

175 "isNotBanned": (not banNotes), 

176 "banNote": noteForDict}) 

177 profileNotes = ProfileNote.select().where(ProfileNote.user == volunteer) 

178 

179 bonnerRequirements = getCertRequirementsWithCompletion(certification=Certification.BONNER, username=volunteer) 

180 

181 managersProgramDict = getManagerProgramDict(g.current_user) 

182 managersList = [id[1] for id in managersProgramDict.items()] 

183 totalSustainedEngagements = getEngagementTotal(getCommunityEngagementByTerm(volunteer)) 

184 

185 return render_template ("/main/userProfile.html", 

186 programs = programs, 

187 programsInterested = programsInterested, 

188 upcomingEvents = upcomingEvents, 

189 participatedEvents = participatedEvents, 

190 rsvpedEvents = rsvpedEvents, 

191 permissionPrograms = permissionPrograms, 

192 eligibilityTable = eligibilityTable, 

193 volunteer = volunteer, 

194 backgroundTypes = backgroundTypes, 

195 allBackgroundHistory = allBackgroundHistory, 

196 currentDateTime = datetime.datetime.now(), 

197 profileNotes = profileNotes, 

198 bonnerRequirements = bonnerRequirements, 

199 managersList = managersList, 

200 participatedInLabor = getCeltsLaborHistory(volunteer), 

201 totalSustainedEngagements = totalSustainedEngagements, 

202 ) 

203 abort(403) 

204 

205@main_bp.route('/profile/<username>/emergencyContact', methods=['GET', 'POST']) 

206def emergencyContactInfo(username): 

207 """ 

208 This loads the Emergency Contact Page 

209 """ 

210 if not (g.current_user.username == username or g.current_user.isCeltsAdmin): 

211 abort(403) 

212 

213 user = User.get(User.username == username) 

214 

215 if request.method == 'GET': 

216 readOnly = g.current_user.username != username 

217 contactInfo = EmergencyContact.get_or_none(EmergencyContact.user_id == username) 

218 return render_template ("/main/emergencyContactInfo.html", 

219 username=username, 

220 contactInfo=contactInfo, 

221 readOnly=readOnly 

222 ) 

223 

224 elif request.method == 'POST': 

225 if g.current_user.username != username: 

226 abort(403) 

227 

228 rowsUpdated = EmergencyContact.update(**request.form).where(EmergencyContact.user == username).execute() 

229 if not rowsUpdated: 

230 EmergencyContact.create(user = username, **request.form) 

231 createActivityLog(f"{g.current_user.fullName} updated {user.fullName}'s emergency contact information.") 

232 flash('Emergency contact information saved successfully!', 'success') 

233 

234 if request.args.get('action') == 'exit': 

235 return redirect (f"/profile/{username}") 

236 else: 

237 return redirect (f"/profile/{username}/insuranceInfo") 

238 

239@main_bp.route('/profile/<username>/insuranceInfo', methods=['GET', 'POST']) 

240def insuranceInfo(username): 

241 """ 

242 This loads the Insurance Information Page 

243 """ 

244 if not (g.current_user.username == username or g.current_user.isCeltsAdmin): 

245 abort(403) 

246 

247 user = User.get(User.username == username) 

248 

249 if request.method == 'GET': 

250 readOnly = g.current_user.username != username 

251 userInsuranceInfo = InsuranceInfo.get_or_none(InsuranceInfo.user == username) 

252 return render_template ("/main/insuranceInfo.html", 

253 username=username, 

254 userInsuranceInfo=userInsuranceInfo, 

255 readOnly=readOnly 

256 ) 

257 

258 # Save the form data 

259 elif request.method == 'POST': 

260 if g.current_user.username != username: 

261 abort(403) 

262 

263 rowsUpdated = InsuranceInfo.update(**request.form).where(InsuranceInfo.user == username).execute() 

264 if not rowsUpdated: 

265 InsuranceInfo.create(user = username, **request.form) 

266 createActivityLog(f"{g.current_user.fullName} updated {user.fullName}'s insurance information.") 

267 flash('Insurance information saved successfully!', 'success') 

268 

269 if request.args.get('action') == 'exit': 

270 return redirect (f"/profile/{username}") 

271 else: 

272 return redirect (f"/profile/{username}/emergencyContact") 

273 

274@main_bp.route('/profile/<username>/travelForm', methods=['GET', 'POST']) 

275def travelForm(username): 

276 if not (g.current_user.username == username or g.current_user.isCeltsAdmin): 

277 abort(403) 

278 

279 user = (User.select(User, EmergencyContact, InsuranceInfo) 

280 .join(EmergencyContact, JOIN.LEFT_OUTER).switch() 

281 .join(InsuranceInfo, JOIN.LEFT_OUTER) 

282 .where(User.username == username).limit(1)) 

283 if not list(user): 

284 abort(404) 

285 userData = list(user.dicts())[0] 

286 userData = {key: value if value else '' for (key, value) in userData.items()} 

287 

288 return render_template ('/main/travelForm.html', 

289 userData = userData 

290 ) 

291 

292 

293@main_bp.route('/profile/addNote', methods=['POST']) 

294def addNote(): 

295 """ 

296 This function adds a note to the user's profile. 

297 """ 

298 postData = request.form 

299 try: 

300 note = addProfileNote(postData["visibility"], postData["bonner"] == "yes", postData["noteTextbox"], postData["username"]) 

301 flash("Successfully added profile note", "success") 

302 return redirect(url_for("main.viewUsersProfile", username=postData["username"])) 

303 except Exception as e: 

304 print("Error adding note", e) 

305 flash("Failed to add profile note", "danger") 

306 return "Failed to add profile note", 500 

307 

308@main_bp.route('/<username>/deleteNote', methods=['POST']) 

309def deleteNote(username): 

310 """ 

311 This function deletes a note from the user's profile. 

312 """ 

313 try: 

314 deleteProfileNote(request.form["id"]) 

315 flash("Successfully deleted profile note", "success") 

316 except Exception as e: 

317 print("Error deleting note", e) 

318 flash("Failed to delete profile note", "danger") 

319 return "success" 

320 

321# ===========================Ban=============================================== 

322@main_bp.route('/<username>/ban/<program_id>', methods=['POST']) 

323def ban(program_id, username): 

324 """ 

325 This function updates the ban status of a username either when they are banned from a program. 

326 program_id: the primary id of the program the student is being banned from 

327 username: unique value of a user to correctly identify them 

328 """ 

329 postData = request.form 

330 banNote = postData["note"] # This contains the note left about the change 

331 banEndDate = postData["endDate"] # Contains the date the ban will no longer be effective 

332 try: 

333 banUser(program_id, username, banNote, banEndDate, g.current_user) 

334 programInfo = Program.get(int(program_id)) 

335 flash("Successfully banned the volunteer", "success") 

336 createActivityLog(f'Banned {username} from {programInfo.programName} until {banEndDate}.') 

337 return "Successfully banned the volunteer." 

338 except Exception as e: 

339 print("Error while updating ban", e) 

340 flash("Failed to ban the volunteer", "danger") 

341 return "Failed to ban the volunteer", 500 

342 

343# ===========================Unban=============================================== 

344@main_bp.route('/<username>/unban/<program_id>', methods=['POST']) 

345def unban(program_id, username): 

346 """ 

347 This function updates the ban status of a username either when they are unbanned from a program. 

348 program_id: the primary id of the program the student is being unbanned from 

349 username: unique value of a user to correctly identify them 

350 """ 

351 postData = request.form 

352 unbanNote = postData["note"] # This contains the note left about the change 

353 try: 

354 unbanUser(program_id, username, unbanNote, g.current_user) 

355 programInfo = Program.get(int(program_id)) 

356 createActivityLog(f'Unbanned {username} from {programInfo.programName}.') 

357 flash("Successfully unbanned the volunteer", "success") 

358 return "Successfully unbanned the volunteer" 

359 

360 except Exception as e: 

361 print("Error while updating Unban", e) 

362 flash("Failed to unban the volunteer", "danger") 

363 return "Failed to unban the volunteer", 500 

364 

365 

366@main_bp.route('/<username>/addInterest/<program_id>', methods=['POST']) 

367def addInterest(program_id, username): 

368 """ 

369 This function adds a program to the list of programs a user interested in 

370 program_id: the primary id of the program the student is adding interest of 

371 username: unique value of a user to correctly identify them 

372 """ 

373 try: 

374 success = addUserInterest(program_id, username) 

375 if success: 

376 flash("Successfully added " + Program.get_by_id(program_id).programName + " as an interest", "success") 

377 return "" 

378 else: 

379 flash("Was unable to remove " + Program.get_by_id(program_id).programName + " as an interest.", "danger") 

380 

381 except Exception as e: 

382 print(e) 

383 return "Error Updating Interest", 500 

384 

385@main_bp.route('/<username>/removeInterest/<program_id>', methods=['POST']) 

386def removeInterest(program_id, username): 

387 """ 

388 This function removes a program to the list of programs a user interested in 

389 program_id: the primary id of the program the student is adding interest of 

390 username: unique value of a user to correctly identify them 

391 """ 

392 try: 

393 removed = removeUserInterest(program_id, username) 

394 if removed: 

395 flash("Successfully removed " + Program.get_by_id(program_id).programName + " as an interest.", "success") 

396 return "" 

397 else: 

398 flash("Was unable to remove " + Program.get_by_id(program_id).programName + " as an interest.", "danger") 

399 except Exception as e: 

400 print(e) 

401 return "Error Updating Interest", 500 

402 

403@main_bp.route('/rsvpForEvent', methods = ['POST']) 

404def volunteerRegister(): 

405 """ 

406 This function selects the user ID and event ID and registers the user 

407 for the event they have clicked register for. 

408 """ 

409 event = Event.get_by_id(request.form['id']) 

410 program = event.program 

411 user = g.current_user 

412 

413 isAdded = checkUserRsvp(user, event) 

414 isEligible = isEligibleForProgram(program, user) 

415 listOfRequirements = unattendedRequiredEvents(program, user) 

416 

417 personAdded = False 

418 if isEligible: 

419 personAdded = addPersonToEvent(user, event) 

420 if personAdded and listOfRequirements: 

421 reqListToString = ', '.join(listOfRequirements) 

422 flash(f"{user.firstName} {user.lastName} successfully registered. However, the following training may be required: {reqListToString}.", "success") 

423 elif personAdded: 

424 flash("Successfully registered for event!","success") 

425 else: 

426 flash(f"RSVP Failed due to an unknown error.", "danger") 

427 else: 

428 flash(f"Cannot RSVP. Contact CELTS administrators: {app.config['celts_admin_contact']}.", "danger") 

429 

430 

431 if 'from' in request.form: 

432 if request.form['from'] == 'ajax': 

433 return '' 

434 return redirect(url_for("admin.eventDisplay", eventId=event.id)) 

435 

436@main_bp.route('/rsvpRemove', methods = ['POST']) 

437def RemoveRSVP(): 

438 """ 

439 This function deletes the user ID and event ID from database when RemoveRSVP is clicked 

440 """ 

441 eventData = request.form 

442 event = Event.get_by_id(eventData['id']) 

443 

444 currentRsvpParticipant = EventRsvp.get(EventRsvp.user == g.current_user, EventRsvp.event == event) 

445 logBody = "withdrew from the waitlist" if currentRsvpParticipant.rsvpWaitlist else "un-RSVP'd" 

446 currentRsvpParticipant.delete_instance() 

447 createRsvpLog(event.id, f"{g.current_user.fullName} {logBody}.") 

448 flash("Successfully unregistered for event!", "success") 

449 if 'from' in eventData: 

450 if eventData['from'] == 'ajax': 

451 return '' 

452 return redirect(url_for("admin.eventDisplay", eventId=event.id)) 

453 

454@main_bp.route('/profile/<username>/serviceTranscript', methods = ['GET']) 

455def serviceTranscript(username): 

456 user = User.get_or_none(User.username == username) 

457 if user is None: 

458 abort(404) 

459 if user != g.current_user and not g.current_user.isAdmin: 

460 abort(403) 

461 

462 slCourses = getSlCourseTranscript(username) 

463 totalHours = getTotalHours(username) 

464 allEventTranscript = getProgramTranscript(username) 

465 startDate = getStartYear(username) 

466 return render_template('main/serviceTranscript.html', 

467 allEventTranscript = allEventTranscript, 

468 slCourses = slCourses.objects(), 

469 totalHours = totalHours, 

470 startDate = startDate, 

471 userData = user) 

472 

473@main_bp.route('/searchUser/<query>', methods = ['GET']) 

474def searchUser(query): 

475 

476 category= request.args.get("category") 

477 

478 '''Accepts user input and queries the database returning results that matches user search''' 

479 try: 

480 query = query.strip() 

481 search = query.upper() 

482 splitSearch = search.split() 

483 searchResults = searchUsers(query,category) 

484 return searchResults 

485 except Exception as e: 

486 print(e) 

487 return "Error in searching for user", 500 

488 

489@main_bp.route('/contributors',methods = ['GET']) 

490def contributors(): 

491 return render_template("/contributors.html") 

492 

493@main_bp.route('/updateDietInformation', methods = ['GET', 'POST']) 

494def getDietInfo(): 

495 dietaryInfo = request.form 

496 user = dietaryInfo["user"] 

497 dietInfo = dietaryInfo["dietInfo"] 

498 

499 if (g.current_user.username == user) or g.current_user.isAdmin: 

500 updateDietInfo(user, dietInfo) 

501 userInfo = User.get(User.username == user) 

502 if len(dietInfo) > 0: 

503 createActivityLog(f"Updated {userInfo.fullName}'s dietary restrictions to {dietInfo}.") if dietInfo.strip() else None 

504 else: 

505 createActivityLog(f"Deleted all {userInfo.fullName}'s dietary restrictions dietary restrictions.") 

506 

507 

508 return " " 

509 

510@main_bp.route('/profile/<username>/indicateInterest', methods=['POST']) 

511def indicateMinorInterest(username): 

512 if g.current_user.isCeltsAdmin or g.current_user.username == username: 

513 toggleMinorInterest(username) 

514 

515 else: 

516 abort(403) 

517 

518 return ""