Coverage for app/logic/minor.py: 65%

159 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-12-18 19:28 +0000

1from collections import defaultdict 

2from typing import List, Dict 

3from flask import flash, g 

4from playhouse.shortcuts import model_to_dict 

5from peewee import JOIN, fn, Case, DoesNotExist, SQL 

6import xlsxwriter 

7 

8from app import app 

9from app.models.user import User 

10from app.models.term import Term 

11from app.models.event import Event 

12from app.models.course import Course 

13from app.models.program import Program 

14from app.models.certification import Certification 

15from app.models.courseInstructor import CourseInstructor 

16from app.models.eventParticipant import EventParticipant 

17from app.models.courseParticipant import CourseParticipant 

18from app.models.individualRequirement import IndividualRequirement 

19from app.models.certificationRequirement import CertificationRequirement 

20from app.models.cceMinorProposal import CCEMinorProposal 

21from app.logic.createLogs import createActivityLog 

22from app.logic.fileHandler import FileHandler 

23from app.logic.serviceLearningCourses import deleteCourseObject 

24from app.models.attachmentUpload import AttachmentUpload 

25 

26 

27def createSummerExperience(username, formData): 

28 """ 

29 Given the username of the student and the formData which includes all of 

30 the SummerExperience information, create a new SummerExperience object. 

31 """ 

32 try: 

33 user = User.get(User.username == username) 

34 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas 

35 CCEMinorProposal.create( 

36 student=user, 

37 proposalType = 'Summer Experience', 

38 contentAreas = contentAreas, 

39 status="Pending", 

40 createdBy = g.current_user, 

41 **formData, 

42 ) 

43 except Exception as e: 

44 print(f"Error saving summer experience: {e}") 

45 raise e 

46 

47def getCCEMinorProposals(username): 

48 proposalList = [] 

49 

50 cceMinorProposals = list(CCEMinorProposal.select().where(CCEMinorProposal.student==username)) 

51 

52 for experience in cceMinorProposals: 

53 proposalList.append({ 

54 "id": experience.id, 

55 "type": experience.proposalType, 

56 "createdBy": experience.createdBy, 

57 "supervisor": experience.supervisorName, 

58 "term": experience.term, 

59 "status": experience.status, 

60 }) 

61 

62 return proposalList 

63 

64def getEngagementTotal(engagementData): 

65 """  

66 Count the number of engagements (from all terms) that have matched with a requirement  

67 """ 

68 

69 # map the flattened list of engagements to their matched values, and sum them 

70 return sum(map(lambda e: e['matched'], sum(engagementData.values(),[]))) 

71 

72 

73def getMinorInterest() -> List[Dict]: 

74 """ 

75 Get all students that have indicated interest in the CCE minor and return a list of dicts of all interested students 

76 """ 

77 interestedStudents = (User.select(User) 

78 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username)) 

79 .where(User.isStudent & User.minorInterest & ~User.declaredMinor & IndividualRequirement.username.is_null(True))) 

80 

81 interestedStudentList = [model_to_dict(student) for student in interestedStudents] 

82 

83 return interestedStudentList 

84 

85def getMinorProgress(): 

86 """ 

87 Get all the users who have an IndividualRequirement record under the CCE certification which  

88 and returns a list of dicts containing the student, how many engagements they have completed,  

89 and if they have completed the summer experience.  

90 """ 

91 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0) 

92 

93 engagedStudentsWithCount = ( 

94 User.select( 

95 User, 

96 IndividualRequirement, 

97 Term, 

98 IndividualRequirement.term_id, 

99 fn.COUNT(IndividualRequirement.id).alias('engagementCount'), 

100 fn.SUM(summerCase).alias('hasSummer'), 

101 fn.IF(fn.COUNT(CCEMinorProposal.id) > 0, True, False).alias('hasCCEMinorProposal')) 

102 .join(IndividualRequirement, on=(User.username == IndividualRequirement.username_id)) 

103 .join(CertificationRequirement, on=(IndividualRequirement.requirement_id == CertificationRequirement.id)) 

104 .join(Term, on=(IndividualRequirement.term_id == Term.id)) 

105 .switch(User).join(CCEMinorProposal, JOIN.LEFT_OUTER, on= (User.username == CCEMinorProposal.student_id)) 

106 .where(CertificationRequirement.certification_id == Certification.CCE) 

107 .group_by(User.username, IndividualRequirement.term_id, Term.id) 

108 .order_by(SQL("engagementCount").desc()) 

109 ) 

