Coverage for app/logic/minor.py: 65%
167 statements
« prev ^ index » next coverage.py v7.10.2, created at 2026-01-22 18:12 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2026-01-22 18:12 +0000
1from collections import defaultdict
2from typing import List, Dict
3from flask import flash, g
4from playhouse.shortcuts import model_to_dict
5from peewee import JOIN, fn, Case, DoesNotExist, SQL
6import xlsxwriter
8from app import app
9from app.models.user import User
10from app.models.term import Term
11from app.models.event import Event
12from app.models.course import Course
13from app.models.program import Program
14from app.models.certification import Certification
15from app.models.courseInstructor import CourseInstructor
16from app.models.eventParticipant import EventParticipant
17from app.models.courseParticipant import CourseParticipant
18from app.models.individualRequirement import IndividualRequirement
19from app.models.certificationRequirement import CertificationRequirement
20from app.models.cceMinorProposal import CCEMinorProposal
21from app.logic.createLogs import createActivityLog
22from app.logic.fileHandler import FileHandler
23from app.logic.serviceLearningCourses import deleteCourseObject
24from app.models.attachmentUpload import AttachmentUpload
27def createSummerExperience(username, formData):
28 """
29 Given the username of the student and the formData which includes all of
30 the SummerExperience information, create a new SummerExperience object.
31 """
32 try:
33 user = User.get(User.username == username)
34 contentAreas = ', '.join(formData.getlist('contentArea')) # Combine multiple content areas
35 CCEMinorProposal.create(
36 student=user,
37 proposalType = 'Summer Experience',
38 contentAreas = contentAreas,
39 status="Pending",
40 createdBy = g.current_user,
41 **formData,
42 )
43 except Exception as e:
44 print(f"Error saving summer experience: {e}")
45 raise e
47def getCCEMinorProposals(username):
48 proposalList = []
50 cceMinorProposals = list(CCEMinorProposal.select().where(CCEMinorProposal.student==username))
52 for experience in cceMinorProposals:
53 proposalList.append({
54 "id": experience.id,
55 "type": experience.proposalType,
56 "createdBy": experience.createdBy,
57 "supervisor": experience.supervisorName,
58 "term": experience.term,
59 "status": experience.status,
60 })
62 return proposalList
64def getEngagementTotal(engagementData):
65 """
66 Count the number of engagements (from all terms) that have matched with a requirement
67 """
69 # map the flattened list of engagements to their matched values, and sum them
70 return sum(map(lambda e: e['matched'], sum(engagementData.values(),[])))
73def getMinorInterest() -> List[Dict]:
74 """
75 Get all students that have indicated interest in the CCE minor and return a list of dicts of all interested students
76 """
77 interestedStudents = (User.select(User)
78 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username))
79 .where(User.isStudent & User.minorInterest & ~User.declaredMinor & IndividualRequirement.username.is_null(True)))
81 interestedStudentList = [model_to_dict(student) for student in interestedStudents]
83 return interestedStudentList
85def getMinorProgress():
86 """
87 Get all the users who have an IndividualRequirement record under the CCE certification which
88 and returns a list of dicts containing the student, how many engagements they have completed,
89 and if they have completed the summer experience.
90 """
91 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0)
93 engagedStudentsWithCount = (
94 User.select(User, fn.COUNT(IndividualRequirement.id).alias('engagementCount'),
95 fn.SUM(summerCase).alias('hasSummer'),
96 fn.IF(fn.COUNT(CCEMinorProposal.id) > 0, True, False).alias('hasCCEMinorProposal'))
97 .join(IndividualRequirement, on=(User.username == IndividualRequirement.username))
98 .join(CertificationRequirement, on=(IndividualRequirement.requirement_id == CertificationRequirement.id))
99 .switch(User).join(CCEMinorProposal, JOIN.LEFT_OUTER, on= (User.username == CCEMinorProposal.student))
100 .where(CertificationRequirement.certification_id == Certification.CCE)
101 .group_by(User.firstName, User.lastName, User.username)
102 .order_by(SQL("engagementCount").desc())
103 )
104 engagedStudentsList = [{'firstName': student.firstName,
105 'lastName': student.lastName,
106 'username': student.username,
107 'B-Number': student.bnumber,
108 'hasGraduated': student.hasGraduated,
109 'engagementCount': student.engagementCount - student.hasSummer,
110 'hasCCEMinorProposal': student.hasCCEMinorProposal,
111 'hasSummer': "Completed" if student.hasSummer else "Incomplete"} for student in engagedStudentsWithCount]
112 return engagedStudentsList
114def getMinorSpreadsheet():
115 """
116 Returns a spreadsheet containing users and related spreadsheet information.
117 """
118 # If we're in 2025, can we get the minor information for 2023?
119 studentProgress = getMinorProgress()
120 columnNames = studentProgress[0]
121 columnNames = ["First Name", "Last Name", "Username", "B-Number", "Number of Engagements", "Completed Summer Experience"]
123 filepath = f"{app.config['files']['base_path']}/minor_data.xlsx"
124 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True})
126 worksheet = workbook.add_worksheet('minor_information')
127 format_row = workbook.add_format({'align': 'left'})
129 columnIndex = 1
130 worksheet.set_column(columnIndex, len(columnNames), 30, workbook.add_format({'bold': True}))
131 for columnName in columnNames:
132 worksheet.write(1, columnIndex, columnName)
133 columnIndex += 1
135 for rowNumber, student in enumerate(studentProgress, 2):
136 if student['hasGraduated']: continue
137 student.pop('hasCCEMinorProposal')
138 student.pop('hasGraduated')
139 student['hasSummer'] = "Yes" if student['hasSummer'] == "Complete" else "No"
140 worksheet.set_row(rowNumber, None, format_row)
141 if student['B-Number'] == None: student["B-Number"] = "No B-Number Found"
142 for columnNumber, key in enumerate(student, 1):
143 worksheet.write(rowNumber, columnNumber, student[key])
146 workbook.close()
148 return filepath
151def toggleMinorInterest(username, isAdding):
152 """
153 Given a username, update their minor interest and minor status.
154 """
156 try:
157 user = User.get(username=username)
158 if not user:
159 return {"error": "User not found"}, 404
161 user.minorInterest = isAdding
162 user.declaredMinor = False
163 user.save()
165 except Exception as e:
166 print(f"Error updating minor interest: {e}")
167 return {"error": str(e)}, 500
169def declareMinorInterest(username):
170 """
171 Given a username, update their minor declaration
172 """
173 user = User.get_by_id(username)
175 if not user:
176 raise ValueError(f"User with username '{username}' not found.")
178 user.declaredMinor = not user.declaredMinor
180 try:
181 user.save()
182 except Exception as e:
183 raise RuntimeError(f"Failed to declare interested student: {e}")
185def getDeclaredMinorStudentsWithProgress():
186 """
187 This function retrieves a list of students who have declared the CCE minor along with their engagement progress.
188 It returns a list of dictionaries containing student information and their engagement details.
189 """
190 summerCase = Case(None, [(CCEMinorProposal.proposalType == "Summer Experience", 1)], 0)
192 q = (
193 User
194 .select(
195 User,
196 fn.COUNT(fn.DISTINCT(IndividualRequirement.id)).alias("rawEngagementCount"),
197 fn.COALESCE(fn.SUM(fn.DISTINCT(summerCase)), 0).alias("summerCount"),
198 fn.IF(fn.COUNT(fn.DISTINCT(CCEMinorProposal.id)) > 0, True, False).alias("hasCCEMinorProposal"),
199 )
200 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(User.username == IndividualRequirement.username))
201 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(IndividualRequirement.requirement_id == CertificationRequirement.id))
202 .switch(User)
203 .join(CCEMinorProposal, JOIN.LEFT_OUTER, on=(User.username == CCEMinorProposal.student))
204 .where(
205 (User.isStudent == True) &
206 (User.declaredMinor == True) &
207 (
208 (CertificationRequirement.certification_id == Certification.CCE) |
209 (CertificationRequirement.id.is_null(True))
210 )
211 )
212 .group_by(User.firstName, User.lastName, User.username)
213 .order_by(SQL("rawEngagementCount").desc())
214 )
216 result = []
217 for s in q:
219 engagementCount = int(s.rawEngagementCount) - int(s.summerCount or 0)
221 result.append({
222 "firstName": s.firstName,
223 "lastName": s.lastName,
224 "username": s.username,
225 "B-Number": s.bnumber,
226 "email": s.email,
227 "hasGraduated": s.hasGraduated,
228 "engagementCount": engagementCount,
229 "hasCCEMinorProposal": bool(s.hasCCEMinorProposal),
230 "hasSummer": "Completed" if (s.summerCount and int(s.summerCount) > 0) else "Incomplete",
231 })
233 return result
235def getDeclaredMinorStudents():
236 """
237 Get a list of the students who have declared minor
238 """
239 declaredStudents = User.select().where(User.isStudent & User.declaredMinor)
241 interestedStudentList = [model_to_dict(student) for student in declaredStudents]
243 return interestedStudentList
245def getCourseInformation(id):
246 """
247 Given a course ID, return an object containing the course information and
248 its instructors full names.
249 """
250 # retrieve the course and the course instructors
251 course = model_to_dict(Course.get_by_id(id))
253 courseInstructors = (CourseInstructor.select(CourseInstructor, User)
254 .join(Course).switch()
255 .join(User)
256 .where(Course.id == id))
258 courseInformation = {"instructors": [(instructor.user.firstName + " " + instructor.user.lastName) for instructor in courseInstructors], "course": course}
260 return courseInformation
262def getProgramEngagementHistory(program_id, username, term_id):
263 """
264 Given a program_id, username, and term_id, return an object containing all events in the provided program
265 and in the given term along with the program name.
266 """
267 # execute a query that will retrieve all events in which the user has participated
268 # that fall under the provided term and programs.
269 eventsInProgramAndTerm = (Event.select(Event.id, Event.name, EventParticipant.hoursEarned)
270 .join(Program).switch()
271 .join(EventParticipant)
272 .where(EventParticipant.user == username,
273 Event.term == term_id,
274 Event.isService == True,
275 Program.id == program_id)
276 )
278 program = Program.get_by_id(program_id)
280 # calculate total amount of hours for the whole program that term
281 totalHours = 0
282 for event in eventsInProgramAndTerm:
283 if event.eventparticipant.hoursEarned:
284 totalHours += event.eventparticipant.hoursEarned
286 participatedEvents = {"program":program.programName, "events": [event for event in eventsInProgramAndTerm.dicts()], "totalHours": totalHours}
288 return participatedEvents
290def setCommunityEngagementForUser(action, engagementData, currentUser):
291 """
292 Either add or remove an IndividualRequirement record for a student's Sustained Community Engagement
294 :param action: The behavior of the function. Can be 'add' or 'remove'
295 :param engagementData:
296 type: program or course
297 id: program or course id
298 username: the username of the student that is having a community engagement added or removed
299 term: The term the engagement is recorded in
300 :param currentuser: The user who is performing the add/remove action
302 :raises DoesNotExist: if there are no available CertificationRequirement slots remaining for the engagement
303 """
304 if engagementData['type'] not in ['program','course']:
305 raise Exception("Invalid engagement type!")
307 requirement = (CertificationRequirement.select()
308 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(
309 (IndividualRequirement.requirement == CertificationRequirement.id) &
310 (IndividualRequirement.username == engagementData['username'])))
311 .where(IndividualRequirement.username.is_null(True),
312 CertificationRequirement.certification == Certification.CCE,
313 CertificationRequirement.name.not_in(['Summer Program'])))
314 if action == 'add':
315 try:
316 IndividualRequirement.create(**{engagementData['type']: engagementData['id'],
317 "username": engagementData['username'],
318 "term": engagementData['term'],
319 "requirement": requirement.get(),
320 "addedBy": currentUser,
321 })
322 # Thrown if there are no available engagement requirements left. Handled elsewhere.
323 except DoesNotExist as e:
324 raise e
326 elif action == 'remove':
327 IndividualRequirement.delete().where(
328 getattr(IndividualRequirement, engagementData['type']) == engagementData['id'],
329 IndividualRequirement.username == engagementData['username'],
330 IndividualRequirement.term == engagementData['term']
331 ).execute()
332 else:
333 raise Exception(f"Invalid action '{action}' sent to setCommunityEngagementForUser")
335def getCommunityEngagementByTerm(username):
336 """
337 Given a username, return all of their community engagements (service learning courses and event participations.)
338 """
339 courseMatchCase = Case(None, [(IndividualRequirement.course.is_null(True) , 0)], 1)
341 courses = (Course.select(Course, courseMatchCase.alias("matchedReq"))
342 .join(CourseParticipant, on=(Course.id == CourseParticipant.course))
343 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=(
344 (IndividualRequirement.course == Course.id) &
345 (IndividualRequirement.username == CourseParticipant.user) &
346 (IndividualRequirement.term == Course.term)))
347 .where(CourseParticipant.user == username)
348 .group_by(Course.courseName, Course.term))
350 # initialize default dict to store term descriptions as keys mapping to each
351 # engagement's respective type, name, id, and term.
352 communityEngagementByTermDict = defaultdict(list)
353 for course in courses:
354 communityEngagementByTermDict[(course.term.description, course.term.id)].append(
355 {"name":course.courseName,
356 "id":course.id,
357 "type":"course",
358 "matched": course.matchedReq,
359 "term":course.term.id})
361 programMatchCase = Case(None, [(IndividualRequirement.program.is_null(True) , 0)], 1)
363 events = (Event.select(Event, Program, programMatchCase.alias('matchedReq'))
364 .join(EventParticipant, on=(Event.id == EventParticipant.event)).switch()
365 .join(Program)
366 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.program == Program.id) &
367 (IndividualRequirement.username == EventParticipant.user) &
368 (IndividualRequirement.term == Event.term)))
369 .where(EventParticipant.user == username, Event.isService == True)
370 .group_by(Event.program, Event.term))
372 for event in events:
373 communityEngagementByTermDict[(event.term.description, event.term.id)].append({"name":event.program.programName,
374 "id":event.program.id,
375 "type":"program",
376 "matched": event.matchedReq,
377 "term":event.term.id
378 })
380 # sorting the communityEngagementByTermDict by the term id
381 return dict(sorted(communityEngagementByTermDict.items(), key=lambda engagement: engagement[0][1]))
383def createOtherEngagementRequest(username, formData):
384 """
385 Create a CCEMinorProposal entry based off of the form data
386 """
387 user = User.get(User.username == username)
389 cceObject = CCEMinorProposal.create(proposalType = 'Other Engagement',
390 createdBy = g.current_user,
391 status = 'Pending',
392 student = user,
393 **formData
394 )
396 return cceObject
398def saveSummerExperience(username, summerExperience, currentUser):
399 """
400 :param username: username of the student that the summer experience is for
401 :param summerExperience: dict
402 summerExperience: string of what the summer experience was (will be written as the 'description' in the IndividualRequirement table)
403 selectedSummerTerm: the term description that the summer experience took place in
404 :param currentUser: the username of the user who added the summer experience record
406 Delete any existing IndividualRequirement entry for 'username' if it is for 'Summer Program' and create a new IndividualRequirement entry for
407 'Summer Program' with the contents of summerExperience.
408 """
409 requirementDeleteSubSelect = CertificationRequirement.select().where(CertificationRequirement.certification == Certification.CCE, CertificationRequirement.name << ['Summer Program'])
410 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.requirement == requirementDeleteSubSelect).execute()
412 requirement = (CertificationRequirement.select()
413 .join(IndividualRequirement, JOIN.LEFT_OUTER, on=((IndividualRequirement.requirement == CertificationRequirement.id) &
414 (IndividualRequirement.username == username)))
415 .where(IndividualRequirement.username.is_null(True),
416 CertificationRequirement.certification == Certification.CCE,
417 CertificationRequirement.name << ['Summer Program']))
419 summerTerm = (Term.select().where(Term.description == summerExperience['selectedSummerTerm']))
421 IndividualRequirement.create(**{"description": summerExperience['summerExperience'],
422 "username": username,
423 "term": summerTerm.get(),
424 "requirement": requirement.get(),
425 "addedBy": currentUser,
426 })
427 return ""
429def getSummerExperience(username):
430 """
431 Get a students summer experience to populate text box if the student has one
432 """
433 summerExperience = (IndividualRequirement.select()
434 .join(CertificationRequirement, JOIN.LEFT_OUTER, on=(CertificationRequirement.id == IndividualRequirement.requirement)).switch()
435 .join(Term, on=(IndividualRequirement.term == Term.id))
436 .where(IndividualRequirement.username == username,
437 CertificationRequirement.certification == Certification.CCE,
438 CertificationRequirement.name << ['Summer Program']))
439 if len(list(summerExperience)) == 1:
440 return (summerExperience.get().term.description, summerExperience.get().description)
442 return (None, None)
444def removeSummerExperience(username):
445 """
446 Delete IndividualRequirement table entry for 'username'
447 """
448 term, summerExperienceToDelete = getSummerExperience(username)
449 IndividualRequirement.delete().where(IndividualRequirement.username == username, IndividualRequirement.description == summerExperienceToDelete).execute()
451def removeProposal(proposalID) -> None:
452 """
453 Delete summer experience or other engagement objects from the CCEMinorProposal table.
454 File objects attached to the CCEMinorProposal object are also deleted.
455 """
456 proposalID = int(proposalID)
458 proposalAttachment = AttachmentUpload.get_or_none(proposal=proposalID)
459 if proposalAttachment:
460 proposalFileHandler = FileHandler(proposalId=proposalID)
461 proposalFileHandler.deleteFile(proposalAttachment.id)
463 CCEMinorProposal.delete().where(CCEMinorProposal.id == proposalID).execute()