Coverage for app/logic/minor.py: 65%

167 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2026-02-17 20:15 +0000

1from collections import defaultdict 

2from typing import List, Dict 

3from flask import flash, g 

4from playhouse.shortcuts import model_to_dict 

5from peewee import JOIN, fn, Case, DoesNotExist, SQL 

6import xlsxwriter 

7 

8from app import app 

9from app.models.user import User 

10from app.models.term import Term 

11from app.models.event import Event 

12from app.models.course import Course 

13from app.models.program import Program 

14from app.models.certification import Certification 

15from app.models.courseInstructor import CourseInstructor 

16from app.models.eventParticipant import EventParticipant 

17from app.models.courseParticipant import CourseParticipant 

18from app.models.individualRequirement import IndividualRequirement 

19from app.models.certificationRequirement import CertificationRequirement 

20from app.models.cceMinorProposal import CCEMinorProposal 

21from app.logic.createLogs import createActivityLog 

22from app.logic.fileHandler import FileHandler 

23from app.logic.serviceLearningCourses import deleteCourseObject 

24from app.models.attachmentUpload import AttachmentUpload 

25 

26 

27def createSummerExperience(username, formData): 

28 """ 

29 Given the username of the student and the formData which includes all of 

30 the SummerExperience information, create a new SummerExperience object. 

31 """ 

32 try: 

33 user = User.get(User.username == username) 

34 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas 

35 CCEMinorProposal.create( 

36 student=user, 

37 proposalType = 'Summer Experience', 

38 contentAreas = contentAreas, 

39 status="Pending", 

40 createdBy = g.current_user, 

41 **formData, 

42 ) 

43 except Exception as e: 

44 print(f"Error saving summer experience: {e}") 

45 raise e 

46 

47def getCCEMinorProposals(username): 

48 proposalList = [] 

49 

50 cceMinorProposals = list(CCEMinorProposal.select().where(CCEMinorProposal.student==username)) 

51 

52 for experience in cceMinorProposals: 

53 proposalList.append({ 

54 "id": experience.id, 

55 "type": experience.proposalType, 

56 "createdBy": experience.createdBy, 

57 "supervisor": experience.supervisorName, 

58 "term": experience.term, 

59 "status": experience.status, 

60 }) 

61 

62 return proposalList 

63 

64def getEngagementTotal(engagementData): 

65 """  

66 Count the number of engagements (from all terms) that have matched with a requirement  

67 """ 

68 

69 # map the flattened list of engagements to their matched values, and sum them 

70 return sum(map(lambda e: e['matched'], sum(engagementData.values(),[]))) 

71 

72 

73def getMinorInterest() -> List[Dict]: 

74 """ 

75 Get all students that have indicated interest in the CCE minor and return a list of dicts of all interested students 

76 """ 

77 interestedStudents = (User.select(User) 

78 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username)) 

79 .where(User.isStudent & User.minorInterest & ~User.declaredMinor & IndividualRequirement.username.is_null(True))) 

80 

81 interestedStudentList = [model_to_dict(student) for student in interestedStudents] 

82 

83 return interestedStudentList 

84 

85def getMinorProgress(): 

86 """ 

87 Get all the users who have an IndividualRequirement record under the CCE certification which  

88 and returns a list of dicts containing the student, how many engagements they have completed,  

89 and if they have completed the summer experience.  

90 """ 

91 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0) 

92 

93 engagedStudentsWithCount = ( 

94 User.select(User, fn.COUNT(IndividualRequirement.id).alias('engagementCount'), 

95 fn.SUM(summerCase).alias('hasSummer'), 

96 fn.IF(fn.COUNT(CCEMinorProposal.id) > 0, True, False).alias('hasCCEMinorProposal')) 

97 .join(IndividualRequirement, on=(User.username == IndividualRequirement.username)) 

98 .join(CertificationRequirement, on=(IndividualRequirement.requirement_id == CertificationRequirement.id)) 

99 .switch(User).join(CCEMinorProposal, JOIN.LEFT_OUTER, on= (User.username == CCEMinorProposal.student)) 

100 .where( 

101 (CertificationRequirement.certification_id == Certification.CCE) & 

102 (User.declaredMinor == True) 

103 ) 

104 .group_by(User.firstName, User.lastName, User.username) 

105 .order_by(SQL("engagementCount").desc()) 

106 ) 

