Coverage for app/logic/minor.py: 66%
164 statements
« prev ^ index » next coverage.py v7.10.2, created at 2026-03-11 19:39 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2026-03-11 19:39 +0000
1from collections import defaultdict
2from typing import List, Dict
3from flask import flash, g
4from playhouse.shortcuts import model_to_dict
5from peewee import JOIN, fn, Case, DoesNotExist, SQL
6import xlsxwriter
8from app import app
9from app.models.user import User
10from app.models.term import Term
11from app.models.event import Event
12from app.models.course import Course
13from app.models.program import Program
14from app.models.certification import Certification
15from app.models.courseInstructor import CourseInstructor
16from app.models.eventParticipant import EventParticipant
17from app.models.courseParticipant import CourseParticipant
18from app.models.individualRequirement import IndividualRequirement
19from app.models.certificationRequirement import CertificationRequirement
20from app.models.cceMinorProposal import CCEMinorProposal
21from app.logic.createLogs import createActivityLog
22from app.logic.fileHandler import FileHandler
23from app.logic.serviceLearningCourses import deleteCourseObject
24from app.models.attachmentUpload import AttachmentUpload
27def createSummerExperience(username, formData):
28 """
29 Given the username of the student and the formData which includes all of
30 the SummerExperience information, create a new SummerExperience object.
31 """
32 try:
33 user = User.get(User.username == username)
34 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas
35 CCEMinorProposal.create(
36 student=user,
37 proposalType = 'Summer Experience',
38 contentAreas = contentAreas,
39 status="Pending",
40 createdBy = g.current_user,
41 **formData,
42 )
43 except Exception as e:
44 print(f"Error saving summer experience: {e}")
45 raise e
47def getCCEMinorProposals(username):
48 proposalList = []
50 cceMinorProposals = list(CCEMinorProposal.select().where(CCEMinorProposal.student==username))
52 for experience in cceMinorProposals:
53 proposalList.append({
54 "id": experience.id,
55 "type": experience.proposalType,
56 "createdBy": experience.createdBy,
57 "supervisor": experience.supervisorName,
58 "term": experience.term,
59 "status": experience.status,
60 })
62 return proposalList
64def getEngagementTotal(engagementData):
65 """
66 Count the number of engagements (from all terms) that have matched with a requirement
67 """
69 # map the flattened list of engagements to their matched values, and sum them
70 return sum(map(lambda e: e['matched'], sum(engagementData.values(),[])))
73def getMinorInterest() -> List[Dict]:
74 """
75 Get all students that have indicated interest in the CCE minor and return a list of dicts of all interested students
76 """
77 interestedStudents = (User.select(User)
78 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username))
79 .where(User.isStudent & User.minorInterest & ~User.declaredMinor & IndividualRequirement.username.is_null(True)))
81 interestedStudentList = [model_to_dict(student) for student in interestedStudents]
83 return interestedStudentList
85def getMinorProgress():
86 """
87 Get all the users who have an IndividualRequirement record under the CCE certification which
88 and returns a list of dicts containing the student, how many engagements they have completed,
89 and if they have completed the summer experience.
90 """
91 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0)
93 engagedStudentsWithCount = (
94 User.select(User, fn.COUNT(IndividualRequirement.id).alias('engagementCount'),
95 fn.SUM(summerCase).alias('hasSummer'),
96 fn.IF(fn.COUNT(CCEMinorProposal.id) > 0, True, False).alias('hasCCEMinorProposal'))
97 .join(IndividualRequirement, on=(User.username == IndividualRequirement.username))
98 .join(CertificationRequirement, on=(IndividualRequirement.requirement_id == CertificationRequirement.id))
99 .switch(User).join(CCEMinorProposal, JOIN.LEFT_OUTER, on= (User.username == CCEMinorProposal.student))
100 .where(
101 (CertificationRequirement.certification_id == Certification.CCE) &
102 (User.declaredMinor == True)
103 )
104 .group_by(User.firstName, User.lastName, User.username)
105 .order_by(SQL("engagementCount").desc())
106 )
107 engagedStudentsList = [{'firstName': student.firstName,
108 'lastName': student.lastName,
109 'username': student.username,
110 'B-Number': student.bnumber,
111 'hasGraduated': student.hasGraduated,
112 'engagementCount': student.engagementCount - student.hasSummer,
113 'hasCCEMinorProposal': student.hasCCEMinorProposal,
114 'hasSummer': "Completed" if student.hasSummer else "Incomplete"} for student in engagedStudentsWithCount]
115 return engagedStudentsList
117def getMinorSpreadsheet():
118 """
119 Returns a spreadsheet containing users and related spreadsheet information.
120 """
121 # If we're in 2025, can we get the minor information for 2023?
122 studentProgress = getMinorProgress()
123 columnNames = studentProgress[0]
124 columnNames = ["First Name", "Last Name", "Username", "B-Number", "Number of Engagements", "Completed Summer Experience"]
126 filepath = f"{app.config['files']['base_path']}/minor_data.xlsx"
127 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True})
129 worksheet = workbook.add_worksheet('minor_information')
130 format_row = workbook.add_format({'align': 'left'})
132 columnIndex = 1
133 worksheet.set_column(columnIndex, len(columnNames), 30, workbook.add_format({'bold': True}))
134 for columnName in columnNames:
135 worksheet.write(1, columnIndex, columnName)
136 columnIndex += 1
138 for rowNumber, student in enumerate(studentProgress, 2):
139 if student['hasGraduated']: continue
140 student.pop('hasCCEMinorProposal')
141 student.pop('hasGraduated')
142 student['hasSummer'] = "Yes" if student['hasSummer'] == "Complete" else "No"
143 worksheet.set_row(rowNumber, None, format_row)
144 if student['B-Number'] == None: student["B-Number"] = "No B-Number Found"
145 for columnNumber, key in enumerate(student, 1):
146 worksheet.write(rowNumber, columnNumber, student[key])
149 workbook.close()
151 return filepath
154def toggleMinorInterest(username, isAdding):
155 """
156 Given a username, update their minor interest and minor status.
157 """
159 try:
160 user = User.get(username=username)
161 if not user:
162 return {"error": "User not found"}, 404
164 user.minorInterest = isAdding
165 user.declaredMinor = False
166 user.save()
168 except Exception as e:
169 print(f"Error updating minor interest: {e}")
170 return {"error": str(e)}, 500
172def declareMinorInterest(username):
173 """
174 Given a username, update their minor declaration
175 """
176 user = User.get_by_id(username)
178 if not user:
179 raise ValueError(f"User with username '{username}' not found.")
181 user.declaredMinor = not user.declaredMinor
183 try:
184 user.save()
185 except Exception as e:
186 raise RuntimeError(f"Failed to declare interested student: {e}")
188def getDeclaredMinorStudents():
189 """
190 This function retrieves a list of students who have declared the CCE minor along with their engagement progress.
191 It returns a list of dictionaries containing student information and their engagement details and adds students who have no requirements but have declared the minor with 0 engagements.
192 """
193 summerEngagementCount = fn.COUNT(
194 fn.DISTINCT(
195 Case(
196 None,
197 [(CCEMinorProposal.proposalType == "Summer Experience", CCEMinorProposal.id)],
198 None
199 )
200 )
201 ).alias("summerEngagementCount")
203 # this returns the count of distinct engagements that have a certification requirement id.
204 # this is important because our join clause specifically joins individualrequirements with the certifications that match to CCE
205 # while leaving the rest as null
206 cceEngagementCount = fn.COUNT(
207 fn.DISTINCT(
208 Case(
209 None,
210 [(CertificationRequirement.id.is_null(False), IndividualRequirement.id)],
211 None
212 )
213 )
214 ).alias("allEngagementCount")
216 q = (
217 User
218 .select(
219 User,
220 cceEngagementCount,
221 summerEngagementCount,
222 fn.IF(fn.COUNT(fn.DISTINCT(CCEMinorProposal.id)) > 0, True, False).alias("hasCCEMinorProposal"),
223 )
224 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username))
225 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(
226 (IndividualRequirement.requirement_id == CertificationRequirement.id) &
227 (CertificationRequirement.certification_id == Certification.CCE) # only cce minor certs are populated with non-null
228 ))
229 .switch(User)
230 .join(CCEMinorProposal, JOIN.LEFT_OUTER, on=(User.username == CCEMinorProposal.student))
231 .where(
232 (User.declaredMinor == True) &
233 (User.isStudent == True)
234 )
235 .group_by(User.username)
236 .order_by(SQL("allEngagementCount").desc())
237 )
239 result = []
240 for s in q:
241 engagementCount = int(s.allEngagementCount or 0)
242 result.append({
243 "firstName": s.firstName,
244 "lastName": s.lastName,
245 "username": s.username,
246 "B-Number": s.bnumber,
247 "email": s.email,
248 "hasGraduated": s.hasGraduated,
249 "engagementCount": engagementCount,
250 "hasCCEMinorProposal": bool(s.hasCCEMinorProposal),
251 "hasSummer": "Completed" if (s.summerEngagementCount and int(s.summerEngagementCount) > 0) else "Incomplete",
252 })
254 return result
256def getCourseInformation(id):
257 """
258 Given a course ID, return an object containing the course information and
259 its instructors full names.
260 """
261 # retrieve the course and the course instructors
262 course = model_to_dict(Course.get_by_id(id))
264 courseInstructors = (CourseInstructor.select(CourseInstructor, User)
265 .join(Course).switch()
266 .join(User)
267 .where(Course.id == id))
269 courseInformation = {"instructors": [(instructor.user.firstName + " " + instructor.user.lastName) for instructor in courseInstructors], "course": course}
271 return courseInformation
273def getProgramEngagementHistory(program_id, username, term_id):
274 """
275 Given a program_id, username, and term_id, return an object containing all events in the provided program
276 and in the given term along with the program name.
277 """
278 # execute a query that will retrieve all events in which the user has participated
279 # that fall under the provided term and programs.
280 eventsInProgramAndTerm = (Event.select(Event.id, Event.name, EventParticipant.hoursEarned)
281 .join(Program).switch()
282 .join(EventParticipant)
283 .where(EventParticipant.user == username,
284 Event.term == term_id,
285 Event.isService == True,
286 Program.id == program_id)
287 )
289 program = Program.get_by_id(program_id)
291 # calculate total amount of hours for the whole program that term
292 totalHours = 0
293 for event in eventsInProgramAndTerm:
294 if event.eventparticipant.hoursEarned:
295 totalHours += event.eventparticipant.hoursEarned
297 participatedEvents = {"program":program.programName, "events": [event for event in eventsInProgramAndTerm.dicts()], "totalHours": totalHours}
299 return participatedEvents
301def setCommunityEngagementForUser(action, engagementData, currentUser):
302 """
303 Either add or remove an IndividualRequirement record for a student's Sustained Community Engagement
305 :param action: The behavior of the function. Can be 'add' or 'remove'
306 :param engagementData:
307 type: program or course
308 id: program or course id
309 username: the username of the student that is having a community engagement added or removed
310 term: The term the engagement is recorded in
311 :param currentuser: The user who is performing the add/remove action
313 :raises DoesNotExist: if there are no available CertificationRequirement slots remaining for the engagement
314 """
315 if engagementData['type'] not in ['program','course']:
316 raise Exception("Invalid engagement type!")
318 requirement = (CertificationRequirement.select()
319 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(
320 (IndividualRequirement.requirement == CertificationRequirement.id) &
321 (IndividualRequirement.username == engagementData['username'])))
322 .where(IndividualRequirement.username.is_null(True),
323 CertificationRequirement.certification == Certification.CCE,
324 CertificationRequirement.name.not_in(['Summer Program'])))
325 if action == 'add':
326 try:
327 IndividualRequirement.create(**{engagementData['type']: engagementData['id'],
328 "username": engagementData['username'],
329 "term": engagementData['term'],
330 "requirement": requirement.get(),
331 "addedBy": currentUser,
332 })
334 # Thrown if there are no available engagement requirements left. Handled elsewhere.
335 except DoesNotExist as e:
336 raise e
338 elif action == 'remove':
339 IndividualRequirement.delete().where(
340 getattr(IndividualRequirement, engagementData['type']) == engagementData['id'],
341 IndividualRequirement.username == engagementData['username'],
342 IndividualRequirement.term == engagementData['term']
343 ).execute()
345 else:
346 raise Exception(f"Invalid action '{action}' sent to setCommunityEngagementForUser")
348def getCommunityEngagementByTerm(username):
349 """
350 Given a username, return all of their community engagements (service learning courses and event participations.)
351 """
352 courseMatchCase = Case(None, [(IndividualRequirement.course.is_null(True) , 0)], 1)
354 courses = (Course.select(Course, courseMatchCase.alias("matchedReq"))
355 .join(CourseParticipant, on=(Course.id == CourseParticipant.course))
356 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(
357 (IndividualRequirement.course == Course.id) &
358 (IndividualRequirement.username == CourseParticipant.user) &
359 (IndividualRequirement.term == Course.term)))
360 .where(CourseParticipant.user == username)
361 .group_by(Course.courseName, Course.term))
363 # initialize default dict to store term descriptions as keys mapping to each
364 # engagement's respective type, name, id, and term.
365 communityEngagementByTermDict = defaultdict(list)
366 for course in courses:
367 communityEngagementByTermDict[(course.term.description, course.term.id)].append(
368 {"name":course.courseName,
369 "id":course.id,
370 "type":"course",
371 "matched": course.matchedReq,
372 "term":course.term.id})
374 programMatchCase = Case(None, [(IndividualRequirement.program.is_null(True) , 0)], 1)
376 events = (Event.select(Event, Program, programMatchCase.alias('matchedReq'))
377 .join(EventParticipant, on=(Event.id == EventParticipant.event)).switch()
378 .join(Program)
379 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.program == Program.id) &
380 (IndividualRequirement.username == EventParticipant.user) &
381 (IndividualRequirement.term == Event.term)))
382 .where(EventParticipant.user == username, Event.isService == True)
383 .group_by(Event.program, Event.term))
385 for event in events:
386 communityEngagementByTermDict[(event.term.description, event.term.id)].append({"name":event.program.programName,
387 "id":event.program.id,
388 "type":"program",
389 "matched": event.matchedReq,
390 "term":event.term.id
391 })
393 # sorting the communityEngagementByTermDict by the term id
394 return dict(sorted(communityEngagementByTermDict.items(), key=lambda engagement: engagement[0][1]))
396def createOtherEngagementRequest(username, formData):
397 """
398 Create a CCEMinorProposal entry based off of the form data
399 """
400 user = User.get(User.username == username)
402 cceObject = CCEMinorProposal.create(proposalType = 'Other Engagement',
403 createdBy = g.current_user,
404 status = 'Pending',
405 student = user,
406 **formData
407 )
409 return cceObject
411def saveSummerExperience(username, summerExperience, currentUser):
412 """
413 :param username: username of the student that the summer experience is for
414 :param summerExperience: dict
415 summerExperience: string of what the summer experience was (will be written as the 'description' in the IndividualRequirement table)
416 selectedSummerTerm: the term description that the summer experience took place in
417 :param currentUser: the username of the user who added the summer experience record
419 Delete any existing IndividualRequirement entry for 'username' if it is for 'Summer Program' and create a new IndividualRequirement entry for
420 'Summer Program' with the contents of summerExperience.
421 """
422 requirementDeleteSubSelect = CertificationRequirement.select().where(CertificationRequirement.certification == Certification.CCE, CertificationRequirement.name << ['Summer Program'])
423 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.requirement == requirementDeleteSubSelect).execute()
425 requirement = (CertificationRequirement.select()
426 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.requirement == CertificationRequirement.id) &
427 (IndividualRequirement.username == username)))
428 .where(IndividualRequirement.username.is_null(True),
429 CertificationRequirement.certification == Certification.CCE,
430 CertificationRequirement.name << ['Summer Program']))
432 summerTerm = (Term.select().where(Term.description == summerExperience['selectedSummerTerm']))
434 IndividualRequirement.create(**{"description": summerExperience['summerExperience'],
435 "username": username,
436 "term": summerTerm.get(),
437 "requirement": requirement.get(),
438 "addedBy": currentUser,
439 })
441 return ""
443def getSummerExperience(username):
444 """
445 Get a students summer experience to populate text box if the student has one
446 """
447 summerExperience = (IndividualRequirement.select()
448 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(CertificationRequirement.id == IndividualRequirement.requirement)).switch()
449 .join(Term, on=(IndividualRequirement.term == Term.id))
450 .where(IndividualRequirement.username == username,
451 CertificationRequirement.certification == Certification.CCE,
452 CertificationRequirement.name << ['Summer Program']))
453 if len(list(summerExperience)) == 1:
454 return (summerExperience.get().term.description, summerExperience.get().description)
456 return (None, None)
458def removeSummerExperience(username):
459 """
460 Delete IndividualRequirement table entry for 'username'
461 """
462 term, summerExperienceToDelete = getSummerExperience(username)
463 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.description == summerExperienceToDelete).execute()
465def removeProposal(proposalID) -> None:
466 """
467 Delete summer experience or other engagement objects from the CCEMinorProposal table.
468 File objects attached to the CCEMinorProposal object are also deleted.
469 """
470 proposalID = int(proposalID)
472 proposalAttachment = AttachmentUpload.get_or_none(proposal=proposalID)
473 if proposalAttachment:
474 proposalFileHandler = FileHandler(proposalId=proposalID)
475 proposalFileHandler.deleteFile(proposalAttachment.id)
477 CCEMinorProposal.delete().where(CCEMinorProposal.id == proposalID).execute()