Coverage for app/logic/minor.py: 66%
185 statements
« prev ^ index » next coverage.py v7.10.2, created at 2026-03-26 20:07 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2026-03-26 20:07 +0000
1from collections import defaultdict
2from typing import List, Dict
3from flask import flash, g
4from playhouse.shortcuts import model_to_dict
5from peewee import JOIN, fn, Case, DoesNotExist, SQL
6import xlsxwriter
8from app.models import app
9from app.models.user import User
10from app.models.term import Term
11from app.models.event import Event
12from app.models.course import Course
13from app.models.program import Program
14from app.models.certification import Certification
15from app.models.courseInstructor import CourseInstructor
16from app.models.eventParticipant import EventParticipant
17from app.models.courseParticipant import CourseParticipant
18from app.models.individualRequirement import IndividualRequirement
19from app.models.certificationRequirement import CertificationRequirement
20from app.models.cceMinorProposal import CCEMinorProposal
21from app.logic.fileHandler import FileHandler
22from app.logic.utils import getFilesFromRequest
23from app.models.attachmentUpload import AttachmentUpload
26def createSummerExperience(username, formData):
27 """
28 Given the username of the student and the formData which includes all of
29 the SummerExperience information, create a new SummerExperience object.
30 """
31 user = User.get(User.username == username)
32 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas
33 formData = dict(formData)
34 formData.pop("contentArea")
35 return CCEMinorProposal.create(
36 student=user,
37 proposalType = 'Summer Experience',
38 contentAreas = contentAreas,
39 createdBy = g.current_user,
40 **formData,
41 )
43def updateSummerExperience(proposalID, formData):
44 """
45 Given the username of the student and the formData which includes all of
46 the SummerExperience information, create a new SummerExperience object.
47 """
48 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas
49 formData = dict(formData)
50 formData.pop("contentArea")
51 formData.pop("experienceHoursOver300")
52 CCEMinorProposal.update(contentAreas=contentAreas, **formData).where(CCEMinorProposal.id == proposalID).execute()
54def getCCEMinorProposals(username):
55 return list(CCEMinorProposal.select().where(CCEMinorProposal.student==username))
57def getEngagementTotal(engagementData):
58 """
59 Count the number of engagements (from all terms) that have matched with a requirement
60 """
62 # map the flattened list of engagements to their matched values, and sum them
63 return sum(map(lambda e: e['matched'], sum(engagementData.values(),[])))
66def getMinorInterest() -> List[Dict]:
67 """
68 Get all students that have indicated interest in the CCE minor and return a list of dicts of all interested students
69 """
70 interestedStudents = (User.select(User)
71 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username))
72 .where(User.isStudent & User.minorInterest & ~User.declaredMinor & IndividualRequirement.username.is_null(True)))
74 interestedStudentList = [model_to_dict(student) for student in interestedStudents]
76 return interestedStudentList
78def getMinorProgress():
79 """
80 Get all the users who have an IndividualRequirement record under the CCE certification which
81 and returns a list of dicts containing the student, how many engagements they have completed,
82 and if they have completed the summer experience.
83 """
84 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0)
86 engagedStudentsWithCount = (
87 User.select(User, fn.COUNT(IndividualRequirement.id).alias('engagementCount'),
88 fn.SUM(summerCase).alias('hasSummer'),
89 fn.IF(fn.COUNT(CCEMinorProposal.id) > 0, True, False).alias('hasCCEMinorProposal'))
90 .join(IndividualRequirement, on=(User.username == IndividualRequirement.username))
91 .join(CertificationRequirement, on=(IndividualRequirement.requirement_id == CertificationRequirement.id))
92 .switch(User).join(CCEMinorProposal, JOIN.LEFT_OUTER, on= (User.username == CCEMinorProposal.student))
93 .where(
94 (CertificationRequirement.certification_id == Certification.CCE) &
95 (User.declaredMinor == True)
96 )
97 .group_by(User.firstName, User.lastName, User.username)
98 .order_by(SQL("engagementCount").desc())
99 )
100 engagedStudentsList = [{'firstName': student.firstName,
101 'lastName': student.lastName,
102 'username': student.username,
103 'B-Number': student.bnumber,
104 'hasGraduated': student.hasGraduated,
105 'engagementCount': student.engagementCount - student.hasSummer,
106 'hasCCEMinorProposal': student.hasCCEMinorProposal,
107 'hasSummer': "Completed" if student.hasSummer else "Incomplete"} for student in engagedStudentsWithCount]
108 return engagedStudentsList
110def getMinorSpreadsheet():
111 """
112 Returns a spreadsheet containing users and related spreadsheet information.
113 """
114 # If we're in 2025, can we get the minor information for 2023?
115 studentProgress = getMinorProgress()
116 columnNames = studentProgress[0]
117 columnNames = ["First Name", "Last Name", "Username", "B-Number", "Number of Engagements", "Completed Summer Experience"]
119 filepath = f"{app.config['files']['base_path']}/minor_data.xlsx"
120 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True})
122 worksheet = workbook.add_worksheet('minor_information')
123 format_row = workbook.add_format({'align': 'left'})
125 columnIndex = 1
126 worksheet.set_column(columnIndex, len(columnNames), 30, workbook.add_format({'bold': True}))
127 for columnName in columnNames:
128 worksheet.write(1, columnIndex, columnName)
129 columnIndex += 1
131 for rowNumber, student in enumerate(studentProgress, 2):
132 if student['hasGraduated']: continue
133 student.pop('hasCCEMinorProposal')
134 student.pop('hasGraduated')
135 student['hasSummer'] = "Yes" if student['hasSummer'] == "Complete" else "No"
136 worksheet.set_row(rowNumber, None, format_row)
137 if student['B-Number'] == None: student["B-Number"] = "No B-Number Found"
138 for columnNumber, key in enumerate(student, 1):
139 worksheet.write(rowNumber, columnNumber, student[key])
142 workbook.close()
144 return filepath
147def toggleMinorInterest(username, isAdding):
148 """
149 Given a username, update their minor interest and minor status.
150 """
152 try:
153 user = User.get(username=username)
154 if not user:
155 return {"error": "User not found"}, 404
157 user.minorInterest = isAdding
158 user.declaredMinor = False
159 user.save()
161 except Exception as e:
162 print(f"Error updating minor interest: {e}")
163 return {"error": str(e)}, 500
165def declareMinorInterest(username):
166 """
167 Given a username, update their minor declaration
168 """
169 user = User.get_by_id(username)
171 if not user:
172 raise ValueError(f"User with username '{username}' not found.")
174 user.declaredMinor = not user.declaredMinor
176 try:
177 user.save()
178 except Exception as e:
179 raise RuntimeError(f"Failed to declare interested student: {e}")
181def getDeclaredMinorStudents():
182 """
183 This function retrieves a list of students who have declared the CCE minor along with their engagement progress.
184 It returns a list of dictionaries containing student information and their engagement details and adds students who have no requirements but have declared the minor with 0 engagements.
185 """
186 summerEngagementCount = fn.COUNT(
187 fn.DISTINCT(
188 Case(
189 None,
190 [(CCEMinorProposal.proposalType == "Summer Experience", CCEMinorProposal.id)],
191 None
192 )
193 )
194 ).alias("summerEngagementCount")
196 # this returns the count of distinct engagements that have a certification requirement id.
197 # this is important because our join clause specifically joins individualrequirements with the certifications that match to CCE
198 # while leaving the rest as null
199 cceEngagementCount = fn.COUNT(
200 fn.DISTINCT(
201 Case(
202 None,
203 [(CertificationRequirement.id.is_null(False), IndividualRequirement.id)],
204 None
205 )
206 )
207 ).alias("allEngagementCount")
209 q = (
210 User
211 .select(
212 User,
213 cceEngagementCount,
214 summerEngagementCount,
215 fn.IF(fn.COUNT(fn.DISTINCT(CCEMinorProposal.id)) > 0, True, False).alias("hasCCEMinorProposal"),
216 )
217 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username))
218 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(
219 (IndividualRequirement.requirement_id == CertificationRequirement.id) &
220 (CertificationRequirement.certification_id == Certification.CCE) # only cce minor certs are populated with non-null
221 ))
222 .switch(User)
223 .join(CCEMinorProposal, JOIN.LEFT_OUTER, on=(User.username == CCEMinorProposal.student))
224 .where(
225 (User.declaredMinor == True) &
226 (User.isStudent == True)
227 )
228 .group_by(User.username)
229 .order_by(SQL("allEngagementCount").desc())
230 )
232 result = []
233 for s in q:
234 engagementCount = int(s.allEngagementCount or 0)
235 result.append({
236 "firstName": s.firstName,
237 "lastName": s.lastName,
238 "username": s.username,
239 "B-Number": s.bnumber,
240 "email": s.email,
241 "hasGraduated": s.hasGraduated,
242 "engagementCount": engagementCount,
243 "hasCCEMinorProposal": bool(s.hasCCEMinorProposal),
244 "hasSummer": "Completed" if (s.summerEngagementCount and int(s.summerEngagementCount) > 0) else "Incomplete",
245 })
247 return result
249def getCourseInformation(id):
250 """
251 Given a course ID, return an object containing the course information and
252 its instructors full names.
253 """
254 # retrieve the course and the course instructors
255 course = model_to_dict(Course.get_by_id(id))
257 courseInstructors = (CourseInstructor.select(CourseInstructor, User)
258 .join(Course).switch()
259 .join(User)
260 .where(Course.id == id))
262 courseInformation = {"instructors": [(instructor.user.firstName + " " + instructor.user.lastName) for instructor in courseInstructors], "course": course}
264 return courseInformation
266def getProgramEngagementHistory(program_id, username, term_id):
267 """
268 Given a program_id, username, and term_id, return an object containing all events in the provided program
269 and in the given term along with the program name.
270 """
271 # execute a query that will retrieve all events in which the user has participated
272 # that fall under the provided term and programs.
273 eventsInProgramAndTerm = (Event.select(Event.id, Event.name, EventParticipant.hoursEarned)
274 .join(Program).switch()
275 .join(EventParticipant)
276 .where(EventParticipant.user == username,
277 Event.term == term_id,
278 Event.isService == True,
279 Program.id == program_id)
280 )
282 program = Program.get_by_id(program_id)
284 # calculate total amount of hours for the whole program that term
285 totalHours = 0
286 for event in eventsInProgramAndTerm:
287 if event.eventparticipant.hoursEarned:
288 totalHours += event.eventparticipant.hoursEarned
290 participatedEvents = {"program":program.programName, "events": [event for event in eventsInProgramAndTerm.dicts()], "totalHours": totalHours}
292 return participatedEvents
294def setCommunityEngagementForUser(action, engagementData, currentUser):
295 """
296 Either add or remove an IndividualRequirement record for a student's Sustained Community Engagement
298 :param action: The behavior of the function. Can be 'add' or 'remove'
299 :param engagementData:
300 type: program or course
301 id: program or course id
302 username: the username of the student that is having a community engagement added or removed
303 term: The term the engagement is recorded in
304 :param currentuser: The user who is performing the add/remove action
306 :raises DoesNotExist: if there are no available CertificationRequirement slots remaining for the engagement
307 """
308 if engagementData['type'] not in ['program','course']:
309 raise Exception("Invalid engagement type!")
311 requirement = (CertificationRequirement.select()
312 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(
313 (IndividualRequirement.requirement == CertificationRequirement.id) &
314 (IndividualRequirement.username == engagementData['username'])))
315 .where(IndividualRequirement.username.is_null(True),
316 CertificationRequirement.certification == Certification.CCE,
317 CertificationRequirement.name.not_in(['Summer Program'])))
318 if action == 'add':
319 try:
320 IndividualRequirement.create(**{engagementData['type']: engagementData['id'],
321 "username": engagementData['username'],
322 "term": engagementData['term'],
323 "requirement": requirement.get(),
324 "addedBy": currentUser,
325 })
327 # Thrown if there are no available engagement requirements left. Handled elsewhere.
328 except DoesNotExist as e:
329 raise e
331 elif action == 'remove':
332 IndividualRequirement.delete().where(
333 getattr(IndividualRequirement, engagementData['type']) == engagementData['id'],
334 IndividualRequirement.username == engagementData['username'],
335 IndividualRequirement.term == engagementData['term']
336 ).execute()
338 else:
339 raise Exception(f"Invalid action '{action}' sent to setCommunityEngagementForUser")
341def getCommunityEngagementByTerm(username):
342 """
343 Given a username, return all of their community engagements (service learning courses and event participations.)
344 """
345 courseMatchCase = Case(None, [(IndividualRequirement.course.is_null(True) , 0)], 1)
347 courses = (Course.select(Course, courseMatchCase.alias("matchedReq"))
348 .join(CourseParticipant, on=(Course.id == CourseParticipant.course))
349 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(
350 (IndividualRequirement.course == Course.id) &
351 (IndividualRequirement.username == CourseParticipant.user) &
352 (IndividualRequirement.term == Course.term)))
353 .where(CourseParticipant.user == username)
354 .group_by(Course.courseName, Course.term))
356 # initialize default dict to store term descriptions as keys mapping to each
357 # engagement's respective type, name, id, and term.
358 communityEngagementByTermDict = defaultdict(list)
359 for course in courses:
360 communityEngagementByTermDict[(course.term.description, course.term.id)].append(
361 {"name":course.courseName,
362 "id":course.id,
363 "type":"course",
364 "matched": course.matchedReq,
365 "term":course.term.id})
367 programMatchCase = Case(None, [(IndividualRequirement.program.is_null(True) , 0)], 1)
369 events = (Event.select(Event, Program, programMatchCase.alias('matchedReq'))
370 .join(EventParticipant, on=(Event.id == EventParticipant.event)).switch()
371 .join(Program)
372 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.program == Program.id) &
373 (IndividualRequirement.username == EventParticipant.user) &
374 (IndividualRequirement.term == Event.term)))
375 .where(EventParticipant.user == username, Event.isService == True)
376 .group_by(Event.program, Event.term))
378 for event in events:
379 communityEngagementByTermDict[(event.term.description, event.term.id)].append({"name":event.program.programName,
380 "id":event.program.id,
381 "type":"program",
382 "matched": event.matchedReq,
383 "term":event.term.id
384 })
386 # sorting the communityEngagementByTermDict by the term id
387 return dict(sorted(communityEngagementByTermDict.items(), key=lambda engagement: engagement[0][1]))
389def createOtherEngagement(username, request):
390 """
391 Create a CCEMinorProposal entry based off of the form data
392 """
393 user = User.get(User.username == username)
395 createdProposal = CCEMinorProposal.create(proposalType = 'Other Engagement',
396 createdBy = g.current_user,
397 student = user,
398 **request.form
399 )
400 attachment = request.files.get("attachmentObject")
401 if attachment:
402 addFile = FileHandler(getFilesFromRequest(request), proposalId=createdProposal.id)
403 addFile.saveFiles()
405def updateOtherEngagementRequest(proposalID, request):
406 """
407 Update an existing CCEMinorProposal entry based off of the form data
408 """
409 newAttachment = request.files.get("attachmentObject")
410 previousAttachment = AttachmentUpload.get_or_none(proposal=proposalID)
411 proposalObject = CCEMinorProposal.get_by_id(proposalID)
412 deleteAttachment = request.form.get("deleteAttachment") == "true"
414 if deleteAttachment:
415 if not previousAttachment:
416 raise AssertionError("deleteAttachment flag is set but no attachment exists in the database.")
417 FileHandler(proposalId=proposalID).deleteFile(previousAttachment.id)
418 elif newAttachment and newAttachment.filename:
419 if previousAttachment:
420 FileHandler(proposalId=proposalID).deleteFile(previousAttachment.id)
421 addFile = FileHandler(getFilesFromRequest(request), proposalId=proposalID)
422 addFile.saveFiles(parentEvent=proposalObject)
424 formData = request.form.copy()
425 formData.pop("deleteAttachment", None)
426 CCEMinorProposal.update(**formData).where(CCEMinorProposal.id == proposalID).execute()
428def saveSummerExperience(username, summerExperience, currentUser):
429 """
430 :param username: username of the student that the summer experience is for
431 :param summerExperience: dict
432 summerExperience: string of what the summer experience was (will be written as the 'description' in the IndividualRequirement table)
433 selectedSummerTerm: the term description that the summer experience took place in
434 :param currentUser: the username of the user who added the summer experience record
436 Delete any existing IndividualRequirement entry for 'username' if it is for 'Summer Program' and create a new IndividualRequirement entry for
437 'Summer Program' with the contents of summerExperience.
438 """
439 requirementDeleteSubSelect = CertificationRequirement.select().where(CertificationRequirement.certification == Certification.CCE, CertificationRequirement.name << ['Summer Program'])
440 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.requirement == requirementDeleteSubSelect).execute()
442 requirement = (CertificationRequirement.select()
443 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.requirement == CertificationRequirement.id) &
444 (IndividualRequirement.username == username)))
445 .where(IndividualRequirement.username.is_null(True),
446 CertificationRequirement.certification == Certification.CCE,
447 CertificationRequirement.name << ['Summer Program']))
449 summerTerm = (Term.select().where(Term.description == summerExperience['selectedSummerTerm']))
451 IndividualRequirement.create(**{"description": summerExperience['summerExperience'],
452 "username": username,
453 "term": summerTerm.get(),
454 "requirement": requirement.get(),
455 "addedBy": currentUser,
456 })
458 return ""
460def getSummerExperience(username):
461 """
462 Get a students summer experience to populate text box if the student has one
463 """
464 summerExperience = (IndividualRequirement.select()
465 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(CertificationRequirement.id == IndividualRequirement.requirement)).switch()
466 .join(Term, on=(IndividualRequirement.term == Term.id))
467 .where(IndividualRequirement.username == username,
468 CertificationRequirement.certification == Certification.CCE,
469 CertificationRequirement.name << ['Summer Program']))
470 if len(list(summerExperience)) == 1:
471 return (summerExperience.get().term.description, summerExperience.get().description)
473 return (None, None)
475def removeSummerExperience(username):
476 """
477 Delete IndividualRequirement table entry for 'username'
478 """
479 term, summerExperienceToDelete = getSummerExperience(username)
480 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.description == summerExperienceToDelete).execute()
482def removeProposal(proposalID) -> None:
483 """
484 Delete summer experience or other engagement objects from the CCEMinorProposal table.
485 File objects attached to the CCEMinorProposal object are also deleted.
486 """
487 proposalID = int(proposalID)
489 proposalAttachment = AttachmentUpload.get_or_none(proposal=proposalID)
490 if proposalAttachment:
491 proposalFileHandler = FileHandler(proposalId=proposalID)
492 proposalFileHandler.deleteFile(proposalAttachment.id)
494 CCEMinorProposal.delete().where(CCEMinorProposal.id == proposalID).execute()
496def changeProposalStatus(proposalID, newStatus) -> None:
497 """
498 Changes the status of a proposal.
499 """
500 CCEMinorProposal.update(status=newStatus).where(CCEMinorProposal.id == int(proposalID)).execute()