Coverage for app/logic/minor.py: 66%

179 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-12-22 18:34 +0000

1from collections import defaultdict 

2from typing import List, Dict 

3from flask import flash, g 

4from playhouse.shortcuts import model_to_dict 

5from peewee import JOIN, fn, Case, DoesNotExist, SQL 

6import xlsxwriter 

7 

8from app.models import mainDB 

9from app.models.user import User 

10from app.models.term import Term 

11from app.models.event import Event 

12from app.models.course import Course 

13from app.models.program import Program 

14from app.models.certification import Certification 

15from app.models.courseInstructor import CourseInstructor 

16from app.models.eventParticipant import EventParticipant 

17from app.models.courseParticipant import CourseParticipant 

18from app.models.individualRequirement import IndividualRequirement 

19from app.models.certificationRequirement import CertificationRequirement 

20from app.models.cceMinorProposal import CCEMinorProposal 

21from app.logic.fileHandler import FileHandler 

22from app.logic.utils import getFilesFromRequest 

23from app.models.attachmentUpload import AttachmentUpload 

24 

25 

26def createSummerExperience(username, formData): 

27 """ 

28 Given the username of the student and the formData which includes all of 

29 the SummerExperience information, create a new SummerExperience object. 

30 """ 

31 user = User.get(User.username == username) 

32 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas 

33 formData = dict(formData) 

34 formData.pop("contentArea") 

35 return CCEMinorProposal.create( 

36 student=user, 

37 proposalType = 'Summer Experience', 

38 contentAreas = contentAreas, 

39 createdBy = g.current_user, 

40 **formData, 

41 ) 

42 

43def updateSummerExperience(proposalID, formData): 

44 """ 

45 Given the username of the student and the formData which includes all of 

46 the SummerExperience information, create a new SummerExperience object. 

47 """ 

48 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas 

49 formData = dict(formData) 

50 formData.pop("contentArea") 

51 formData.pop("experienceHoursOver300") 

52 CCEMinorProposal.update(contentAreas=contentAreas, **formData).where(CCEMinorProposal.id == proposalID).execute() 

53 

54def getCCEMinorProposals(username): 

55 return list(CCEMinorProposal.select().where(CCEMinorProposal.student==username)) 

56 

57def getEngagementTotal(engagementData): 

58 """  

59 Count the number of engagements (from all terms) that have matched with a requirement  

60 """ 

61 

62 # map the flattened list of engagements to their matched values, and sum them 

63 return sum(map(lambda e: e['matched'], sum(engagementData.values(),[]))) 

64 

65 

66def getMinorInterest() -> List[Dict]: 

67 """ 

68 Get all students that have indicated interest in the CCE minor and return a list of dicts of all interested students 

69 """ 

70 interestedStudents = (User.select(User) 

71 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username)) 

72 .where(User.isStudent & User.minorInterest & ~User.declaredMinor & IndividualRequirement.username.is_null(True))) 

73 

74 interestedStudentList = [model_to_dict(student) for student in interestedStudents] 

75 

76 return interestedStudentList 

77 

78def getMinorProgress(): 

79 """ 

80 Get all the users who have an IndividualRequirement record under the CCE certification which  

81 and returns a list of dicts containing the student, how many engagements they have completed,  

82 and if they have completed the summer experience.  

83 """ 

84 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0) 

85 

86 engagedStudentsWithCount = ( 

87 User.select(User, fn.COUNT(IndividualRequirement.id).alias('engagementCount'), 

88 fn.SUM(summerCase).alias('hasSummer'), 

89 fn.IF(fn.COUNT(CCEMinorProposal.id) > 0, True, False).alias('hasCCEMinorProposal')) 

90 .join(IndividualRequirement, on=(User.username == IndividualRequirement.username)) 

91 .join(CertificationRequirement, on=(IndividualRequirement.requirement_id == CertificationRequirement.id)) 

92 .switch(User).join(CCEMinorProposal, JOIN.LEFT_OUTER, on= (User.username == CCEMinorProposal.student)) 

93 .where(CertificationRequirement.certification_id == Certification.CCE) 

94 .group_by(User.firstName, User.lastName, User.username) 

95 .order_by(SQL("engagementCount").desc()) 

96 ) 

97 engagedStudentsList = [{'firstName': student.firstName, 

98 'lastName': student.lastName, 

99 'username': student.username, 

100 'B-Number': student.bnumber, 

101 'hasGraduated': student.hasGraduated, 

102 'engagementCount': student.engagementCount - student.hasSummer, 

103 'hasCCEMinorProposal': student.hasCCEMinorProposal, 

104 'hasSummer': "Completed" if student.hasSummer else "Incomplete"} for student in engagedStudentsWithCount] 

105 return engagedStudentsList 

106 

107def getMinorSpreadsheet(): 

