Coverage for app/logic/minor.py: 62%

148 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2025-04-10 19:40 +0000

1from collections import defaultdict 

2from typing import List, Dict 

3from flask import flash, g 

4from playhouse.shortcuts import model_to_dict 

5from peewee import JOIN, fn, Case, DoesNotExist, SQL 

6import xlsxwriter 

7 

8from app import app 

9from app.models.user import User 

10from app.models.term import Term 

11from app.models.event import Event 

12from app.models.course import Course 

13from app.models.program import Program 

14from app.models.certification import Certification 

15from app.models.courseInstructor import CourseInstructor 

16from app.models.eventParticipant import EventParticipant 

17from app.models.courseParticipant import CourseParticipant 

18from app.models.individualRequirement import IndividualRequirement 

19from app.models.certificationRequirement import CertificationRequirement 

20from app.models.cceMinorProposal import CCEMinorProposal 

21 

22 

23def createSummerExperience(username, formData): 

24 """ 

25 Given the username of the student and the formData which includes all of 

26 the SummerExperience information, create a new SummerExperience object. 

27 """ 

28 try: 

29 user = User.get(User.username == username) 

30 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas 

31 CCEMinorProposal.create( 

32 student=user, 

33 proposalType = 'Summer Experience', 

34 contentAreas = contentAreas, 

35 status="Pending", 

36 createdBy = g.current_user, 

37 **formData, 

38 ) 

39 except Exception as e: 

40 print(f"Error saving summer experience: {e}") 

41 raise e 

42 

43def getCCEMinorProposals(username): 

44 proposalList = [] 

45 

46 cceMinorProposals = list(CCEMinorProposal.select().where(CCEMinorProposal.student==username)) 

47 

48 for experience in cceMinorProposals: 

49 proposalList.append({ 

50 "id": experience.id, 

51 "type": experience.proposalType, 

52 "createdBy": experience.createdBy, 

53 "supervisor": experience.supervisorName, 

54 "term": experience.term, 

55 "status": experience.status, 

56 }) 

57 

58 return proposalList 

59 

60def getEngagementTotal(engagementData): 

61 """  

62 Count the number of engagements (from all terms) that have matched with a requirement  

63 """ 

64 

65 # map the flattened list of engagements to their matched values, and sum them 

66 return sum(map(lambda e: e['matched'], sum(engagementData.values(),[]))) 

67 

68 

69def getMinorInterest() -> List[Dict]: 

70 """ 

71 Get all students that have indicated interest in the CCE minor and return a list of dicts of all interested students 

72 """ 

73 interestedStudents = (User.select(User) 

74 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username)) 

75 .where(User.isStudent & User.minorInterest & ~User.declaredMinor & IndividualRequirement.username.is_null(True))) 

76 

77 interestedStudentList = [model_to_dict(student) for student in interestedStudents] 

78 

79 return interestedStudentList 

80 

81def getMinorProgress(): 

82 """ 

83 Get all the users who have an IndividualRequirement record under the CCE certification which  

84 and returns a list of dicts containing the student, how many engagements they have completed,  

85 and if they have completed the summer experience.  

86 """ 

87 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0) 

88 

89 engagedStudentsWithCount = ( 

90 User.select(User, fn.COUNT(IndividualRequirement.id).alias('engagementCount'), 

91 fn.SUM(summerCase).alias('hasSummer'), 

92 fn.IF(fn.COUNT(CCEMinorProposal.id) > 0, True, False).alias('hasCCEMinorProposal')) 

93 .join(IndividualRequirement, on=(User.username == IndividualRequirement.username)) 

94 .join(CertificationRequirement, on=(IndividualRequirement.requirement_id == CertificationRequirement.id)) 

95 .switch(User).join(CCEMinorProposal, JOIN.LEFT_OUTER, on= (User.username == CCEMinorProposal.student)) 

96 .where(CertificationRequirement.certification_id == Certification.CCE) 

97 .group_by(User.firstName, User.lastName, User.username) 

98 .order_by(SQL("engagementCount").desc()) 

99 ) 

