Coverage for app/logic/spreadsheet.py: 100%

113 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-06-19 13:45 +0000

1from importlib.abc import ResourceReader 

2from os import major 

3import xlsxwriter 

4from peewee import fn, Case, JOIN 

5from collections import defaultdict 

6 

7from app import app 

8from app.models.eventParticipant import EventParticipant 

9from app.models.user import User 

10from app.models.program import Program 

11from app.models.event import Event 

12from app.models.term import Term 

13 

14 

15def getUniqueVolunteers(academicYear): 

16 

17 uniqueVolunteers = (EventParticipant.select(fn.DISTINCT(EventParticipant.user_id), fn.CONCAT(User.firstName, ' ', User.lastName), User.bnumber) 

18 .join(User).switch(EventParticipant) 

19 .join(Event) 

20 .join(Term) 

21 .where(Term.academicYear == academicYear) 

22 .order_by(EventParticipant.user_id)) 

23 

24 return uniqueVolunteers.tuples() 

25 

26def getVolunteerProgramEventByTerm(term): 

27# Volunteers by term for each event the participated in for wich program. user: program, event, term 

28 

29 volunteersByTerm = (EventParticipant.select(fn.CONCAT(User.firstName, ' ', User.lastName), EventParticipant.user_id, Program.programName, Event.name) 

30 .join(User).switch(EventParticipant) 

31 .join(Event) 

32 .join(Program) 

33 .where(Event.term_id == term) 

34 .order_by(EventParticipant.user_id)) 

35 

36 return volunteersByTerm.tuples() 

37 

38def totalVolunteerHours(): 

39 

40 query = (EventParticipant.select(fn.SUM(EventParticipant.hoursEarned))) 

41 

42 return query.tuples() 

43 

44def volunteerProgramHours(): 

45 

46 volunteerProgramHours = (EventParticipant.select(Program.programName, EventParticipant.user_id, fn.SUM(EventParticipant.hoursEarned)) 

47 .join(Event, on=(EventParticipant.event_id == Event.id)) 

48 .join(Program, on=(Event.program_id == Program.id)) 

49 .group_by(Program.programName, EventParticipant.user_id)) 

50 

51 return volunteerProgramHours.tuples() 

52 

53def onlyCompletedAllVolunteer(academicYear): 

54 # Return volunteers that only attended the All Volunteer Training and then nothing else 

55 

56 subQuery = (EventParticipant.select(EventParticipant.user_id) 

57 .join(Event) 

58 .join(Term) 

59 .where(Event.name != "All Volunteer Training", Term.academicYear == academicYear)) 

60 

61 onlyAllVolunteer = (EventParticipant.select(EventParticipant.user_id, fn.CONCAT(User.firstName, " ", User.lastName)) 

62 .join(User).switch(EventParticipant) 

63 .join(Event) 

64 .join(Term) 

65 .where(Event.name == "All Volunteer Training", Term.academicYear == academicYear, EventParticipant.user_id.not_in(subQuery))) 

66 

67 return onlyAllVolunteer.tuples() 

68 

69def volunteerHoursByProgram(): 

70 query = ((Program.select(Program.programName, fn.SUM(EventParticipant.hoursEarned).alias('sum')).join(Event) 

71 .join(EventParticipant, on=(Event.id == EventParticipant.event_id)) 

72 .group_by(Program.programName) 

73 .order_by(Program.programName))) 

74 

75 return query.tuples() 

76 

77def volunteerMajorAndClass(column, reorderClassLevel=False): 

78 

79 majorAndClass = (User.select(Case(None, ((column.is_null(), "Unknown"),), column), fn.COUNT(fn.DISTINCT(EventParticipant.user_id)).alias('count')) 

80 .join(EventParticipant, on=(User.username == EventParticipant.user_id)) 

81 .group_by(column)) 

82 

83 if reorderClassLevel: 

84 majorAndClass = majorAndClass.order_by(Case(None, ((column == "Freshman", 1), 

85 (column == "Sophomore", 2), 

86 (column == "Junior", 3), 

87 (column == "Senior", 4), 

88 (column == "Graduating", 5), 

89 (column == "Non-Degree", 6), 

90 (column.is_null(), 7)), 

91 8)) 

92 else: 

93 majorAndClass = majorAndClass.order_by(column.asc(nulls = 'LAST')) 

94 

95 return majorAndClass.tuples() 

96 

97def repeatVolunteersPerProgram(): 

98 # Get people who participated in events more than once (individual program) 

99 repeatPerProgramQuery = (EventParticipant.select(fn.CONCAT(User.firstName, " ", User.lastName).alias('fullName'), 

100 Program.programName.alias("programName"), 

101 fn.COUNT(EventParticipant.event_id).alias('event_count')) 

102 .join(Event, on=(EventParticipant.event_id == Event.id)) 

103 .join(Program, on=(Event.program == Program.id)) 

104 .join(User, on=(User.username == EventParticipant.user_id)) 

105 .group_by(User.firstName, User.lastName, Event.program) 

106 .having(fn.COUNT(EventParticipant.event_id) > 1) 

107 .order_by(Event.program, User.lastName)) 

