Coverage for app/logic/minor.py: 66%

164 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2026-03-11 19:39 +0000

1from collections import defaultdict 

2from typing import List, Dict 

3from flask import flash, g 

4from playhouse.shortcuts import model_to_dict 

5from peewee import JOIN, fn, Case, DoesNotExist, SQL 

6import xlsxwriter 

7 

8from app import app 

9from app.models.user import User 

10from app.models.term import Term 

11from app.models.event import Event 

12from app.models.course import Course 

13from app.models.program import Program 

14from app.models.certification import Certification 

15from app.models.courseInstructor import CourseInstructor 

16from app.models.eventParticipant import EventParticipant 

17from app.models.courseParticipant import CourseParticipant 

18from app.models.individualRequirement import IndividualRequirement 

19from app.models.certificationRequirement import CertificationRequirement 

20from app.models.cceMinorProposal import CCEMinorProposal 

21from app.logic.createLogs import createActivityLog 

22from app.logic.fileHandler import FileHandler 

23from app.logic.serviceLearningCourses import deleteCourseObject 

24from app.models.attachmentUpload import AttachmentUpload 

25 

26 

27def createSummerExperience(username, formData): 

28 """ 

29 Given the username of the student and the formData which includes all of 

30 the SummerExperience information, create a new SummerExperience object. 

31 """ 

32 try: 

33 user = User.get(User.username == username) 

34 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas 

35 CCEMinorProposal.create( 

36 student=user, 

37 proposalType = 'Summer Experience', 

38 contentAreas = contentAreas, 

39 status="Pending", 

40 createdBy = g.current_user, 

41 **formData, 

42 ) 

43 except Exception as e: 

44 print(f"Error saving summer experience: {e}") 

45 raise e 

46 

47def getCCEMinorProposals(username): 

48 proposalList = [] 

49 

50 cceMinorProposals = list(CCEMinorProposal.select().where(CCEMinorProposal.student==username)) 

51 

52 for experience in cceMinorProposals: 

53 proposalList.append({ 

54 "id": experience.id, 

55 "type": experience.proposalType, 

56 "createdBy": experience.createdBy, 

57 "supervisor": experience.supervisorName, 

58 "term": experience.term, 

59 "status": experience.status, 

60 }) 

61 

62 return proposalList 

63 

64def getEngagementTotal(engagementData): 

65 """  

66 Count the number of engagements (from all terms) that have matched with a requirement  

67 """ 

68 

69 # map the flattened list of engagements to their matched values, and sum them 

70 return sum(map(lambda e: e['matched'], sum(engagementData.values(),[]))) 

71 

72 

73def getMinorInterest() -> List[Dict]: 

74 """ 

75 Get all students that have indicated interest in the CCE minor and return a list of dicts of all interested students 

76 """ 

77 interestedStudents = (User.select(User) 

78 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username)) 

79 .where(User.isStudent & User.minorInterest & ~User.declaredMinor & IndividualRequirement.username.is_null(True))) 

80 

81 interestedStudentList = [model_to_dict(student) for student in interestedStudents] 

82 

83 return interestedStudentList 

84 

85def getMinorProgress(): 

86 """ 

87 Get all the users who have an IndividualRequirement record under the CCE certification which  

88 and returns a list of dicts containing the student, how many engagements they have completed,  

89 and if they have completed the summer experience.  

90 """ 

91 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0) 

92 

93 engagedStudentsWithCount = ( 

94 User.select(User, fn.COUNT(IndividualRequirement.id).alias('engagementCount'), 

95 fn.SUM(summerCase).alias('hasSummer'), 

96 fn.IF(fn.COUNT(CCEMinorProposal.id) > 0, True, False).alias('hasCCEMinorProposal')) 

97 .join(IndividualRequirement, on=(User.username == IndividualRequirement.username)) 

98 .join(CertificationRequirement, on=(IndividualRequirement.requirement_id == CertificationRequirement.id)) 

99 .switch(User).join(CCEMinorProposal, JOIN.LEFT_OUTER, on= (User.username == CCEMinorProposal.student)) 

100 .where( 

101 (CertificationRequirement.certification_id == Certification.CCE) & 

102 (User.declaredMinor == True) 

103 ) 

104 .group_by(User.firstName, User.lastName, User.username) 

105 .order_by(SQL("engagementCount").desc()) 

106 ) 