110 engagedStudentsList = [{'firstName': student.firstName, 

111 'lastName': student.lastName, 

112 'username': student.username, 

113 'B-Number': student.bnumber, 

114 'hasGraduated': student.hasGraduated, 

115 'engagementCount': student.engagementCount - student.hasSummer, 

116 'hasCCEMinorProposal': student.hasCCEMinorProposal, 

117 'hasSummer': "Completed" if student.hasSummer else "Incomplete", 

118 'engagementTerm': student.individualrequirement.term.description} for student in engagedStudentsWithCount] 

119 return engagedStudentsList 

120 

121def getMinorSpreadsheet(): 

122 """ 

123 Returns a spreadsheet containing users and related spreadsheet information. 

124 """ 

125 # If we're in 2025, can we get the minor information for 2023? 

126 studentProgress = getMinorProgress() 

127 columnNames = studentProgress[0] 

128 columnNames = ["First Name", "Last Name", "Username", "B-Number", "Number of Engagements", "Completed Summer Experience"] 

129 

130 filepath = f"{app.config['files']['base_path']}/minor_data.xlsx" 

131 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True}) 

132 

133 worksheet = workbook.add_worksheet('minor_information') 

134 format_row = workbook.add_format({'align': 'left'}) 

135 

136 columnIndex = 1 

137 worksheet.set_column(columnIndex, len(columnNames), 30, workbook.add_format({'bold': True})) 

138 for columnName in columnNames: 

139 worksheet.write(1, columnIndex, columnName) 

140 columnIndex += 1 

141 

142 for rowNumber, student in enumerate(studentProgress, 2): 

143 if student['hasGraduated']: continue 

144 student.pop('hasCCEMinorProposal') 

145 student.pop('hasGraduated') 

146 student['hasSummer'] = "Yes" if student['hasSummer'] == "Complete" else "No" 

147 worksheet.set_row(rowNumber, None, format_row) 

148 if student['B-Number'] == None: student["B-Number"] = "No B-Number Found" 

149 for columnNumber, key in enumerate(student, 1): 

150 worksheet.write(rowNumber, columnNumber, student[key]) 

151 

152 

153 workbook.close() 

154 

155 return filepath 

156 

157 

158def toggleMinorInterest(username, isAdding): 

159 """ 

160 Given a username, update their minor interest and minor status. 

161 """ 

162 

163 try: 

164 user = User.get(username=username) 

165 if not user: 

166 return {"error": "User not found"}, 404 

167 

168 user.minorInterest = isAdding 

169 user.declaredMinor = False 

170 user.save() 

171 

172 except Exception as e: 

173 print(f"Error updating minor interest: {e}") 

174 return {"error": str(e)}, 500 

175 

176def declareMinorInterest(username): 

177 """ 

178 Given a username, update their minor declaration 

179 """ 

180 user = User.get_by_id(username) 

181 

182 if not user: 

183 raise ValueError(f"User with username '{username}' not found.") 

184 

185 user.declaredMinor = not user.declaredMinor 

186 

187 try: 

188 user.save() 

189 except Exception as e: 

190 raise RuntimeError(f"Failed to declare interested student: {e}") 

191 

192def getDeclaredMinorStudents(): 

193 """ 

194 Get a list of the students who have declared minor 

195 """ 

196 declaredStudents = User.select().where(User.isStudent & User.declaredMinor) 

197 

198 interestedStudentList = [model_to_dict(student) for student in declaredStudents] 

199 

200 return interestedStudentList 

201 

202def getCourseInformation(id): 

203 """ 

204 Given a course ID, return an object containing the course information and  

205 its instructors full names. 

206 """ 

207 # retrieve the course and the course instructors 

208 course = model_to_dict(Course.get_by_id(id)) 

209 

210 courseInstructors = (CourseInstructor.select(CourseInstructor, User) 

211 .join(Course).switch() 

212 .join(User) 

213 .where(Course.id == id)) 

214 

215 courseInformation = {"instructors": [(instructor.user.firstName + " " + instructor.user.lastName) for instructor in courseInstructors], "course": course} 

216 

217 return courseInformation 

218 

219def getProgramEngagementHistory(program_id, username, term_id): 

220 """ 

221 Given a program_id, username, and term_id, return an object containing all events in the provided program  

222 and in the given term along with the program name. 

223 """ 

