Coverage for app/logic/minor.py: 65%
167 statements
« prev ^ index » next coverage.py v7.10.2, created at 2026-02-17 20:15 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2026-02-17 20:15 +0000
1from collections import defaultdict
2from typing import List, Dict
3from flask import flash, g
4from playhouse.shortcuts import model_to_dict
5from peewee import JOIN, fn, Case, DoesNotExist, SQL
6import xlsxwriter
8from app import app
9from app.models.user import User
10from app.models.term import Term
11from app.models.event import Event
12from app.models.course import Course
13from app.models.program import Program
14from app.models.certification import Certification
15from app.models.courseInstructor import CourseInstructor
16from app.models.eventParticipant import EventParticipant
17from app.models.courseParticipant import CourseParticipant
18from app.models.individualRequirement import IndividualRequirement
19from app.models.certificationRequirement import CertificationRequirement
20from app.models.cceMinorProposal import CCEMinorProposal
21from app.logic.createLogs import createActivityLog
22from app.logic.fileHandler import FileHandler
23from app.logic.serviceLearningCourses import deleteCourseObject
24from app.models.attachmentUpload import AttachmentUpload
27def createSummerExperience(username, formData):
28 """
29 Given the username of the student and the formData which includes all of
30 the SummerExperience information, create a new SummerExperience object.
31 """
32 try:
33 user = User.get(User.username == username)
34 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas
35 CCEMinorProposal.create(
36 student=user,
37 proposalType = 'Summer Experience',
38 contentAreas = contentAreas,
39 status="Pending",
40 createdBy = g.current_user,
41 **formData,
42 )
43 except Exception as e:
44 print(f"Error saving summer experience: {e}")
45 raise e
47def getCCEMinorProposals(username):
48 proposalList = []
50 cceMinorProposals = list(CCEMinorProposal.select().where(CCEMinorProposal.student==username))
52 for experience in cceMinorProposals:
53 proposalList.append({
54 "id": experience.id,
55 "type": experience.proposalType,
56 "createdBy": experience.createdBy,
57 "supervisor": experience.supervisorName,
58 "term": experience.term,
59 "status": experience.status,
60 })
62 return proposalList
64def getEngagementTotal(engagementData):
65 """
66 Count the number of engagements (from all terms) that have matched with a requirement
67 """
69 # map the flattened list of engagements to their matched values, and sum them
70 return sum(map(lambda e: e['matched'], sum(engagementData.values(),[])))
73def getMinorInterest() -> List[Dict]:
74 """
75 Get all students that have indicated interest in the CCE minor and return a list of dicts of all interested students
76 """
77 interestedStudents = (User.select(User)
78 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username))
79 .where(User.isStudent & User.minorInterest & ~User.declaredMinor & IndividualRequirement.username.is_null(True)))
81 interestedStudentList = [model_to_dict(student) for student in interestedStudents]
83 return interestedStudentList
85def getMinorProgress():
86 """
87 Get all the users who have an IndividualRequirement record under the CCE certification which
88 and returns a list of dicts containing the student, how many engagements they have completed,
89 and if they have completed the summer experience.
90 """
91 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0)
93 engagedStudentsWithCount = (
94 User.select(User, fn.COUNT(IndividualRequirement.id).alias('engagementCount'),
95 fn.SUM(summerCase).alias('hasSummer'),
96 fn.IF(fn.COUNT(CCEMinorProposal.id) > 0, True, False).alias('hasCCEMinorProposal'))
97 .join(IndividualRequirement, on=(User.username == IndividualRequirement.username))
98 .join(CertificationRequirement, on=(IndividualRequirement.requirement_id == CertificationRequirement.id))
99 .switch(User).join(CCEMinorProposal, JOIN.LEFT_OUTER, on= (User.username == CCEMinorProposal.student))
100 .where(
101 (CertificationRequirement.certification_id == Certification.CCE) &
102 (User.declaredMinor == True)
103 )
104 .group_by(User.firstName, User.lastName, User.username)
105 .order_by(SQL("engagementCount").desc())
106 )
107 engagedStudentsList = [{'firstName': student.firstName,
108 'lastName': student.lastName,
109 'username': student.username,
110 'B-Number': student.bnumber,
111 'hasGraduated': student.hasGraduated,
112 'engagementCount': student.engagementCount - student.hasSummer,
113 'hasCCEMinorProposal': student.hasCCEMinorProposal,
114 'hasSummer': "Completed" if student.hasSummer else "Incomplete"} for student in engagedStudentsWithCount]
115 return engagedStudentsList
117def getMinorSpreadsheet():
118 """
119 Returns a spreadsheet containing users and related spreadsheet information.
120 """
121 # If we're in 2025, can we get the minor information for 2023?
122 studentProgress = getMinorProgress()
123 columnNames = studentProgress[0]
124 columnNames = ["First Name", "Last Name", "Username", "B-Number", "Number of Engagements", "Completed Summer Experience"]
126 filepath = f"{app.config['files']['base_path']}/minor_data.xlsx"
127 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True})
129 worksheet = workbook.add_worksheet('minor_information')
130 format_row = workbook.add_format({'align': 'left'})
132 columnIndex = 1
133 worksheet.set_column(columnIndex, len(columnNames), 30, workbook.add_format({'bold': True}))
134 for columnName in columnNames:
135 worksheet.write(1, columnIndex, columnName)
136 columnIndex += 1
138 for rowNumber, student in enumerate(studentProgress, 2):
139 if student['hasGraduated']: continue
140 student.pop('hasCCEMinorProposal')
141 student.pop('hasGraduated')
142 student['hasSummer'] = "Yes" if student['hasSummer'] == "Complete" else "No"
143 worksheet.set_row(rowNumber, None, format_row)
144 if student['B-Number'] == None: student["B-Number"] = "No B-Number Found"
145 for columnNumber, key in enumerate(student, 1):
146 worksheet.write(rowNumber, columnNumber, student[key])
149 workbook.close()
151 return filepath
154def toggleMinorInterest(username, isAdding):
155 """
156 Given a username, update their minor interest and minor status.
157 """
159 try:
160 user = User.get(username=username)
161 if not user:
162 return {"error": "User not found"}, 404
164 user.minorInterest = isAdding
165 user.declaredMinor = False
166 user.save()
168 except Exception as e:
169 print(f"Error updating minor interest: {e}")
170 return {"error": str(e)}, 500
172def declareMinorInterest(username):
173 """
174 Given a username, update their minor declaration
175 """
176 user = User.get_by_id(username)
178 if not user:
179 raise ValueError(f"User with username '{username}' not found.")
181 user.declaredMinor = not user.declaredMinor
183 try:
184 user.save()
185 except Exception as e:
186 raise RuntimeError(f"Failed to declare interested student: {e}")
188def getDeclaredMinorStudentsWithProgress():
189 """
190 This function retrieves a list of students who have declared the CCE minor along with their engagement progress.
191 It returns a list of dictionaries containing student information and their engagement details and adds students who have no requirements but have declared the minor with 0 engagements.
192 """
193 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0)
195 q = (
196 User
197 .select(
198 User,
199 fn.COUNT(fn.DISTINCT(IndividualRequirement.id)).alias("rawEngagementCount"),
200 fn.COALESCE(fn.SUM(fn.DISTINCT(summerCase)), 0).alias("summerCount"),
201 fn.IF(fn.COUNT(fn.DISTINCT(CCEMinorProposal.id)) > 0, True, False).alias("hasCCEMinorProposal"),
202 )
203 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username))
204 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(IndividualRequirement.requirement_id == CertificationRequirement.id))
205 .switch(User)
206 .join(CCEMinorProposal, JOIN.LEFT_OUTER, on=(User.username == CCEMinorProposal.student))
207 .where(
208 (User.declaredMinor == True) &
209 (
210 (CertificationRequirement.certification_id == Certification.CCE) |
211 (CertificationRequirement.id.is_null(True))
212 )
213 )
214 .group_by(User.firstName, User.lastName, User.username)
215 .order_by(SQL("rawEngagementCount").desc())
216 )
218 result = []
219 for s in q:
221 engagementCount = int(s.rawEngagementCount) - int(s.summerCount or 0)
223 result.append({
224 "firstName": s.firstName,
225 "lastName": s.lastName,
226 "username": s.username,
227 "B-Number": s.bnumber,
228 "email": s.email,
229 "hasGraduated": s.hasGraduated,
230 "engagementCount": engagementCount,
231 "hasCCEMinorProposal": bool(s.hasCCEMinorProposal),
232 "hasSummer": "Completed" if (s.summerCount and int(s.summerCount) > 0) else "Incomplete",
233 })
235 return result
237def getDeclaredMinorStudents():
238 """
239 Get a list of the students who have declared minor
240 """
241 declaredStudents = User.select().where(User.isStudent & User.declaredMinor)
243 interestedStudentList = [model_to_dict(student) for student in declaredStudents]
245 return interestedStudentList
247def getCourseInformation(id):
248 """
249 Given a course ID, return an object containing the course information and
250 its instructors full names.
251 """
252 # retrieve the course and the course instructors
253 course = model_to_dict(Course.get_by_id(id))
255 courseInstructors = (CourseInstructor.select(CourseInstructor, User)
256 .join(Course).switch()
257 .join(User)
258 .where(Course.id == id))
260 courseInformation = {"instructors": [(instructor.user.firstName + " " + instructor.user.lastName) for instructor in courseInstructors], "course": course}
262 return courseInformation
264def getProgramEngagementHistory(program_id, username, term_id):
265 """
266 Given a program_id, username, and term_id, return an object containing all events in the provided program
267 and in the given term along with the program name.
268 """
269 # execute a query that will retrieve all events in which the user has participated
270 # that fall under the provided term and programs.
271 eventsInProgramAndTerm = (Event.select(Event.id, Event.name, EventParticipant.hoursEarned)
272 .join(Program).switch()
273 .join(EventParticipant)
274 .where(EventParticipant.user == username,
275 Event.term == term_id,
276 Event.isService == True,
277 Program.id == program_id)
278 )
280 program = Program.get_by_id(program_id)
282 # calculate total amount of hours for the whole program that term
283 totalHours = 0
284 for event in eventsInProgramAndTerm:
285 if event.eventparticipant.hoursEarned:
286 totalHours += event.eventparticipant.hoursEarned
288 participatedEvents = {"program":program.programName, "events": [event for event in eventsInProgramAndTerm.dicts()], "totalHours": totalHours}
290 return participatedEvents
292def setCommunityEngagementForUser(action, engagementData, currentUser):
293 """
294 Either add or remove an IndividualRequirement record for a student's Sustained Community Engagement
296 :param action: The behavior of the function. Can be 'add' or 'remove'
297 :param engagementData:
298 type: program or course
299 id: program or course id
300 username: the username of the student that is having a community engagement added or removed
301 term: The term the engagement is recorded in
302 :param currentuser: The user who is performing the add/remove action
304 :raises DoesNotExist: if there are no available CertificationRequirement slots remaining for the engagement
305 """
306 if engagementData['type'] not in ['program','course']:
307 raise Exception("Invalid engagement type!")
309 requirement = (CertificationRequirement.select()
310 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(
311 (IndividualRequirement.requirement == CertificationRequirement.id) &
312 (IndividualRequirement.username == engagementData['username'])))
313 .where(IndividualRequirement.username.is_null(True),
314 CertificationRequirement.certification == Certification.CCE,
315 CertificationRequirement.name.not_in(['Summer Program'])))
316 if action == 'add':
317 try:
318 IndividualRequirement.create(**{engagementData['type']: engagementData['id'],
319 "username": engagementData['username'],
320 "term": engagementData['term'],
321 "requirement": requirement.get(),
322 "addedBy": currentUser,
323 })
325 # Thrown if there are no available engagement requirements left. Handled elsewhere.
326 except DoesNotExist as e:
327 raise e
329 elif action == 'remove':
330 IndividualRequirement.delete().where(
331 getattr(IndividualRequirement, engagementData['type']) == engagementData['id'],
332 IndividualRequirement.username == engagementData['username'],
333 IndividualRequirement.term == engagementData['term']
334 ).execute()
336 else:
337 raise Exception(f"Invalid action '{action}' sent to setCommunityEngagementForUser")
339def getCommunityEngagementByTerm(username):
340 """
341 Given a username, return all of their community engagements (service learning courses and event participations.)
342 """
343 courseMatchCase = Case(None, [(IndividualRequirement.course.is_null(True) , 0)], 1)
345 courses = (Course.select(Course, courseMatchCase.alias("matchedReq"))
346 .join(CourseParticipant, on=(Course.id == CourseParticipant.course))
347 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(
348 (IndividualRequirement.course == Course.id) &
349 (IndividualRequirement.username == CourseParticipant.user) &
350 (IndividualRequirement.term == Course.term)))
351 .where(CourseParticipant.user == username)
352 .group_by(Course.courseName, Course.term))
354 # initialize default dict to store term descriptions as keys mapping to each
355 # engagement's respective type, name, id, and term.
356 communityEngagementByTermDict = defaultdict(list)
357 for course in courses:
358 communityEngagementByTermDict[(course.term.description, course.term.id)].append(
359 {"name":course.courseName,
360 "id":course.id,
361 "type":"course",
362 "matched": course.matchedReq,
363 "term":course.term.id})
365 programMatchCase = Case(None, [(IndividualRequirement.program.is_null(True) , 0)], 1)
367 events = (Event.select(Event, Program, programMatchCase.alias('matchedReq'))
368 .join(EventParticipant, on=(Event.id == EventParticipant.event)).switch()
369 .join(Program)
370 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.program == Program.id) &
371 (IndividualRequirement.username == EventParticipant.user) &
372 (IndividualRequirement.term == Event.term)))
373 .where(EventParticipant.user == username, Event.isService == True)
374 .group_by(Event.program, Event.term))
376 for event in events:
377 communityEngagementByTermDict[(event.term.description, event.term.id)].append({"name":event.program.programName,
378 "id":event.program.id,
379 "type":"program",
380 "matched": event.matchedReq,
381 "term":event.term.id
382 })
384 # sorting the communityEngagementByTermDict by the term id
385 return dict(sorted(communityEngagementByTermDict.items(), key=lambda engagement: engagement[0][1]))
387def createOtherEngagementRequest(username, formData):
388 """
389 Create a CCEMinorProposal entry based off of the form data
390 """
391 user = User.get(User.username == username)
393 cceObject = CCEMinorProposal.create(proposalType = 'Other Engagement',
394 createdBy = g.current_user,
395 status = 'Pending',
396 student = user,
397 **formData
398 )
400 return cceObject
402def saveSummerExperience(username, summerExperience, currentUser):
403 """
404 :param username: username of the student that the summer experience is for
405 :param summerExperience: dict
406 summerExperience: string of what the summer experience was (will be written as the 'description' in the IndividualRequirement table)
407 selectedSummerTerm: the term description that the summer experience took place in
408 :param currentUser: the username of the user who added the summer experience record
410 Delete any existing IndividualRequirement entry for 'username' if it is for 'Summer Program' and create a new IndividualRequirement entry for
411 'Summer Program' with the contents of summerExperience.
412 """
413 requirementDeleteSubSelect = CertificationRequirement.select().where(CertificationRequirement.certification == Certification.CCE, CertificationRequirement.name << ['Summer Program'])
414 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.requirement == requirementDeleteSubSelect).execute()
416 requirement = (CertificationRequirement.select()
417 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.requirement == CertificationRequirement.id) &
418 (IndividualRequirement.username == username)))
419 .where(IndividualRequirement.username.is_null(True),
420 CertificationRequirement.certification == Certification.CCE,
421 CertificationRequirement.name << ['Summer Program']))
423 summerTerm = (Term.select().where(Term.description == summerExperience['selectedSummerTerm']))
425 IndividualRequirement.create(**{"description": summerExperience['summerExperience'],
426 "username": username,
427 "term": summerTerm.get(),
428 "requirement": requirement.get(),
429 "addedBy": currentUser,
430 })
432 return ""
434def getSummerExperience(username):
435 """
436 Get a students summer experience to populate text box if the student has one
437 """
438 summerExperience = (IndividualRequirement.select()
439 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(CertificationRequirement.id == IndividualRequirement.requirement)).switch()
440 .join(Term, on=(IndividualRequirement.term == Term.id))
441 .where(IndividualRequirement.username == username,
442 CertificationRequirement.certification == Certification.CCE,
443 CertificationRequirement.name << ['Summer Program']))
444 if len(list(summerExperience)) == 1:
445 return (summerExperience.get().term.description, summerExperience.get().description)
447 return (None, None)
449def removeSummerExperience(username):
450 """
451 Delete IndividualRequirement table entry for 'username'
452 """
453 term, summerExperienceToDelete = getSummerExperience(username)
454 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.description == summerExperienceToDelete).execute()
456def removeProposal(proposalID) -> None:
457 """
458 Delete summer experience or other engagement objects from the CCEMinorProposal table.
459 File objects attached to the CCEMinorProposal object are also deleted.
460 """
461 proposalID = int(proposalID)
463 proposalAttachment = AttachmentUpload.get_or_none(proposal=proposalID)
464 if proposalAttachment:
465 proposalFileHandler = FileHandler(proposalId=proposalID)
466 proposalFileHandler.deleteFile(proposalAttachment.id)
468 CCEMinorProposal.delete().where(CCEMinorProposal.id == proposalID).execute()