107 engagedStudentsList = [{'firstName': student.firstName, 

108 'lastName': student.lastName, 

109 'username': student.username, 

110 'B-Number': student.bnumber, 

111 'hasGraduated': student.hasGraduated, 

112 'engagementCount': student.engagementCount - student.hasSummer, 

113 'hasCCEMinorProposal': student.hasCCEMinorProposal, 

114 'hasSummer': "Completed" if student.hasSummer else "Incomplete"} for student in engagedStudentsWithCount] 

115 return engagedStudentsList 

116 

117def getMinorSpreadsheet(): 

118 """ 

119 Returns a spreadsheet containing users and related spreadsheet information. 

120 """ 

121 # If we're in 2025, can we get the minor information for 2023? 

122 studentProgress = getMinorProgress() 

123 columnNames = studentProgress[0] 

124 columnNames = ["First Name", "Last Name", "Username", "B-Number", "Number of Engagements", "Completed Summer Experience"] 

125 

126 filepath = f"{app.config['files']['base_path']}/minor_data.xlsx" 

127 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True}) 

128 

129 worksheet = workbook.add_worksheet('minor_information') 

130 format_row = workbook.add_format({'align': 'left'}) 

131 

132 columnIndex = 1 

133 worksheet.set_column(columnIndex, len(columnNames), 30, workbook.add_format({'bold': True})) 

134 for columnName in columnNames: 

135 worksheet.write(1, columnIndex, columnName) 

136 columnIndex += 1 

137 

138 for rowNumber, student in enumerate(studentProgress, 2): 

139 if student['hasGraduated']: continue 

140 student.pop('hasCCEMinorProposal') 

141 student.pop('hasGraduated') 

142 student['hasSummer'] = "Yes" if student['hasSummer'] == "Complete" else "No" 

143 worksheet.set_row(rowNumber, None, format_row) 

144 if student['B-Number'] == None: student["B-Number"] = "No B-Number Found" 

145 for columnNumber, key in enumerate(student, 1): 

146 worksheet.write(rowNumber, columnNumber, student[key]) 

147 

148 

149 workbook.close() 

150 

151 return filepath 

152 

153 

154def toggleMinorInterest(username, isAdding): 

155 """ 

156 Given a username, update their minor interest and minor status. 

157 """ 

158 

159 try: 

160 user = User.get(username=username) 

161 if not user: 

162 return {"error": "User not found"}, 404 

163 

164 user.minorInterest = isAdding 

165 user.declaredMinor = False 

166 user.save() 

167 

168 except Exception as e: 

169 print(f"Error updating minor interest: {e}") 

170 return {"error": str(e)}, 500 

171 

172def declareMinorInterest(username): 

173 """ 

174 Given a username, update their minor declaration 

175 """ 

176 user = User.get_by_id(username) 

177 

178 if not user: 

179 raise ValueError(f"User with username '{username}' not found.") 

180 

181 user.declaredMinor = not user.declaredMinor 

182 

183 try: 

184 user.save() 

185 except Exception as e: 

186 raise RuntimeError(f"Failed to declare interested student: {e}") 

187 

188def getDeclaredMinorStudentsWithProgress(): 

189 """ 

190 This function retrieves a list of students who have declared the CCE minor along with their engagement progress. 

191 It returns a list of dictionaries containing student information and their engagement details and adds students who have no requirements but have declared the minor with 0 engagements. 

192 """ 

193 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0) 

194 