224 # execute a query that will retrieve all events in which the user has participated 

225 # that fall under the provided term and programs. 

226 eventsInProgramAndTerm = (Event.select(Event.id, Event.name, EventParticipant.hoursEarned) 

227 .join(Program).switch() 

228 .join(EventParticipant) 

229 .where(EventParticipant.user == username, 

230 Event.term == term_id, 

231 Event.isService == True, 

232 Program.id == program_id) 

233 ) 

234 

235 program = Program.get_by_id(program_id) 

236 

237 # calculate total amount of hours for the whole program that term 

238 totalHours = 0 

239 for event in eventsInProgramAndTerm: 

240 if event.eventparticipant.hoursEarned: 

241 totalHours += event.eventparticipant.hoursEarned 

242 

243 participatedEvents = {"program":program.programName, "events": [event for event in eventsInProgramAndTerm.dicts()], "totalHours": totalHours} 

244 

245 return participatedEvents 

246 

247def setCommunityEngagementForUser(action, engagementData, currentUser): 

248 """ 

249 Either add or remove an IndividualRequirement record for a student's Sustained Community Engagement 

250 

251 :param action: The behavior of the function. Can be 'add' or 'remove' 

252 :param engagementData: 

253 type: program or course 

254 id: program or course id 

255 username: the username of the student that is having a community engagement added or removed 

256 term: The term the engagement is recorded in 

257 :param currentuser: The user who is performing the add/remove action  

258 

259 :raises DoesNotExist: if there are no available CertificationRequirement slots remaining for the engagement 

260 """ 

261 if engagementData['type'] not in ['program','course']: 

262 raise Exception("Invalid engagement type!") 

263 

264 requirement = (CertificationRequirement.select() 

265 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

266 (IndividualRequirement.requirement == CertificationRequirement.id) & 

267 (IndividualRequirement.username == engagementData['username']))) 

268 .where(IndividualRequirement.username.is_null(True), 

269 CertificationRequirement.certification == Certification.CCE, 

270 CertificationRequirement.name.not_in(['Summer Program']))) 

271 if action == 'add': 

272 try: 

273 IndividualRequirement.create(**{engagementData['type']: engagementData['id'], 

274 "username": engagementData['username'], 

275 "term": engagementData['term'], 

276 "requirement": requirement.get(), 

277 "addedBy": currentUser, 

278 }) 

279 # Thrown if there are no available engagement requirements left. Handled elsewhere. 

280 except DoesNotExist as e: 

281 raise e 

282 

283 elif action == 'remove': 

284 IndividualRequirement.delete().where( 

285 getattr(IndividualRequirement, engagementData['type']) == engagementData['id'], 

286 IndividualRequirement.username == engagementData['username'], 

287 IndividualRequirement.term == engagementData['term'] 

288 ).execute() 

289 else: 

290 raise Exception(f"Invalid action '{action}' sent to setCommunityEngagementForUser") 

291 

292def getCommunityEngagementByTerm(username): 

293 """ 

294 Given a username, return all of their community engagements (service learning courses and event participations.) 

295 """ 

296 courseMatchCase = Case(None, [(IndividualRequirement.course.is_null(True) , 0)], 1) 

297 

298 courses = (Course.select(Course, courseMatchCase.alias("matchedReq")) 

299 .join(CourseParticipant, on=(Course.id == CourseParticipant.course)) 

300 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

301 (IndividualRequirement.course == Course.id) & 

302 (IndividualRequirement.username == CourseParticipant.user) & 

303 (IndividualRequirement.term == Course.term))) 

304 .where(CourseParticipant.user == username) 

305 .group_by(Course.courseName, Course.term)) 

306 

307 # initialize default dict to store term descriptions as keys mapping to each 

308 # engagement's respective type, name, id, and term. 

309 communityEngagementByTermDict = defaultdict(list) 

310 for course in courses: 

311 communityEngagementByTermDict[(course.term.description, course.term.id)].append( 

312 {"name":course.courseName, 

313 "id":course.id, 

314 "type":"course", 

315 "matched": course.matchedReq, 

316 "term":course.term.id}) 

317 

318 programMatchCase = Case(None, [(IndividualRequirement.program.is_null(True) , 0)], 1) 

319 

320 events = (Event.select(Event, Program, programMatchCase.alias('matchedReq')) 

321 .join(EventParticipant, on=(Event.id == EventParticipant.event)).switch() 

322 .join(Program) 

323 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.program == Program.id) & 

