Coverage for app/controllers/main/routes.py: 27%

308 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-06-28 16:11 +0000

1import json 

2import datetime 

3from peewee import JOIN 

4from http import cookies 

5from playhouse.shortcuts import model_to_dict 

6from flask import request, render_template, g, abort, flash, redirect, url_for 

7 

8from app.controllers.main import main_bp 

9from app import app 

10from app.models.term import Term 

11from app.models.user import User 

12from app.models.note import Note 

13from app.models.event import Event 

14from app.models.program import Program 

15from app.models.interest import Interest 

16from app.models.eventRsvp import EventRsvp 

17from app.models.celtsLabor import CeltsLabor 

18from app.models.programBan import ProgramBan 

19from app.models.profileNote import ProfileNote 

20from app.models.insuranceInfo import InsuranceInfo 

21from app.models.certification import Certification 

22from app.models.programManager import ProgramManager 

23from app.models.backgroundCheck import BackgroundCheck 

24from app.models.emergencyContact import EmergencyContact 

25from app.models.eventParticipant import EventParticipant 

26from app.models.courseInstructor import CourseInstructor 

27from app.models.backgroundCheckType import BackgroundCheckType 

28 

29from app.logic.events import getUpcomingEventsForUser, getParticipatedEventsForUser, getTrainingEvents, getEventRsvpCountsForTerm, getUpcomingStudentLedCount, getStudentLedEvents, getBonnerEvents, getOtherEvents 

30from app.logic.transcript import * 

31from app.logic.loginManager import logout 

32from app.logic.searchUsers import searchUsers 

33from app.logic.utils import selectSurroundingTerms 

34from app.logic.celtsLabor import getCeltsLaborHistory 

35from app.logic.createLogs import createRsvpLog, createActivityLog 

36from app.logic.certification import getCertRequirementsWithCompletion 

37from app.logic.landingPage import getManagerProgramDict, getActiveEventTab 

38from app.logic.minor import toggleMinorInterest, getCommunityEngagementByTerm, getEngagementTotal 

39from app.logic.participants import unattendedRequiredEvents, trainedParticipants, getParticipationStatusForTrainings, checkUserRsvp, addPersonToEvent 

40from app.logic.users import addUserInterest, removeUserInterest, banUser, unbanUser, isEligibleForProgram, getUserBGCheckHistory, addProfileNote, deleteProfileNote, updateDietInfo 

41 

42@main_bp.route('/logout', methods=['GET']) 

43def redirectToLogout(): 

44 return redirect(logout()) 

45 

46@main_bp.route('/', methods=['GET']) 

47def landingPage(): 

48 

49 managerProgramDict = getManagerProgramDict(g.current_user) 

50 # Optimize the query to fetch programs with non-canceled, non-past events in the current term 

51 

52 programsWithEventsList = list(Program.select(Program, Event) 

53 .join(Event) 

54 .where((Event.term == g.current_term) & (Event.isCanceled == False)) 

55 .distinct() 

56 .execute()) # Ensure only unique programs are included 

57 # Limit returned list to events in the future 

58 futureEvents = [p for p in programsWithEventsList if not p.event.isPastEnd] 

59 

60 return render_template("/main/landingPage.html", 

61 managerProgramDict=managerProgramDict, 

62 term=g.current_term, 

63 programsWithEventsList = futureEvents) 

64 

65 

66 

67 

68@main_bp.route('/goToEventsList/<programID>', methods=['GET']) 

69def goToEventsList(programID): 

70 return {"activeTab": getActiveEventTab(programID)} 

71 

72@main_bp.route('/eventsList/<selectedTerm>', methods=['GET'], defaults={'activeTab': "studentLedEvents", 'programID': 0}) 

73@main_bp.route('/eventsList/<selectedTerm>/<activeTab>', methods=['GET'], defaults={'programID': 0}) 

74@main_bp.route('/eventsList/<selectedTerm>/<activeTab>/<programID>', methods=['GET']) 

75def events(selectedTerm, activeTab, programID): 

76 currentTerm = g.current_term 

77 if selectedTerm: 

78 currentTerm = selectedTerm 

79 currentTime = datetime.datetime.now() 

80 

81 listOfTerms = Term.select() 

82 participantRSVP = EventRsvp.select(EventRsvp, Event).join(Event).where(EventRsvp.user == g.current_user) 

83 rsvpedEventsID = [event.event.id for event in participantRSVP] 

84 

85 term = Term.get_by_id(currentTerm) 

86 

87 currentEventRsvpAmount = getEventRsvpCountsForTerm(term) 