100 engagedStudentsList = [{'firstName': student.firstName, 

101 'lastName': student.lastName, 

102 'username': student.username, 

103 'B-Number': student.bnumber, 

104 'hasGraduated': student.hasGraduated, 

105 'engagementCount': student.engagementCount - student.hasSummer, 

106 'hasCCEMinorProposal': student.hasCCEMinorProposal, 

107 'hasSummer': "Completed" if student.hasSummer else "Incomplete"} for student in engagedStudentsWithCount] 

108 return engagedStudentsList 

109 

110def getMinorSpreadsheet(): 

111 """ 

112 Returns a spreadsheet containing users and related spreadsheet information. 

113 """ 

114 # If we're in 2025, can we get the minor information for 2023? 

115 studentProgress = getMinorProgress() 

116 columnNames = studentProgress[0] 

117 columnNames = ["First Name", "Last Name", "Username", "B-Number", "Number of Engagements", "Completed Summer Experience"] 

118 

119 filepath = f"{app.config['files']['base_path']}/minor_data.xlsx" 

120 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True}) 

121 

122 worksheet = workbook.add_worksheet('minor_information') 

123 format_row = workbook.add_format({'align': 'left'}) 

124 

125 columnIndex = 1 

126 worksheet.set_column(columnIndex, len(columnNames), 30, workbook.add_format({'bold': True})) 

127 for columnName in columnNames: 

128 worksheet.write(1, columnIndex, columnName) 

129 columnIndex += 1 

130 

131 for rowNumber, student in enumerate(studentProgress, 2): 

132 if student['hasGraduated']: continue 

133 student.pop('hasCCEMinorProposal') 

134 student.pop('hasGraduated') 

135 student['hasSummer'] = "Yes" if student['hasSummer'] == "Complete" else "No" 

136 worksheet.set_row(rowNumber, None, format_row) 

137 if student['B-Number'] == None: student["B-Number"] = "No B-Number Found" 

138 for columnNumber, key in enumerate(student, 1): 

139 worksheet.write(rowNumber, columnNumber, student[key]) 

140 

141 

142 workbook.close() 

143 

144 return filepath 

145 

146 

147def toggleMinorInterest(username, isAdding): 

148 """ 

149 Given a username, update their minor interest and minor status. 

150 """ 

151 

152 try: 

153 user = User.get(username=username) 

154 if not user: 

155 return {"error": "User not found"}, 404 

156 

157 user.minorInterest = isAdding 

158 user.declaredMinor = False 

159 user.save() 

160 

161 except Exception as e: 

162 print(f"Error updating minor interest: {e}") 

163 return {"error": str(e)}, 500 

164 

165def declareMinorInterest(username): 

166 """ 

167 Given a username, update their minor declaration 

168 """ 

169 user = User.get_by_id(username) 

170 

171 if not user: 

172 raise ValueError(f"User with username '{username}' not found.") 

173 

174 user.declaredMinor = not user.declaredMinor 

175 

176 try: 

177 user.save() 

178 except Exception as e: 

179 raise RuntimeError(f"Failed to declare interested student: {e}") 

180 

181def getDeclaredMinorStudents(): 

182 """ 

183 Get a list of the students who have declared minor 

184 """ 

185 declaredStudents = User.select().where(User.isStudent & User.minorInterest & User.declaredMinor) 

186 

187 interestedStudentList = [model_to_dict(student) for student in declaredStudents] 

188 

189 return interestedStudentList 

190 

191def getCourseInformation(id): 

192 """ 

193 Given a course ID, return an object containing the course information and  

194 its instructors full names. 

195 """ 

196 # retrieve the course and the course instructors 

197 course = model_to_dict(Course.get_by_id(id)) 

198 

199 courseInstructors = (CourseInstructor.select(CourseInstructor, User) 

200 .join(Course).switch() 

201 .join(User) 

202 .where(Course.id == id)) 

203 

204 courseInformation = {"instructors": [(instructor.user.firstName + " " + instructor.user.lastName) for instructor in courseInstructors], "course": course} 

205 

206 return courseInformation 

207 

208def getProgramEngagementHistory(program_id, username, term_id): 

