Coverage for app/logic/minor.py: 65%

159 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2025-07-22 20:03 +0000

1from collections import defaultdict 

2from typing import List, Dict 

3from flask import flash, g 

4from playhouse.shortcuts import model_to_dict 

5from peewee import JOIN, fn, Case, DoesNotExist, SQL 

6import xlsxwriter 

7 

8from app import app 

9from app.models.user import User 

10from app.models.term import Term 

11from app.models.event import Event 

12from app.models.course import Course 

13from app.models.program import Program 

14from app.models.certification import Certification 

15from app.models.courseInstructor import CourseInstructor 

16from app.models.eventParticipant import EventParticipant 

17from app.models.courseParticipant import CourseParticipant 

18from app.models.individualRequirement import IndividualRequirement 

19from app.models.certificationRequirement import CertificationRequirement 

20from app.models.cceMinorProposal import CCEMinorProposal 

21from app.logic.createLogs import createActivityLog 

22from app.logic.fileHandler import FileHandler 

23from app.logic.serviceLearningCourses import deleteCourseObject 

24from app.models.attachmentUpload import AttachmentUpload 

25 

26 

27def createSummerExperience(username, formData): 

28 """ 

29 Given the username of the student and the formData which includes all of 

30 the SummerExperience information, create a new SummerExperience object. 

31 """ 

32 try: 

33 user = User.get(User.username == username) 

34 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas 

35 CCEMinorProposal.create( 

36 student=user, 

37 proposalType = 'Summer Experience', 

38 contentAreas = contentAreas, 

39 status="Pending", 

40 createdBy = g.current_user, 

41 **formData, 

42 ) 

43 except Exception as e: 

44 print(f"Error saving summer experience: {e}") 

45 raise e 

46 

47def getCCEMinorProposals(username): 

48 proposalList = [] 

49 

50 cceMinorProposals = list(CCEMinorProposal.select().where(CCEMinorProposal.student==username)) 

51 

52 for experience in cceMinorProposals: 

53 proposalList.append({ 

54 "id": experience.id, 

55 "type": experience.proposalType, 

56 "createdBy": experience.createdBy, 

57 "supervisor": experience.supervisorName, 

58 "term": experience.term, 

59 "status": experience.status, 

60 }) 

61 

62 return proposalList 

63 

64def getEngagementTotal(engagementData): 

65 """  

66 Count the number of engagements (from all terms) that have matched with a requirement  

67 """ 

68 

69 # map the flattened list of engagements to their matched values, and sum them 

70 return sum(map(lambda e: e['matched'], sum(engagementData.values(),[]))) 

71 

72 

73def getMinorInterest() -> List[Dict]: 

74 """ 

75 Get all students that have indicated interest in the CCE minor and return a list of dicts of all interested students 

76 """ 

77 interestedStudents = (User.select(User) 

78 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username)) 

79 .where(User.isStudent & User.minorInterest & ~User.declaredMinor & IndividualRequirement.username.is_null(True))) 

80 

81 interestedStudentList = [model_to_dict(student) for student in interestedStudents] 

82 

83 return interestedStudentList 

84 

85def getMinorProgress(): 

86 """ 

87 Get all the users who have an IndividualRequirement record under the CCE certification which  

88 and returns a list of dicts containing the student, how many engagements they have completed,  

89 and if they have completed the summer experience.  

90 """ 

91 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0) 

92 

93 engagedStudentsWithCount = ( 

94 User.select(User, fn.COUNT(IndividualRequirement.id).alias('engagementCount'), 

95 fn.SUM(summerCase).alias('hasSummer'), 

96 fn.IF(fn.COUNT(CCEMinorProposal.id) > 0, True, False).alias('hasCCEMinorProposal')) 

97 .join(IndividualRequirement, on=(User.username == IndividualRequirement.username)) 

98 .join(CertificationRequirement, on=(IndividualRequirement.requirement_id == CertificationRequirement.id)) 

99 .switch(User).join(CCEMinorProposal, JOIN.LEFT_OUTER, on= (User.username == CCEMinorProposal.student)) 

100 .where(CertificationRequirement.certification_id == Certification.CCE) 

101 .group_by(User.firstName, User.lastName, User.username) 

102 .order_by(SQL("engagementCount").desc()) 

103 ) 