107 engagedStudentsList = [{'firstName': student.firstName, 

108 'lastName': student.lastName, 

109 'username': student.username, 

110 'B-Number': student.bnumber, 

111 'hasGraduated': student.hasGraduated, 

112 'engagementCount': student.engagementCount - student.hasSummer, 

113 'hasCCEMinorProposal': student.hasCCEMinorProposal, 

114 'hasSummer': "Completed" if student.hasSummer else "Incomplete"} for student in engagedStudentsWithCount] 

115 return engagedStudentsList 

116 

117def getMinorSpreadsheet(): 

118 """ 

119 Returns a spreadsheet containing users and related spreadsheet information. 

120 """ 

121 # If we're in 2025, can we get the minor information for 2023? 

122 studentProgress = getMinorProgress() 

123 columnNames = studentProgress[0] 

124 columnNames = ["First Name", "Last Name", "Username", "B-Number", "Number of Engagements", "Completed Summer Experience"] 

125 

126 filepath = f"{app.config['files']['base_path']}/minor_data.xlsx" 

127 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True}) 

128 

129 worksheet = workbook.add_worksheet('minor_information') 

130 format_row = workbook.add_format({'align': 'left'}) 

131 

132 columnIndex = 1 

133 worksheet.set_column(columnIndex, len(columnNames), 30, workbook.add_format({'bold': True})) 

134 for columnName in columnNames: 

135 worksheet.write(1, columnIndex, columnName) 

136 columnIndex += 1 

137 

138 for rowNumber, student in enumerate(studentProgress, 2): 

139 if student['hasGraduated']: continue 

140 student.pop('hasCCEMinorProposal') 

141 student.pop('hasGraduated') 

142 student['hasSummer'] = "Yes" if student['hasSummer'] == "Complete" else "No" 

143 worksheet.set_row(rowNumber, None, format_row) 

144 if student['B-Number'] == None: student["B-Number"] = "No B-Number Found" 

145 for columnNumber, key in enumerate(student, 1): 

146 worksheet.write(rowNumber, columnNumber, student[key]) 

147 

148 

149 workbook.close() 

150 

151 return filepath 

152 

153 

154def toggleMinorInterest(username, isAdding): 

155 """ 

156 Given a username, update their minor interest and minor status. 

157 """ 

158 

159 try: 

160 user = User.get(username=username) 

161 if not user: 

162 return {"error": "User not found"}, 404 

163 

164 user.minorInterest = isAdding 

165 user.declaredMinor = False 

166 user.save() 

167 

168 except Exception as e: 

169 print(f"Error updating minor interest: {e}") 

170 return {"error": str(e)}, 500 

171 

172def declareMinorInterest(username): 

173 """ 

174 Given a username, update their minor declaration 

175 """ 

176 user = User.get_by_id(username) 

177 

178 if not user: 

179 raise ValueError(f"User with username '{username}' not found.") 

180 

181 user.declaredMinor = not user.declaredMinor 

182 

183 try: 

184 user.save() 

185 except Exception as e: 

186 raise RuntimeError(f"Failed to declare interested student: {e}") 

187 

188def getDeclaredMinorStudents(): 

189 """ 

190 This function retrieves a list of students who have declared the CCE minor along with their engagement progress. 

191 It returns a list of dictionaries containing student information and their engagement details and adds students who have no requirements but have declared the minor with 0 engagements. 

192 """ 

193 summerEngagementCount = fn.COUNT( 

194 fn.DISTINCT( 

195 Case( 

196 None, 

197 [(CCEMinorProposal.proposalType == "Summer Experience", CCEMinorProposal.id)], 

198 None 

199 ) 

200 ) 

201 ).alias("summerEngagementCount") 

