Coverage for app/logic/fileHandler.py: 81%

96 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2025-07-22 20:03 +0000

1import os 

2from flask import redirect, url_for 

3from app import app 

4from app.models.attachmentUpload import AttachmentUpload 

5from app.models.cceMinorProposal import CCEMinorProposal 

6from app.models.program import Program 

7import glob 

8 

9class FileHandler: 

10 def __init__(self, files=None, courseId=None, eventId=None, programId=None, proposalId=None): 

11 self.files = files 

12 if not isinstance(self.files, list): 

13 self.files = [self.files] 

14 self.path = app.config['files']['base_path'] 

15 self.courseId = courseId 

16 self.eventId = eventId 

17 self.programId = programId 

18 self.proposalId = proposalId 

19 

20 if courseId: 

21 self.path = os.path.join(self.path, app.config['files']['course_attachment_path'], str(courseId)) 

22 elif eventId: 

23 self.path = os.path.join(self.path, app.config['files']['event_attachment_path']) 

24 elif programId: 

25 self.path = os.path.join(self.path, app.config['files']['program_attachment_path']) 

26 elif proposalId: 

27 self.path = os.path.join(self.path, app.config['files']['proposal_attachment_path']) 

28 

29 def makeDirectory(self): 

30 try: 

31 extraDir = str(self.eventId) if self.eventId else "" 

32 os.makedirs(os.path.join(self.path, extraDir)) 

33 except OSError as e: 

34 if e.errno != 17: 

35 print(f'Fail to create directory: {e}') 

36 raise e 

37 

38 def getFileFullPath(self, newfilename=''): 

39 try: 

40 if self.eventId or self.courseId or self.programId or self.proposalId: 

41 filePath = (os.path.join(self.path, newfilename)) 

42 except AttributeError: 

43 pass 

44 except FileExistsError: 

45 pass 

46 

47 return filePath 

48 

49 def saveFiles(self, saveOriginalFile=None): 

50 try: 

51 for file in self.files: 

52 saveFileToFilesystem = None 

53 

54 if self.eventId: 

55 attachmentName = str(saveOriginalFile.id) + "/" + file.filename 

56 isFileInEvent = AttachmentUpload.select().where(AttachmentUpload.event_id == self.eventId, 

57 AttachmentUpload.fileName == attachmentName).exists() 

58 if not isFileInEvent: 

59 AttachmentUpload.create(event=self.eventId, fileName=attachmentName) 

60 if saveOriginalFile and saveOriginalFile.id == self.eventId: 

61 saveFileToFilesystem = attachmentName 

62 elif self.courseId: 

63 isFileInCourse = AttachmentUpload.select().where(AttachmentUpload.course == self.courseId, AttachmentUpload.fileName == file.filename).exists() 

64 if not isFileInCourse: 

65 AttachmentUpload.create(course=self.courseId, fileName=file.filename) 

66 saveFileToFilesystem = file.filename 

67 elif self.programId: 

68 

69 # remove the existing file 

70 deleteFileObject = AttachmentUpload.get_or_none(program=self.programId) 

71 if deleteFileObject: 

72 self.deleteFile(deleteFileObject.id) 

73 

74 # add the new file 

75 fileType = file.filename.split('.')[-1] 

76 fileName = f"{self.programId}.{fileType}" 

77 AttachmentUpload.create(program=self.programId, fileName=fileName) 

78 currentProgramID = fileName 

79 saveFileToFilesystem = currentProgramID 

80 

81 elif self.proposalId: 

82 fileType = file.filename.split('.')[-1] 

83 fileName = f"{self.proposalId}.{fileType}" 

84 isFileInProposal = AttachmentUpload.select().where(AttachmentUpload.proposal == self.proposalId, 

85 AttachmentUpload.fileName == fileName).exists() 

86 if not isFileInProposal: 

87 # add the new file 

88 AttachmentUpload.create(proposal=self.proposalId, fileName=fileName) 

89 saveFileToFilesystem = fileName 

90 

91 else: 

92 saveFileToFilesystem = file.filename 

93 

94 if saveFileToFilesystem: 

95 self.makeDirectory() 

96 file.save(self.getFileFullPath(newfilename=saveFileToFilesystem)) 

97 

98 except AttributeError as e: 

99 print(e) 

100 

101 def retrievePath(self, files): 

102 pathDict = {} 

103 for file in files: 

104 pathDict[file.fileName] = ((self.path + "/" + file.fileName)[3:], file) 

105 return pathDict 

106 

107 def deleteFile(self, fileId): 

108 file = AttachmentUpload.get_by_id(fileId) 

109 file.delete_instance() 

110 if not AttachmentUpload.select().where(AttachmentUpload.fileName == file.fileName).exists(): 

111 path = os.path.join(self.path, file.fileName) 

112 os.remove(path) 

113 

114 def changeDisplay(self, fileId, isDisplayed): 

115 file = AttachmentUpload.get_by_id(fileId) 

116 

117 # Uncheck all other checkboxes for the same event 

118 AttachmentUpload.update(isDisplayed=False).where(AttachmentUpload.event == file.event).execute() 

119 

120 # Check the selected checkbox 

121 AttachmentUpload.update(isDisplayed=isDisplayed).where(AttachmentUpload.id == fileId).execute() 

122 return ""