104 engagedStudentsList = [{'firstName': student.firstName, 

105 'lastName': student.lastName, 

106 'username': student.username, 

107 'B-Number': student.bnumber, 

108 'hasGraduated': student.hasGraduated, 

109 'engagementCount': student.engagementCount - student.hasSummer, 

110 'hasCCEMinorProposal': student.hasCCEMinorProposal, 

111 'hasSummer': "Completed" if student.hasSummer else "Incomplete"} for student in engagedStudentsWithCount] 

112 return engagedStudentsList 

113 

114def getMinorSpreadsheet(): 

115 """ 

116 Returns a spreadsheet containing users and related spreadsheet information. 

117 """ 

118 # If we're in 2025, can we get the minor information for 2023? 

119 studentProgress = getMinorProgress() 

120 columnNames = studentProgress[0] 

121 columnNames = ["First Name", "Last Name", "Username", "B-Number", "Number of Engagements", "Completed Summer Experience"] 

122 

123 filepath = f"{app.config['files']['base_path']}/minor_data.xlsx" 

124 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True}) 

125 

126 worksheet = workbook.add_worksheet('minor_information') 

127 format_row = workbook.add_format({'align': 'left'}) 

128 

129 columnIndex = 1 

130 worksheet.set_column(columnIndex, len(columnNames), 30, workbook.add_format({'bold': True})) 

131 for columnName in columnNames: 

132 worksheet.write(1, columnIndex, columnName) 

133 columnIndex += 1 

134 

135 for rowNumber, student in enumerate(studentProgress, 2): 

136 if student['hasGraduated']: continue 

137 student.pop('hasCCEMinorProposal') 

138 student.pop('hasGraduated') 

139 student['hasSummer'] = "Yes" if student['hasSummer'] == "Complete" else "No" 

140 worksheet.set_row(rowNumber, None, format_row) 

141 if student['B-Number'] == None: student["B-Number"] = "No B-Number Found" 

142 for columnNumber, key in enumerate(student, 1): 

143 worksheet.write(rowNumber, columnNumber, student[key]) 

144 

145 

146 workbook.close() 

147 

148 return filepath 

149 

150 

151def toggleMinorInterest(username, isAdding): 

152 """ 

153 Given a username, update their minor interest and minor status. 

154 """ 

155 

156 try: 

157 user = User.get(username=username) 

158 if not user: 

159 return {"error": "User not found"}, 404 

160 

161 user.minorInterest = isAdding 

162 user.declaredMinor = False 

163 user.save() 

164 

165 except Exception as e: 

166 print(f"Error updating minor interest: {e}") 

167 return {"error": str(e)}, 500 

168 

169def declareMinorInterest(username): 

170 """ 

171 Given a username, update their minor declaration 

172 """ 

173 user = User.get_by_id(username) 

174 

175 if not user: 

176 raise ValueError(f"User with username '{username}' not found.") 

177 

178 user.declaredMinor = not user.declaredMinor 

179 

180 try: 

181 user.save() 

182 except Exception as e: 

183 raise RuntimeError(f"Failed to declare interested student: {e}") 

184 

185def getDeclaredMinorStudents(): 

186 """ 

187 Get a list of the students who have declared minor 

188 """ 

189 declaredStudents = User.select().where(User.isStudent & User.declaredMinor) 

190 

191 interestedStudentList = [model_to_dict(student) for student in declaredStudents] 

192 

193 return interestedStudentList 

194 

195def getCourseInformation(id): 

196 """ 

197 Given a course ID, return an object containing the course information and  

198 its instructors full names. 

199 """ 

200 # retrieve the course and the course instructors 

201 course = model_to_dict(Course.get_by_id(id)) 

202 

203 courseInstructors = (CourseInstructor.select(CourseInstructor, User) 

204 .join(Course).switch() 

205 .join(User) 

206 .where(Course.id == id)) 

207 

208 courseInformation = {"instructors": [(instructor.user.firstName + " " + instructor.user.lastName) for instructor in courseInstructors], "course": course} 

209 

210 return courseInformation 

211 

212def getProgramEngagementHistory(program_id, username, term_id): 

213 """ 

214 Given a program_id, username, and term_id, return an object containing all events in the provided program  

215 and in the given term along with the program name. 

216 """ 

217 # execute a query that will retrieve all events in which the user has participated 

218 # that fall under the provided term and programs. 