108 """ 

109 Returns a spreadsheet containing users and related spreadsheet information. 

110 """ 

111 # If we're in 2025, can we get the minor information for 2023? 

112 studentProgress = getMinorProgress() 

113 columnNames = studentProgress[0] 

114 columnNames = ["First Name", "Last Name", "Username", "B-Number", "Number of Engagements", "Completed Summer Experience"] 

115 

116 filepath = f"{app.config['files']['base_path']}/minor_data.xlsx" 

117 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True}) 

118 

119 worksheet = workbook.add_worksheet('minor_information') 

120 format_row = workbook.add_format({'align': 'left'}) 

121 

122 columnIndex = 1 

123 worksheet.set_column(columnIndex, len(columnNames), 30, workbook.add_format({'bold': True})) 

124 for columnName in columnNames: 

125 worksheet.write(1, columnIndex, columnName) 

126 columnIndex += 1 

127 

128 for rowNumber, student in enumerate(studentProgress, 2): 

129 if student['hasGraduated']: continue 

130 student.pop('hasCCEMinorProposal') 

131 student.pop('hasGraduated') 

132 student['hasSummer'] = "Yes" if student['hasSummer'] == "Complete" else "No" 

133 worksheet.set_row(rowNumber, None, format_row) 

134 if student['B-Number'] == None: student["B-Number"] = "No B-Number Found" 

135 for columnNumber, key in enumerate(student, 1): 

136 worksheet.write(rowNumber, columnNumber, student[key]) 

137 

138 

139 workbook.close() 

140 

141 return filepath 

142 

143 

144def toggleMinorInterest(username, isAdding): 

145 """ 

146 Given a username, update their minor interest and minor status. 

147 """ 

148 

149 try: 

150 user = User.get(username=username) 

151 if not user: 

152 return {"error": "User not found"}, 404 

153 

154 user.minorInterest = isAdding 

155 user.declaredMinor = False 

156 user.save() 

157 

158 except Exception as e: 

159 print(f"Error updating minor interest: {e}") 

160 return {"error": str(e)}, 500 

161 

162def declareMinorInterest(username): 

163 """ 

164 Given a username, update their minor declaration 

165 """ 

166 user = User.get_by_id(username) 

167 

168 if not user: 

169 raise ValueError(f"User with username '{username}' not found.") 

170 

171 user.declaredMinor = not user.declaredMinor 

172 

173 try: 

174 user.save() 

175 except Exception as e: 

176 raise RuntimeError(f"Failed to declare interested student: {e}") 

177 

178def getDeclaredMinorStudents(): 

179 """ 

180 Get a list of the students who have declared minor 

181 """ 

182 declaredStudents = User.select().where(User.isStudent & User.declaredMinor) 

183 

184 interestedStudentList = [model_to_dict(student) for student in declaredStudents] 

185 

186 return interestedStudentList 

187 

188def getCourseInformation(id): 

189 """ 

190 Given a course ID, return an object containing the course information and  

191 its instructors full names. 

192 """ 

193 # retrieve the course and the course instructors 

194 course = model_to_dict(Course.get_by_id(id)) 

195 

196 courseInstructors = (CourseInstructor.select(CourseInstructor, User) 

197 .join(Course).switch() 

198 .join(User) 

199 .where(Course.id == id)) 

200 

201 courseInformation = {"instructors": [(instructor.user.firstName + " " + instructor.user.lastName) for instructor in courseInstructors], "course": course} 

202 

203 return courseInformation 

204 

205def getProgramEngagementHistory(program_id, username, term_id): 

206 """ 

207 Given a program_id, username, and term_id, return an object containing all events in the provided program  

208 and in the given term along with the program name. 

209 """ 

210 # execute a query that will retrieve all events in which the user has participated 

211 # that fall under the provided term and programs. 

212 eventsInProgramAndTerm = (Event.select(Event.id, Event.name, EventParticipant.hoursEarned) 

213 .join(Program).switch() 

214 .join(EventParticipant) 

215 .where(EventParticipant.user == username, 

216 Event.term == term_id, 

217 Event.isService == True, 

218 Program.id == program_id) 

219 ) 

220 

221 program = Program.get_by_id(program_id) 

222 

223 # calculate total amount of hours for the whole program that term 

224 totalHours = 0 

225 for event in eventsInProgramAndTerm: 

226 if event.eventparticipant.hoursEarned: 

227 totalHours += event.eventparticipant.hoursEarned 

228 

229 participatedEvents = {"program":program.programName, "events": [event for event in eventsInProgramAndTerm.dicts()], "totalHours": totalHours} 

230 

231 return participatedEvents 

232 

233def setCommunityEngagementForUser(action, engagementData, currentUser): 

