Coverage for app/logic/minor.py: 66%
174 statements
« prev ^ index » next coverage.py v7.10.2, created at 2026-03-08 07:10 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2026-03-08 07:10 +0000
1from collections import defaultdict
2from typing import List, Dict
3from flask import flash, g
4from playhouse.shortcuts import model_to_dict
5from peewee import JOIN, fn, Case, DoesNotExist, SQL
6import xlsxwriter
8from app.models import app
9from app.models.user import User
10from app.models.term import Term
11from app.models.event import Event
12from app.models.course import Course
13from app.models.program import Program
14from app.models.certification import Certification
15from app.models.courseInstructor import CourseInstructor
16from app.models.eventParticipant import EventParticipant
17from app.models.courseParticipant import CourseParticipant
18from app.models.individualRequirement import IndividualRequirement
19from app.models.certificationRequirement import CertificationRequirement
20from app.models.cceMinorProposal import CCEMinorProposal
21from app.logic.fileHandler import FileHandler
22from app.logic.utils import getFilesFromRequest
23from app.models.attachmentUpload import AttachmentUpload
26def createSummerExperience(username, formData):
27 """
28 Given the username of the student and the formData which includes all of
29 the SummerExperience information, create a new SummerExperience object.
30 """
31 user = User.get(User.username == username)
32 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas
33 formData = dict(formData)
34 formData.pop("contentArea")
35 return CCEMinorProposal.create(
36 student=user,
37 proposalType = 'Summer Experience',
38 contentAreas = contentAreas,
39 createdBy = g.current_user,
40 **formData,
41 )
43def updateSummerExperience(proposalID, formData):
44 """
45 Given the username of the student and the formData which includes all of
46 the SummerExperience information, create a new SummerExperience object.
47 """
48 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas
49 formData = dict(formData)
50 formData.pop("contentArea")
51 formData.pop("experienceHoursOver300")
52 CCEMinorProposal.update(contentAreas=contentAreas, **formData).where(CCEMinorProposal.id == proposalID).execute()
54def getCCEMinorProposals(username):
55 return list(CCEMinorProposal.select().where(CCEMinorProposal.student==username))
57def getEngagementTotal(engagementData):
58 """
59 Count the number of engagements (from all terms) that have matched with a requirement
60 """
62 # map the flattened list of engagements to their matched values, and sum them
63 return sum(map(lambda e: e['matched'], sum(engagementData.values(),[])))
66def getMinorInterest() -> List[Dict]:
67 """
68 Get all students that have indicated interest in the CCE minor and return a list of dicts of all interested students
69 """
70 interestedStudents = (User.select(User)
71 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username))
72 .where(User.isStudent & User.minorInterest & ~User.declaredMinor & IndividualRequirement.username.is_null(True)))
74 interestedStudentList = [model_to_dict(student) for student in interestedStudents]
76 return interestedStudentList
78def getMinorProgress():
79 """
80 Get all the users who have an IndividualRequirement record under the CCE certification which
81 and returns a list of dicts containing the student, how many engagements they have completed,
82 and if they have completed the summer experience.
83 """
84 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0)
86 engagedStudentsWithCount = (
87 User.select(User, fn.COUNT(IndividualRequirement.id).alias('engagementCount'),
88 fn.SUM(summerCase).alias('hasSummer'),
89 fn.IF(fn.COUNT(CCEMinorProposal.id) > 0, True, False).alias('hasCCEMinorProposal'))
90 .join(IndividualRequirement, on=(User.username == IndividualRequirement.username))
91 .join(CertificationRequirement, on=(IndividualRequirement.requirement_id == CertificationRequirement.id))
92 .switch(User).join(CCEMinorProposal, JOIN.LEFT_OUTER, on= (User.username == CCEMinorProposal.student))
93 .where(CertificationRequirement.certification_id == Certification.CCE)
94 .group_by(User.firstName, User.lastName, User.username)
95 .order_by(SQL("engagementCount").desc())
96 )
97 engagedStudentsList = [{'firstName': student.firstName,
98 'lastName': student.lastName,
99 'username': student.username,
100 'B-Number': student.bnumber,
101 'hasGraduated': student.hasGraduated,
102 'engagementCount': student.engagementCount - student.hasSummer,
103 'hasCCEMinorProposal': student.hasCCEMinorProposal,
104 'hasSummer': "Completed" if student.hasSummer else "Incomplete"} for student in engagedStudentsWithCount]
105 return engagedStudentsList
107def getMinorSpreadsheet():
108 """
109 Returns a spreadsheet containing users and related spreadsheet information.
110 """
111 # If we're in 2025, can we get the minor information for 2023?
112 studentProgress = getMinorProgress()
113 columnNames = studentProgress[0]
114 columnNames = ["First Name", "Last Name", "Username", "B-Number", "Number of Engagements", "Completed Summer Experience"]
116 filepath = f"{app.config['files']['base_path']}/minor_data.xlsx"
117 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True})
119 worksheet = workbook.add_worksheet('minor_information')
120 format_row = workbook.add_format({'align': 'left'})
122 columnIndex = 1
123 worksheet.set_column(columnIndex, len(columnNames), 30, workbook.add_format({'bold': True}))
124 for columnName in columnNames:
125 worksheet.write(1, columnIndex, columnName)
126 columnIndex += 1
128 for rowNumber, student in enumerate(studentProgress, 2):
129 if student['hasGraduated']: continue
130 student.pop('hasCCEMinorProposal')
131 student.pop('hasGraduated')
132 student['hasSummer'] = "Yes" if student['hasSummer'] == "Complete" else "No"
133 worksheet.set_row(rowNumber, None, format_row)
134 if student['B-Number'] == None: student["B-Number"] = "No B-Number Found"
135 for columnNumber, key in enumerate(student, 1):
136 worksheet.write(rowNumber, columnNumber, student[key])
139 workbook.close()
141 return filepath
144def toggleMinorInterest(username, isAdding):
145 """
146 Given a username, update their minor interest and minor status.
147 """
149 try:
150 user = User.get(username=username)
151 if not user:
152 return {"error": "User not found"}, 404
154 user.minorInterest = isAdding
155 user.declaredMinor = False
156 user.save()
158 except Exception as e:
159 print(f"Error updating minor interest: {e}")
160 return {"error": str(e)}, 500
162def declareMinorInterest(username):
163 """
164 Given a username, update their minor declaration
165 """
166 user = User.get_by_id(username)
168 if not user:
169 raise ValueError(f"User with username '{username}' not found.")
171 user.declaredMinor = not user.declaredMinor
173 try:
174 user.save()
175 except Exception as e:
176 raise RuntimeError(f"Failed to declare interested student: {e}")
178def getDeclaredMinorStudents():
179 """
180 Get a list of the students who have declared minor
181 """
182 declaredStudents = User.select().where(User.isStudent & User.declaredMinor)
184 interestedStudentList = [model_to_dict(student) for student in declaredStudents]
186 return interestedStudentList
188def getCourseInformation(id):
189 """
190 Given a course ID, return an object containing the course information and
191 its instructors full names.
192 """
193 # retrieve the course and the course instructors
194 course = model_to_dict(Course.get_by_id(id))
196 courseInstructors = (CourseInstructor.select(CourseInstructor, User)
197 .join(Course).switch()
198 .join(User)
199 .where(Course.id == id))
201 courseInformation = {"instructors": [(instructor.user.firstName + " " + instructor.user.lastName) for instructor in courseInstructors], "course": course}
203 return courseInformation
205def getProgramEngagementHistory(program_id, username, term_id):
206 """
207 Given a program_id, username, and term_id, return an object containing all events in the provided program
208 and in the given term along with the program name.
209 """
210 # execute a query that will retrieve all events in which the user has participated
211 # that fall under the provided term and programs.
212 eventsInProgramAndTerm = (Event.select(Event.id, Event.name, EventParticipant.hoursEarned)
213 .join(Program).switch()
214 .join(EventParticipant)
215 .where(EventParticipant.user == username,
216 Event.term == term_id,
217 Event.isService == True,
218 Program.id == program_id)
219 )
221 program = Program.get_by_id(program_id)
223 # calculate total amount of hours for the whole program that term
224 totalHours = 0
225 for event in eventsInProgramAndTerm:
226 if event.eventparticipant.hoursEarned:
227 totalHours += event.eventparticipant.hoursEarned
229 participatedEvents = {"program":program.programName, "events": [event for event in eventsInProgramAndTerm.dicts()], "totalHours": totalHours}
231 return participatedEvents
233def setCommunityEngagementForUser(action, engagementData, currentUser):
234 """
235 Either add or remove an IndividualRequirement record for a student's Sustained Community Engagement
237 :param action: The behavior of the function. Can be 'add' or 'remove'
238 :param engagementData:
239 type: program or course
240 id: program or course id
241 username: the username of the student that is having a community engagement added or removed
242 term: The term the engagement is recorded in
243 :param currentuser: The user who is performing the add/remove action
245 :raises DoesNotExist: if there are no available CertificationRequirement slots remaining for the engagement
246 """
247 if engagementData['type'] not in ['program','course']:
248 raise Exception("Invalid engagement type!")
250 requirement = (CertificationRequirement.select()
251 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(
252 (IndividualRequirement.requirement == CertificationRequirement.id) &
253 (IndividualRequirement.username == engagementData['username'])))
254 .where(IndividualRequirement.username.is_null(True),
255 CertificationRequirement.certification == Certification.CCE,
256 CertificationRequirement.name.not_in(['Summer Program'])))
257 if action == 'add':
258 try:
259 IndividualRequirement.create(**{engagementData['type']: engagementData['id'],
260 "username": engagementData['username'],
261 "term": engagementData['term'],
262 "requirement": requirement.get(),
263 "addedBy": currentUser,
264 })
265 # Thrown if there are no available engagement requirements left. Handled elsewhere.
266 except DoesNotExist as e:
267 raise e
269 elif action == 'remove':
270 IndividualRequirement.delete().where(
271 getattr(IndividualRequirement, engagementData['type']) == engagementData['id'],
272 IndividualRequirement.username == engagementData['username'],
273 IndividualRequirement.term == engagementData['term']
274 ).execute()
275 else:
276 raise Exception(f"Invalid action '{action}' sent to setCommunityEngagementForUser")
278def getCommunityEngagementByTerm(username):
279 """
280 Given a username, return all of their community engagements (service learning courses and event participations.)
281 """
282 courseMatchCase = Case(None, [(IndividualRequirement.course.is_null(True) , 0)], 1)
284 courses = (Course.select(Course, courseMatchCase.alias("matchedReq"))
285 .join(CourseParticipant, on=(Course.id == CourseParticipant.course))
286 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(
287 (IndividualRequirement.course == Course.id) &
288 (IndividualRequirement.username == CourseParticipant.user) &
289 (IndividualRequirement.term == Course.term)))
290 .where(CourseParticipant.user == username)
291 .group_by(Course.courseName, Course.term))
293 # initialize default dict to store term descriptions as keys mapping to each
294 # engagement's respective type, name, id, and term.
295 communityEngagementByTermDict = defaultdict(list)
296 for course in courses:
297 communityEngagementByTermDict[(course.term.description, course.term.id)].append(
298 {"name":course.courseName,
299 "id":course.id,
300 "type":"course",
301 "matched": course.matchedReq,
302 "term":course.term.id})
304 programMatchCase = Case(None, [(IndividualRequirement.program.is_null(True) , 0)], 1)
306 events = (Event.select(Event, Program, programMatchCase.alias('matchedReq'))
307 .join(EventParticipant, on=(Event.id == EventParticipant.event)).switch()
308 .join(Program)
309 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.program == Program.id) &
310 (IndividualRequirement.username == EventParticipant.user) &
311 (IndividualRequirement.term == Event.term)))
312 .where(EventParticipant.user == username, Event.isService == True)
313 .group_by(Event.program, Event.term))
315 for event in events:
316 communityEngagementByTermDict[(event.term.description, event.term.id)].append({"name":event.program.programName,
317 "id":event.program.id,
318 "type":"program",
319 "matched": event.matchedReq,
320 "term":event.term.id
321 })
323 # sorting the communityEngagementByTermDict by the term id
324 return dict(sorted(communityEngagementByTermDict.items(), key=lambda engagement: engagement[0][1]))
326def createOtherEngagement(username, request):
327 """
328 Create a CCEMinorProposal entry based off of the form data
329 """
330 user = User.get(User.username == username)
332 createdProposal = CCEMinorProposal.create(proposalType = 'Other Engagement',
333 createdBy = g.current_user,
334 student = user,
335 **request.form
336 )
337 proposalObject = CCEMinorProposal.get_by_id(createdProposal)
338 attachment = request.files.get("attachmentObject")
339 if attachment:
340 addFile = FileHandler(getFilesFromRequest(request), proposalId=createdProposal.id)
341 addFile.saveFiles(parentEvent=proposalObject)
343def updateOtherEngagementRequest(proposalID, request):
344 """
345 Update an existing CCEMinorProposal entry based off of the form data
346 """
347 newAttachment = request.files.get("attachmentObject")
348 previousAttachment = AttachmentUpload.get_or_none(proposal=proposalID)
349 proposalObject = CCEMinorProposal.get_by_id(proposalID)
351 if newAttachment:
352 if previousAttachment:
353 FileHandler(proposalId=proposalID).deleteFile(previousAttachment.id)
354 addFile = FileHandler(getFilesFromRequest(request), proposalId=proposalID)
355 addFile.saveFiles(parentEvent=proposalObject)
357 CCEMinorProposal.update(**request.form).where(CCEMinorProposal.id == proposalID).execute()
359def saveSummerExperience(username, summerExperience, currentUser):
360 """
361 :param username: username of the student that the summer experience is for
362 :param summerExperience: dict
363 summerExperience: string of what the summer experience was (will be written as the 'description' in the IndividualRequirement table)
364 selectedSummerTerm: the term description that the summer experience took place in
365 :param currentUser: the username of the user who added the summer experience record
367 Delete any existing IndividualRequirement entry for 'username' if it is for 'Summer Program' and create a new IndividualRequirement entry for
368 'Summer Program' with the contents of summerExperience.
369 """
370 requirementDeleteSubSelect = CertificationRequirement.select().where(CertificationRequirement.certification == Certification.CCE, CertificationRequirement.name << ['Summer Program'])
371 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.requirement == requirementDeleteSubSelect).execute()
373 requirement = (CertificationRequirement.select()
374 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.requirement == CertificationRequirement.id) &
375 (IndividualRequirement.username == username)))
376 .where(IndividualRequirement.username.is_null(True),
377 CertificationRequirement.certification == Certification.CCE,
378 CertificationRequirement.name << ['Summer Program']))
380 summerTerm = (Term.select().where(Term.description == summerExperience['selectedSummerTerm']))
382 IndividualRequirement.create(**{"description": summerExperience['summerExperience'],
383 "username": username,
384 "term": summerTerm.get(),
385 "requirement": requirement.get(),
386 "addedBy": currentUser,
387 })
388 return ""
390def getSummerExperience(username):
391 """
392 Get a students summer experience to populate text box if the student has one
393 """
394 summerExperience = (IndividualRequirement.select()
395 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(CertificationRequirement.id == IndividualRequirement.requirement)).switch()
396 .join(Term, on=(IndividualRequirement.term == Term.id))
397 .where(IndividualRequirement.username == username,
398 CertificationRequirement.certification == Certification.CCE,
399 CertificationRequirement.name << ['Summer Program']))
400 if len(list(summerExperience)) == 1:
401 return (summerExperience.get().term.description, summerExperience.get().description)
403 return (None, None)
405def removeSummerExperience(username):
406 """
407 Delete IndividualRequirement table entry for 'username'
408 """
409 term, summerExperienceToDelete = getSummerExperience(username)
410 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.description == summerExperienceToDelete).execute()
412def removeProposal(proposalID) -> None:
413 """
414 Delete summer experience or other engagement objects from the CCEMinorProposal table.
415 File objects attached to the CCEMinorProposal object are also deleted.
416 """
417 proposalID = int(proposalID)
419 proposalAttachment = AttachmentUpload.get_or_none(proposal=proposalID)
420 if proposalAttachment:
421 proposalFileHandler = FileHandler(proposalId=proposalID)
422 proposalFileHandler.deleteFile(proposalAttachment.id)
424 CCEMinorProposal.delete().where(CCEMinorProposal.id == proposalID).execute()
426def changeProposalStatus(proposalID, newStatus) -> None:
427 """
428 Changes the status of a proposal.
429 """
430 CCEMinorProposal.update(status=newStatus).where(CCEMinorProposal.id == int(proposalID)).execute()