202 

203 # this returns the count of distinct engagements that have a certification requirement id. 

204 # this is important because our join clause specifically joins individualrequirements with the certifications that match to CCE 

205 # while leaving the rest as null 

206 cceEngagementCount = fn.COUNT( 

207 fn.DISTINCT( 

208 Case( 

209 None, 

210 [(CertificationRequirement.id.is_null(False), IndividualRequirement.id)], 

211 None 

212 ) 

213 ) 

214 ).alias("allEngagementCount") 

215 

216 q = ( 

217 User 

218 .select( 

219 User, 

220 cceEngagementCount, 

221 summerEngagementCount, 

222 fn.IF(fn.COUNT(fn.DISTINCT(CCEMinorProposal.id)) > 0, True, False).alias("hasCCEMinorProposal"), 

223 ) 

224 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username)) 

225 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=( 

226 (IndividualRequirement.requirement_id == CertificationRequirement.id) & 

227 (CertificationRequirement.certification_id == Certification.CCE) # only cce minor certs are populated with non-null 

228 )) 

229 .switch(User) 

230 .join(CCEMinorProposal, JOIN.LEFT_OUTER, on=(User.username == CCEMinorProposal.student)) 

231 .where( 

232 (User.declaredMinor == True) & 

233 (User.isStudent == True) 

234 ) 

235 .group_by(User.username) 

236 .order_by(SQL("allEngagementCount").desc()) 

237 ) 

238 

239 result = [] 

240 for s in q: 

241 engagementCount = int(s.allEngagementCount or 0) 

242 result.append({ 

243 "firstName": s.firstName, 

244 "lastName": s.lastName, 

245 "username": s.username, 

246 "B-Number": s.bnumber, 

247 "email": s.email, 

248 "hasGraduated": s.hasGraduated, 

249 "engagementCount": engagementCount, 

250 "hasCCEMinorProposal": bool(s.hasCCEMinorProposal), 

251 "hasSummer": "Completed" if (s.summerEngagementCount and int(s.summerEngagementCount) > 0) else "Incomplete", 

252 }) 

253 

254 return result 

255 

256def getCourseInformation(id): 

257 """ 

258 Given a course ID, return an object containing the course information and  

259 its instructors full names. 

260 """ 

261 # retrieve the course and the course instructors 

262 course = model_to_dict(Course.get_by_id(id)) 

263 

264 courseInstructors = (CourseInstructor.select(CourseInstructor, User) 

265 .join(Course).switch() 

266 .join(User) 

267 .where(Course.id == id)) 

268 

269 courseInformation = {"instructors": [(instructor.user.firstName + " " + instructor.user.lastName) for instructor in courseInstructors], "course": course} 

270 

271 return courseInformation 

272 

273def getProgramEngagementHistory(program_id, username, term_id): 

274 """ 

275 Given a program_id, username, and term_id, return an object containing all events in the provided program  

276 and in the given term along with the program name. 

277 """ 

278 # execute a query that will retrieve all events in which the user has participated 

279 # that fall under the provided term and programs. 

280 eventsInProgramAndTerm = (Event.select(Event.id, Event.name, EventParticipant.hoursEarned) 

281 .join(Program).switch() 

282 .join(EventParticipant) 

283 .where(EventParticipant.user == username, 

284 Event.term == term_id, 

285 Event.isService == True, 

286 Program.id == program_id) 

287 ) 

288 

289 program = Program.get_by_id(program_id) 

290 

291 # calculate total amount of hours for the whole program that term 

292 totalHours = 0 

293 for event in eventsInProgramAndTerm: 

294 if event.eventparticipant.hoursEarned: 

295 totalHours += event.eventparticipant.hoursEarned 

296 

297 participatedEvents = {"program":program.programName, "events": [event for event in eventsInProgramAndTerm.dicts()], "totalHours": totalHours} 