234 """ 

235 Either add or remove an IndividualRequirement record for a student's Sustained Community Engagement 

236 

237 :param action: The behavior of the function. Can be 'add' or 'remove' 

238 :param engagementData: 

239 type: program or course 

240 id: program or course id 

241 username: the username of the student that is having a community engagement added or removed 

242 term: The term the engagement is recorded in 

243 :param currentuser: The user who is performing the add/remove action  

244 

245 :raises DoesNotExist: if there are no available CertificationRequirement slots remaining for the engagement 

246 """ 

247 if engagementData['type'] not in ['program','course']: 

248 raise Exception("Invalid engagement type!") 

249 

250 requirement = (CertificationRequirement.select() 

251 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

252 (IndividualRequirement.requirement == CertificationRequirement.id) & 

253 (IndividualRequirement.username == engagementData['username']))) 

254 .where(IndividualRequirement.username.is_null(True), 

255 CertificationRequirement.certification == Certification.CCE, 

256 CertificationRequirement.name.not_in(['Summer Program']))) 

257 if action == 'add': 

258 try: 

259 IndividualRequirement.create(**{engagementData['type']: engagementData['id'], 

260 "username": engagementData['username'], 

261 "term": engagementData['term'], 

262 "requirement": requirement.get(), 

263 "addedBy": currentUser, 

264 }) 

265 # Thrown if there are no available engagement requirements left. Handled elsewhere. 

266 except DoesNotExist as e: 

267 raise e 

268 

269 elif action == 'remove': 

270 IndividualRequirement.delete().where( 

271 getattr(IndividualRequirement, engagementData['type']) == engagementData['id'], 

272 IndividualRequirement.username == engagementData['username'], 

273 IndividualRequirement.term == engagementData['term'] 

274 ).execute() 

275 else: 

276 raise Exception(f"Invalid action '{action}' sent to setCommunityEngagementForUser") 

277 

278def getCommunityEngagementByTerm(username): 

279 """ 

280 Given a username, return all of their community engagements (service learning courses and event participations.) 

281 """ 

282 courseMatchCase = Case(None, [(IndividualRequirement.course.is_null(True) , 0)], 1) 

283 

284 courses = (Course.select(Course, courseMatchCase.alias("matchedReq")) 

285 .join(CourseParticipant, on=(Course.id == CourseParticipant.course)) 

286 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

287 (IndividualRequirement.course == Course.id) & 

288 (IndividualRequirement.username == CourseParticipant.user) & 

289 (IndividualRequirement.term == Course.term))) 

290 .where(CourseParticipant.user == username) 

291 .group_by(Course.courseName, Course.term)) 

292 

293 # initialize default dict to store term descriptions as keys mapping to each 

294 # engagement's respective type, name, id, and term. 

295 communityEngagementByTermDict = defaultdict(list) 

296 for course in courses: 

297 communityEngagementByTermDict[(course.term.description, course.term.id)].append( 

298 {"name":course.courseName, 

299 "id":course.id, 

300 "type":"course", 

301 "matched": course.matchedReq, 

302 "term":course.term.id}) 

303 

304 programMatchCase = Case(None, [(IndividualRequirement.program.is_null(True) , 0)], 1) 

305 

306 events = (Event.select(Event, Program, programMatchCase.alias('matchedReq')) 

307 .join(EventParticipant, on=(Event.id == EventParticipant.event)).switch() 

308 .join(Program) 

309 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.program == Program.id) & 

310 (IndividualRequirement.username == EventParticipant.user) & 

311 (IndividualRequirement.term == Event.term))) 

312 .where(EventParticipant.user == username, Event.isService == True) 

313 .group_by(Event.program, Event.term)) 

314 

315 for event in events: 

316 communityEngagementByTermDict[(event.term.description, event.term.id)].append({"name":event.program.programName, 

317 "id":event.program.id, 

318 "type":"program", 

319 "matched": event.matchedReq, 

320 "term":event.term.id 

321 }) 

322 

323 # sorting the communityEngagementByTermDict by the term id 

324 return dict(sorted(communityEngagementByTermDict.items(), key=lambda engagement: engagement[0][1])) 

325 

326def createOtherEngagement(username, request): 

327 """ 

328 Create a CCEMinorProposal entry based off of the form data 

329 """ 

330 user = User.get(User.username == username) 

331 

332 createdProposal = CCEMinorProposal.create(proposalType = 'Other Engagement', 

333 createdBy = g.current_user, 

334 student = user, 

335 **request.form 

336 ) 

337 proposalObject = CCEMinorProposal.get_by_id(createdProposal) 

338 attachment = request.files.get("attachmentObject") 

339 if attachment: 

340 addFile = FileHandler(getFilesFromRequest(request), proposalId=createdProposal.id) 

341 addFile.saveFiles(parentEvent=proposalObject) 

342 