88 studentLedEvents = getStudentLedEvents(term) 

89 countUpcomingStudentLedEvents = getUpcomingStudentLedCount(term, currentTime) 

90 trainingEvents = getTrainingEvents(term, g.current_user) 

91 bonnerEvents = getBonnerEvents(term) 

92 otherEvents = getOtherEvents(term) 

93 

94 managersProgramDict = getManagerProgramDict(g.current_user) 

95 

96 return render_template("/events/event_list.html", 

97 selectedTerm = term, 

98 studentLedEvents = studentLedEvents, 

99 trainingEvents = trainingEvents, 

100 bonnerEvents = bonnerEvents, 

101 otherEvents = otherEvents, 

102 listOfTerms = listOfTerms, 

103 rsvpedEventsID = rsvpedEventsID, 

104 currentEventRsvpAmount = currentEventRsvpAmount, 

105 currentTime = currentTime, 

106 user = g.current_user, 

107 activeTab = activeTab, 

108 programID = int(programID), 

109 managersProgramDict = managersProgramDict, 

110 countUpcomingStudentLedEvents = countUpcomingStudentLedEvents 

111 ) 

112 

113@main_bp.route('/profile/<username>', methods=['GET']) 

114def viewUsersProfile(username): 

115 """ 

116 This function displays the information of a volunteer to the user 

117 """ 

118 try: 

119 volunteer = User.get(User.username == username) 

120 except Exception as e: 

121 if g.current_user.isAdmin: 

122 flash(f"{username} does not exist! ", category='danger') 

123 return redirect(url_for('admin.studentSearchPage')) 

124 else: 

125 abort(403) # Error 403 if non admin/student-staff user trys to access via url 

126 

127 if (g.current_user == volunteer) or g.current_user.isAdmin: 

128 upcomingEvents = getUpcomingEventsForUser(volunteer) 

129 participatedEvents = getParticipatedEventsForUser(volunteer) 

130 programs = Program.select() 

131 if not g.current_user.isBonnerScholar and not g.current_user.isAdmin: 

132 programs = programs.where(Program.isBonnerScholars == False) 

133 interests = Interest.select(Interest, Program).join(Program).where(Interest.user == volunteer) 

134 programsInterested = [interest.program for interest in interests] 

135 

136 rsvpedEventsList = EventRsvp.select(EventRsvp, Event).join(Event).where(EventRsvp.user == volunteer) 

137 rsvpedEvents = [event.event.id for event in rsvpedEventsList] 

138 

139 programManagerPrograms = ProgramManager.select(ProgramManager, Program).join(Program).where(ProgramManager.user == volunteer) 

140 permissionPrograms = [entry.program.id for entry in programManagerPrograms] 

141 

142 allBackgroundHistory = getUserBGCheckHistory(volunteer) 

143 backgroundTypes = list(BackgroundCheckType.select()) 

144 

145 eligibilityTable = [] 

146 

147 for program in programs: 

148 banNotes = list(ProgramBan.select(ProgramBan, Note) 

149 .join(Note, on=(ProgramBan.banNote == Note.id)) 

150 .where(ProgramBan.user == volunteer, 

151 ProgramBan.program == program, 

152 ProgramBan.endDate > datetime.datetime.now()).execute()) 

153 userParticipatedTrainingEvents = getParticipationStatusForTrainings(program, [volunteer], g.current_term) 

154 try: 

155 allTrainingsComplete = False not in [attended for event, attended in userParticipatedTrainingEvents[username]] # Did volunteer attend all events 

156 except KeyError: 

157 allTrainingsComplete = False 

158 noteForDict = banNotes[-1].banNote.noteContent if banNotes else "" 

159 eligibilityTable.append({"program": program, 

160 "completedTraining": allTrainingsComplete, 

161 "trainingList": userParticipatedTrainingEvents, 

162 "isNotBanned": (not banNotes), 

163 "banNote": noteForDict}) 

164 profileNotes = ProfileNote.select().where(ProfileNote.user == volunteer) 

165 

166 bonnerRequirements = getCertRequirementsWithCompletion(certification=Certification.BONNER, username=volunteer) 

167 

168 managersProgramDict = getManagerProgramDict(g.current_user) 

169 managersList = [id[1] for id in managersProgramDict.items()] 

170 totalSustainedEngagements = getEngagementTotal(getCommunityEngagementByTerm(volunteer)) 

171 