298 

299 return participatedEvents 

300 

301def setCommunityEngagementForUser(action, engagementData, currentUser): 

302 """ 

303 Either add or remove an IndividualRequirement record for a student's Sustained Community Engagement 

304 

305 :param action: The behavior of the function. Can be 'add' or 'remove' 

306 :param engagementData: 

307 type: program or course 

308 id: program or course id 

309 username: the username of the student that is having a community engagement added or removed 

310 term: The term the engagement is recorded in 

311 :param currentuser: The user who is performing the add/remove action  

312 

313 :raises DoesNotExist: if there are no available CertificationRequirement slots remaining for the engagement 

314 """ 

315 if engagementData['type'] not in ['program','course']: 

316 raise Exception("Invalid engagement type!") 

317 

318 requirement = (CertificationRequirement.select() 

319 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

320 (IndividualRequirement.requirement == CertificationRequirement.id) & 

321 (IndividualRequirement.username == engagementData['username']))) 

322 .where(IndividualRequirement.username.is_null(True), 

323 CertificationRequirement.certification == Certification.CCE, 

324 CertificationRequirement.name.not_in(['Summer Program']))) 

325 if action == 'add': 

326 try: 

327 IndividualRequirement.create(**{engagementData['type']: engagementData['id'], 

328 "username": engagementData['username'], 

329 "term": engagementData['term'], 

330 "requirement": requirement.get(), 

331 "addedBy": currentUser, 

332 }) 

333 

334 # Thrown if there are no available engagement requirements left. Handled elsewhere. 

335 except DoesNotExist as e: 

336 raise e 

337 

338 elif action == 'remove': 

339 IndividualRequirement.delete().where( 

340 getattr(IndividualRequirement, engagementData['type']) == engagementData['id'], 

341 IndividualRequirement.username == engagementData['username'], 

342 IndividualRequirement.term == engagementData['term'] 

343 ).execute() 

344 

345 else: 

346 raise Exception(f"Invalid action '{action}' sent to setCommunityEngagementForUser") 

347 

348def getCommunityEngagementByTerm(username): 

349 """ 

350 Given a username, return all of their community engagements (service learning courses and event participations.) 

351 """ 

352 courseMatchCase = Case(None, [(IndividualRequirement.course.is_null(True) , 0)], 1) 

353 

354 courses = (Course.select(Course, courseMatchCase.alias("matchedReq")) 

355 .join(CourseParticipant, on=(Course.id == CourseParticipant.course)) 

356 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

357 (IndividualRequirement.course == Course.id) & 

358 (IndividualRequirement.username == CourseParticipant.user) & 

359 (IndividualRequirement.term == Course.term))) 

360 .where(CourseParticipant.user == username) 

361 .group_by(Course.courseName, Course.term)) 

362 

363 # initialize default dict to store term descriptions as keys mapping to each 

364 # engagement's respective type, name, id, and term. 

365 communityEngagementByTermDict = defaultdict(list) 

366 for course in courses: 

367 communityEngagementByTermDict[(course.term.description, course.term.id)].append( 

368 {"name":course.courseName, 

369 "id":course.id, 

370 "type":"course", 

371 "matched": course.matchedReq, 

372 "term":course.term.id}) 

373 

374 programMatchCase = Case(None, [(IndividualRequirement.program.is_null(True) , 0)], 1) 

375 

376 events = (Event.select(Event, Program, programMatchCase.alias('matchedReq')) 

377 .join(EventParticipant, on=(Event.id == EventParticipant.event)).switch() 

378 .join(Program) 

379 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.program == Program.id) & 

380 (IndividualRequirement.username == EventParticipant.user) & 

381 (IndividualRequirement.term == Event.term))) 

382 .where(EventParticipant.user == username, Event.isService == True) 

383 .group_by(Event.program, Event.term)) 

384 

385 for event in events: 

