Coverage for app/logic/fileHandler.py: 69%
95 statements
« prev ^ index » next coverage.py v7.2.7, created at 2025-04-10 19:40 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2025-04-10 19:40 +0000
1import os
2from flask import redirect, url_for
3from app import app
4from app.models.attachmentUpload import AttachmentUpload
5from app.models.program import Program
6import glob
8class FileHandler:
9 def __init__(self, files=None, courseId=None, eventId=None, programId=None, proposalId=None):
10 self.files = files
11 if not isinstance(self.files, list):
12 self.files = [self.files]
13 self.path = app.config['files']['base_path']
14 self.courseId = courseId
15 self.eventId = eventId
16 self.programId = programId
17 self.proposalId = proposalId
18 if courseId:
19 self.path = os.path.join(self.path, app.config['files']['course_attachment_path'], str(courseId))
20 elif eventId:
21 self.path = os.path.join(self.path, app.config['files']['event_attachment_path'])
22 elif programId:
23 self.path = os.path.join(self.path, app.config['files']['program_attachment_path'])
24 elif proposalId:
25 self.path = os.path.join(self.path, app.config['files']['proposal_attachment_path'])
27 def makeDirectory(self):
28 try:
29 extraDir = str(self.eventId) if self.eventId else ""
30 os.makedirs(os.path.join(self.path, extraDir))
31 except OSError as e:
32 if e.errno != 17:
33 print(f'Fail to create directory: {e}')
34 raise e
36 def getFileFullPath(self, newfilename=''):
37 try:
38 if self.eventId or self.courseId or self.programId or self.proposalId:
39 filePath = (os.path.join(self.path, newfilename))
40 except AttributeError:
41 pass
42 except FileExistsError:
43 pass
44 return filePath
46 def saveFiles(self, saveOriginalFile=None):
47 try:
48 for file in self.files:
49 saveFileToFilesystem = None
51 if self.eventId:
52 attachmentName = str(saveOriginalFile.id) + "/" + file.filename
53 isFileInEvent = AttachmentUpload.select().where(AttachmentUpload.event_id == self.eventId,
54 AttachmentUpload.fileName == attachmentName).exists()
55 if not isFileInEvent:
56 AttachmentUpload.create(event=self.eventId, fileName=attachmentName)
57 if saveOriginalFile and saveOriginalFile.id == self.eventId:
58 saveFileToFilesystem = attachmentName
59 elif self.courseId:
60 isFileInCourse = AttachmentUpload.select().where(AttachmentUpload.course == self.courseId, AttachmentUpload.fileName == file.filename).exists()
61 if not isFileInCourse:
62 AttachmentUpload.create(course=self.courseId, fileName=file.filename)
63 saveFileToFilesystem = file.filename
64 elif self.programId:
66 # remove the existing file
67 deleteFileObject = AttachmentUpload.get_or_none(program=self.programId)
68 if deleteFileObject:
69 self.deleteFile(deleteFileObject.id)
71 # add the new file
72 fileType = file.filename.split('.')[-1]
73 fileName = f"{self.programId}.{fileType}"
74 AttachmentUpload.create(program=self.programId, fileName=fileName)
75 currentProgramID = fileName
76 saveFileToFilesystem = currentProgramID
79 elif self.proposalId:
80 fileType = file.filename.split('.')[-1]
81 fileName = f"{self.proposalId}.{fileType}"
82 isFileInProposal = AttachmentUpload.select().where(AttachmentUpload.proposal == self.proposalId,
83 AttachmentUpload.fileName == fileName).exists()
84 if not isFileInProposal:
85 # add the new file
86 AttachmentUpload.create(proposal=self.proposalId, fileName=fileName)
87 saveFileToFilesystem = fileName
89 else:
90 saveFileToFilesystem = file.filename
92 if saveFileToFilesystem:
93 self.makeDirectory()
94 file.save(self.getFileFullPath(newfilename=saveFileToFilesystem))
96 except AttributeError as e:
97 print(e)
99 def retrievePath(self, files):
100 pathDict = {}
101 for file in files:
102 pathDict[file.fileName] = ((self.path + "/" + file.fileName)[3:], file)
103 return pathDict
105 def deleteFile(self, fileId):
106 file = AttachmentUpload.get_by_id(fileId)
107 file.delete_instance()
108 if not AttachmentUpload.select().where(AttachmentUpload.fileName == file.fileName).exists():
109 path = os.path.join(self.path, file.fileName)
110 os.remove(path)
112 def changeDisplay(self, fileId, isDisplayed):
113 file = AttachmentUpload.get_by_id(fileId)
115 # Uncheck all other checkboxes for the same event
116 AttachmentUpload.update(isDisplayed=False).where(AttachmentUpload.event == file.event).execute()
118 # Check the selected checkbox
119 AttachmentUpload.update(isDisplayed=isDisplayed).where(AttachmentUpload.id == fileId).execute()
120 return ""