Coverage for app/logic/minor.py: 65%

167 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2026-01-22 18:12 +0000

1from collections import defaultdict 

2from typing import List, Dict 

3from flask import flash, g 

4from playhouse.shortcuts import model_to_dict 

5from peewee import JOIN, fn, Case, DoesNotExist, SQL 

6import xlsxwriter 

7 

8from app import app 

9from app.models.user import User 

10from app.models.term import Term 

11from app.models.event import Event 

12from app.models.course import Course 

13from app.models.program import Program 

14from app.models.certification import Certification 

15from app.models.courseInstructor import CourseInstructor 

16from app.models.eventParticipant import EventParticipant 

17from app.models.courseParticipant import CourseParticipant 

18from app.models.individualRequirement import IndividualRequirement 

19from app.models.certificationRequirement import CertificationRequirement 

20from app.models.cceMinorProposal import CCEMinorProposal 

21from app.logic.createLogs import createActivityLog 

22from app.logic.fileHandler import FileHandler 

23from app.logic.serviceLearningCourses import deleteCourseObject 

24from app.models.attachmentUpload import AttachmentUpload 

25 

26 

27def createSummerExperience(username, formData): 

28 """ 

29 Given the username of the student and the formData which includes all of 

30 the SummerExperience information, create a new SummerExperience object. 

31 """ 

32 try: 

33 user = User.get(User.username == username) 

34 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas 

35 CCEMinorProposal.create( 

36 student=user, 

37 proposalType = 'Summer Experience', 

38 contentAreas = contentAreas, 

39 status="Pending", 

40 createdBy = g.current_user, 

41 **formData, 

42 ) 

43 except Exception as e: 

44 print(f"Error saving summer experience: {e}") 

45 raise e 

46 

47def getCCEMinorProposals(username): 

48 proposalList = [] 

49 

50 cceMinorProposals = list(CCEMinorProposal.select().where(CCEMinorProposal.student==username)) 

51 

52 for experience in cceMinorProposals: 

53 proposalList.append({ 

54 "id": experience.id, 

55 "type": experience.proposalType, 

56 "createdBy": experience.createdBy, 

57 "supervisor": experience.supervisorName, 

58 "term": experience.term, 

59 "status": experience.status, 

60 }) 

61 

62 return proposalList 

63 

64def getEngagementTotal(engagementData): 

65 """  

66 Count the number of engagements (from all terms) that have matched with a requirement  

67 """ 

68 

69 # map the flattened list of engagements to their matched values, and sum them 

70 return sum(map(lambda e: e['matched'], sum(engagementData.values(),[]))) 

71 

72 

73def getMinorInterest() -> List[Dict]: 

74 """ 

75 Get all students that have indicated interest in the CCE minor and return a list of dicts of all interested students 

76 """ 

77 interestedStudents = (User.select(User) 

78 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username)) 

79 .where(User.isStudent & User.minorInterest & ~User.declaredMinor & IndividualRequirement.username.is_null(True))) 

80 

81 interestedStudentList = [model_to_dict(student) for student in interestedStudents] 

82 

83 return interestedStudentList 

84 

85def getMinorProgress(): 

86 """ 

87 Get all the users who have an IndividualRequirement record under the CCE certification which  

88 and returns a list of dicts containing the student, how many engagements they have completed,  

89 and if they have completed the summer experience.  

90 """ 

91 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0) 

92 

93 engagedStudentsWithCount = ( 

94 User.select(User, fn.COUNT(IndividualRequirement.id).alias('engagementCount'), 

95 fn.SUM(summerCase).alias('hasSummer'), 

96 fn.IF(fn.COUNT(CCEMinorProposal.id) > 0, True, False).alias('hasCCEMinorProposal')) 

97 .join(IndividualRequirement, on=(User.username == IndividualRequirement.username)) 

98 .join(CertificationRequirement, on=(IndividualRequirement.requirement_id == CertificationRequirement.id)) 

99 .switch(User).join(CCEMinorProposal, JOIN.LEFT_OUTER, on= (User.username == CCEMinorProposal.student)) 

100 .where(CertificationRequirement.certification_id == Certification.CCE) 

101 .group_by(User.firstName, User.lastName, User.username) 

102 .order_by(SQL("engagementCount").desc()) 

103 ) 