108 

109 return repeatPerProgramQuery.tuples() 

110 

111def repeatVolunteers(): 

112 # Get people who participated in events more than once (all programs) 

113 repeatAllProgramQuery = (EventParticipant.select(fn.CONCAT(User.firstName," ", User.lastName), fn.COUNT(EventParticipant.user_id).alias('count')) 

114 .join(User, on=(User.username == EventParticipant.user_id)) 

115 .group_by(User.firstName, User.lastName) 

116 .having(fn.COUNT(EventParticipant.user_id) > 1)) 

117 

118 return repeatAllProgramQuery.tuples() 

119 

120def getRetentionRate(academicYear): 

121 # Returns a list of tuples of program retention information in the format ('program name', 'percent people retained') 

122 retentionList = [] 

123 fall, spring = academicYear.split("-") 

124 fallParticipationDict = termParticipation(f"Fall {fall}") 

125 springParticipationDict = termParticipation(f"Spring {spring}") 

126 

127 # calculate the retention rate using the defined function 

128 retentionRateDict = calculateRetentionRate(fallParticipationDict, springParticipationDict) 

129 for program, retentionRate in retentionRateDict.items(): 

130 retentionList.append((program, str(round(retentionRate * 100, 2)) + "%")) 

131 

132 return retentionList 

133 

134def termParticipation(termDescription): 

135 participationQuery = (Event.select(Event.program, EventParticipant.user_id.alias('participant'), Program.programName.alias("programName")) 

136 .join(EventParticipant, JOIN.LEFT_OUTER, on=(Event.id == EventParticipant.event)) 

137 .join(Program, on=(Program.id == Event.program)) 

138 .join(Term, on=(Event.term_id == Term.id)) 

139 .where(Term.description == termDescription)) 

140 

141 programParticipationDict = defaultdict(list) 

142 for result in participationQuery.dicts(): 

143 programName = result['programName'] 

144 participant = result['participant'] 

145 programParticipationDict[programName].append(participant) 

146 

147 return dict(programParticipationDict) 

148 

149def removeNullParticipants(participantList): 

150 # loop through the list and remove all entries that do not have a participant 

151 return list(filter(lambda participant: participant, participantList)) 

152 

153# function to calculate the retention rate for each program 

154def calculateRetentionRate(fallDict, springDict): 

155 retentionDict = {} 

156 for program in fallDict: 

157 fallParticipants = set(removeNullParticipants(fallDict[program])) 

158 springParticipants = set(removeNullParticipants(springDict.get(program, []))) 

159 retentionRate = 0.0 

160 try: 

161 retentionRate = len(fallParticipants & springParticipants) / len(fallParticipants) 

162 except ZeroDivisionError: 

163 pass 

164 retentionDict[program] = retentionRate 

165 

166 

167 return retentionDict 

168 

169# def halfRetentionRateRecurringEvents(): 

170 

171# programs = ProgramEvent.select(ProgramEvent.program_id).distinct() 

172 

173# retention_rates = {} 

174 

175# # Loop over the programs and get the corresponding event IDs 

176# for program in programs: 

177# # Define the query for each program 

178# query = (EventParticipant.select(EventParticipant.event_id.alias("event_id"), Event.name.alias("name")) 

179# .join(Event, on=(EventParticipant.event_id == Event.id)) 

180# .join(ProgramEvent, on=(EventParticipant.event_id == ProgramEvent.event_id)) 

181# .join(Program, on=(Program.id == ProgramEvent.program_id)) 

182# .where((ProgramEvent.program_id == program.program_id) & (Event.recurringId != None)) 

183# .distinct() 

184# .dicts()) 

185 

186# event_count = 0 

187# name_counts = defaultdict(int) 

188 

189# for result in query: 

190# event_count += 1 

191# participants = EventParticipant.select(EventParticipant.user_id).where(EventParticipant.event_id == result["event_id"]) 

192# for participant in participants: 

193# name = participant.user_id 

194# name_counts[name] += 1 

195 

196# half_count = event_count // 2 

197# qualified_names = [name for name, count in name_counts.items() if count >= half_count] 

198 

199# if len(name_counts) > 0: 

200# percentage = len(qualified_names) / len(name_counts) * 100 

201# else: 

202# percentage = 0 

203 

204# retention_rates[program.program.programName] = percentage 

205 

206# return retention_rates 

207 

208 

209# def fullRetentionRateRecurringEvents(): 

210 

211# programs = ProgramEvent.select(ProgramEvent.program_id).distinct() 

212 

213# full_retention = {} 

214 

215# # Loop over the programs and get the corresponding event IDs 

216# for program in programs: 

217# # Define the query for each program 

218# query = (EventParticipant.select(EventParticipant.event_id.alias("event_id"), Event.name.alias("name")) 