219 eventsInProgramAndTerm = (Event.select(Event.id, Event.name, EventParticipant.hoursEarned) 

220 .join(Program).switch() 

221 .join(EventParticipant) 

222 .where(EventParticipant.user == username, 

223 Event.term == term_id, 

224 Event.isService == True, 

225 Program.id == program_id) 

226 ) 

227 

228 program = Program.get_by_id(program_id) 

229 

230 # calculate total amount of hours for the whole program that term 

231 totalHours = 0 

232 for event in eventsInProgramAndTerm: 

233 if event.eventparticipant.hoursEarned: 

234 totalHours += event.eventparticipant.hoursEarned 

235 

236 participatedEvents = {"program":program.programName, "events": [event for event in eventsInProgramAndTerm.dicts()], "totalHours": totalHours} 

237 

238 return participatedEvents 

239 

240def setCommunityEngagementForUser(action, engagementData, currentUser): 

241 """ 

242 Either add or remove an IndividualRequirement record for a student's Sustained Community Engagement 

243 

244 :param action: The behavior of the function. Can be 'add' or 'remove' 

245 :param engagementData: 

246 type: program or course 

247 id: program or course id 

248 username: the username of the student that is having a community engagement added or removed 

249 term: The term the engagement is recorded in 

250 :param currentuser: The user who is performing the add/remove action  

251 

252 :raises DoesNotExist: if there are no available CertificationRequirement slots remaining for the engagement 

253 """ 

254 if engagementData['type'] not in ['program','course']: 

255 raise Exception("Invalid engagement type!") 

256 

257 requirement = (CertificationRequirement.select() 

258 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

259 (IndividualRequirement.requirement == CertificationRequirement.id) & 

260 (IndividualRequirement.username == engagementData['username']))) 

261 .where(IndividualRequirement.username.is_null(True), 

262 CertificationRequirement.certification == Certification.CCE, 

263 CertificationRequirement.name.not_in(['Summer Program']))) 

264 if action == 'add': 

265 try: 

266 IndividualRequirement.create(**{engagementData['type']: engagementData['id'], 

267 "username": engagementData['username'], 

268 "term": engagementData['term'], 

269 "requirement": requirement.get(), 

270 "addedBy": currentUser, 

271 }) 

272 # Thrown if there are no available engagement requirements left. Handled elsewhere. 

273 except DoesNotExist as e: 

274 raise e 

275 

276 elif action == 'remove': 

277 IndividualRequirement.delete().where( 

278 getattr(IndividualRequirement, engagementData['type']) == engagementData['id'], 

279 IndividualRequirement.username == engagementData['username'], 

280 IndividualRequirement.term == engagementData['term'] 

281 ).execute() 

282 else: 

283 raise Exception(f"Invalid action '{action}' sent to setCommunityEngagementForUser") 

284 

285def getCommunityEngagementByTerm(username): 

286 """ 

287 Given a username, return all of their community engagements (service learning courses and event participations.) 

288 """ 

289 courseMatchCase = Case(None, [(IndividualRequirement.course.is_null(True) , 0)], 1) 

290 

291 courses = (Course.select(Course, courseMatchCase.alias("matchedReq")) 

292 .join(CourseParticipant, on=(Course.id == CourseParticipant.course)) 

293 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

294 (IndividualRequirement.course == Course.id) & 

295 (IndividualRequirement.username == CourseParticipant.user) & 

296 (IndividualRequirement.term == Course.term))) 

297 .where(CourseParticipant.user == username) 

298 .group_by(Course.courseName, Course.term)) 

299 

300 # initialize default dict to store term descriptions as keys mapping to each 

301 # engagement's respective type, name, id, and term. 

302 communityEngagementByTermDict = defaultdict(list) 

303 for course in courses: 

304 communityEngagementByTermDict[(course.term.description, course.term.id)].append( 

305 {"name":course.courseName, 

306 "id":course.id, 

307 "type":"course", 

308 "matched": course.matchedReq, 

309 "term":course.term.id}) 

310 

311 programMatchCase = Case(None, [(IndividualRequirement.program.is_null(True) , 0)], 1) 

312 

313 events = (Event.select(Event, Program, programMatchCase.alias('matchedReq')) 

314 .join(EventParticipant, on=(Event.id == EventParticipant.event)).switch() 

315 .join(Program) 

316 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.program == Program.id) & 