104 engagedStudentsList = [{'firstName': student.firstName, 

105 'lastName': student.lastName, 

106 'username': student.username, 

107 'B-Number': student.bnumber, 

108 'hasGraduated': student.hasGraduated, 

109 'engagementCount': student.engagementCount - student.hasSummer, 

110 'hasCCEMinorProposal': student.hasCCEMinorProposal, 

111 'hasSummer': "Completed" if student.hasSummer else "Incomplete"} for student in engagedStudentsWithCount] 

112 return engagedStudentsList 

113 

114def getMinorSpreadsheet(): 

115 """ 

116 Returns a spreadsheet containing users and related spreadsheet information. 

117 """ 

118 # If we're in 2025, can we get the minor information for 2023? 

119 studentProgress = getMinorProgress() 

120 columnNames = studentProgress[0] 

121 columnNames = ["First Name", "Last Name", "Username", "B-Number", "Number of Engagements", "Completed Summer Experience"] 

122 

123 filepath = f"{app.config['files']['base_path']}/minor_data.xlsx" 

124 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True}) 

125 

126 worksheet = workbook.add_worksheet('minor_information') 

127 format_row = workbook.add_format({'align': 'left'}) 

128 

129 columnIndex = 1 

130 worksheet.set_column(columnIndex, len(columnNames), 30, workbook.add_format({'bold': True})) 

131 for columnName in columnNames: 

132 worksheet.write(1, columnIndex, columnName) 

133 columnIndex += 1 

134 

135 for rowNumber, student in enumerate(studentProgress, 2): 

136 if student['hasGraduated']: continue 

137 student.pop('hasCCEMinorProposal') 

138 student.pop('hasGraduated') 

139 student['hasSummer'] = "Yes" if student['hasSummer'] == "Complete" else "No" 

140 worksheet.set_row(rowNumber, None, format_row) 

141 if student['B-Number'] == None: student["B-Number"] = "No B-Number Found" 

142 for columnNumber, key in enumerate(student, 1): 

143 worksheet.write(rowNumber, columnNumber, student[key]) 

144 

145 

146 workbook.close() 

147 

148 return filepath 

149 

150 

151def toggleMinorInterest(username, isAdding): 

152 """ 

153 Given a username, update their minor interest and minor status. 

154 """ 

155 

156 try: 

157 user = User.get(username=username) 

158 if not user: 

159 return {"error": "User not found"}, 404 

160 

161 user.minorInterest = isAdding 

162 user.declaredMinor = False 

163 user.save() 

164 

165 except Exception as e: 

166 print(f"Error updating minor interest: {e}") 

167 return {"error": str(e)}, 500 

168 

169def declareMinorInterest(username): 

170 """ 

171 Given a username, update their minor declaration 

172 """ 

173 user = User.get_by_id(username) 

174 

175 if not user: 

176 raise ValueError(f"User with username '{username}' not found.") 

177 

178 user.declaredMinor = not user.declaredMinor 

179 

180 try: 

181 user.save() 

182 except Exception as e: 

183 raise RuntimeError(f"Failed to declare interested student: {e}") 

184 

185def getDeclaredMinorStudentsWithProgress(): 

186 """ 

187 This function retrieves a list of students who have declared the CCE minor along with their engagement progress. 

188 It returns a list of dictionaries containing student information and their engagement details. 

189 """ 

190 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0) 

191 