195 q = ( 

196 User 

197 .select( 

198 User, 

199 fn.COUNT(fn.DISTINCT(IndividualRequirement.id)).alias("rawEngagementCount"), 

200 fn.COALESCE(fn.SUM(fn.DISTINCT(summerCase)), 0).alias("summerCount"), 

201 fn.IF(fn.COUNT(fn.DISTINCT(CCEMinorProposal.id)) > 0, True, False).alias("hasCCEMinorProposal"), 

202 ) 

203 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username)) 

204 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(IndividualRequirement.requirement_id == CertificationRequirement.id)) 

205 .switch(User) 

206 .join(CCEMinorProposal, JOIN.LEFT_OUTER, on=(User.username == CCEMinorProposal.student)) 

207 .where( 

208 (User.declaredMinor == True) & 

209 ( 

210 (CertificationRequirement.certification_id == Certification.CCE) | 

211 (CertificationRequirement.id.is_null(True)) 

212 ) 

213 ) 

214 .group_by(User.firstName, User.lastName, User.username) 

215 .order_by(SQL("rawEngagementCount").desc()) 

216 ) 

217 

218 result = [] 

219 for s in q: 

220 

221 engagementCount = int(s.rawEngagementCount) - int(s.summerCount or 0) 

222 

223 result.append({ 

224 "firstName": s.firstName, 

225 "lastName": s.lastName, 

226 "username": s.username, 

227 "B-Number": s.bnumber, 

228 "email": s.email, 

229 "hasGraduated": s.hasGraduated, 

230 "engagementCount": engagementCount, 

231 "hasCCEMinorProposal": bool(s.hasCCEMinorProposal), 

232 "hasSummer": "Completed" if (s.summerCount and int(s.summerCount) > 0) else "Incomplete", 

233 }) 

234 

235 return result 

236 

237def getDeclaredMinorStudents(): 

238 """ 

239 Get a list of the students who have declared minor 

240 """ 

241 declaredStudents = User.select().where(User.isStudent & User.declaredMinor) 

242 

243 interestedStudentList = [model_to_dict(student) for student in declaredStudents] 

244 

245 return interestedStudentList 

246 

247def getCourseInformation(id): 

248 """ 

249 Given a course ID, return an object containing the course information and  

250 its instructors full names. 

251 """ 

252 # retrieve the course and the course instructors 

253 course = model_to_dict(Course.get_by_id(id)) 

254 

255 courseInstructors = (CourseInstructor.select(CourseInstructor, User) 

256 .join(Course).switch() 

257 .join(User) 

258 .where(Course.id == id)) 

259 

260 courseInformation = {"instructors": [(instructor.user.firstName + " " + instructor.user.lastName) for instructor in courseInstructors], "course": course} 

261 

262 return courseInformation 

263 

264def getProgramEngagementHistory(program_id, username, term_id): 

265 """ 

266 Given a program_id, username, and term_id, return an object containing all events in the provided program  

267 and in the given term along with the program name. 

268 """ 

269 # execute a query that will retrieve all events in which the user has participated 

270 # that fall under the provided term and programs. 

271 eventsInProgramAndTerm = (Event.select(Event.id, Event.name, EventParticipant.hoursEarned) 

272 .join(Program).switch() 

273 .join(EventParticipant) 

274 .where(EventParticipant.user == username, 

275 Event.term == term_id, 

276 Event.isService == True, 

277 Program.id == program_id) 

278 ) 

279 

280 program = Program.get_by_id(program_id) 

281 

282 # calculate total amount of hours for the whole program that term 

283 totalHours = 0 

284 for event in eventsInProgramAndTerm: 

285 if event.eventparticipant.hoursEarned: 

286 totalHours += event.eventparticipant.hoursEarned 

287 

288 participatedEvents = {"program":program.programName, "events": [event for event in eventsInProgramAndTerm.dicts()], "totalHours": totalHours} 

289 

290 return participatedEvents 

291 

