Coverage for app/logic/minor.py: 66%

185 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2026-03-26 20:07 +0000

1from collections import defaultdict 

2from typing import List, Dict 

3from flask import flash, g 

4from playhouse.shortcuts import model_to_dict 

5from peewee import JOIN, fn, Case, DoesNotExist, SQL 

6import xlsxwriter 

7 

8from app.models import app 

9from app.models.user import User 

10from app.models.term import Term 

11from app.models.event import Event 

12from app.models.course import Course 

13from app.models.program import Program 

14from app.models.certification import Certification 

15from app.models.courseInstructor import CourseInstructor 

16from app.models.eventParticipant import EventParticipant 

17from app.models.courseParticipant import CourseParticipant 

18from app.models.individualRequirement import IndividualRequirement 

19from app.models.certificationRequirement import CertificationRequirement 

20from app.models.cceMinorProposal import CCEMinorProposal 

21from app.logic.fileHandler import FileHandler 

22from app.logic.utils import getFilesFromRequest 

23from app.models.attachmentUpload import AttachmentUpload 

24 

25 

26def createSummerExperience(username, formData): 

27 """ 

28 Given the username of the student and the formData which includes all of 

29 the SummerExperience information, create a new SummerExperience object. 

30 """ 

31 user = User.get(User.username == username) 

32 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas 

33 formData = dict(formData) 

34 formData.pop("contentArea") 

35 return CCEMinorProposal.create( 

36 student=user, 

37 proposalType = 'Summer Experience', 

38 contentAreas = contentAreas, 

39 createdBy = g.current_user, 

40 **formData, 

41 ) 

42 

43def updateSummerExperience(proposalID, formData): 

44 """ 

45 Given the username of the student and the formData which includes all of 

46 the SummerExperience information, create a new SummerExperience object. 

47 """ 

48 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas 

49 formData = dict(formData) 

50 formData.pop("contentArea") 

51 formData.pop("experienceHoursOver300") 

52 CCEMinorProposal.update(contentAreas=contentAreas, **formData).where(CCEMinorProposal.id == proposalID).execute() 

53 

54def getCCEMinorProposals(username): 

55 return list(CCEMinorProposal.select().where(CCEMinorProposal.student==username)) 

56 

57def getEngagementTotal(engagementData): 

58 """  

59 Count the number of engagements (from all terms) that have matched with a requirement  

60 """ 

61 

62 # map the flattened list of engagements to their matched values, and sum them 

63 return sum(map(lambda e: e['matched'], sum(engagementData.values(),[]))) 

64 

65 

66def getMinorInterest() -> List[Dict]: 

67 """ 

68 Get all students that have indicated interest in the CCE minor and return a list of dicts of all interested students 

69 """ 

70 interestedStudents = (User.select(User) 

71 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username)) 

72 .where(User.isStudent & User.minorInterest & ~User.declaredMinor & IndividualRequirement.username.is_null(True))) 

73 

74 interestedStudentList = [model_to_dict(student) for student in interestedStudents] 

75 

76 return interestedStudentList 

77 

78def getMinorProgress(): 

79 """ 

80 Get all the users who have an IndividualRequirement record under the CCE certification which  

81 and returns a list of dicts containing the student, how many engagements they have completed,  

82 and if they have completed the summer experience.  

83 """ 

84 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0) 

85 

86 engagedStudentsWithCount = ( 

87 User.select(User, fn.COUNT(IndividualRequirement.id).alias('engagementCount'), 

88 fn.SUM(summerCase).alias('hasSummer'), 

89 fn.IF(fn.COUNT(CCEMinorProposal.id) > 0, True, False).alias('hasCCEMinorProposal')) 

90 .join(IndividualRequirement, on=(User.username == IndividualRequirement.username)) 

91 .join(CertificationRequirement, on=(IndividualRequirement.requirement_id == CertificationRequirement.id)) 

92 .switch(User).join(CCEMinorProposal, JOIN.LEFT_OUTER, on= (User.username == CCEMinorProposal.student)) 

93 .where( 

94 (CertificationRequirement.certification_id == Certification.CCE) & 

95 (User.declaredMinor == True) 

96 ) 

97 .group_by(User.firstName, User.lastName, User.username) 

98 .order_by(SQL("engagementCount").desc()) 

99 ) 

