Coverage for app/logic/fileHandler.py: 76%
85 statements
« prev ^ index » next coverage.py v7.2.7, created at 2025-01-29 15:39 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2025-01-29 15:39 +0000
1import os
2from flask import redirect, url_for
3from app import app
4from app.models.attachmentUpload import AttachmentUpload
5from app.models.program import Program
6import glob
8class FileHandler:
9 def __init__(self, files=None, courseId=None, eventId=None, programId=None):
10 self.files = files
11 if not isinstance(self.files, list):
12 self.files = [self.files]
13 self.path = app.config['files']['base_path']
14 self.courseId = courseId
15 self.eventId = eventId
16 self.programId = programId
17 if courseId:
18 self.path = os.path.join(self.path, app.config['files']['course_attachment_path'], str(courseId))
19 elif eventId:
20 self.path = os.path.join(self.path, app.config['files']['event_attachment_path'])
21 elif programId:
22 self.path = os.path.join(self.path, app.config['files']['program_attachment_path'])
24 def makeDirectory(self):
25 try:
26 extraDir = str(self.eventId) if self.eventId else ""
27 os.makedirs(os.path.join(self.path, extraDir))
28 except OSError as e:
29 if e.errno != 17:
30 print(f'Fail to create directory: {e}')
31 raise e
33 def getFileFullPath(self, newfilename=''):
34 try:
35 if self.eventId or self.courseId or self.programId:
36 filePath = (os.path.join(self.path, newfilename))
37 except AttributeError:
38 pass
39 except FileExistsError:
40 pass
41 return filePath
43 def saveFiles(self, saveOriginalFile=None):
44 try:
45 for file in self.files:
46 saveFileToFilesystem = None
48 if self.eventId:
49 attachmentName = str(saveOriginalFile.id) + "/" + file.filename
50 isFileInEvent = AttachmentUpload.select().where(AttachmentUpload.event_id == self.eventId,
51 AttachmentUpload.fileName == attachmentName).exists()
52 if not isFileInEvent:
53 AttachmentUpload.create(event=self.eventId, fileName=attachmentName)
54 if saveOriginalFile and saveOriginalFile.id == self.eventId:
55 saveFileToFilesystem = attachmentName
56 elif self.courseId:
57 isFileInCourse = AttachmentUpload.select().where(AttachmentUpload.course == self.courseId, AttachmentUpload.fileName == file.filename).exists()
58 if not isFileInCourse:
59 AttachmentUpload.create(course=self.courseId, fileName=file.filename)
60 saveFileToFilesystem = file.filename
61 elif self.programId:
63 # remove the existing file
64 deleteFileObject = AttachmentUpload.get_or_none(program=self.programId)
65 if deleteFileObject:
66 self.deleteFile(deleteFileObject.id)
68 # add the new file
69 fileType = file.filename.split('.')[-1]
70 fileName = f"{self.programId}.{fileType}"
71 AttachmentUpload.create(program=self.programId, fileName=fileName)
72 currentProgramID = fileName
73 saveFileToFilesystem = currentProgramID
75 else:
76 saveFileToFilesystem = file.filename
78 if saveFileToFilesystem:
79 self.makeDirectory()
80 file.save(self.getFileFullPath(newfilename=saveFileToFilesystem))
82 except AttributeError:
83 pass
85 def retrievePath(self, files):
86 pathDict = {}
87 for file in files:
88 pathDict[file.fileName] = ((self.path + "/" + file.fileName)[3:], file)
89 return pathDict
91 def deleteFile(self, fileId):
92 file = AttachmentUpload.get_by_id(fileId)
93 file.delete_instance()
94 if not AttachmentUpload.select().where(AttachmentUpload.fileName == file.fileName).exists():
95 path = os.path.join(self.path, file.fileName)
96 os.remove(path)
98 def changeDisplay(self, fileId, isDisplayed):
99 file = AttachmentUpload.get_by_id(fileId)
101 # Uncheck all other checkboxes for the same event
102 AttachmentUpload.update(isDisplayed=False).where(AttachmentUpload.event == file.event).execute()
104 # Check the selected checkbox
105 AttachmentUpload.update(isDisplayed=isDisplayed).where(AttachmentUpload.id == fileId).execute()
106 return ""