317 (IndividualRequirement.username == EventParticipant.user) & 

318 (IndividualRequirement.term == Event.term))) 

319 .where(EventParticipant.user == username, Event.isService == True) 

320 .group_by(Event.program, Event.term)) 

321 

322 for event in events: 

323 communityEngagementByTermDict[(event.term.description, event.term.id)].append({"name":event.program.programName, 

324 "id":event.program.id, 

325 "type":"program", 

326 "matched": event.matchedReq, 

327 "term":event.term.id 

328 }) 

329 

330 # sorting the communityEngagementByTermDict by the term id 

331 return dict(sorted(communityEngagementByTermDict.items(), key=lambda engagement: engagement[0][1])) 

332 

333def createOtherEngagementRequest(username, formData): 

334 """ 

335 Create a CCEMinorProposal entry based off of the form data 

336 """ 

337 user = User.get(User.username == username) 

338 

339 cceObject = CCEMinorProposal.create(proposalType = 'Other Engagement', 

340 createdBy = g.current_user, 

341 status = 'Pending', 

342 student = user, 

343 **formData 

344 ) 

345 

346 return cceObject 

347 

348def saveSummerExperience(username, summerExperience, currentUser): 

349 """ 

350 :param username: username of the student that the summer experience is for 

351 :param summerExperience: dict  

352 summerExperience: string of what the summer experience was (will be written as the 'description' in the IndividualRequirement table) 

353 selectedSummerTerm: the term description that the summer experience took place in 

354 :param currentUser: the username of the user who added the summer experience record 

355 

356 Delete any existing IndividualRequirement entry for 'username' if it is for 'Summer Program' and create a new IndividualRequirement entry for  

357 'Summer Program' with the contents of summerExperience.  

358 """ 

359 requirementDeleteSubSelect = CertificationRequirement.select().where(CertificationRequirement.certification == Certification.CCE, CertificationRequirement.name << ['Summer Program']) 

360 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.requirement == requirementDeleteSubSelect).execute() 

361 

362 requirement = (CertificationRequirement.select() 

363 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.requirement == CertificationRequirement.id) & 

364 (IndividualRequirement.username == username))) 

365 .where(IndividualRequirement.username.is_null(True), 

366 CertificationRequirement.certification == Certification.CCE, 

367 CertificationRequirement.name << ['Summer Program'])) 

368 

369 summerTerm = (Term.select().where(Term.description == summerExperience['selectedSummerTerm'])) 

370 

371 IndividualRequirement.create(**{"description": summerExperience['summerExperience'], 

372 "username": username, 

373 "term": summerTerm.get(), 

374 "requirement": requirement.get(), 

375 "addedBy": currentUser, 

376 }) 

377 return "" 

378 

379def getSummerExperience(username): 

380 """ 

381 Get a students summer experience to populate text box if the student has one 

382 """ 

383 summerExperience = (IndividualRequirement.select() 

384 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(CertificationRequirement.id == IndividualRequirement.requirement)).switch() 

385 .join(Term, on=(IndividualRequirement.term == Term.id)) 

386 .where(IndividualRequirement.username == username, 

387 CertificationRequirement.certification == Certification.CCE, 

388 CertificationRequirement.name << ['Summer Program'])) 

389 if len(list(summerExperience)) == 1: 

390 return (summerExperience.get().term.description, summerExperience.get().description) 

391 

392 return (None, None) 

393 

394def removeSummerExperience(username): 

395 """ 

396 Delete IndividualRequirement table entry for 'username' 

397 """ 

398 term, summerExperienceToDelete = getSummerExperience(username) 

399 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.description == summerExperienceToDelete).execute() 

400 

401def removeProposal(proposalID) -> None: 

402 """ 

403 Delete summer experience or other engagement objects from the CCEMinorProposal table.  

404 File objects attached to the CCEMinorProposal object are also deleted.  

405 """ 

406 proposalID = int(proposalID) 

407 

408 proposalAttachment = AttachmentUpload.get_or_none(proposal=proposalID) 

409 if proposalAttachment: 

410 proposalFileHandler = FileHandler(proposalId=proposalID) 

411 proposalFileHandler.deleteFile(proposalAttachment.id) 

412 

413 CCEMinorProposal.delete().where(CCEMinorProposal.id == proposalID).execute()