100 engagedStudentsList = [{'firstName': student.firstName, 

101 'lastName': student.lastName, 

102 'username': student.username, 

103 'B-Number': student.bnumber, 

104 'hasGraduated': student.hasGraduated, 

105 'engagementCount': student.engagementCount - student.hasSummer, 

106 'hasCCEMinorProposal': student.hasCCEMinorProposal, 

107 'hasSummer': "Completed" if student.hasSummer else "Incomplete"} for student in engagedStudentsWithCount] 

108 return engagedStudentsList 

109 

110def getMinorSpreadsheet(): 

111 """ 

112 Returns a spreadsheet containing users and related spreadsheet information. 

113 """ 

114 # If we're in 2025, can we get the minor information for 2023? 

115 studentProgress = getMinorProgress() 

116 columnNames = studentProgress[0] 

117 columnNames = ["First Name", "Last Name", "Username", "B-Number", "Number of Engagements", "Completed Summer Experience"] 

118 

119 filepath = f"{app.config['files']['base_path']}/minor_data.xlsx" 

120 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True}) 

121 

122 worksheet = workbook.add_worksheet('minor_information') 

123 format_row = workbook.add_format({'align': 'left'}) 

124 

125 columnIndex = 1 

126 worksheet.set_column(columnIndex, len(columnNames), 30, workbook.add_format({'bold': True})) 

127 for columnName in columnNames: 

128 worksheet.write(1, columnIndex, columnName) 

129 columnIndex += 1 

130 

131 for rowNumber, student in enumerate(studentProgress, 2): 

132 if student['hasGraduated']: continue 

133 student.pop('hasCCEMinorProposal') 

134 student.pop('hasGraduated') 

135 student['hasSummer'] = "Yes" if student['hasSummer'] == "Complete" else "No" 

136 worksheet.set_row(rowNumber, None, format_row) 

137 if student['B-Number'] == None: student["B-Number"] = "No B-Number Found" 

138 for columnNumber, key in enumerate(student, 1): 

139 worksheet.write(rowNumber, columnNumber, student[key]) 

140 

141 

142 workbook.close() 

143 

144 return filepath 

145 

146 

147def toggleMinorInterest(username, isAdding): 

148 """ 

149 Given a username, update their minor interest and minor status. 

150 """ 

151 

152 try: 

153 user = User.get(username=username) 

154 if not user: 

155 return {"error": "User not found"}, 404 

156 

157 user.minorInterest = isAdding 

158 user.declaredMinor = False 

159 user.save() 

160 

161 except Exception as e: 

162 print(f"Error updating minor interest: {e}") 

163 return {"error": str(e)}, 500 

164 

165def declareMinorInterest(username): 

166 """ 

167 Given a username, update their minor declaration 

168 """ 

169 user = User.get_by_id(username) 

170 

171 if not user: 

172 raise ValueError(f"User with username '{username}' not found.") 

173 

174 user.declaredMinor = not user.declaredMinor 

175 

176 try: 

177 user.save() 

178 except Exception as e: 

179 raise RuntimeError(f"Failed to declare interested student: {e}") 

180 

181def getDeclaredMinorStudents(): 

182 """ 

183 This function retrieves a list of students who have declared the CCE minor along with their engagement progress. 

184 It returns a list of dictionaries containing student information and their engagement details and adds students who have no requirements but have declared the minor with 0 engagements. 

185 """ 

186 summerEngagementCount = fn.COUNT( 

187 fn.DISTINCT( 

188 Case( 

189 None, 

190 [(CCEMinorProposal.proposalType == "Summer Experience", CCEMinorProposal.id)], 

191 None 

192 ) 

193 ) 

194 ).alias("summerEngagementCount") 

195 

196 # this returns the count of distinct engagements that have a certification requirement id. 

197 # this is important because our join clause specifically joins individualrequirements with the certifications that match to CCE 

198 # while leaving the rest as null 

199 cceEngagementCount = fn.COUNT( 

200 fn.DISTINCT( 

201 Case( 

202 None, 

203 [(CertificationRequirement.id.is_null(False), IndividualRequirement.id)], 

204 None 

205 ) 

206 ) 

207 ).alias("allEngagementCount") 

208 