343def updateOtherEngagementRequest(proposalID, request): 

344 attachment = request.files.get("attachmentObject") 

345 

346 with mainDB.atomic(): 

347 # Get proposal safely 

348 proposalObject = CCEMinorProposal.get_by_id(proposalID) 

349 

350 # ---- HANDLE ATTACHMENT SAFELY ---- 

351 if attachment: 

352 existingAttachment = ( 

353 AttachmentUpload 

354 .select() 

355 .where(AttachmentUpload.proposal == proposalID) 

356 .first() 

357 ) 

358 

359 if existingAttachment: 

360 deleteFile = FileHandler(proposalId=proposalID) 

361 deleteFile.deleteFile(existingAttachment.id) 

362 

363 addFile = FileHandler( 

364 getFilesFromRequest(request), 

365 proposalId=proposalID 

366 ) 

367 addFile.saveFiles(parentEvent=proposalObject) 

368 

369 # ---- CLEAN FORM DATA ---- 

370 update_data = dict(request.form) 

371 

372 # remove fields that are not DB columns 

373 update_data.pop("contentArea", None) 

374 update_data.pop("attachmentObject", None) 

375 

376 # ---- UPDATE PROPOSAL ---- 

377 ( 

378 CCEMinorProposal 

379 .update(**update_data) 

380 .where(CCEMinorProposal.id == proposalID) 

381 .execute() 

382 ) 

383 

384def saveSummerExperience(username, summerExperience, currentUser): 

385 """ 

386 :param username: username of the student that the summer experience is for 

387 :param summerExperience: dict  

388 summerExperience: string of what the summer experience was (will be written as the 'description' in the IndividualRequirement table) 

389 selectedSummerTerm: the term description that the summer experience took place in 

390 :param currentUser: the username of the user who added the summer experience record 

391 

392 Delete any existing IndividualRequirement entry for 'username' if it is for 'Summer Program' and create a new IndividualRequirement entry for  

393 'Summer Program' with the contents of summerExperience.  

394 """ 

395 requirementDeleteSubSelect = CertificationRequirement.select().where(CertificationRequirement.certification == Certification.CCE, CertificationRequirement.name << ['Summer Program']) 

396 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.requirement == requirementDeleteSubSelect).execute() 

397 

398 requirement = (CertificationRequirement.select() 

399 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.requirement == CertificationRequirement.id) & 

400 (IndividualRequirement.username == username))) 

401 .where(IndividualRequirement.username.is_null(True), 

402 CertificationRequirement.certification == Certification.CCE, 

403 CertificationRequirement.name << ['Summer Program'])) 

404 

405 summerTerm = (Term.select().where(Term.description == summerExperience['selectedSummerTerm'])) 

406 

407 IndividualRequirement.create(**{"description": summerExperience['summerExperience'], 

408 "username": username, 

409 "term": summerTerm.get(), 

410 "requirement": requirement.get(), 

411 "addedBy": currentUser, 

412 }) 

413 return "" 

414 

415def getSummerExperience(username): 

416 """ 

417 Get a students summer experience to populate text box if the student has one 

418 """ 

419 summerExperience = (IndividualRequirement.select() 

420 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(CertificationRequirement.id == IndividualRequirement.requirement)).switch() 

421 .join(Term, on=(IndividualRequirement.term == Term.id)) 

422 .where(IndividualRequirement.username == username, 

423 CertificationRequirement.certification == Certification.CCE, 

424 CertificationRequirement.name << ['Summer Program'])) 

425 if len(list(summerExperience)) == 1: 

426 return (summerExperience.get().term.description, summerExperience.get().description) 

427 

428 return (None, None) 

429 

430def removeSummerExperience(username): 

431 """ 

432 Delete IndividualRequirement table entry for 'username' 

433 """ 

434 term, summerExperienceToDelete = getSummerExperience(username) 

435 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.description == summerExperienceToDelete).execute() 

436 

437def removeProposal(proposalID) -> None: 

438 """ 

439 Delete summer experience or other engagement objects from the CCEMinorProposal table.  

440 File objects attached to the CCEMinorProposal object are also deleted.  

441 """ 

442 proposalID = int(proposalID) 

443 

444 proposalAttachment = AttachmentUpload.get_or_none(proposal=proposalID) 

445 if proposalAttachment: 

446 proposalFileHandler = FileHandler(proposalId=proposalID) 

447 proposalFileHandler.deleteFile(proposalAttachment.id) 

448 

449 CCEMinorProposal.delete().where(CCEMinorProposal.id == proposalID).execute() 

450 

451def changeProposalStatus(proposalID, newStatus) -> None: 

452 """ 

453 Changes the status of a proposal. 

454 """ 

455 CCEMinorProposal.update(status=newStatus).where(CCEMinorProposal.id == int(proposalID)).execute()