292def setCommunityEngagementForUser(action, engagementData, currentUser): 

293 """ 

294 Either add or remove an IndividualRequirement record for a student's Sustained Community Engagement 

295 

296 :param action: The behavior of the function. Can be 'add' or 'remove' 

297 :param engagementData: 

298 type: program or course 

299 id: program or course id 

300 username: the username of the student that is having a community engagement added or removed 

301 term: The term the engagement is recorded in 

302 :param currentuser: The user who is performing the add/remove action  

303 

304 :raises DoesNotExist: if there are no available CertificationRequirement slots remaining for the engagement 

305 """ 

306 if engagementData['type'] not in ['program','course']: 

307 raise Exception("Invalid engagement type!") 

308 

309 requirement = (CertificationRequirement.select() 

310 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

311 (IndividualRequirement.requirement == CertificationRequirement.id) & 

312 (IndividualRequirement.username == engagementData['username']))) 

313 .where(IndividualRequirement.username.is_null(True), 

314 CertificationRequirement.certification == Certification.CCE, 

315 CertificationRequirement.name.not_in(['Summer Program']))) 

316 if action == 'add': 

317 try: 

318 IndividualRequirement.create(**{engagementData['type']: engagementData['id'], 

319 "username": engagementData['username'], 

320 "term": engagementData['term'], 

321 "requirement": requirement.get(), 

322 "addedBy": currentUser, 

323 }) 

324 

325 # Thrown if there are no available engagement requirements left. Handled elsewhere. 

326 except DoesNotExist as e: 

327 raise e 

328 

329 elif action == 'remove': 

330 IndividualRequirement.delete().where( 

331 getattr(IndividualRequirement, engagementData['type']) == engagementData['id'], 

332 IndividualRequirement.username == engagementData['username'], 

333 IndividualRequirement.term == engagementData['term'] 

334 ).execute() 

335 

336 else: 

337 raise Exception(f"Invalid action '{action}' sent to setCommunityEngagementForUser") 

338 

339def getCommunityEngagementByTerm(username): 

340 """ 

341 Given a username, return all of their community engagements (service learning courses and event participations.) 

342 """ 

343 courseMatchCase = Case(None, [(IndividualRequirement.course.is_null(True) , 0)], 1) 

344 

345 courses = (Course.select(Course, courseMatchCase.alias("matchedReq")) 

346 .join(CourseParticipant, on=(Course.id == CourseParticipant.course)) 

347 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

348 (IndividualRequirement.course == Course.id) & 

349 (IndividualRequirement.username == CourseParticipant.user) & 

350 (IndividualRequirement.term == Course.term))) 

351 .where(CourseParticipant.user == username) 

352 .group_by(Course.courseName, Course.term)) 

353 

354 # initialize default dict to store term descriptions as keys mapping to each 

355 # engagement's respective type, name, id, and term. 

356 communityEngagementByTermDict = defaultdict(list) 

357 for course in courses: 

358 communityEngagementByTermDict[(course.term.description, course.term.id)].append( 

359 {"name":course.courseName, 

360 "id":course.id, 

361 "type":"course", 

362 "matched": course.matchedReq, 

363 "term":course.term.id}) 

364 

365 programMatchCase = Case(None, [(IndividualRequirement.program.is_null(True) , 0)], 1) 

366 

367 events = (Event.select(Event, Program, programMatchCase.alias('matchedReq')) 

368 .join(EventParticipant, on=(Event.id == EventParticipant.event)).switch() 

369 .join(Program) 

370 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.program == Program.id) & 

371 (IndividualRequirement.username == EventParticipant.user) & 

372 (IndividualRequirement.term == Event.term))) 

373 .where(EventParticipant.user == username, Event.isService == True) 

374 .group_by(Event.program, Event.term)) 

375 

376 for event in events: 