209 q = ( 

210 User 

211 .select( 

212 User, 

213 cceEngagementCount, 

214 summerEngagementCount, 

215 fn.IF(fn.COUNT(fn.DISTINCT(CCEMinorProposal.id)) > 0, True, False).alias("hasCCEMinorProposal"), 

216 ) 

217 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username)) 

218 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=( 

219 (IndividualRequirement.requirement_id == CertificationRequirement.id) & 

220 (CertificationRequirement.certification_id == Certification.CCE) # only cce minor certs are populated with non-null 

221 )) 

222 .switch(User) 

223 .join(CCEMinorProposal, JOIN.LEFT_OUTER, on=(User.username == CCEMinorProposal.student)) 

224 .where( 

225 (User.declaredMinor == True) & 

226 (User.isStudent == True) 

227 ) 

228 .group_by(User.username) 

229 .order_by(SQL("allEngagementCount").desc()) 

230 ) 

231 

232 result = [] 

233 for s in q: 

234 engagementCount = int(s.allEngagementCount or 0) 

235 result.append({ 

236 "firstName": s.firstName, 

237 "lastName": s.lastName, 

238 "username": s.username, 

239 "B-Number": s.bnumber, 

240 "email": s.email, 

241 "hasGraduated": s.hasGraduated, 

242 "engagementCount": engagementCount, 

243 "hasCCEMinorProposal": bool(s.hasCCEMinorProposal), 

244 "hasSummer": "Completed" if (s.summerEngagementCount and int(s.summerEngagementCount) > 0) else "Incomplete", 

245 }) 

246 

247 return result 

248 

249def getCourseInformation(id): 

250 """ 

251 Given a course ID, return an object containing the course information and  

252 its instructors full names. 

253 """ 

254 # retrieve the course and the course instructors 

255 course = model_to_dict(Course.get_by_id(id)) 

256 

257 courseInstructors = (CourseInstructor.select(CourseInstructor, User) 

258 .join(Course).switch() 

259 .join(User) 

260 .where(Course.id == id)) 

261 

262 courseInformation = {"instructors": [(instructor.user.firstName + " " + instructor.user.lastName) for instructor in courseInstructors], "course": course} 

263 

264 return courseInformation 

265 

266def getProgramEngagementHistory(program_id, username, term_id): 

267 """ 

268 Given a program_id, username, and term_id, return an object containing all events in the provided program  

269 and in the given term along with the program name. 

270 """ 

271 # execute a query that will retrieve all events in which the user has participated 

272 # that fall under the provided term and programs. 

273 eventsInProgramAndTerm = (Event.select(Event.id, Event.name, EventParticipant.hoursEarned) 

274 .join(Program).switch() 

275 .join(EventParticipant) 

276 .where(EventParticipant.user == username, 

277 Event.term == term_id, 

278 Event.isService == True, 

279 Program.id == program_id) 

280 ) 

281 

282 program = Program.get_by_id(program_id) 

283 

284 # calculate total amount of hours for the whole program that term 

285 totalHours = 0 

286 for event in eventsInProgramAndTerm: 

287 if event.eventparticipant.hoursEarned: 

288 totalHours += event.eventparticipant.hoursEarned 

289 

290 participatedEvents = {"program":program.programName, "events": [event for event in eventsInProgramAndTerm.dicts()], "totalHours": totalHours} 

291 

292 return participatedEvents 

293 

294def setCommunityEngagementForUser(action, engagementData, currentUser): 

295 """ 

296 Either add or remove an IndividualRequirement record for a student's Sustained Community Engagement 

297 

298 :param action: The behavior of the function. Can be 'add' or 'remove' 

299 :param engagementData: 

300 type: program or course 

301 id: program or course id 

302 username: the username of the student that is having a community engagement added or removed 

303 term: The term the engagement is recorded in 

304 :param currentuser: The user who is performing the add/remove action  

305 

306 :raises DoesNotExist: if there are no available CertificationRequirement slots remaining for the engagement 

307 """ 

308 if engagementData['type'] not in ['program','course']: 

309 raise Exception("Invalid engagement type!") 

310 

311 requirement = (CertificationRequirement.select() 

312 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

313 (IndividualRequirement.requirement == CertificationRequirement.id) & 

314 (IndividualRequirement.username == engagementData['username']))) 

