Coverage for app/logic/fileHandler.py: 76%

85 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2025-01-29 15:39 +0000

1import os 

2from flask import redirect, url_for 

3from app import app 

4from app.models.attachmentUpload import AttachmentUpload 

5from app.models.program import Program 

6import glob 

7 

8class FileHandler: 

9 def __init__(self, files=None, courseId=None, eventId=None, programId=None): 

10 self.files = files 

11 if not isinstance(self.files, list): 

12 self.files = [self.files] 

13 self.path = app.config['files']['base_path'] 

14 self.courseId = courseId 

15 self.eventId = eventId 

16 self.programId = programId 

17 if courseId: 

18 self.path = os.path.join(self.path, app.config['files']['course_attachment_path'], str(courseId)) 

19 elif eventId: 

20 self.path = os.path.join(self.path, app.config['files']['event_attachment_path']) 

21 elif programId: 

22 self.path = os.path.join(self.path, app.config['files']['program_attachment_path']) 

23 

24 def makeDirectory(self): 

25 try: 

26 extraDir = str(self.eventId) if self.eventId else "" 

27 os.makedirs(os.path.join(self.path, extraDir)) 

28 except OSError as e: 

29 if e.errno != 17: 

30 print(f'Fail to create directory: {e}') 

31 raise e 

32 

33 def getFileFullPath(self, newfilename=''): 

34 try: 

35 if self.eventId or self.courseId or self.programId: 

36 filePath = (os.path.join(self.path, newfilename)) 

37 except AttributeError: 

38 pass 

39 except FileExistsError: 

40 pass 

41 return filePath 

42 

43 def saveFiles(self, saveOriginalFile=None): 

44 try: 

45 for file in self.files: 

46 saveFileToFilesystem = None 

47 

48 if self.eventId: 

49 attachmentName = str(saveOriginalFile.id) + "/" + file.filename 

50 isFileInEvent = AttachmentUpload.select().where(AttachmentUpload.event_id == self.eventId, 

51 AttachmentUpload.fileName == attachmentName).exists() 

52 if not isFileInEvent: 

53 AttachmentUpload.create(event=self.eventId, fileName=attachmentName) 

54 if saveOriginalFile and saveOriginalFile.id == self.eventId: 

55 saveFileToFilesystem = attachmentName 

56 elif self.courseId: 

57 isFileInCourse = AttachmentUpload.select().where(AttachmentUpload.course == self.courseId, AttachmentUpload.fileName == file.filename).exists() 

58 if not isFileInCourse: 

59 AttachmentUpload.create(course=self.courseId, fileName=file.filename) 

60 saveFileToFilesystem = file.filename 

61 elif self.programId: 

62 

63 # remove the existing file 

64 deleteFileObject = AttachmentUpload.get_or_none(program=self.programId) 

65 if deleteFileObject: 

66 self.deleteFile(deleteFileObject.id) 

67 

68 # add the new file 

69 fileType = file.filename.split('.')[-1] 

70 fileName = f"{self.programId}.{fileType}" 

71 AttachmentUpload.create(program=self.programId, fileName=fileName) 

72 currentProgramID = fileName 

73 saveFileToFilesystem = currentProgramID 

74 

75 else: 

76 saveFileToFilesystem = file.filename 

77 

78 if saveFileToFilesystem: 

79 self.makeDirectory() 

80 file.save(self.getFileFullPath(newfilename=saveFileToFilesystem)) 

81 

82 except AttributeError: 

83 pass 

84 

85 def retrievePath(self, files): 

86 pathDict = {} 

87 for file in files: 

88 pathDict[file.fileName] = ((self.path + "/" + file.fileName)[3:], file) 

89 return pathDict 

90 

91 def deleteFile(self, fileId): 

92 file = AttachmentUpload.get_by_id(fileId) 

93 file.delete_instance() 

94 if not AttachmentUpload.select().where(AttachmentUpload.fileName == file.fileName).exists(): 

95 path = os.path.join(self.path, file.fileName) 

96 os.remove(path) 

97 

98 def changeDisplay(self, fileId, isDisplayed): 

99 file = AttachmentUpload.get_by_id(fileId) 

100 

101 # Uncheck all other checkboxes for the same event 

102 AttachmentUpload.update(isDisplayed=False).where(AttachmentUpload.event == file.event).execute() 

103 

104 # Check the selected checkbox 

105 AttachmentUpload.update(isDisplayed=isDisplayed).where(AttachmentUpload.id == fileId).execute() 

106 return ""