386 communityEngagementByTermDict[(event.term.description, event.term.id)].append({"name":event.program.programName, 

387 "id":event.program.id, 

388 "type":"program", 

389 "matched": event.matchedReq, 

390 "term":event.term.id 

391 }) 

392 

393 # sorting the communityEngagementByTermDict by the term id 

394 return dict(sorted(communityEngagementByTermDict.items(), key=lambda engagement: engagement[0][1])) 

395 

396def createOtherEngagementRequest(username, formData): 

397 """ 

398 Create a CCEMinorProposal entry based off of the form data 

399 """ 

400 user = User.get(User.username == username) 

401 

402 cceObject = CCEMinorProposal.create(proposalType = 'Other Engagement', 

403 createdBy = g.current_user, 

404 status = 'Pending', 

405 student = user, 

406 **formData 

407 ) 

408 

409 return cceObject 

410 

411def saveSummerExperience(username, summerExperience, currentUser): 

412 """ 

413 :param username: username of the student that the summer experience is for 

414 :param summerExperience: dict  

415 summerExperience: string of what the summer experience was (will be written as the 'description' in the IndividualRequirement table) 

416 selectedSummerTerm: the term description that the summer experience took place in 

417 :param currentUser: the username of the user who added the summer experience record 

418 

419 Delete any existing IndividualRequirement entry for 'username' if it is for 'Summer Program' and create a new IndividualRequirement entry for  

420 'Summer Program' with the contents of summerExperience.  

421 """ 

422 requirementDeleteSubSelect = CertificationRequirement.select().where(CertificationRequirement.certification == Certification.CCE, CertificationRequirement.name << ['Summer Program']) 

423 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.requirement == requirementDeleteSubSelect).execute() 

424 

425 requirement = (CertificationRequirement.select() 

426 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.requirement == CertificationRequirement.id) & 

427 (IndividualRequirement.username == username))) 

428 .where(IndividualRequirement.username.is_null(True), 

429 CertificationRequirement.certification == Certification.CCE, 

430 CertificationRequirement.name << ['Summer Program'])) 

431 

432 summerTerm = (Term.select().where(Term.description == summerExperience['selectedSummerTerm'])) 

433 

434 IndividualRequirement.create(**{"description": summerExperience['summerExperience'], 

435 "username": username, 

436 "term": summerTerm.get(), 

437 "requirement": requirement.get(), 

438 "addedBy": currentUser, 

439 }) 

440 

441 return "" 

442 

443def getSummerExperience(username): 

444 """ 

445 Get a students summer experience to populate text box if the student has one 

446 """ 

447 summerExperience = (IndividualRequirement.select() 

448 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(CertificationRequirement.id == IndividualRequirement.requirement)).switch() 

449 .join(Term, on=(IndividualRequirement.term == Term.id)) 

450 .where(IndividualRequirement.username == username, 

451 CertificationRequirement.certification == Certification.CCE, 

452 CertificationRequirement.name << ['Summer Program'])) 

453 if len(list(summerExperience)) == 1: 

454 return (summerExperience.get().term.description, summerExperience.get().description) 

455 

456 return (None, None) 

457 

458def removeSummerExperience(username): 

459 """ 

460 Delete IndividualRequirement table entry for 'username' 

461 """ 

462 term, summerExperienceToDelete = getSummerExperience(username) 

463 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.description == summerExperienceToDelete).execute() 

464 

465def removeProposal(proposalID) -> None: 

466 """ 

467 Delete summer experience or other engagement objects from the CCEMinorProposal table.  

468 File objects attached to the CCEMinorProposal object are also deleted.  

469 """ 

470 proposalID = int(proposalID) 

471 

472 proposalAttachment = AttachmentUpload.get_or_none(proposal=proposalID) 

473 if proposalAttachment: 

474 proposalFileHandler = FileHandler(proposalId=proposalID) 

475 proposalFileHandler.deleteFile(proposalAttachment.id) 

476 

477 CCEMinorProposal.delete().where(CCEMinorProposal.id == proposalID).execute()