172 return render_template ("/main/userProfile.html", 

173 programs = programs, 

174 programsInterested = programsInterested, 

175 upcomingEvents = upcomingEvents, 

176 participatedEvents = participatedEvents, 

177 rsvpedEvents = rsvpedEvents, 

178 permissionPrograms = permissionPrograms, 

179 eligibilityTable = eligibilityTable, 

180 volunteer = volunteer, 

181 backgroundTypes = backgroundTypes, 

182 allBackgroundHistory = allBackgroundHistory, 

183 currentDateTime = datetime.datetime.now(), 

184 profileNotes = profileNotes, 

185 bonnerRequirements = bonnerRequirements, 

186 managersList = managersList, 

187 participatedInLabor = getCeltsLaborHistory(volunteer), 

188 totalSustainedEngagements = totalSustainedEngagements, 

189 ) 

190 abort(403) 

191 

192@main_bp.route('/profile/<username>/emergencyContact', methods=['GET', 'POST']) 

193def emergencyContactInfo(username): 

194 """ 

195 This loads the Emergency Contact Page 

196 """ 

197 if not (g.current_user.username == username or g.current_user.isCeltsAdmin): 

198 abort(403) 

199 

200 

201 if request.method == 'GET': 

202 readOnly = g.current_user.username != username 

203 contactInfo = EmergencyContact.get_or_none(EmergencyContact.user_id == username) 

204 return render_template ("/main/emergencyContactInfo.html", 

205 username=username, 

206 contactInfo=contactInfo, 

207 readOnly=readOnly 

208 ) 

209 

210 elif request.method == 'POST': 

211 if g.current_user.username != username: 

212 abort(403) 

213 

214 rowsUpdated = EmergencyContact.update(**request.form).where(EmergencyContact.user == username).execute() 

215 if not rowsUpdated: 

216 EmergencyContact.create(user = username, **request.form) 

217 createActivityLog(f"{g.current_user} updated {username}'s emergency contact information.") 

218 flash('Emergency contact information saved successfully!', 'success') 

219 

220 if request.args.get('action') == 'exit': 

221 return redirect (f"/profile/{username}") 

222 else: 

223 return redirect (f"/profile/{username}/insuranceInfo") 

224 

225@main_bp.route('/profile/<username>/insuranceInfo', methods=['GET', 'POST']) 

226def insuranceInfo(username): 

227 """ 

228 This loads the Insurance Information Page 

229 """ 

230 if not (g.current_user.username == username or g.current_user.isCeltsAdmin): 

231 abort(403) 

232 

233 if request.method == 'GET': 

234 readOnly = g.current_user.username != username 

235 userInsuranceInfo = InsuranceInfo.get_or_none(InsuranceInfo.user == username) 

236 return render_template ("/main/insuranceInfo.html", 

237 username=username, 

238 userInsuranceInfo=userInsuranceInfo, 

239 readOnly=readOnly 

240 ) 

241 

242 # Save the form data 

243 elif request.method == 'POST': 

244 if g.current_user.username != username: 

245 abort(403) 

246 

247 rowsUpdated = InsuranceInfo.update(**request.form).where(InsuranceInfo.user == username).execute() 

248 if not rowsUpdated: 

249 InsuranceInfo.create(user = username, **request.form) 

250 createActivityLog(f"{g.current_user} updated {username}'s emergency contact information.") 

251 flash('Insurance information saved successfully!', 'success') 

252 

253 if request.args.get('action') == 'exit': 

254 return redirect (f"/profile/{username}") 

255 else: 

256 return redirect (f"/profile/{username}/emergencyContact") 

257 

258@main_bp.route('/profile/<username>/travelForm', methods=['GET', 'POST']) 

259def travelForm(username): 

260 if not (g.current_user.username == username or g.current_user.isCeltsAdmin): 

261 abort(403) 

262 

263 user = (User.select(User, EmergencyContact, InsuranceInfo) 

264 .join(EmergencyContact, JOIN.LEFT_OUTER).switch() 

265 .join(InsuranceInfo, JOIN.LEFT_OUTER) 

266 .where(User.username == username).limit(1)) 

267 if not list(user): 

268 abort(404) 

269 userData = list(user.dicts())[0] 

270 userData = {key: value if value else '' for (key, value) in userData.items()} 

271 

272 return render_template ('/main/travelForm.html', 

273 userData = userData 

274 ) 

275 

276 

277@main_bp.route('/profile/addNote', methods=['POST']) 

278def addNote(): 

279 """ 

280 This function adds a note to the user's profile. 

281 """ 

282 postData = request.form 

283 try: 

