Coverage for app/logic/fileHandler.py: 81%

91 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-12-22 18:34 +0000

1import os 

2from flask import redirect, url_for 

3from app import app 

4from app.models.attachmentUpload import AttachmentUpload 

5from app.models.cceMinorProposal import CCEMinorProposal 

6from app.models.program import Program 

7import glob 

8 

9class FileHandler: 

10 def __init__(self, files=None, courseId=None, eventId=None, programId=None, proposalId=None): 

11 self.files = files 

12 if not isinstance(self.files, list): 

13 self.files = [self.files] 

14 self.path = app.config['files']['base_path'] 

15 self.courseId = courseId 

16 self.eventId = eventId 

17 self.programId = programId 

18 self.proposalId = proposalId 

19 

20 if courseId: 

21 self.path = os.path.join(self.path, app.config['files']['course_attachment_path'], str(courseId)) 

22 elif eventId: 

23 self.path = os.path.join(self.path, app.config['files']['event_attachment_path']) 

24 elif programId: 

25 self.path = os.path.join(self.path, app.config['files']['program_attachment_path']) 

26 elif proposalId: 

27 self.path = os.path.join(self.path, app.config['files']['proposal_attachment_path'], str(proposalId)) 

28 

29 def makeDirectory(self): 

30 try: 

31 extraDir = "" 

32 if self.eventId: 

33 extraDir = str(self.eventId) 

34 

35 os.makedirs(os.path.join(self.path, extraDir)) 

36 except OSError as e: 

37 if e.errno != 17: 

38 print(f'Fail to create directory: {e}') 

39 raise e 

40 

41 def getFileFullPath(self, newfilename=''): 

42 try: 

43 if self.eventId or self.courseId or self.programId or self.proposalId: 

44 filePath = (os.path.join(self.path, newfilename)) 

45 except AttributeError: 

46 pass 

47 except FileExistsError: 

48 pass 

49 

50 return filePath 

51 

52 def saveFiles(self, parentEvent=None): 

53 """ 

54 Saves attachments for different types and creates DB record for stored attachment 

55 """ 

56 for file in self.files: 

57 saveFileToFilesystem = None 

58 

59 if self.eventId: 

60 attachmentName = str(parentEvent.id) + "/" + file.filename 

61 isFileInEvent = AttachmentUpload.select().where(AttachmentUpload.event_id == self.eventId, 

62 AttachmentUpload.fileName == attachmentName).exists() 

63 if not isFileInEvent: 

64 AttachmentUpload.create(event=self.eventId, fileName=attachmentName) 

65 if parentEvent and parentEvent.id == self.eventId: 

66 saveFileToFilesystem = attachmentName 

67 

68 elif self.courseId or self.proposalId: 

69 recordId = self.courseId if self.courseId else self.proposalId 

70 fieldName = 'course' if self.courseId else 'proposal' 

71 

72 fileExists = AttachmentUpload.select().where( 

73 getattr(AttachmentUpload, fieldName) == recordId, 

74 AttachmentUpload.fileName == file.filename 

75 ).exists() 

76 

77 if not fileExists: 

78 create_data = {fieldName: recordId, 'fileName': file.filename} 

79 AttachmentUpload.create(**create_data) 

80 saveFileToFilesystem = file.filename 

81 

82 elif self.programId: 

83 

84 # remove the existing file 

85 deleteFileObject = AttachmentUpload.get_or_none(program=self.programId) 

86 if deleteFileObject: 

87 self.deleteFile(deleteFileObject.id) 

88 

89 # add the new file 

90 fileType = file.filename.split('.')[-1] 

91 fileName = f"{self.programId}.{fileType}" 

92 AttachmentUpload.create(program=self.programId, fileName=fileName) 

93 currentProgramID = fileName 

94 saveFileToFilesystem = currentProgramID 

95 

96 else: 

97 saveFileToFilesystem = file.filename 

98 

99 if saveFileToFilesystem: 

100 self.makeDirectory() 

101 file.save(self.getFileFullPath(newfilename=saveFileToFilesystem)) 

102 

103 def retrievePath(self, files): 

104 pathDict = {} 

105 for file in files: 

106 pathDict[file.fileName] = ((self.path + "/" + file.fileName)[3:], file) 

107 return pathDict 

108 

109 def deleteFile(self, fileId): 

110 file = AttachmentUpload.get_by_id(fileId) 

111 file.delete_instance() 

112 if not AttachmentUpload.select().where(AttachmentUpload.fileName == file.fileName).exists(): 

113 path = os.path.join(self.path, file.fileName) 

114 os.remove(path) 

115 

116 def changeDisplay(self, fileId, isDisplayed): 

117 file = AttachmentUpload.get_by_id(fileId) 

118 

119 # Uncheck all other checkboxes for the same event 

120 AttachmentUpload.update(isDisplayed=False).where(AttachmentUpload.event == file.event).execute() 

121 

122 # Check the selected checkbox 

123 AttachmentUpload.update(isDisplayed=isDisplayed).where(AttachmentUpload.id == fileId).execute() 

124 return ""