|
1
|
|
|
from datetime import date, datetime |
|
2
|
|
|
import json |
|
3
|
|
|
import os |
|
4
|
|
|
import shutil |
|
5
|
|
|
from typing import BinaryIO |
|
6
|
|
|
|
|
7
|
|
|
from pypdf import PdfReader |
|
8
|
|
|
import requests |
|
9
|
|
|
from requests import HTTPError |
|
10
|
|
|
|
|
11
|
|
|
from src.kalauz.new_data_processors.common import DataProcessor |
|
12
|
|
|
|
|
13
|
|
|
|
|
14
|
|
|
def get_pdf_date(pdf_file: BinaryIO) -> date: |
|
15
|
|
|
pdf_reader = PdfReader(pdf_file) |
|
16
|
|
|
first_page = pdf_reader.pages[0] |
|
17
|
|
|
text = first_page.extract_text() |
|
18
|
|
|
date_str = text.split()[-2] |
|
19
|
|
|
return datetime.strptime(date_str, "%Y.%m.%d.").date() |
|
20
|
|
|
|
|
21
|
|
|
|
|
22
|
|
|
class NewFilesRegistrar(DataProcessor): |
|
23
|
|
|
def run(self) -> None: |
|
24
|
|
|
self.process_received_files() |
|
25
|
|
|
self.logger.info("All new files registered!") |
|
26
|
|
|
|
|
27
|
|
|
def process_received_files(self) -> None: |
|
28
|
|
|
folder_received = os.path.abspath("data/01_received/") |
|
29
|
|
|
folder_converted = os.path.abspath("data/02_converted/") |
|
30
|
|
|
with os.scandir(folder_received) as folder: |
|
31
|
|
|
for file in folder: |
|
32
|
|
|
if file.name.endswith(".pdf"): |
|
33
|
|
|
self.process_received_file(file, folder_converted, folder_received) |
|
34
|
|
|
|
|
35
|
|
|
def process_received_file( |
|
36
|
|
|
self, file: os.DirEntry, folder_converted: str, folder_received: str |
|
37
|
|
|
) -> None: |
|
38
|
|
|
company_name = file.name.split("_")[0] |
|
39
|
|
|
with open(file, "rb") as pdf_file: |
|
40
|
|
|
file_date = get_pdf_date(pdf_file) |
|
41
|
|
|
extension = os.path.splitext(file)[1] |
|
42
|
|
|
new_file_name_pdf = f"{company_name}_{str(file_date)}_ASR{extension}" |
|
43
|
|
|
new_file_name_xlsx = f"{company_name}_{str(file_date)}_ASR.xlsx" |
|
44
|
|
|
new_file_path_pdf = os.path.join(folder_received, new_file_name_pdf) |
|
45
|
|
|
new_file_path_xlsx = os.path.join(folder_converted, new_file_name_xlsx) |
|
46
|
|
|
os.rename( |
|
47
|
|
|
src=file.path, |
|
48
|
|
|
dst=new_file_path_pdf, |
|
49
|
|
|
) |
|
50
|
|
|
with open(new_file_path_xlsx, "wb") as xlsx_file: |
|
51
|
|
|
xlsx_data = self.convert_pdf_to_xlsx(new_file_name_pdf) |
|
52
|
|
|
xlsx_file.write(xlsx_data) |
|
53
|
|
|
shutil.move( |
|
54
|
|
|
src=new_file_path_pdf, |
|
55
|
|
|
dst=folder_converted, |
|
56
|
|
|
) |
|
57
|
|
|
|
|
58
|
|
|
def convert_pdf_to_xlsx(self, file_name: str) -> bytes: |
|
59
|
|
|
try: |
|
60
|
|
|
api_url = "https://eu-v2.convertapi.com/convert/pdf/to/xlsx" |
|
61
|
|
|
parameters = { |
|
62
|
|
|
"Secret": self.get_convertapi_secret(), |
|
63
|
|
|
"EnableOcr": "false", |
|
64
|
|
|
"StoreFile": "true", |
|
65
|
|
|
"Timeout": "90", |
|
66
|
|
|
} |
|
67
|
|
|
file = { |
|
68
|
|
|
"File": open(f"data/01_received/{file_name}", "rb"), |
|
69
|
|
|
} |
|
70
|
|
|
print(f"Converting {file_name} to .xlsx started...") |
|
71
|
|
|
response = requests.post( |
|
72
|
|
|
url=api_url, |
|
73
|
|
|
params=parameters, |
|
74
|
|
|
files=file, |
|
75
|
|
|
) |
|
76
|
|
|
print("...finished!") |
|
77
|
|
|
|
|
78
|
|
|
response.raise_for_status() |
|
79
|
|
|
json_response = json.loads(response.content) |
|
80
|
|
|
file_url = json_response["Files"][0]["Url"] |
|
81
|
|
|
|
|
82
|
|
|
return requests.get(file_url).content |
|
83
|
|
|
except HTTPError: |
|
84
|
|
|
self.logger.critical(f"Failed to convert .pdf to .xlsx!") |
|
85
|
|
|
raise |
|
86
|
|
|
|
|
87
|
|
|
def get_convertapi_secret(self) -> str: |
|
88
|
|
|
try: |
|
89
|
|
|
env_var_name = "CONVERTAPI_SECRET" |
|
90
|
|
|
secret = os.getenv(env_var_name) |
|
91
|
|
|
if not secret: |
|
92
|
|
|
raise ValueError( |
|
93
|
|
|
f"No password found in the `.env` file for {env_var_name}!" |
|
94
|
|
|
) |
|
95
|
|
|
return secret |
|
96
|
|
|
except ValueError as exception: |
|
97
|
|
|
self.logger.critical(exception) |
|
98
|
|
|
raise |
|
99
|
|
|
|