284 note = addProfileNote(postData["visibility"], postData["bonner"] == "yes", postData["noteTextbox"], postData["username"]) 

285 flash("Successfully added profile note", "success") 

286 return redirect(url_for("main.viewUsersProfile", username=postData["username"])) 

287 except Exception as e: 

288 print("Error adding note", e) 

289 flash("Failed to add profile note", "danger") 

290 return "Failed to add profile note", 500 

291 

292@main_bp.route('/<username>/deleteNote', methods=['POST']) 

293def deleteNote(username): 

294 """ 

295 This function deletes a note from the user's profile. 

296 """ 

297 try: 

298 deleteProfileNote(request.form["id"]) 

299 flash("Successfully deleted profile note", "success") 

300 except Exception as e: 

301 print("Error deleting note", e) 

302 flash("Failed to delete profile note", "danger") 

303 return "success" 

304 

305# ===========================Ban=============================================== 

306@main_bp.route('/<username>/ban/<program_id>', methods=['POST']) 

307def ban(program_id, username): 

308 """ 

309 This function updates the ban status of a username either when they are banned from a program. 

310 program_id: the primary id of the program the student is being banned from 

311 username: unique value of a user to correctly identify them 

312 """ 

313 postData = request.form 

314 banNote = postData["note"] # This contains the note left about the change 

315 banEndDate = postData["endDate"] # Contains the date the ban will no longer be effective 

316 try: 

317 banUser(program_id, username, banNote, banEndDate, g.current_user) 

318 programInfo = Program.get(int(program_id)) 

319 flash("Successfully banned the volunteer", "success") 

320 createActivityLog(f'Banned {username} from {programInfo.programName} until {banEndDate}.') 

321 return "Successfully banned the volunteer." 

322 except Exception as e: 

323 print("Error while updating ban", e) 

324 flash("Failed to ban the volunteer", "danger") 

325 return "Failed to ban the volunteer", 500 

326 

327# ===========================Unban=============================================== 

328@main_bp.route('/<username>/unban/<program_id>', methods=['POST']) 

329def unban(program_id, username): 

330 """ 

331 This function updates the ban status of a username either when they are unbanned from a program. 

332 program_id: the primary id of the program the student is being unbanned from 

333 username: unique value of a user to correctly identify them 

334 """ 

335 postData = request.form 

336 unbanNote = postData["note"] # This contains the note left about the change 

337 try: 

338 unbanUser(program_id, username, unbanNote, g.current_user) 

339 programInfo = Program.get(int(program_id)) 

340 createActivityLog(f'Unbanned {username} from {programInfo.programName}.') 

341 flash("Successfully unbanned the volunteer", "success") 

342 return "Successfully unbanned the volunteer" 

343 

344 except Exception as e: 

345 print("Error while updating Unban", e) 

346 flash("Failed to unban the volunteer", "danger") 

347 return "Failed to unban the volunteer", 500 

348 

349 

350@main_bp.route('/<username>/addInterest/<program_id>', methods=['POST']) 

351def addInterest(program_id, username): 

352 """ 

353 This function adds a program to the list of programs a user interested in 

354 program_id: the primary id of the program the student is adding interest of 

355 username: unique value of a user to correctly identify them 

356 """ 

357 try: 

358 success = addUserInterest(program_id, username) 

359 if success: 

360 flash("Successfully added " + Program.get_by_id(program_id).programName + " as an interest", "success") 

361 return "" 

362 else: 

363 flash("Was unable to remove " + Program.get_by_id(program_id).programName + " as an interest.", "danger") 

364 

365 except Exception as e: 

366 print(e) 

367 return "Error Updating Interest", 500 

368 

369@main_bp.route('/<username>/removeInterest/<program_id>', methods=['POST']) 

370def removeInterest(program_id, username): 

371 """ 

372 This function removes a program to the list of programs a user interested in 

373 program_id: the primary id of the program the student is adding interest of 

374 username: unique value of a user to correctly identify them 

375 """ 

376 try: 

377 removed = removeUserInterest(program_id, username) 

378 if removed: 

379 flash("Successfully removed " + Program.get_by_id(program_id).programName + " as an interest.", "success") 

380 return "" 

381 else: 

382 flash("Was unable to remove " + Program.get_by_id(program_id).programName + " as an interest.", "danger") 

383 except Exception as e: 

384 print(e) 

385 return "Error Updating Interest", 500 

386 

387@main_bp.route('/rsvpForEvent', methods = ['POST']) 

388def volunteerRegister(): 

389 """ 

390 This function selects the user ID and event ID and registers the user 

391 for the event they have clicked register for. 

392 """ 