377 communityEngagementByTermDict[(event.term.description, event.term.id)].append({"name":event.program.programName, 

378 "id":event.program.id, 

379 "type":"program", 

380 "matched": event.matchedReq, 

381 "term":event.term.id 

382 }) 

383 

384 # sorting the communityEngagementByTermDict by the term id 

385 return dict(sorted(communityEngagementByTermDict.items(), key=lambda engagement: engagement[0][1])) 

386 

387def createOtherEngagementRequest(username, formData): 

388 """ 

389 Create a CCEMinorProposal entry based off of the form data 

390 """ 

391 user = User.get(User.username == username) 

392 

393 cceObject = CCEMinorProposal.create(proposalType = 'Other Engagement', 

394 createdBy = g.current_user, 

395 status = 'Pending', 

396 student = user, 

397 **formData 

398 ) 

399 

400 return cceObject 

401 

402def saveSummerExperience(username, summerExperience, currentUser): 

403 """ 

404 :param username: username of the student that the summer experience is for 

405 :param summerExperience: dict  

406 summerExperience: string of what the summer experience was (will be written as the 'description' in the IndividualRequirement table) 

407 selectedSummerTerm: the term description that the summer experience took place in 

408 :param currentUser: the username of the user who added the summer experience record 

409 

410 Delete any existing IndividualRequirement entry for 'username' if it is for 'Summer Program' and create a new IndividualRequirement entry for  

411 'Summer Program' with the contents of summerExperience.  

412 """ 

413 requirementDeleteSubSelect = CertificationRequirement.select().where(CertificationRequirement.certification == Certification.CCE, CertificationRequirement.name << ['Summer Program']) 

414 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.requirement == requirementDeleteSubSelect).execute() 

415 

416 requirement = (CertificationRequirement.select() 

417 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.requirement == CertificationRequirement.id) & 

418 (IndividualRequirement.username == username))) 

419 .where(IndividualRequirement.username.is_null(True), 

420 CertificationRequirement.certification == Certification.CCE, 

421 CertificationRequirement.name << ['Summer Program'])) 

422 

423 summerTerm = (Term.select().where(Term.description == summerExperience['selectedSummerTerm'])) 

424 

425 IndividualRequirement.create(**{"description": summerExperience['summerExperience'], 

426 "username": username, 

427 "term": summerTerm.get(), 

428 "requirement": requirement.get(), 

429 "addedBy": currentUser, 

430 }) 

431 

432 return "" 

433 

434def getSummerExperience(username): 

435 """ 

436 Get a students summer experience to populate text box if the student has one 

437 """ 

438 summerExperience = (IndividualRequirement.select() 

439 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(CertificationRequirement.id == IndividualRequirement.requirement)).switch() 

440 .join(Term, on=(IndividualRequirement.term == Term.id)) 

441 .where(IndividualRequirement.username == username, 

442 CertificationRequirement.certification == Certification.CCE, 

443 CertificationRequirement.name << ['Summer Program'])) 

444 if len(list(summerExperience)) == 1: 

445 return (summerExperience.get().term.description, summerExperience.get().description) 

446 

447 return (None, None) 

448 

449def removeSummerExperience(username): 

450 """ 

451 Delete IndividualRequirement table entry for 'username' 

452 """ 

453 term, summerExperienceToDelete = getSummerExperience(username) 

454 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.description == summerExperienceToDelete).execute() 

455 

456def removeProposal(proposalID) -> None: 

457 """ 

458 Delete summer experience or other engagement objects from the CCEMinorProposal table.  

459 File objects attached to the CCEMinorProposal object are also deleted.  

460 """ 

461 proposalID = int(proposalID) 

462 

463 proposalAttachment = AttachmentUpload.get_or_none(proposal=proposalID) 

464 if proposalAttachment: 

465 proposalFileHandler = FileHandler(proposalId=proposalID) 

466 proposalFileHandler.deleteFile(proposalAttachment.id) 

467 

468 CCEMinorProposal.delete().where(CCEMinorProposal.id == proposalID).execute()