219# .join(Event, on=(EventParticipant.event_id == Event.id)) 

220# .join(ProgramEvent, on=(EventParticipant.event_id == ProgramEvent.event_id)) 

221# .join(Program, on=(Program.id == ProgramEvent.program_id)) 

222# .where((ProgramEvent.program_id == program.program_id) & (Event.recurringId != None)) 

223# .distinct() 

224# .dicts()) 

225 

226# event_count = 0 

227# name_counts = defaultdict(int) 

228 

229# for result in query: 

230# event_count += 1 

231# participants = EventParticipant.select(EventParticipant.user_id).where(EventParticipant.event_id == result["event_id"]) 

232# for participant in participants: 

233# name = participant.user_id 

234# name_counts[name] += 1 

235 

236# qualified_names = [name for name, count in name_counts.items() if count >= event_count] 

237 

238# if len(name_counts) > 0: 

239# percentage = len(qualified_names) / len(name_counts) * 100 

240# else: 

241# percentage = 0 

242 

243# full_retention[program.program.programName] = percentage 

244 

245# return full_retention 

246 

247# create a new Excel file 

248 

249# define function to save data to a sheet in the Excel file 

250def makeDataXls(getData, columnTitles, sheetName, workbook): 

251 

252 worksheet = workbook.add_worksheet(sheetName) 

253 bold = workbook.add_format({'bold': True}) 

254 

255 worksheet.write_string(0, 0, sheetName) 

256 

257 for column, title in enumerate(columnTitles): 

258 worksheet.write(1, column, title, bold) 

259 

260 for column, rowData in enumerate(getData): 

261 for data, value in enumerate(rowData): 

262 worksheet.write(column+2, data, value) 

263 

264 for column, title in enumerate(columnTitles): 

265 columnData = [title] + [rowData[column] for rowData in getData] 

266 setColumnWidth = max(len(str(x)) for x in columnData) 

267 worksheet.set_column(column, column, setColumnWidth + 3) 

268 

269def createSpreadsheet(academicYear): 

270 filepath = app.config['files']['base_path'] + '/volunteer_data.xlsx' 

271 workbook = xlsxwriter.Workbook(filepath, {'in_memory': True}) 

272 

273 hoursByProgramColumns = ["Program", "Hours"] 

274 volunteerMajorColumns = ["Major", "Count"] 

275 volunteerClassColumns = ["Class Level", "Count"] 

276 repeatProgramEventVolunteerColumns = ["Volunteer", "Program Name", "Event Count"] 

277 repeatAllProgramVolunteerColumns = ["Volunteer", "Number of Events"] 

278 volunteerProgramRetentionRateAcrossTermColumns = ["Program", "Retention Rate"] 

279 uniqueVolunteersColumns = ["Username", "Full Name", "B-Number"] 

280 totalVolunteerHoursColumns = ["Total Volunteer Hours"] 

281 volunteerProgramHoursColumns = [ "Program Name", "Volunteer Username", "Volunteer Hours"] 

282 onlyCompletedAllVolunteerColumns = ["Username", "Full Name"] 

283 volunteerProgramEventByTerm = ["Full Name", "Username", "Program Name", "Event Name"] 

284 

285 

286 makeDataXls(volunteerHoursByProgram(), hoursByProgramColumns, "Total Hours By Program", workbook) 

287 makeDataXls(volunteerMajorAndClass(User.major), volunteerMajorColumns, "Volunteers By Major", workbook) 

288 makeDataXls(volunteerMajorAndClass(User.classLevel, reorderClassLevel=True), volunteerClassColumns, "Volunteers By Class Level", workbook) 

289 makeDataXls(repeatVolunteersPerProgram(), repeatProgramEventVolunteerColumns, "Repeat Volunteers Per Program", workbook) 

290 makeDataXls(repeatVolunteers(), repeatAllProgramVolunteerColumns, "Repeat Volunteers All Programs", workbook) 

291 makeDataXls(getRetentionRate(academicYear), volunteerProgramRetentionRateAcrossTermColumns, "Retention Rate By Semester", workbook) 

292 makeDataXls(getUniqueVolunteers(academicYear), uniqueVolunteersColumns, "Unique Volunteers", workbook) 

293 makeDataXls(totalVolunteerHours(), totalVolunteerHoursColumns, "Total Hours", workbook) 

294 makeDataXls(volunteerProgramHours(), volunteerProgramHoursColumns, "Volunteer Hours By Program", workbook) 

295 makeDataXls(onlyCompletedAllVolunteer(academicYear), onlyCompletedAllVolunteerColumns , "Only All Volunteer Training", workbook) 

296 makeDataXls(getVolunteerProgramEventByTerm(Term.get_or_none(Term.description == f"Fall {academicYear.split('-')[0]}")), volunteerProgramEventByTerm, f"Fall {academicYear.split('-')[0]}", workbook) 

297 

298 workbook.close() 

299 

300 return filepath