324 (IndividualRequirement.username == EventParticipant.user) & 

325 (IndividualRequirement.term == Event.term))) 

326 .where(EventParticipant.user == username, Event.isService == True) 

327 .group_by(Event.program, Event.term)) 

328 

329 for event in events: 

330 communityEngagementByTermDict[(event.term.description, event.term.id)].append({"name":event.program.programName, 

331 "id":event.program.id, 

332 "type":"program", 

333 "matched": event.matchedReq, 

334 "term":event.term.id 

335 }) 

336 

337 # sorting the communityEngagementByTermDict by the term id 

338 return dict(sorted(communityEngagementByTermDict.items(), key=lambda engagement: engagement[0][1])) 

339 

340def createOtherEngagementRequest(username, formData): 

341 """ 

342 Create a CCEMinorProposal entry based off of the form data 

343 """ 

344 user = User.get(User.username == username) 

345 

346 cceObject = CCEMinorProposal.create(proposalType = 'Other Engagement', 

347 createdBy = g.current_user, 

348 status = 'Pending', 

349 student = user, 

350 **formData 

351 ) 

352 

353 return cceObject 

354 

355def saveSummerExperience(username, summerExperience, currentUser): 

356 """ 

357 :param username: username of the student that the summer experience is for 

358 :param summerExperience: dict  

359 summerExperience: string of what the summer experience was (will be written as the 'description' in the IndividualRequirement table) 

360 selectedSummerTerm: the term description that the summer experience took place in 

361 :param currentUser: the username of the user who added the summer experience record 

362 

363 Delete any existing IndividualRequirement entry for 'username' if it is for 'Summer Program' and create a new IndividualRequirement entry for  

364 'Summer Program' with the contents of summerExperience.  

365 """ 

366 requirementDeleteSubSelect = CertificationRequirement.select().where(CertificationRequirement.certification == Certification.CCE, CertificationRequirement.name << ['Summer Program']) 

367 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.requirement == requirementDeleteSubSelect).execute() 

368 

369 requirement = (CertificationRequirement.select() 

370 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.requirement == CertificationRequirement.id) & 

371 (IndividualRequirement.username == username))) 

372 .where(IndividualRequirement.username.is_null(True), 

373 CertificationRequirement.certification == Certification.CCE, 

374 CertificationRequirement.name << ['Summer Program'])) 

375 

376 summerTerm = (Term.select().where(Term.description == summerExperience['selectedSummerTerm'])) 

377 

378 IndividualRequirement.create(**{"description": summerExperience['summerExperience'], 

379 "username": username, 

380 "term": summerTerm.get(), 

381 "requirement": requirement.get(), 

382 "addedBy": currentUser, 

383 }) 

384 return "" 

385 

386def getSummerExperience(username): 

387 """ 

388 Get a students summer experience to populate text box if the student has one 

389 """ 

390 summerExperience = (IndividualRequirement.select() 

391 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(CertificationRequirement.id == IndividualRequirement.requirement)).switch() 

392 .join(Term, on=(IndividualRequirement.term == Term.id)) 

393 .where(IndividualRequirement.username == username, 

394 CertificationRequirement.certification == Certification.CCE, 

395 CertificationRequirement.name << ['Summer Program'])) 

396 if len(list(summerExperience)) == 1: 

397 return (summerExperience.get().term.description, summerExperience.get().description) 

398 

399 return (None, None) 

400 

401def removeSummerExperience(username): 

402 """ 

403 Delete IndividualRequirement table entry for 'username' 

404 """ 

405 term, summerExperienceToDelete = getSummerExperience(username) 

406 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.description == summerExperienceToDelete).execute() 

407 

408def removeProposal(proposalID) -> None: 

409 """ 

410 Delete summer experience or other engagement objects from the CCEMinorProposal table.  

411 File objects attached to the CCEMinorProposal object are also deleted.  

412 """ 

413 proposalID = int(proposalID) 

414 

415 proposalAttachment = AttachmentUpload.get_or_none(proposal=proposalID) 

416 if proposalAttachment: 

417 proposalFileHandler = FileHandler(proposalId=proposalID) 

418 proposalFileHandler.deleteFile(proposalAttachment.id) 

419 

420 CCEMinorProposal.delete().where(CCEMinorProposal.id == proposalID).execute()