192 q = ( 

193 User 

194 .select( 

195 User, 

196 fn.COUNT(fn.DISTINCT(IndividualRequirement.id)).alias("rawEngagementCount"), 

197 fn.COALESCE(fn.SUM(fn.DISTINCT(summerCase)), 0).alias("summerCount"), 

198 fn.IF(fn.COUNT(fn.DISTINCT(CCEMinorProposal.id)) > 0, True, False).alias("hasCCEMinorProposal"), 

199 ) 

200 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username)) 

201 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(IndividualRequirement.requirement_id == CertificationRequirement.id)) 

202 .switch(User) 

203 .join(CCEMinorProposal, JOIN.LEFT_OUTER, on=(User.username == CCEMinorProposal.student)) 

204 .where( 

205 (User.isStudent == True) & 

206 (User.declaredMinor == True) & 

207 ( 

208 (CertificationRequirement.certification_id == Certification.CCE) | 

209 (CertificationRequirement.id.is_null(True)) 

210 ) 

211 ) 

212 .group_by(User.firstName, User.lastName, User.username) 

213 .order_by(SQL("rawEngagementCount").desc()) 

214 ) 

215 

216 result = [] 

217 for s in q: 

218 

219 engagementCount = int(s.rawEngagementCount) - int(s.summerCount or 0) 

220 

221 result.append({ 

222 "firstName": s.firstName, 

223 "lastName": s.lastName, 

224 "username": s.username, 

225 "B-Number": s.bnumber, 

226 "email": s.email, 

227 "hasGraduated": s.hasGraduated, 

228 "engagementCount": engagementCount, 

229 "hasCCEMinorProposal": bool(s.hasCCEMinorProposal), 

230 "hasSummer": "Completed" if (s.summerCount and int(s.summerCount) > 0) else "Incomplete", 

231 }) 

232 

233 return result 

234 

235def getDeclaredMinorStudents(): 

236 """ 

237 Get a list of the students who have declared minor 

238 """ 

239 declaredStudents = User.select().where(User.isStudent & User.declaredMinor) 

240 

241 interestedStudentList = [model_to_dict(student) for student in declaredStudents] 

242 

243 return interestedStudentList 

244 

245def getCourseInformation(id): 

246 """ 

247 Given a course ID, return an object containing the course information and  

248 its instructors full names. 

249 """ 

250 # retrieve the course and the course instructors 

251 course = model_to_dict(Course.get_by_id(id)) 

252 

253 courseInstructors = (CourseInstructor.select(CourseInstructor, User) 

254 .join(Course).switch() 

255 .join(User) 

256 .where(Course.id == id)) 

257 

258 courseInformation = {"instructors": [(instructor.user.firstName + " " + instructor.user.lastName) for instructor in courseInstructors], "course": course} 

259 

260 return courseInformation 

261 

262def getProgramEngagementHistory(program_id, username, term_id): 

263 """ 

264 Given a program_id, username, and term_id, return an object containing all events in the provided program  

265 and in the given term along with the program name. 

266 """ 

267 # execute a query that will retrieve all events in which the user has participated 

268 # that fall under the provided term and programs. 

269 eventsInProgramAndTerm = (Event.select(Event.id, Event.name, EventParticipant.hoursEarned) 

270 .join(Program).switch() 

271 .join(EventParticipant) 

272 .where(EventParticipant.user == username, 

273 Event.term == term_id, 

274 Event.isService == True, 

275 Program.id == program_id) 

276 ) 

277 

278 program = Program.get_by_id(program_id) 

279 

280 # calculate total amount of hours for the whole program that term 

281 totalHours = 0 

282 for event in eventsInProgramAndTerm: 

283 if event.eventparticipant.hoursEarned: 

284 totalHours += event.eventparticipant.hoursEarned 

285 

286 participatedEvents = {"program":program.programName, "events": [event for event in eventsInProgramAndTerm.dicts()], "totalHours": totalHours} 

287 

288 return participatedEvents 

289 

290def setCommunityEngagementForUser(action, engagementData, currentUser): 

