Coverage for app/logic/celtsLabor.py: 74%

57 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2025-07-22 20:03 +0000

1import requests 

2import json 

3from peewee import DoesNotExist 

4from collections import defaultdict 

5from app import app 

6from app.models.user import User 

7from app.models.celtsLabor import CeltsLabor 

8from app.models.term import Term 

9 

10def getCeltsLaborFromLsf(): 

11 """ 

12 Make a call to the LSF endpoint which returns all CELTS student labor records.  

13 

14 The returned data is a dictionary with B# key and value that is a list of dicts that contain the labor information.  

15 

16 """ 

17 try: 

18 lsfUrl = f"{app.config['lsf_url'].strip('/')}/api/org/2084" 

19 response = requests.get(lsfUrl) 

20 return response.json() 

21 except json.decoder.JSONDecodeError: 

22 print(f'Response from {lsfUrl} was not JSON.\n' + response.text) 

23 return {} 

24 except KeyError as e: 

25 print(f'Make sure you have "lsf_url" set in your local-override config file.') 

26 raise(e) 

27 

28def updateCeltsLaborFromLsf(): 

29 """ 

30 Parse the LSF response object so that it is only labor records for summer and academic years. 

31 

32 Input: JSON response 

33 {'B#': [{'positionTitle': 'Position Title', 

34 'termCode': 'Term Code',  

35 'laborStart': Labor Start Date,  

36 'laborEnd': Labor End Date,  

37 'jobType': 'Job Type', 

38 'wls': 'WLS Lvl', 

39 'termName': 'Term Name'}], 

40 'B#': [{'positionTitle': 'Position Title', 

41 'termCode': 'Term Code',  

42 'laborStart': Labor Start Date,  

43 'laborEnd': Labor End Date,  

44 'jobType': 'Job Type', 

45 'wls': 'WLS Lvl', 

46 'termName': 'Term Name'}]} 

47 """ 

48 laborDict = getCeltsLaborFromLsf() 

49 

50 studentLaborDict = {} 

51 for key, value in laborDict.items(): 

52 try: 

53 username = User.get(bnumber = key) 

54 # All term codes for summer end with 13 and all term codes for an academic year end in 00 and those are the only terms we want to record. 

55 studentLaborDict[username] = collapsePositions([p for p in value if str(p["termCode"])[-2:] in ["00","13"]]) 

56 except DoesNotExist: 

57 pass 

58 

59 refreshCeltsLaborRecords(studentLaborDict) 

60 

61def collapsePositions(positionList): 

62 ''' 

63 Parse position information from the JSON response and return only the position title and term title  

64 

65 Input: [{'positionTitle': 'Position Title', 

66 'termCode': 'Term Code',  

67 'laborStart': Labor Start Date,  

68 'laborEnd': Labor End Date,  

69 'jobType': 'Job Type', 

70 'wls': 'WLS Lvl', 

71 'termName': 'Term Name'}] 

72  

73 Returned:{'Position Title': ['Term Name']} 

74 

75 ''' 

76 laborPositionDict = defaultdict(list) 

77 

78 for position in positionList: 

79 laborPositionDict[position['positionTitle']].append(position['termName'].replace("AY ", "")) 

80 

81 return laborPositionDict 

82 

83def refreshCeltsLaborRecords(laborDict): 

84 """ 

85 Input: Dictionary containing CELTS labor information 

86 {'username1': {'positionTitle': ['termName'], 

87 'positionTitle': ['termName']}, 

88 'username2': {'positionTitle': ['termName', 'termName']} 

89 } 

90 

91 Delete records for the students that are currently in the CeltsLabor table 

92 if they are also in the laborDict and save content of laborDict to the table.  

93 """ 

94 

95 celtsLabor = [] 

96 for key, value in laborDict.items(): 

97 for positionTitle, termNames in value.items(): 

98 for term in termNames: 

99 termTableMatch = Term.select() 

100 if term[0].isalpha(): 

101 termTableMatch = termTableMatch.where(Term.description == term) 

102 else: 

103 termTableMatch = termTableMatch.where(Term.academicYear == term, Term.description % "Fall%") 

104 try: 

105 laborTerm = termTableMatch.get() 

106 isAcademicYear = not laborTerm.isSummer 

107 celtsLabor.append({"user": key, 

108 "positionTitle": positionTitle, 

109 "term": laborTerm, 

110 "isAcademicYear": isAcademicYear}) 

111 except DoesNotExist: 

112 pass 

113 

114 CeltsLabor.delete().where(CeltsLabor.user << [username['user'] for username in celtsLabor]).execute() 

115 CeltsLabor.insert_many(celtsLabor).on_conflict_replace().execute() 

116 

117def getCeltsLaborHistory(volunteer): 

118 

119 laborHistoryList = list(CeltsLabor.select(CeltsLabor.positionTitle, 

120 Term.description, 

121 Term.academicYear, 

122 Term.isSummer) 

123 .join(Term, on=(CeltsLabor.term == Term.id)) 

124 .where(CeltsLabor.user == volunteer)) 

125 

126 laborHistoryDict= {} 

127 for position in laborHistoryList: 

128 laborHistoryDict[position.positionTitle] = position.term.description if position.term.isSummer else position.term.academicYear 

129 

130 return laborHistoryDict