315 .where(IndividualRequirement.username.is_null(True), 

316 CertificationRequirement.certification == Certification.CCE, 

317 CertificationRequirement.name.not_in(['Summer Program']))) 

318 if action == 'add': 

319 try: 

320 IndividualRequirement.create(**{engagementData['type']: engagementData['id'], 

321 "username": engagementData['username'], 

322 "term": engagementData['term'], 

323 "requirement": requirement.get(), 

324 "addedBy": currentUser, 

325 }) 

326 

327 # Thrown if there are no available engagement requirements left. Handled elsewhere. 

328 except DoesNotExist as e: 

329 raise e 

330 

331 elif action == 'remove': 

332 IndividualRequirement.delete().where( 

333 getattr(IndividualRequirement, engagementData['type']) == engagementData['id'], 

334 IndividualRequirement.username == engagementData['username'], 

335 IndividualRequirement.term == engagementData['term'] 

336 ).execute() 

337 

338 else: 

339 raise Exception(f"Invalid action '{action}' sent to setCommunityEngagementForUser") 

340 

341def getCommunityEngagementByTerm(username): 

342 """ 

343 Given a username, return all of their community engagements (service learning courses and event participations.) 

344 """ 

345 courseMatchCase = Case(None, [(IndividualRequirement.course.is_null(True) , 0)], 1) 

346 

347 courses = (Course.select(Course, courseMatchCase.alias("matchedReq")) 

348 .join(CourseParticipant, on=(Course.id == CourseParticipant.course)) 

349 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

350 (IndividualRequirement.course == Course.id) & 

351 (IndividualRequirement.username == CourseParticipant.user) & 

352 (IndividualRequirement.term == Course.term))) 

353 .where(CourseParticipant.user == username) 

354 .group_by(Course.courseName, Course.term)) 

355 

356 # initialize default dict to store term descriptions as keys mapping to each 

357 # engagement's respective type, name, id, and term. 

358 communityEngagementByTermDict = defaultdict(list) 

359 for course in courses: 

360 communityEngagementByTermDict[(course.term.description, course.term.id)].append( 

361 {"name":course.courseName, 

362 "id":course.id, 

363 "type":"course", 

364 "matched": course.matchedReq, 

365 "term":course.term.id}) 

366 

367 programMatchCase = Case(None, [(IndividualRequirement.program.is_null(True) , 0)], 1) 

368 

369 events = (Event.select(Event, Program, programMatchCase.alias('matchedReq')) 

370 .join(EventParticipant, on=(Event.id == EventParticipant.event)).switch() 

371 .join(Program) 

372 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.program == Program.id) & 

373 (IndividualRequirement.username == EventParticipant.user) & 

374 (IndividualRequirement.term == Event.term))) 

375 .where(EventParticipant.user == username, Event.isService == True) 

376 .group_by(Event.program, Event.term)) 

377 

378 for event in events: 

379 communityEngagementByTermDict[(event.term.description, event.term.id)].append({"name":event.program.programName, 

380 "id":event.program.id, 

381 "type":"program", 

382 "matched": event.matchedReq, 

383 "term":event.term.id 

384 }) 

385 

386 # sorting the communityEngagementByTermDict by the term id 

387 return dict(sorted(communityEngagementByTermDict.items(), key=lambda engagement: engagement[0][1])) 

388 

389def createOtherEngagement(username, request): 

390 """ 

391 Create a CCEMinorProposal entry based off of the form data 

392 """ 

393 user = User.get(User.username == username) 

394 

395 createdProposal = CCEMinorProposal.create(proposalType = 'Other Engagement', 

396 createdBy = g.current_user, 

397 student = user, 

398 **request.form 

399 ) 

400 attachment = request.files.get("attachmentObject") 

401 if attachment: 

402 addFile = FileHandler(getFilesFromRequest(request), proposalId=createdProposal.id) 

403 addFile.saveFiles() 

404 

405def updateOtherEngagementRequest(proposalID, request): 

406 """ 

407 Update an existing CCEMinorProposal entry based off of the form data 

408 """ 

409 newAttachment = request.files.get("attachmentObject") 

410 previousAttachment = AttachmentUpload.get_or_none(proposal=proposalID) 

411 proposalObject = CCEMinorProposal.get_by_id(proposalID) 

