Coverage for app/logic/minor.py: 65%
159 statements
« prev ^ index » next coverage.py v7.2.7, created at 2025-07-22 20:03 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2025-07-22 20:03 +0000
1from collections import defaultdict
2from typing import List, Dict
3from flask import flash, g
4from playhouse.shortcuts import model_to_dict
5from peewee import JOIN, fn, Case, DoesNotExist, SQL
6import xlsxwriter
8from app import app
9from app.models.user import User
10from app.models.term import Term
11from app.models.event import Event
12from app.models.course import Course
13from app.models.program import Program
14from app.models.certification import Certification
15from app.models.courseInstructor import CourseInstructor
16from app.models.eventParticipant import EventParticipant
17from app.models.courseParticipant import CourseParticipant
18from app.models.individualRequirement import IndividualRequirement
19from app.models.certificationRequirement import CertificationRequirement
20from app.models.cceMinorProposal import CCEMinorProposal
21from app.logic.createLogs import createActivityLog
22from app.logic.fileHandler import FileHandler
23from app.logic.serviceLearningCourses import deleteCourseObject
24from app.models.attachmentUpload import AttachmentUpload
27def createSummerExperience(username, formData):
28 """
29 Given the username of the student and the formData which includes all of
30 the SummerExperience information, create a new SummerExperience object.
31 """
32 try:
33 user = User.get(User.username == username)
34 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas
35 CCEMinorProposal.create(
36 student=user,
37 proposalType = 'Summer Experience',
38 contentAreas = contentAreas,
39 status="Pending",
40 createdBy = g.current_user,
41 **formData,
42 )
43 except Exception as e:
44 print(f"Error saving summer experience: {e}")
45 raise e
47def getCCEMinorProposals(username):
48 proposalList = []
50 cceMinorProposals = list(CCEMinorProposal.select().where(CCEMinorProposal.student==username))
52 for experience in cceMinorProposals:
53 proposalList.append({
54 "id": experience.id,
55 "type": experience.proposalType,
56 "createdBy": experience.createdBy,
57 "supervisor": experience.supervisorName,
58 "term": experience.term,
59 "status": experience.status,
60 })
62 return proposalList
64def getEngagementTotal(engagementData):
65 """
66 Count the number of engagements (from all terms) that have matched with a requirement
67 """
69 # map the flattened list of engagements to their matched values, and sum them
70 return sum(map(lambda e: e['matched'], sum(engagementData.values(),[])))
73def getMinorInterest() -> List[Dict]:
74 """
75 Get all students that have indicated interest in the CCE minor and return a list of dicts of all interested students
76 """
77 interestedStudents = (User.select(User)
78 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username))
79 .where(User.isStudent & User.minorInterest & ~User.declaredMinor & IndividualRequirement.username.is_null(True)))
81 interestedStudentList = [model_to_dict(student) for student in interestedStudents]
83 return interestedStudentList
85def getMinorProgress():
86 """
87 Get all the users who have an IndividualRequirement record under the CCE certification which
88 and returns a list of dicts containing the student, how many engagements they have completed,
89 and if they have completed the summer experience.
90 """
91 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0)
93 engagedStudentsWithCount = (
94 User.select(User, fn.COUNT(IndividualRequirement.id).alias('engagementCount'),
95 fn.SUM(summerCase).alias('hasSummer'),
96 fn.IF(fn.COUNT(CCEMinorProposal.id) > 0, True, False).alias('hasCCEMinorProposal'))
97 .join(IndividualRequirement, on=(User.username == IndividualRequirement.username))
98 .join(CertificationRequirement, on=(IndividualRequirement.requirement_id == CertificationRequirement.id))
99 .switch(User).join(CCEMinorProposal, JOIN.LEFT_OUTER, on= (User.username == CCEMinorProposal.student))
100 .where(CertificationRequirement.certification_id == Certification.CCE)
101 .group_by(User.firstName, User.lastName, User.username)
102 .order_by(SQL("engagementCount").desc())
103 )
104 engagedStudentsList = [{'firstName': student.firstName,
105 'lastName': student.lastName,
106 'username': student.username,
107 'B-Number': student.bnumber,
108 'hasGraduated': student.hasGraduated,
109 'engagementCount': student.engagementCount - student.hasSummer,
110 'hasCCEMinorProposal': student.hasCCEMinorProposal,
111 'hasSummer': "Completed" if student.hasSummer else "Incomplete"} for student in engagedStudentsWithCount]
112 return engagedStudentsList
114def getMinorSpreadsheet():
115 """
116 Returns a spreadsheet containing users and related spreadsheet information.
117 """
118 # If we're in 2025, can we get the minor information for 2023?
119 studentProgress = getMinorProgress()
120 columnNames = studentProgress[0]
121 columnNames = ["First Name", "Last Name", "Username", "B-Number", "Number of Engagements", "Completed Summer Experience"]
123 filepath = f"{app.config['files']['base_path']}/minor_data.xlsx"
124 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True})
126 worksheet = workbook.add_worksheet('minor_information')
127 format_row = workbook.add_format({'align': 'left'})
129 columnIndex = 1
130 worksheet.set_column(columnIndex, len(columnNames), 30, workbook.add_format({'bold': True}))
131 for columnName in columnNames:
132 worksheet.write(1, columnIndex, columnName)
133 columnIndex += 1
135 for rowNumber, student in enumerate(studentProgress, 2):
136 if student['hasGraduated']: continue
137 student.pop('hasCCEMinorProposal')
138 student.pop('hasGraduated')
139 student['hasSummer'] = "Yes" if student['hasSummer'] == "Complete" else "No"
140 worksheet.set_row(rowNumber, None, format_row)
141 if student['B-Number'] == None: student["B-Number"] = "No B-Number Found"
142 for columnNumber, key in enumerate(student, 1):
143 worksheet.write(rowNumber, columnNumber, student[key])
146 workbook.close()
148 return filepath
151def toggleMinorInterest(username, isAdding):
152 """
153 Given a username, update their minor interest and minor status.
154 """
156 try:
157 user = User.get(username=username)
158 if not user:
159 return {"error": "User not found"}, 404
161 user.minorInterest = isAdding
162 user.declaredMinor = False
163 user.save()
165 except Exception as e:
166 print(f"Error updating minor interest: {e}")
167 return {"error": str(e)}, 500
169def declareMinorInterest(username):
170 """
171 Given a username, update their minor declaration
172 """
173 user = User.get_by_id(username)
175 if not user:
176 raise ValueError(f"User with username '{username}' not found.")
178 user.declaredMinor = not user.declaredMinor
180 try:
181 user.save()
182 except Exception as e:
183 raise RuntimeError(f"Failed to declare interested student: {e}")
185def getDeclaredMinorStudents():
186 """
187 Get a list of the students who have declared minor
188 """
189 declaredStudents = User.select().where(User.isStudent & User.declaredMinor)
191 interestedStudentList = [model_to_dict(student) for student in declaredStudents]
193 return interestedStudentList
195def getCourseInformation(id):
196 """
197 Given a course ID, return an object containing the course information and
198 its instructors full names.
199 """
200 # retrieve the course and the course instructors
201 course = model_to_dict(Course.get_by_id(id))
203 courseInstructors = (CourseInstructor.select(CourseInstructor, User)
204 .join(Course).switch()
205 .join(User)
206 .where(Course.id == id))
208 courseInformation = {"instructors": [(instructor.user.firstName + " " + instructor.user.lastName) for instructor in courseInstructors], "course": course}
210 return courseInformation
212def getProgramEngagementHistory(program_id, username, term_id):
213 """
214 Given a program_id, username, and term_id, return an object containing all events in the provided program
215 and in the given term along with the program name.
216 """
217 # execute a query that will retrieve all events in which the user has participated
218 # that fall under the provided term and programs.
219 eventsInProgramAndTerm = (Event.select(Event.id, Event.name, EventParticipant.hoursEarned)
220 .join(Program).switch()
221 .join(EventParticipant)
222 .where(EventParticipant.user == username,
223 Event.term == term_id,
224 Event.isService == True,
225 Program.id == program_id)
226 )
228 program = Program.get_by_id(program_id)
230 # calculate total amount of hours for the whole program that term
231 totalHours = 0
232 for event in eventsInProgramAndTerm:
233 if event.eventparticipant.hoursEarned:
234 totalHours += event.eventparticipant.hoursEarned
236 participatedEvents = {"program":program.programName, "events": [event for event in eventsInProgramAndTerm.dicts()], "totalHours": totalHours}
238 return participatedEvents
240def setCommunityEngagementForUser(action, engagementData, currentUser):
241 """
242 Either add or remove an IndividualRequirement record for a student's Sustained Community Engagement
244 :param action: The behavior of the function. Can be 'add' or 'remove'
245 :param engagementData:
246 type: program or course
247 id: program or course id
248 username: the username of the student that is having a community engagement added or removed
249 term: The term the engagement is recorded in
250 :param currentuser: The user who is performing the add/remove action
252 :raises DoesNotExist: if there are no available CertificationRequirement slots remaining for the engagement
253 """
254 if engagementData['type'] not in ['program','course']:
255 raise Exception("Invalid engagement type!")
257 requirement = (CertificationRequirement.select()
258 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(
259 (IndividualRequirement.requirement == CertificationRequirement.id) &
260 (IndividualRequirement.username == engagementData['username'])))
261 .where(IndividualRequirement.username.is_null(True),
262 CertificationRequirement.certification == Certification.CCE,
263 CertificationRequirement.name.not_in(['Summer Program'])))
264 if action == 'add':
265 try:
266 IndividualRequirement.create(**{engagementData['type']: engagementData['id'],
267 "username": engagementData['username'],
268 "term": engagementData['term'],
269 "requirement": requirement.get(),
270 "addedBy": currentUser,
271 })
272 # Thrown if there are no available engagement requirements left. Handled elsewhere.
273 except DoesNotExist as e:
274 raise e
276 elif action == 'remove':
277 IndividualRequirement.delete().where(
278 getattr(IndividualRequirement, engagementData['type']) == engagementData['id'],
279 IndividualRequirement.username == engagementData['username'],
280 IndividualRequirement.term == engagementData['term']
281 ).execute()
282 else:
283 raise Exception(f"Invalid action '{action}' sent to setCommunityEngagementForUser")
285def getCommunityEngagementByTerm(username):
286 """
287 Given a username, return all of their community engagements (service learning courses and event participations.)
288 """
289 courseMatchCase = Case(None, [(IndividualRequirement.course.is_null(True) , 0)], 1)
291 courses = (Course.select(Course, courseMatchCase.alias("matchedReq"))
292 .join(CourseParticipant, on=(Course.id == CourseParticipant.course))
293 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(
294 (IndividualRequirement.course == Course.id) &
295 (IndividualRequirement.username == CourseParticipant.user) &
296 (IndividualRequirement.term == Course.term)))
297 .where(CourseParticipant.user == username)
298 .group_by(Course.courseName, Course.term))
300 # initialize default dict to store term descriptions as keys mapping to each
301 # engagement's respective type, name, id, and term.
302 communityEngagementByTermDict = defaultdict(list)
303 for course in courses:
304 communityEngagementByTermDict[(course.term.description, course.term.id)].append(
305 {"name":course.courseName,
306 "id":course.id,
307 "type":"course",
308 "matched": course.matchedReq,
309 "term":course.term.id})
311 programMatchCase = Case(None, [(IndividualRequirement.program.is_null(True) , 0)], 1)
313 events = (Event.select(Event, Program, programMatchCase.alias('matchedReq'))
314 .join(EventParticipant, on=(Event.id == EventParticipant.event)).switch()
315 .join(Program)
316 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.program == Program.id) &
317 (IndividualRequirement.username == EventParticipant.user) &
318 (IndividualRequirement.term == Event.term)))
319 .where(EventParticipant.user == username, Event.isService == True)
320 .group_by(Event.program, Event.term))
322 for event in events:
323 communityEngagementByTermDict[(event.term.description, event.term.id)].append({"name":event.program.programName,
324 "id":event.program.id,
325 "type":"program",
326 "matched": event.matchedReq,
327 "term":event.term.id
328 })
330 # sorting the communityEngagementByTermDict by the term id
331 return dict(sorted(communityEngagementByTermDict.items(), key=lambda engagement: engagement[0][1]))
333def createOtherEngagementRequest(username, formData):
334 """
335 Create a CCEMinorProposal entry based off of the form data
336 """
337 user = User.get(User.username == username)
339 cceObject = CCEMinorProposal.create(proposalType = 'Other Engagement',
340 createdBy = g.current_user,
341 status = 'Pending',
342 student = user,
343 **formData
344 )
346 return cceObject
348def saveSummerExperience(username, summerExperience, currentUser):
349 """
350 :param username: username of the student that the summer experience is for
351 :param summerExperience: dict
352 summerExperience: string of what the summer experience was (will be written as the 'description' in the IndividualRequirement table)
353 selectedSummerTerm: the term description that the summer experience took place in
354 :param currentUser: the username of the user who added the summer experience record
356 Delete any existing IndividualRequirement entry for 'username' if it is for 'Summer Program' and create a new IndividualRequirement entry for
357 'Summer Program' with the contents of summerExperience.
358 """
359 requirementDeleteSubSelect = CertificationRequirement.select().where(CertificationRequirement.certification == Certification.CCE, CertificationRequirement.name << ['Summer Program'])
360 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.requirement == requirementDeleteSubSelect).execute()
362 requirement = (CertificationRequirement.select()
363 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.requirement == CertificationRequirement.id) &
364 (IndividualRequirement.username == username)))
365 .where(IndividualRequirement.username.is_null(True),
366 CertificationRequirement.certification == Certification.CCE,
367 CertificationRequirement.name << ['Summer Program']))
369 summerTerm = (Term.select().where(Term.description == summerExperience['selectedSummerTerm']))
371 IndividualRequirement.create(**{"description": summerExperience['summerExperience'],
372 "username": username,
373 "term": summerTerm.get(),
374 "requirement": requirement.get(),
375 "addedBy": currentUser,
376 })
377 return ""
379def getSummerExperience(username):
380 """
381 Get a students summer experience to populate text box if the student has one
382 """
383 summerExperience = (IndividualRequirement.select()
384 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(CertificationRequirement.id == IndividualRequirement.requirement)).switch()
385 .join(Term, on=(IndividualRequirement.term == Term.id))
386 .where(IndividualRequirement.username == username,
387 CertificationRequirement.certification == Certification.CCE,
388 CertificationRequirement.name << ['Summer Program']))
389 if len(list(summerExperience)) == 1:
390 return (summerExperience.get().term.description, summerExperience.get().description)
392 return (None, None)
394def removeSummerExperience(username):
395 """
396 Delete IndividualRequirement table entry for 'username'
397 """
398 term, summerExperienceToDelete = getSummerExperience(username)
399 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.description == summerExperienceToDelete).execute()
401def removeProposal(proposalID) -> None:
402 """
403 Delete summer experience or other engagement objects from the CCEMinorProposal table.
404 File objects attached to the CCEMinorProposal object are also deleted.
405 """
406 proposalID = int(proposalID)
408 proposalAttachment = AttachmentUpload.get_or_none(proposal=proposalID)
409 if proposalAttachment:
410 proposalFileHandler = FileHandler(proposalId=proposalID)
411 proposalFileHandler.deleteFile(proposalAttachment.id)
413 CCEMinorProposal.delete().where(CCEMinorProposal.id == proposalID).execute()