291 """ 

292 Either add or remove an IndividualRequirement record for a student's Sustained Community Engagement 

293 

294 :param action: The behavior of the function. Can be 'add' or 'remove' 

295 :param engagementData: 

296 type: program or course 

297 id: program or course id 

298 username: the username of the student that is having a community engagement added or removed 

299 term: The term the engagement is recorded in 

300 :param currentuser: The user who is performing the add/remove action  

301 

302 :raises DoesNotExist: if there are no available CertificationRequirement slots remaining for the engagement 

303 """ 

304 if engagementData['type'] not in ['program','course']: 

305 raise Exception("Invalid engagement type!") 

306 

307 requirement = (CertificationRequirement.select() 

308 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

309 (IndividualRequirement.requirement == CertificationRequirement.id) & 

310 (IndividualRequirement.username == engagementData['username']))) 

311 .where(IndividualRequirement.username.is_null(True), 

312 CertificationRequirement.certification == Certification.CCE, 

313 CertificationRequirement.name.not_in(['Summer Program']))) 

314 if action == 'add': 

315 try: 

316 IndividualRequirement.create(**{engagementData['type']: engagementData['id'], 

317 "username": engagementData['username'], 

318 "term": engagementData['term'], 

319 "requirement": requirement.get(), 

320 "addedBy": currentUser, 

321 }) 

322 # Thrown if there are no available engagement requirements left. Handled elsewhere. 

323 except DoesNotExist as e: 

324 raise e 

325 

326 elif action == 'remove': 

327 IndividualRequirement.delete().where( 

328 getattr(IndividualRequirement, engagementData['type']) == engagementData['id'], 

329 IndividualRequirement.username == engagementData['username'], 

330 IndividualRequirement.term == engagementData['term'] 

331 ).execute() 

332 else: 

333 raise Exception(f"Invalid action '{action}' sent to setCommunityEngagementForUser") 

334 

335def getCommunityEngagementByTerm(username): 

336 """ 

337 Given a username, return all of their community engagements (service learning courses and event participations.) 

338 """ 

339 courseMatchCase = Case(None, [(IndividualRequirement.course.is_null(True) , 0)], 1) 

340 

341 courses = (Course.select(Course, courseMatchCase.alias("matchedReq")) 

342 .join(CourseParticipant, on=(Course.id == CourseParticipant.course)) 

343 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

344 (IndividualRequirement.course == Course.id) & 

345 (IndividualRequirement.username == CourseParticipant.user) & 

346 (IndividualRequirement.term == Course.term))) 

347 .where(CourseParticipant.user == username) 

348 .group_by(Course.courseName, Course.term)) 

349 

350 # initialize default dict to store term descriptions as keys mapping to each 

351 # engagement's respective type, name, id, and term. 

352 communityEngagementByTermDict = defaultdict(list) 

353 for course in courses: 

354 communityEngagementByTermDict[(course.term.description, course.term.id)].append( 

355 {"name":course.courseName, 

356 "id":course.id, 

357 "type":"course", 

358 "matched": course.matchedReq, 

359 "term":course.term.id}) 

360 

361 programMatchCase = Case(None, [(IndividualRequirement.program.is_null(True) , 0)], 1) 

362 

363 events = (Event.select(Event, Program, programMatchCase.alias('matchedReq')) 

364 .join(EventParticipant, on=(Event.id == EventParticipant.event)).switch() 

365 .join(Program) 

366 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.program == Program.id) & 

367 (IndividualRequirement.username == EventParticipant.user) & 

368 (IndividualRequirement.term == Event.term))) 

369 .where(EventParticipant.user == username, Event.isService == True) 

370 .group_by(Event.program, Event.term)) 

371 

372 for event in events: 

373 communityEngagementByTermDict[(event.term.description, event.term.id)].append({"name":event.program.programName, 

374 "id":event.program.id, 

375 "type":"program", 

376 "matched": event.matchedReq, 

377 "term":event.term.id 

378 }) 