393 event = Event.get_by_id(request.form['id']) 

394 program = event.program 

395 user = g.current_user 

396 

397 isAdded = checkUserRsvp(user, event) 

398 isEligible = isEligibleForProgram(program, user) 

399 listOfRequirements = unattendedRequiredEvents(program, user) 

400 

401 personAdded = False 

402 if isEligible: 

403 personAdded = addPersonToEvent(user, event) 

404 if personAdded and listOfRequirements: 

405 reqListToString = ', '.join(listOfRequirements) 

406 flash(f"{user.firstName} {user.lastName} successfully registered. However, the following training may be required: {reqListToString}.", "success") 

407 elif personAdded: 

408 flash("Successfully registered for event!","success") 

409 else: 

410 flash(f"RSVP Failed due to an unknown error.", "danger") 

411 else: 

412 flash(f"Cannot RSVP. Contact CELTS administrators: {app.config['celts_admin_contact']}.", "danger") 

413 

414 

415 if 'from' in request.form: 

416 if request.form['from'] == 'ajax': 

417 return '' 

418 return redirect(url_for("admin.eventDisplay", eventId=event.id)) 

419 

420@main_bp.route('/rsvpRemove', methods = ['POST']) 

421def RemoveRSVP(): 

422 """ 

423 This function deletes the user ID and event ID from database when RemoveRSVP is clicked 

424 """ 

425 eventData = request.form 

426 event = Event.get_by_id(eventData['id']) 

427 

428 currentRsvpParticipant = EventRsvp.get(EventRsvp.user == g.current_user, EventRsvp.event == event) 

429 logBody = "withdrew from the waitlist" if currentRsvpParticipant.rsvpWaitlist else "un-RSVP'd" 

430 currentRsvpParticipant.delete_instance() 

431 createRsvpLog(event.id, f"{g.current_user.fullName} {logBody}.") 

432 flash("Successfully unregistered for event!", "success") 

433 if 'from' in eventData: 

434 if eventData['from'] == 'ajax': 

435 return '' 

436 return redirect(url_for("admin.eventDisplay", eventId=event.id)) 

437 

438@main_bp.route('/profile/<username>/serviceTranscript', methods = ['GET']) 

439def serviceTranscript(username): 

440 user = User.get_or_none(User.username == username) 

441 if user is None: 

442 abort(404) 

443 if user != g.current_user and not g.current_user.isAdmin: 

444 abort(403) 

445 

446 slCourses = getSlCourseTranscript(username) 

447 totalHours = getTotalHours(username) 

448 allEventTranscript = getProgramTranscript(username) 

449 startDate = getStartYear(username) 

450 return render_template('main/serviceTranscript.html', 

451 allEventTranscript = allEventTranscript, 

452 slCourses = slCourses.objects(), 

453 totalHours = totalHours, 

454 startDate = startDate, 

455 userData = user) 

456 

457@main_bp.route('/searchUser/<query>', methods = ['GET']) 

458def searchUser(query): 

459 

460 category= request.args.get("category") 

461 

462 '''Accepts user input and queries the database returning results that matches user search''' 

463 try: 

464 query = query.strip() 

465 search = query.upper() 

466 splitSearch = search.split() 

467 searchResults = searchUsers(query,category) 

468 return searchResults 

469 except Exception as e: 

470 print(e) 

471 return "Error in searching for user", 500 

472 

473@main_bp.route('/contributors',methods = ['GET']) 

474def contributors(): 

475 return render_template("/contributors.html") 

476 

477@main_bp.route('/updateDietInformation', methods = ['GET', 'POST']) 

478def getDietInfo(): 

479 dietaryInfo = request.form 

480 user = dietaryInfo["user"] 

481 dietInfo = dietaryInfo["dietInfo"] 

482 

483 if (g.current_user.username == user) or g.current_user.isAdmin: 

484 updateDietInfo(user, dietInfo) 

485 userInfo = User.get(User.username == user) 

486 if len(dietInfo) > 0: 

487 createActivityLog(f"Updated {userInfo.fullName}'s dietary restrictions to {dietInfo}.") if dietInfo.strip() else None 

488 else: 

489 createActivityLog(f"Deleted all {userInfo.fullName}'s dietary restrictions dietary restrictions.") 

490 

491 

492 return " " 

493 

494@main_bp.route('/profile/<username>/indicateInterest', methods=['POST']) 

495def indicateMinorInterest(username): 

496 toggleMinorInterest(username) 

497 

498 return ""