209 """ 

210 Given a program_id, username, and term_id, return an object containing all events in the provided program  

211 and in the given term along with the program name. 

212 """ 

213 # execute a query that will retrieve all events in which the user has participated 

214 # that fall under the provided term and programs. 

215 eventsInProgramAndTerm = (Event.select(Event.id, Event.name, EventParticipant.hoursEarned) 

216 .join(Program).switch() 

217 .join(EventParticipant) 

218 .where(EventParticipant.user == username, 

219 Event.term == term_id, 

220 Event.isService == True, 

221 Program.id == program_id) 

222 ) 

223 

224 program = Program.get_by_id(program_id) 

225 

226 # calculate total amount of hours for the whole program that term 

227 totalHours = 0 

228 for event in eventsInProgramAndTerm: 

229 if event.eventparticipant.hoursEarned: 

230 totalHours += event.eventparticipant.hoursEarned 

231 

232 participatedEvents = {"program":program.programName, "events": [event for event in eventsInProgramAndTerm.dicts()], "totalHours": totalHours} 

233 

234 return participatedEvents 

235 

236def setCommunityEngagementForUser(action, engagementData, currentUser): 

237 """ 

238 Either add or remove an IndividualRequirement record for a student's Sustained Community Engagement 

239 

240 :param action: The behavior of the function. Can be 'add' or 'remove' 

241 :param engagementData: 

242 type: program or course 

243 id: program or course id 

244 username: the username of the student that is having a community engagement added or removed 

245 term: The term the engagement is recorded in 

246 :param currentuser: The user who is performing the add/remove action  

247 

248 :raises DoesNotExist: if there are no available CertificationRequirement slots remaining for the engagement 

249 """ 

250 if engagementData['type'] not in ['program','course']: 

251 raise Exception("Invalid engagement type!") 

252 

253 requirement = (CertificationRequirement.select() 

254 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

255 (IndividualRequirement.requirement == CertificationRequirement.id) & 

256 (IndividualRequirement.username == engagementData['username']))) 

257 .where(IndividualRequirement.username.is_null(True), 

258 CertificationRequirement.certification == Certification.CCE, 

259 CertificationRequirement.name.not_in(['Summer Program']))) 

260 if action == 'add': 

261 try: 

262 IndividualRequirement.create(**{engagementData['type']: engagementData['id'], 

263 "username": engagementData['username'], 

264 "term": engagementData['term'], 

265 "requirement": requirement.get(), 

266 "addedBy": currentUser, 

267 }) 

268 # Thrown if there are no available engagement requirements left. Handled elsewhere. 

269 except DoesNotExist as e: 

270 raise e 

271 

272 elif action == 'remove': 

273 IndividualRequirement.delete().where( 

274 getattr(IndividualRequirement, engagementData['type']) == engagementData['id'], 

275 IndividualRequirement.username == engagementData['username'], 

276 IndividualRequirement.term == engagementData['term'] 

277 ).execute() 

278 else: 

279 raise Exception(f"Invalid action '{action}' sent to setCommunityEngagementForUser") 

280 

281def getCommunityEngagementByTerm(username): 

282 """ 

283 Given a username, return all of their community engagements (service learning courses and event participations.) 

284 """ 

285 courseMatchCase = Case(None, [(IndividualRequirement.course.is_null(True) , 0)], 1) 

286 

287 courses = (Course.select(Course, courseMatchCase.alias("matchedReq")) 

288 .join(CourseParticipant, on=(Course.id == CourseParticipant.course)) 

289 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=( 

290 (IndividualRequirement.course == Course.id) & 

291 (IndividualRequirement.username == CourseParticipant.user) & 

292 (IndividualRequirement.term == Course.term))) 

293 .where(CourseParticipant.user == username) 

294 .group_by(Course.courseName, Course.term)) 

295 

296 # initialize default dict to store term descriptions as keys mapping to each 

297 # engagement's respective type, name, id, and term. 

298 communityEngagementByTermDict = defaultdict(list) 

299 for course in courses: 

300 communityEngagementByTermDict[(course.term.description, course.term.id)].append( 

301 {"name":course.courseName, 

302 "id":course.id, 

303 "type":"course", 

304 "matched": course.matchedReq, 

305 "term":course.term.id}) 

