Coverage for app/logic/spreadsheet.py: 100%
113 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-07-24 12:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2024-07-24 12:19 +0000
1from importlib.abc import ResourceReader
2from os import major
3import xlsxwriter
4from peewee import fn, Case, JOIN
5from collections import defaultdict
7from app import app
8from app.models.eventParticipant import EventParticipant
9from app.models.user import User
10from app.models.program import Program
11from app.models.event import Event
12from app.models.term import Term
15def getUniqueVolunteers(academicYear):
17 uniqueVolunteers = (EventParticipant.select(fn.DISTINCT(EventParticipant.user_id), fn.CONCAT(User.firstName, ' ', User.lastName), User.bnumber)
18 .join(User).switch(EventParticipant)
19 .join(Event)
20 .join(Term)
21 .where(Term.academicYear == academicYear)
22 .order_by(EventParticipant.user_id))
24 return uniqueVolunteers.tuples()
26def getVolunteerProgramEventByTerm(term):
27# Volunteers by term for each event the participated in for wich program. user: program, event, term
29 volunteersByTerm = (EventParticipant.select(fn.CONCAT(User.firstName, ' ', User.lastName), EventParticipant.user_id, Program.programName, Event.name)
30 .join(User).switch(EventParticipant)
31 .join(Event)
32 .join(Program)
33 .where(Event.term_id == term)
34 .order_by(EventParticipant.user_id))
36 return volunteersByTerm.tuples()
38def totalVolunteerHours():
40 query = (EventParticipant.select(fn.SUM(EventParticipant.hoursEarned)))
42 return query.tuples()
44def volunteerProgramHours():
46 volunteerProgramHours = (EventParticipant.select(Program.programName, EventParticipant.user_id, fn.SUM(EventParticipant.hoursEarned))
47 .join(Event, on=(EventParticipant.event_id == Event.id))
48 .join(Program, on=(Event.program_id == Program.id))
49 .group_by(Program.programName, EventParticipant.user_id))
51 return volunteerProgramHours.tuples()
53def onlyCompletedAllVolunteer(academicYear):
54 # Return volunteers that only attended the All Volunteer Training and then nothing else
56 subQuery = (EventParticipant.select(EventParticipant.user_id)
57 .join(Event)
58 .join(Term)
59 .where(Event.name != "All Volunteer Training", Term.academicYear == academicYear))
61 onlyAllVolunteer = (EventParticipant.select(EventParticipant.user_id, fn.CONCAT(User.firstName, " ", User.lastName))
62 .join(User).switch(EventParticipant)
63 .join(Event)
64 .join(Term)
65 .where(Event.name == "All Volunteer Training", Term.academicYear == academicYear, EventParticipant.user_id.not_in(subQuery)))
67 return onlyAllVolunteer.tuples()
69def volunteerHoursByProgram():
70 query = ((Program.select(Program.programName, fn.SUM(EventParticipant.hoursEarned).alias('sum')).join(Event)
71 .join(EventParticipant, on=(Event.id == EventParticipant.event_id))
72 .group_by(Program.programName)
73 .order_by(Program.programName)))
75 return query.tuples()
77def volunteerMajorAndClass(column, reorderClassLevel=False):
79 majorAndClass = (User.select(Case(None, ((column.is_null(), "Unknown"),), column), fn.COUNT(fn.DISTINCT(EventParticipant.user_id)).alias('count'))
80 .join(EventParticipant, on=(User.username == EventParticipant.user_id))
81 .group_by(column))
83 if reorderClassLevel:
84 majorAndClass = majorAndClass.order_by(Case(None, ((column == "Freshman", 1),
85 (column == "Sophomore", 2),
86 (column == "Junior", 3),
87 (column == "Senior", 4),
88 (column == "Graduating", 5),
89 (column == "Non-Degree", 6),
90 (column.is_null(), 7)),
91 8))
92 else:
93 majorAndClass = majorAndClass.order_by(column.asc(nulls = 'LAST'))
95 return majorAndClass.tuples()
97def repeatVolunteersPerProgram():
98 # Get people who participated in events more than once (individual program)
99 repeatPerProgramQuery = (EventParticipant.select(fn.CONCAT(User.firstName, " ", User.lastName).alias('fullName'),
100 Program.programName.alias("programName"),
101 fn.COUNT(EventParticipant.event_id).alias('event_count'))
102 .join(Event, on=(EventParticipant.event_id == Event.id))
103 .join(Program, on=(Event.program == Program.id))
104 .join(User, on=(User.username == EventParticipant.user_id))
105 .group_by(User.firstName, User.lastName, Event.program)
106 .having(fn.COUNT(EventParticipant.event_id) > 1)
107 .order_by(Event.program, User.lastName))
109 return repeatPerProgramQuery.tuples()
111def repeatVolunteers():
112 # Get people who participated in events more than once (all programs)
113 repeatAllProgramQuery = (EventParticipant.select(fn.CONCAT(User.firstName," ", User.lastName), fn.COUNT(EventParticipant.user_id).alias('count'))
114 .join(User, on=(User.username == EventParticipant.user_id))
115 .group_by(User.firstName, User.lastName)
116 .having(fn.COUNT(EventParticipant.user_id) > 1))
118 return repeatAllProgramQuery.tuples()
120def getRetentionRate(academicYear):
121 # Returns a list of tuples of program retention information in the format ('program name', 'percent people retained')
122 retentionList = []
123 fall, spring = academicYear.split("-")
124 fallParticipationDict = termParticipation(f"Fall {fall}")
125 springParticipationDict = termParticipation(f"Spring {spring}")
127 # calculate the retention rate using the defined function
128 retentionRateDict = calculateRetentionRate(fallParticipationDict, springParticipationDict)
129 for program, retentionRate in retentionRateDict.items():
130 retentionList.append((program, str(round(retentionRate * 100, 2)) + "%"))
132 return retentionList
134def termParticipation(termDescription):
135 participationQuery = (Event.select(Event.program, EventParticipant.user_id.alias('participant'), Program.programName.alias("programName"))
136 .join(EventParticipant, JOIN.LEFT_OUTER, on=(Event.id == EventParticipant.event))
137 .join(Program, on=(Program.id == Event.program))
138 .join(Term, on=(Event.term_id == Term.id))
139 .where(Term.description == termDescription))
141 programParticipationDict = defaultdict(list)
142 for result in participationQuery.dicts():
143 programName = result['programName']
144 participant = result['participant']
145 programParticipationDict[programName].append(participant)
147 return dict(programParticipationDict)
149def removeNullParticipants(participantList):
150 # loop through the list and remove all entries that do not have a participant
151 return list(filter(lambda participant: participant, participantList))
153# function to calculate the retention rate for each program
154def calculateRetentionRate(fallDict, springDict):
155 retentionDict = {}
156 for program in fallDict:
157 fallParticipants = set(removeNullParticipants(fallDict[program]))
158 springParticipants = set(removeNullParticipants(springDict.get(program, [])))
159 retentionRate = 0.0
160 try:
161 retentionRate = len(fallParticipants & springParticipants) / len(fallParticipants)
162 except ZeroDivisionError:
163 pass
164 retentionDict[program] = retentionRate
167 return retentionDict
169# def halfRetentionRateRecurringEvents():
171# programs = ProgramEvent.select(ProgramEvent.program_id).distinct()
173# retention_rates = {}
175# # Loop over the programs and get the corresponding event IDs
176# for program in programs:
177# # Define the query for each program
178# query = (EventParticipant.select(EventParticipant.event_id.alias("event_id"), Event.name.alias("name"))
179# .join(Event, on=(EventParticipant.event_id == Event.id))
180# .join(ProgramEvent, on=(EventParticipant.event_id == ProgramEvent.event_id))
181# .join(Program, on=(Program.id == ProgramEvent.program_id))
182# .where((ProgramEvent.program_id == program.program_id) & (Event.recurringId != None))
183# .distinct()
184# .dicts())
186# event_count = 0
187# name_counts = defaultdict(int)
189# for result in query:
190# event_count += 1
191# participants = EventParticipant.select(EventParticipant.user_id).where(EventParticipant.event_id == result["event_id"])
192# for participant in participants:
193# name = participant.user_id
194# name_counts[name] += 1
196# half_count = event_count // 2
197# qualified_names = [name for name, count in name_counts.items() if count >= half_count]
199# if len(name_counts) > 0:
200# percentage = len(qualified_names) / len(name_counts) * 100
201# else:
202# percentage = 0
204# retention_rates[program.program.programName] = percentage
206# return retention_rates
209# def fullRetentionRateRecurringEvents():
211# programs = ProgramEvent.select(ProgramEvent.program_id).distinct()
213# full_retention = {}
215# # Loop over the programs and get the corresponding event IDs
216# for program in programs:
217# # Define the query for each program
218# query = (EventParticipant.select(EventParticipant.event_id.alias("event_id"), Event.name.alias("name"))
219# .join(Event, on=(EventParticipant.event_id == Event.id))
220# .join(ProgramEvent, on=(EventParticipant.event_id == ProgramEvent.event_id))
221# .join(Program, on=(Program.id == ProgramEvent.program_id))
222# .where((ProgramEvent.program_id == program.program_id) & (Event.recurringId != None))
223# .distinct()
224# .dicts())
226# event_count = 0
227# name_counts = defaultdict(int)
229# for result in query:
230# event_count += 1
231# participants = EventParticipant.select(EventParticipant.user_id).where(EventParticipant.event_id == result["event_id"])
232# for participant in participants:
233# name = participant.user_id
234# name_counts[name] += 1
236# qualified_names = [name for name, count in name_counts.items() if count >= event_count]
238# if len(name_counts) > 0:
239# percentage = len(qualified_names) / len(name_counts) * 100
240# else:
241# percentage = 0
243# full_retention[program.program.programName] = percentage
245# return full_retention
247# create a new Excel file
249# define function to save data to a sheet in the Excel file
250def makeDataXls(getData, columnTitles, sheetName, workbook):
252 worksheet = workbook.add_worksheet(sheetName)
253 bold = workbook.add_format({'bold': True})
255 worksheet.write_string(0, 0, sheetName)
257 for column, title in enumerate(columnTitles):
258 worksheet.write(1, column, title, bold)
260 for column, rowData in enumerate(getData):
261 for data, value in enumerate(rowData):
262 worksheet.write(column+2, data, value)
264 for column, title in enumerate(columnTitles):
265 columnData = [title] + [rowData[column] for rowData in getData]
266 setColumnWidth = max(len(str(x)) for x in columnData)
267 worksheet.set_column(column, column, setColumnWidth + 3)
269def createSpreadsheet(academicYear):
270 filepath = app.config['files']['base_path'] + '/volunteer_data.xlsx'
271 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True})
273 hoursByProgramColumns = ["Program", "Hours"]
274 volunteerMajorColumns = ["Major", "Count"]
275 volunteerClassColumns = ["Class Level", "Count"]
276 repeatProgramEventVolunteerColumns = ["Volunteer", "Program Name", "Event Count"]
277 repeatAllProgramVolunteerColumns = ["Volunteer", "Number of Events"]
278 volunteerProgramRetentionRateAcrossTermColumns = ["Program", "Retention Rate"]
279 uniqueVolunteersColumns = ["Username", "Full Name", "B-Number"]
280 totalVolunteerHoursColumns = ["Total Volunteer Hours"]
281 volunteerProgramHoursColumns = [ "Program Name", "Volunteer Username", "Volunteer Hours"]
282 onlyCompletedAllVolunteerColumns = ["Username", "Full Name"]
283 volunteerProgramEventByTerm = ["Full Name", "Username", "Program Name", "Event Name"]
286 makeDataXls(volunteerHoursByProgram(), hoursByProgramColumns, "Total Hours By Program", workbook)
287 makeDataXls(volunteerMajorAndClass(User.major), volunteerMajorColumns, "Volunteers By Major", workbook)
288 makeDataXls(volunteerMajorAndClass(User.classLevel, reorderClassLevel=True), volunteerClassColumns, "Volunteers By Class Level", workbook)
289 makeDataXls(repeatVolunteersPerProgram(), repeatProgramEventVolunteerColumns, "Repeat Volunteers Per Program", workbook)
290 makeDataXls(repeatVolunteers(), repeatAllProgramVolunteerColumns, "Repeat Volunteers All Programs", workbook)
291 makeDataXls(getRetentionRate(academicYear), volunteerProgramRetentionRateAcrossTermColumns, "Retention Rate By Semester", workbook)
292 makeDataXls(getUniqueVolunteers(academicYear), uniqueVolunteersColumns, "Unique Volunteers", workbook)
293 makeDataXls(totalVolunteerHours(), totalVolunteerHoursColumns, "Total Hours", workbook)
294 makeDataXls(volunteerProgramHours(), volunteerProgramHoursColumns, "Volunteer Hours By Program", workbook)
295 makeDataXls(onlyCompletedAllVolunteer(academicYear), onlyCompletedAllVolunteerColumns , "Only All Volunteer Training", workbook)
296 makeDataXls(getVolunteerProgramEventByTerm(Term.get_or_none(Term.description == f"Fall {academicYear.split('-')[0]}")), volunteerProgramEventByTerm, f"Fall {academicYear.split('-')[0]}", workbook)
298 workbook.close()
300 return filepath