Coverage for app/logic/minor.py: 66%
179 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-12-22 18:34 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-12-22 18:34 +0000
1from collections import defaultdict
2from typing import List, Dict
3from flask import flash, g
4from playhouse.shortcuts import model_to_dict
5from peewee import JOIN, fn, Case, DoesNotExist, SQL
6import xlsxwriter
8from app.models import mainDB
9from app.models.user import User
10from app.models.term import Term
11from app.models.event import Event
12from app.models.course import Course
13from app.models.program import Program
14from app.models.certification import Certification
15from app.models.courseInstructor import CourseInstructor
16from app.models.eventParticipant import EventParticipant
17from app.models.courseParticipant import CourseParticipant
18from app.models.individualRequirement import IndividualRequirement
19from app.models.certificationRequirement import CertificationRequirement
20from app.models.cceMinorProposal import CCEMinorProposal
21from app.logic.fileHandler import FileHandler
22from app.logic.utils import getFilesFromRequest
23from app.models.attachmentUpload import AttachmentUpload
26def createSummerExperience(username, formData):
27 """
28 Given the username of the student and the formData which includes all of
29 the SummerExperience information, create a new SummerExperience object.
30 """
31 user = User.get(User.username == username)
32 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas
33 formData = dict(formData)
34 formData.pop("contentArea")
35 return CCEMinorProposal.create(
36 student=user,
37 proposalType = 'Summer Experience',
38 contentAreas = contentAreas,
39 createdBy = g.current_user,
40 **formData,
41 )
43def updateSummerExperience(proposalID, formData):
44 """
45 Given the username of the student and the formData which includes all of
46 the SummerExperience information, create a new SummerExperience object.
47 """
48 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas
49 formData = dict(formData)
50 formData.pop("contentArea")
51 formData.pop("experienceHoursOver300")
52 CCEMinorProposal.update(contentAreas=contentAreas, **formData).where(CCEMinorProposal.id == proposalID).execute()
54def getCCEMinorProposals(username):
55 return list(CCEMinorProposal.select().where(CCEMinorProposal.student==username))
57def getEngagementTotal(engagementData):
58 """
59 Count the number of engagements (from all terms) that have matched with a requirement
60 """
62 # map the flattened list of engagements to their matched values, and sum them
63 return sum(map(lambda e: e['matched'], sum(engagementData.values(),[])))
66def getMinorInterest() -> List[Dict]:
67 """
68 Get all students that have indicated interest in the CCE minor and return a list of dicts of all interested students
69 """
70 interestedStudents = (User.select(User)
71 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username))
72 .where(User.isStudent & User.minorInterest & ~User.declaredMinor & IndividualRequirement.username.is_null(True)))
74 interestedStudentList = [model_to_dict(student) for student in interestedStudents]
76 return interestedStudentList
78def getMinorProgress():
79 """
80 Get all the users who have an IndividualRequirement record under the CCE certification which
81 and returns a list of dicts containing the student, how many engagements they have completed,
82 and if they have completed the summer experience.
83 """
84 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0)
86 engagedStudentsWithCount = (
87 User.select(User, fn.COUNT(IndividualRequirement.id).alias('engagementCount'),
88 fn.SUM(summerCase).alias('hasSummer'),
89 fn.IF(fn.COUNT(CCEMinorProposal.id) > 0, True, False).alias('hasCCEMinorProposal'))
90 .join(IndividualRequirement, on=(User.username == IndividualRequirement.username))
91 .join(CertificationRequirement, on=(IndividualRequirement.requirement_id == CertificationRequirement.id))
92 .switch(User).join(CCEMinorProposal, JOIN.LEFT_OUTER, on= (User.username == CCEMinorProposal.student))
93 .where(CertificationRequirement.certification_id == Certification.CCE)
94 .group_by(User.firstName, User.lastName, User.username)
95 .order_by(SQL("engagementCount").desc())
96 )
97 engagedStudentsList = [{'firstName': student.firstName,
98 'lastName': student.lastName,
99 'username': student.username,
100 'B-Number': student.bnumber,
101 'hasGraduated': student.hasGraduated,
102 'engagementCount': student.engagementCount - student.hasSummer,
103 'hasCCEMinorProposal': student.hasCCEMinorProposal,
104 'hasSummer': "Completed" if student.hasSummer else "Incomplete"} for student in engagedStudentsWithCount]
105 return engagedStudentsList
107def getMinorSpreadsheet():
108 """
109 Returns a spreadsheet containing users and related spreadsheet information.
110 """
111 # If we're in 2025, can we get the minor information for 2023?
112 studentProgress = getMinorProgress()
113 columnNames = studentProgress[0]
114 columnNames = ["First Name", "Last Name", "Username", "B-Number", "Number of Engagements", "Completed Summer Experience"]
116 filepath = f"{app.config['files']['base_path']}/minor_data.xlsx"
117 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True})
119 worksheet = workbook.add_worksheet('minor_information')
120 format_row = workbook.add_format({'align': 'left'})
122 columnIndex = 1
123 worksheet.set_column(columnIndex, len(columnNames), 30, workbook.add_format({'bold': True}))
124 for columnName in columnNames:
125 worksheet.write(1, columnIndex, columnName)
126 columnIndex += 1
128 for rowNumber, student in enumerate(studentProgress, 2):
129 if student['hasGraduated']: continue
130 student.pop('hasCCEMinorProposal')
131 student.pop('hasGraduated')
132 student['hasSummer'] = "Yes" if student['hasSummer'] == "Complete" else "No"
133 worksheet.set_row(rowNumber, None, format_row)
134 if student['B-Number'] == None: student["B-Number"] = "No B-Number Found"
135 for columnNumber, key in enumerate(student, 1):
136 worksheet.write(rowNumber, columnNumber, student[key])
139 workbook.close()
141 return filepath
144def toggleMinorInterest(username, isAdding):
145 """
146 Given a username, update their minor interest and minor status.
147 """
149 try:
150 user = User.get(username=username)
151 if not user:
152 return {"error": "User not found"}, 404
154 user.minorInterest = isAdding
155 user.declaredMinor = False
156 user.save()
158 except Exception as e:
159 print(f"Error updating minor interest: {e}")
160 return {"error": str(e)}, 500
162def declareMinorInterest(username):
163 """
164 Given a username, update their minor declaration
165 """
166 user = User.get_by_id(username)
168 if not user:
169 raise ValueError(f"User with username '{username}' not found.")
171 user.declaredMinor = not user.declaredMinor
173 try:
174 user.save()
175 except Exception as e:
176 raise RuntimeError(f"Failed to declare interested student: {e}")
178def getDeclaredMinorStudents():
179 """
180 Get a list of the students who have declared minor
181 """
182 declaredStudents = User.select().where(User.isStudent & User.declaredMinor)
184 interestedStudentList = [model_to_dict(student) for student in declaredStudents]
186 return interestedStudentList
188def getCourseInformation(id):
189 """
190 Given a course ID, return an object containing the course information and
191 its instructors full names.
192 """
193 # retrieve the course and the course instructors
194 course = model_to_dict(Course.get_by_id(id))
196 courseInstructors = (CourseInstructor.select(CourseInstructor, User)
197 .join(Course).switch()
198 .join(User)
199 .where(Course.id == id))
201 courseInformation = {"instructors": [(instructor.user.firstName + " " + instructor.user.lastName) for instructor in courseInstructors], "course": course}
203 return courseInformation
205def getProgramEngagementHistory(program_id, username, term_id):
206 """
207 Given a program_id, username, and term_id, return an object containing all events in the provided program
208 and in the given term along with the program name.
209 """
210 # execute a query that will retrieve all events in which the user has participated
211 # that fall under the provided term and programs.
212 eventsInProgramAndTerm = (Event.select(Event.id, Event.name, EventParticipant.hoursEarned)
213 .join(Program).switch()
214 .join(EventParticipant)
215 .where(EventParticipant.user == username,
216 Event.term == term_id,
217 Event.isService == True,
218 Program.id == program_id)
219 )
221 program = Program.get_by_id(program_id)
223 # calculate total amount of hours for the whole program that term
224 totalHours = 0
225 for event in eventsInProgramAndTerm:
226 if event.eventparticipant.hoursEarned:
227 totalHours += event.eventparticipant.hoursEarned
229 participatedEvents = {"program":program.programName, "events": [event for event in eventsInProgramAndTerm.dicts()], "totalHours": totalHours}
231 return participatedEvents
233def setCommunityEngagementForUser(action, engagementData, currentUser):
234 """
235 Either add or remove an IndividualRequirement record for a student's Sustained Community Engagement
237 :param action: The behavior of the function. Can be 'add' or 'remove'
238 :param engagementData:
239 type: program or course
240 id: program or course id
241 username: the username of the student that is having a community engagement added or removed
242 term: The term the engagement is recorded in
243 :param currentuser: The user who is performing the add/remove action
245 :raises DoesNotExist: if there are no available CertificationRequirement slots remaining for the engagement
246 """
247 if engagementData['type'] not in ['program','course']:
248 raise Exception("Invalid engagement type!")
250 requirement = (CertificationRequirement.select()
251 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(
252 (IndividualRequirement.requirement == CertificationRequirement.id) &
253 (IndividualRequirement.username == engagementData['username'])))
254 .where(IndividualRequirement.username.is_null(True),
255 CertificationRequirement.certification == Certification.CCE,
256 CertificationRequirement.name.not_in(['Summer Program'])))
257 if action == 'add':
258 try:
259 IndividualRequirement.create(**{engagementData['type']: engagementData['id'],
260 "username": engagementData['username'],
261 "term": engagementData['term'],
262 "requirement": requirement.get(),
263 "addedBy": currentUser,
264 })
265 # Thrown if there are no available engagement requirements left. Handled elsewhere.
266 except DoesNotExist as e:
267 raise e
269 elif action == 'remove':
270 IndividualRequirement.delete().where(
271 getattr(IndividualRequirement, engagementData['type']) == engagementData['id'],
272 IndividualRequirement.username == engagementData['username'],
273 IndividualRequirement.term == engagementData['term']
274 ).execute()
275 else:
276 raise Exception(f"Invalid action '{action}' sent to setCommunityEngagementForUser")
278def getCommunityEngagementByTerm(username):
279 """
280 Given a username, return all of their community engagements (service learning courses and event participations.)
281 """
282 courseMatchCase = Case(None, [(IndividualRequirement.course.is_null(True) , 0)], 1)
284 courses = (Course.select(Course, courseMatchCase.alias("matchedReq"))
285 .join(CourseParticipant, on=(Course.id == CourseParticipant.course))
286 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(
287 (IndividualRequirement.course == Course.id) &
288 (IndividualRequirement.username == CourseParticipant.user) &
289 (IndividualRequirement.term == Course.term)))
290 .where(CourseParticipant.user == username)
291 .group_by(Course.courseName, Course.term))
293 # initialize default dict to store term descriptions as keys mapping to each
294 # engagement's respective type, name, id, and term.
295 communityEngagementByTermDict = defaultdict(list)
296 for course in courses:
297 communityEngagementByTermDict[(course.term.description, course.term.id)].append(
298 {"name":course.courseName,
299 "id":course.id,
300 "type":"course",
301 "matched": course.matchedReq,
302 "term":course.term.id})
304 programMatchCase = Case(None, [(IndividualRequirement.program.is_null(True) , 0)], 1)
306 events = (Event.select(Event, Program, programMatchCase.alias('matchedReq'))
307 .join(EventParticipant, on=(Event.id == EventParticipant.event)).switch()
308 .join(Program)
309 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.program == Program.id) &
310 (IndividualRequirement.username == EventParticipant.user) &
311 (IndividualRequirement.term == Event.term)))
312 .where(EventParticipant.user == username, Event.isService == True)
313 .group_by(Event.program, Event.term))
315 for event in events:
316 communityEngagementByTermDict[(event.term.description, event.term.id)].append({"name":event.program.programName,
317 "id":event.program.id,
318 "type":"program",
319 "matched": event.matchedReq,
320 "term":event.term.id
321 })
323 # sorting the communityEngagementByTermDict by the term id
324 return dict(sorted(communityEngagementByTermDict.items(), key=lambda engagement: engagement[0][1]))
326def createOtherEngagement(username, request):
327 """
328 Create a CCEMinorProposal entry based off of the form data
329 """
330 user = User.get(User.username == username)
332 createdProposal = CCEMinorProposal.create(proposalType = 'Other Engagement',
333 createdBy = g.current_user,
334 student = user,
335 **request.form
336 )
337 proposalObject = CCEMinorProposal.get_by_id(createdProposal)
338 attachment = request.files.get("attachmentObject")
339 if attachment:
340 addFile = FileHandler(getFilesFromRequest(request), proposalId=createdProposal.id)
341 addFile.saveFiles(parentEvent=proposalObject)
343def updateOtherEngagementRequest(proposalID, request):
344 attachment = request.files.get("attachmentObject")
346 with mainDB.atomic():
347 # Get proposal safely
348 proposalObject = CCEMinorProposal.get_by_id(proposalID)
350 # ---- HANDLE ATTACHMENT SAFELY ----
351 if attachment:
352 existingAttachment = (
353 AttachmentUpload
354 .select()
355 .where(AttachmentUpload.proposal == proposalID)
356 .first()
357 )
359 if existingAttachment:
360 deleteFile = FileHandler(proposalId=proposalID)
361 deleteFile.deleteFile(existingAttachment.id)
363 addFile = FileHandler(
364 getFilesFromRequest(request),
365 proposalId=proposalID
366 )
367 addFile.saveFiles(parentEvent=proposalObject)
369 # ---- CLEAN FORM DATA ----
370 update_data = dict(request.form)
372 # remove fields that are not DB columns
373 update_data.pop("contentArea", None)
374 update_data.pop("attachmentObject", None)
376 # ---- UPDATE PROPOSAL ----
377 (
378 CCEMinorProposal
379 .update(**update_data)
380 .where(CCEMinorProposal.id == proposalID)
381 .execute()
382 )
384def saveSummerExperience(username, summerExperience, currentUser):
385 """
386 :param username: username of the student that the summer experience is for
387 :param summerExperience: dict
388 summerExperience: string of what the summer experience was (will be written as the 'description' in the IndividualRequirement table)
389 selectedSummerTerm: the term description that the summer experience took place in
390 :param currentUser: the username of the user who added the summer experience record
392 Delete any existing IndividualRequirement entry for 'username' if it is for 'Summer Program' and create a new IndividualRequirement entry for
393 'Summer Program' with the contents of summerExperience.
394 """
395 requirementDeleteSubSelect = CertificationRequirement.select().where(CertificationRequirement.certification == Certification.CCE, CertificationRequirement.name << ['Summer Program'])
396 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.requirement == requirementDeleteSubSelect).execute()
398 requirement = (CertificationRequirement.select()
399 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.requirement == CertificationRequirement.id) &
400 (IndividualRequirement.username == username)))
401 .where(IndividualRequirement.username.is_null(True),
402 CertificationRequirement.certification == Certification.CCE,
403 CertificationRequirement.name << ['Summer Program']))
405 summerTerm = (Term.select().where(Term.description == summerExperience['selectedSummerTerm']))
407 IndividualRequirement.create(**{"description": summerExperience['summerExperience'],
408 "username": username,
409 "term": summerTerm.get(),
410 "requirement": requirement.get(),
411 "addedBy": currentUser,
412 })
413 return ""
415def getSummerExperience(username):
416 """
417 Get a students summer experience to populate text box if the student has one
418 """
419 summerExperience = (IndividualRequirement.select()
420 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(CertificationRequirement.id == IndividualRequirement.requirement)).switch()
421 .join(Term, on=(IndividualRequirement.term == Term.id))
422 .where(IndividualRequirement.username == username,
423 CertificationRequirement.certification == Certification.CCE,
424 CertificationRequirement.name << ['Summer Program']))
425 if len(list(summerExperience)) == 1:
426 return (summerExperience.get().term.description, summerExperience.get().description)
428 return (None, None)
430def removeSummerExperience(username):
431 """
432 Delete IndividualRequirement table entry for 'username'
433 """
434 term, summerExperienceToDelete = getSummerExperience(username)
435 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.description == summerExperienceToDelete).execute()
437def removeProposal(proposalID) -> None:
438 """
439 Delete summer experience or other engagement objects from the CCEMinorProposal table.
440 File objects attached to the CCEMinorProposal object are also deleted.
441 """
442 proposalID = int(proposalID)
444 proposalAttachment = AttachmentUpload.get_or_none(proposal=proposalID)
445 if proposalAttachment:
446 proposalFileHandler = FileHandler(proposalId=proposalID)
447 proposalFileHandler.deleteFile(proposalAttachment.id)
449 CCEMinorProposal.delete().where(CCEMinorProposal.id == proposalID).execute()
451def changeProposalStatus(proposalID, newStatus) -> None:
452 """
453 Changes the status of a proposal.
454 """
455 CCEMinorProposal.update(status=newStatus).where(CCEMinorProposal.id == int(proposalID)).execute()