412 deleteAttachment = request.form.get("deleteAttachment") == "true" 

413 

414 if deleteAttachment: 

415 if not previousAttachment: 

416 raise AssertionError("deleteAttachment flag is set but no attachment exists in the database.") 

417 FileHandler(proposalId=proposalID).deleteFile(previousAttachment.id) 

418 elif newAttachment and newAttachment.filename: 

419 if previousAttachment: 

420 FileHandler(proposalId=proposalID).deleteFile(previousAttachment.id) 

421 addFile = FileHandler(getFilesFromRequest(request), proposalId=proposalID) 

422 addFile.saveFiles(parentEvent=proposalObject) 

423 

424 formData = request.form.copy() 

425 formData.pop("deleteAttachment", None) 

426 CCEMinorProposal.update(**formData).where(CCEMinorProposal.id == proposalID).execute() 

427 

428def saveSummerExperience(username, summerExperience, currentUser): 

429 """ 

430 :param username: username of the student that the summer experience is for 

431 :param summerExperience: dict  

432 summerExperience: string of what the summer experience was (will be written as the 'description' in the IndividualRequirement table) 

433 selectedSummerTerm: the term description that the summer experience took place in 

434 :param currentUser: the username of the user who added the summer experience record 

435 

436 Delete any existing IndividualRequirement entry for 'username' if it is for 'Summer Program' and create a new IndividualRequirement entry for  

437 'Summer Program' with the contents of summerExperience.  

438 """ 

439 requirementDeleteSubSelect = CertificationRequirement.select().where(CertificationRequirement.certification == Certification.CCE, CertificationRequirement.name << ['Summer Program']) 

440 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.requirement == requirementDeleteSubSelect).execute() 

441 

442 requirement = (CertificationRequirement.select() 

443 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.requirement == CertificationRequirement.id) & 

444 (IndividualRequirement.username == username))) 

445 .where(IndividualRequirement.username.is_null(True), 

446 CertificationRequirement.certification == Certification.CCE, 

447 CertificationRequirement.name << ['Summer Program'])) 

448 

449 summerTerm = (Term.select().where(Term.description == summerExperience['selectedSummerTerm'])) 

450 

451 IndividualRequirement.create(**{"description": summerExperience['summerExperience'], 

452 "username": username, 

453 "term": summerTerm.get(), 

454 "requirement": requirement.get(), 

455 "addedBy": currentUser, 

456 }) 

457 

458 return "" 

459 

460def getSummerExperience(username): 

461 """ 

462 Get a students summer experience to populate text box if the student has one 

463 """ 

464 summerExperience = (IndividualRequirement.select() 

465 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(CertificationRequirement.id == IndividualRequirement.requirement)).switch() 

466 .join(Term, on=(IndividualRequirement.term == Term.id)) 

467 .where(IndividualRequirement.username == username, 

468 CertificationRequirement.certification == Certification.CCE, 

469 CertificationRequirement.name << ['Summer Program'])) 

470 if len(list(summerExperience)) == 1: 

471 return (summerExperience.get().term.description, summerExperience.get().description) 

472 

473 return (None, None) 

474 

475def removeSummerExperience(username): 

476 """ 

477 Delete IndividualRequirement table entry for 'username' 

478 """ 

479 term, summerExperienceToDelete = getSummerExperience(username) 

480 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.description == summerExperienceToDelete).execute() 

481 

482def removeProposal(proposalID) -> None: 

483 """ 

484 Delete summer experience or other engagement objects from the CCEMinorProposal table.  

485 File objects attached to the CCEMinorProposal object are also deleted.  

486 """ 

487 proposalID = int(proposalID) 

488 

489 proposalAttachment = AttachmentUpload.get_or_none(proposal=proposalID) 

490 if proposalAttachment: 

491 proposalFileHandler = FileHandler(proposalId=proposalID) 

492 proposalFileHandler.deleteFile(proposalAttachment.id) 

493 

494 CCEMinorProposal.delete().where(CCEMinorProposal.id == proposalID).execute() 

495 

496def changeProposalStatus(proposalID, newStatus) -> None: 

497 """ 

498 Changes the status of a proposal. 

499 """ 

500 CCEMinorProposal.update(status=newStatus).where(CCEMinorProposal.id == int(proposalID)).execute()