379 

380 # sorting the communityEngagementByTermDict by the term id 

381 return dict(sorted(communityEngagementByTermDict.items(), key=lambda engagement: engagement[0][1])) 

382 

383def createOtherEngagementRequest(username, formData): 

384 """ 

385 Create a CCEMinorProposal entry based off of the form data 

386 """ 

387 user = User.get(User.username == username) 

388 

389 cceObject = CCEMinorProposal.create(proposalType = 'Other Engagement', 

390 createdBy = g.current_user, 

391 status = 'Pending', 

392 student = user, 

393 **formData 

394 ) 

395 

396 return cceObject 

397 

398def saveSummerExperience(username, summerExperience, currentUser): 

399 """ 

400 :param username: username of the student that the summer experience is for 

401 :param summerExperience: dict  

402 summerExperience: string of what the summer experience was (will be written as the 'description' in the IndividualRequirement table) 

403 selectedSummerTerm: the term description that the summer experience took place in 

404 :param currentUser: the username of the user who added the summer experience record 

405 

406 Delete any existing IndividualRequirement entry for 'username' if it is for 'Summer Program' and create a new IndividualRequirement entry for  

407 'Summer Program' with the contents of summerExperience.  

408 """ 

409 requirementDeleteSubSelect = CertificationRequirement.select().where(CertificationRequirement.certification == Certification.CCE, CertificationRequirement.name << ['Summer Program']) 

410 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.requirement == requirementDeleteSubSelect).execute() 

411 

412 requirement = (CertificationRequirement.select() 

413 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.requirement == CertificationRequirement.id) & 

414 (IndividualRequirement.username == username))) 

415 .where(IndividualRequirement.username.is_null(True), 

416 CertificationRequirement.certification == Certification.CCE, 

417 CertificationRequirement.name << ['Summer Program'])) 

418 

419 summerTerm = (Term.select().where(Term.description == summerExperience['selectedSummerTerm'])) 

420 

421 IndividualRequirement.create(**{"description": summerExperience['summerExperience'], 

422 "username": username, 

423 "term": summerTerm.get(), 

424 "requirement": requirement.get(), 

425 "addedBy": currentUser, 

426 }) 

427 return "" 

428 

429def getSummerExperience(username): 

430 """ 

431 Get a students summer experience to populate text box if the student has one 

432 """ 

433 summerExperience = (IndividualRequirement.select() 

434 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(CertificationRequirement.id == IndividualRequirement.requirement)).switch() 

435 .join(Term, on=(IndividualRequirement.term == Term.id)) 

436 .where(IndividualRequirement.username == username, 

437 CertificationRequirement.certification == Certification.CCE, 

438 CertificationRequirement.name << ['Summer Program'])) 

439 if len(list(summerExperience)) == 1: 

440 return (summerExperience.get().term.description, summerExperience.get().description) 

441 

442 return (None, None) 

443 

444def removeSummerExperience(username): 

445 """ 

446 Delete IndividualRequirement table entry for 'username' 

447 """ 

448 term, summerExperienceToDelete = getSummerExperience(username) 

449 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.description == summerExperienceToDelete).execute() 

450 

451def removeProposal(proposalID) -> None: 

452 """ 

453 Delete summer experience or other engagement objects from the CCEMinorProposal table.  

454 File objects attached to the CCEMinorProposal object are also deleted.  

455 """ 

456 proposalID = int(proposalID) 

457 

458 proposalAttachment = AttachmentUpload.get_or_none(proposal=proposalID) 

459 if proposalAttachment: 

460 proposalFileHandler = FileHandler(proposalId=proposalID) 

461 proposalFileHandler.deleteFile(proposalAttachment.id) 

462 

463 CCEMinorProposal.delete().where(CCEMinorProposal.id == proposalID).execute()