306 

307 programMatchCase = Case(None, [(IndividualRequirement.program.is_null(True) , 0)], 1) 

308 

309 events = (Event.select(Event, Program, programMatchCase.alias('matchedReq')) 

310 .join(EventParticipant, on=(Event.id == EventParticipant.event)).switch() 

311 .join(Program) 

312 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.program == Program.id) & 

313 (IndividualRequirement.username == EventParticipant.user) & 

314 (IndividualRequirement.term == Event.term))) 

315 .where(EventParticipant.user == username, Event.isService == True) 

316 .group_by(Event.program, Event.term)) 

317 

318 for event in events: 

319 communityEngagementByTermDict[(event.term.description, event.term.id)].append({"name":event.program.programName, 

320 "id":event.program.id, 

321 "type":"program", 

322 "matched": event.matchedReq, 

323 "term":event.term.id 

324 }) 

325 

326 # sorting the communityEngagementByTermDict by the term id 

327 return dict(sorted(communityEngagementByTermDict.items(), key=lambda engagement: engagement[0][1])) 

328 

329def createOtherEngagementRequest(username, formData): 

330 """ 

331 Create a CCEMinorProposal entry based off of the form data 

332 """ 

333 user = User.get(User.username == username) 

334 

335 cceObject = CCEMinorProposal.create(proposalType = 'Other Engagement', 

336 createdBy = g.current_user, 

337 status = 'Pending', 

338 student = user, 

339 **formData 

340 ) 

341 

342 return cceObject 

343 

344def saveSummerExperience(username, summerExperience, currentUser): 

345 """ 

346 :param username: username of the student that the summer experience is for 

347 :param summerExperience: dict  

348 summerExperience: string of what the summer experience was (will be written as the 'description' in the IndividualRequirement table) 

349 selectedSummerTerm: the term description that the summer experience took place in 

350 :param currentUser: the username of the user who added the summer experience record 

351 

352 Delete any existing IndividualRequirement entry for 'username' if it is for 'Summer Program' and create a new IndividualRequirement entry for  

353 'Summer Program' with the contents of summerExperience.  

354 """ 

355 requirementDeleteSubSelect = CertificationRequirement.select().where(CertificationRequirement.certification == Certification.CCE, CertificationRequirement.name << ['Summer Program']) 

356 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.requirement == requirementDeleteSubSelect).execute() 

357 

358 requirement = (CertificationRequirement.select() 

359 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.requirement == CertificationRequirement.id) & 

360 (IndividualRequirement.username == username))) 

361 .where(IndividualRequirement.username.is_null(True), 

362 CertificationRequirement.certification == Certification.CCE, 

363 CertificationRequirement.name << ['Summer Program'])) 

364 

365 summerTerm = (Term.select().where(Term.description == summerExperience['selectedSummerTerm'])) 

366 

367 IndividualRequirement.create(**{"description": summerExperience['summerExperience'], 

368 "username": username, 

369 "term": summerTerm.get(), 

370 "requirement": requirement.get(), 

371 "addedBy": currentUser, 

372 }) 

373 return "" 

374 

375def getSummerExperience(username): 

376 """ 

377 Get a students summer experience to populate text box if the student has one 

378 """ 

379 summerExperience = (IndividualRequirement.select() 

380 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(CertificationRequirement.id == IndividualRequirement.requirement)).switch() 

381 .join(Term, on=(IndividualRequirement.term == Term.id)) 

382 .where(IndividualRequirement.username == username, 

383 CertificationRequirement.certification == Certification.CCE, 

384 CertificationRequirement.name << ['Summer Program'])) 

385 if len(list(summerExperience)) == 1: 

386 return (summerExperience.get().term.description, summerExperience.get().description) 

387 

388 return (None, None) 

389 

390def removeSummerExperience(username): 

391 """ 

392 Delete IndividualRequirement table entry for 'username' 

393 """ 

394 term, summerExperienceToDelete = getSummerExperience(username) 

395 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.description == summerExperienceToDelete).execute()