Coverage for app/logic/fileHandler.py: 81%
91 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-12-22 18:34 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-12-22 18:34 +0000
1import os
2from flask import redirect, url_for
3from app import app
4from app.models.attachmentUpload import AttachmentUpload
5from app.models.cceMinorProposal import CCEMinorProposal
6from app.models.program import Program
7import glob
9class FileHandler:
10 def __init__(self, files=None, courseId=None, eventId=None, programId=None, proposalId=None):
11 self.files = files
12 if not isinstance(self.files, list):
13 self.files = [self.files]
14 self.path = app.config['files']['base_path']
15 self.courseId = courseId
16 self.eventId = eventId
17 self.programId = programId
18 self.proposalId = proposalId
20 if courseId:
21 self.path = os.path.join(self.path, app.config['files']['course_attachment_path'], str(courseId))
22 elif eventId:
23 self.path = os.path.join(self.path, app.config['files']['event_attachment_path'])
24 elif programId:
25 self.path = os.path.join(self.path, app.config['files']['program_attachment_path'])
26 elif proposalId:
27 self.path = os.path.join(self.path, app.config['files']['proposal_attachment_path'], str(proposalId))
29 def makeDirectory(self):
30 try:
31 extraDir = ""
32 if self.eventId:
33 extraDir = str(self.eventId)
35 os.makedirs(os.path.join(self.path, extraDir))
36 except OSError as e:
37 if e.errno != 17:
38 print(f'Fail to create directory: {e}')
39 raise e
41 def getFileFullPath(self, newfilename=''):
42 try:
43 if self.eventId or self.courseId or self.programId or self.proposalId:
44 filePath = (os.path.join(self.path, newfilename))
45 except AttributeError:
46 pass
47 except FileExistsError:
48 pass
50 return filePath
52 def saveFiles(self, parentEvent=None):
53 """
54 Saves attachments for different types and creates DB record for stored attachment
55 """
56 for file in self.files:
57 saveFileToFilesystem = None
59 if self.eventId:
60 attachmentName = str(parentEvent.id) + "/" + file.filename
61 isFileInEvent = AttachmentUpload.select().where(AttachmentUpload.event_id == self.eventId,
62 AttachmentUpload.fileName == attachmentName).exists()
63 if not isFileInEvent:
64 AttachmentUpload.create(event=self.eventId, fileName=attachmentName)
65 if parentEvent and parentEvent.id == self.eventId:
66 saveFileToFilesystem = attachmentName
68 elif self.courseId or self.proposalId:
69 recordId = self.courseId if self.courseId else self.proposalId
70 fieldName = 'course' if self.courseId else 'proposal'
72 fileExists = AttachmentUpload.select().where(
73 getattr(AttachmentUpload, fieldName) == recordId,
74 AttachmentUpload.fileName == file.filename
75 ).exists()
77 if not fileExists:
78 create_data = {fieldName: recordId, 'fileName': file.filename}
79 AttachmentUpload.create(**create_data)
80 saveFileToFilesystem = file.filename
82 elif self.programId:
84 # remove the existing file
85 deleteFileObject = AttachmentUpload.get_or_none(program=self.programId)
86 if deleteFileObject:
87 self.deleteFile(deleteFileObject.id)
89 # add the new file
90 fileType = file.filename.split('.')[-1]
91 fileName = f"{self.programId}.{fileType}"
92 AttachmentUpload.create(program=self.programId, fileName=fileName)
93 currentProgramID = fileName
94 saveFileToFilesystem = currentProgramID
96 else:
97 saveFileToFilesystem = file.filename
99 if saveFileToFilesystem:
100 self.makeDirectory()
101 file.save(self.getFileFullPath(newfilename=saveFileToFilesystem))
103 def retrievePath(self, files):
104 pathDict = {}
105 for file in files:
106 pathDict[file.fileName] = ((self.path + "/" + file.fileName)[3:], file)
107 return pathDict
109 def deleteFile(self, fileId):
110 file = AttachmentUpload.get_by_id(fileId)
111 file.delete_instance()
112 if not AttachmentUpload.select().where(AttachmentUpload.fileName == file.fileName).exists():
113 path = os.path.join(self.path, file.fileName)
114 os.remove(path)
116 def changeDisplay(self, fileId, isDisplayed):
117 file = AttachmentUpload.get_by_id(fileId)
119 # Uncheck all other checkboxes for the same event
120 AttachmentUpload.update(isDisplayed=False).where(AttachmentUpload.event == file.event).execute()
122 # Check the selected checkbox
123 AttachmentUpload.update(isDisplayed=isDisplayed).where(AttachmentUpload.id == fileId).execute()
124 return ""