1
|
|
|
#!/usr/bin/env python3 |
2
|
|
|
# -*- coding: utf-8 -*- |
3
|
|
|
|
4
|
|
|
""" |
5
|
|
|
Module to download and to detach/strip/remove attachments |
6
|
|
|
from e-mails on IMAP servers. |
7
|
|
|
""" |
8
|
|
|
|
9
|
|
|
from __future__ import print_function |
10
|
|
|
|
11
|
|
|
import email |
12
|
|
|
import email.mime.text |
13
|
|
|
import email.utils |
14
|
|
|
import hashlib |
15
|
|
|
import logging |
16
|
|
|
import os.path |
17
|
|
|
import re |
18
|
|
|
import shutil |
19
|
|
|
import tempfile |
20
|
|
|
import time |
21
|
|
|
import unicodedata |
22
|
|
|
import src.emlx2eml |
23
|
|
|
|
24
|
|
|
# pylint: disable=R0801 |
25
|
|
|
__author__ = "Alexander Willner" |
26
|
|
|
__copyright__ = "Copyright 2020, Alexander Willner" |
27
|
|
|
__credits__ = [ |
28
|
|
|
"github.com/guido4000", |
29
|
|
|
"github.com/halteproblem", |
30
|
|
|
"github.com/jamesridgway", |
31
|
|
|
] |
32
|
|
|
__license__ = "MIT" |
33
|
|
|
__version__ = "1.0.4" |
34
|
|
|
__maintainer__ = "Alexander Willner" |
35
|
|
|
__email__ = "[email protected]" |
36
|
|
|
__status__ = "Development" |
37
|
|
|
|
38
|
|
|
|
39
|
|
|
class MailboxCleanerMessage: |
40
|
|
|
""" |
41
|
|
|
Class to represent an e-mail. |
42
|
|
|
""" |
43
|
|
|
|
44
|
|
|
_PLACEHOLDER = """ |
45
|
|
|
=========================================================== |
46
|
|
|
This message contained an attachment that was stripped out. |
47
|
|
|
The attachment was stored using the file name: "%(newfile)s". |
48
|
|
|
The original file name was: "%(filename)s". |
49
|
|
|
The original size was: %(size)d KB. |
50
|
|
|
The original type was: %(type)s. |
51
|
|
|
Tool: https://mailboxcleanup.netcee.de |
52
|
|
|
=========================================================== |
53
|
|
|
""" |
54
|
|
|
|
55
|
|
|
def __init__(self, args): |
56
|
|
|
self.args = args |
57
|
|
|
|
58
|
|
|
def download_and_detach_attachments(self, msg): |
59
|
|
|
"""Download attachments and remove them from the mail.""" |
60
|
|
|
|
61
|
|
|
modified = False |
62
|
|
|
|
63
|
|
|
# Iterate over each part of the email |
64
|
|
|
for part in msg.walk(): |
65
|
|
|
if self.is_non_detachable_part(part): |
66
|
|
|
continue |
67
|
|
|
# Only download in relevant mode |
68
|
|
|
date = time.mktime(email.utils.parsedate(msg.get("date"))) |
69
|
|
|
target = self.download_attachment(part, date) |
70
|
|
|
if target is not None: |
71
|
|
|
# Only detach in relevant mode |
72
|
|
|
if not self.args.detach: |
73
|
|
|
logging.debug(" Detaching\t: skipped (disabled)") |
74
|
|
|
continue |
75
|
|
|
self.detach_attachment(part, target) |
76
|
|
|
modified = True |
77
|
|
|
|
78
|
|
|
return modified |
79
|
|
|
|
80
|
|
|
def is_non_detachable_part(self, part): |
81
|
|
|
"""Only process certain types and sizes of attachments.""" |
82
|
|
|
# issue: next line might throw a LookupError |
83
|
|
|
# example: |
84
|
|
|
# File ".../lib/python3.9/email/message.py", line 315, in set_payload |
85
|
|
|
# LookupError: unknown encoding: windows-1251 |
86
|
|
|
msg_size = len(str(part)) / 1024 |
87
|
|
|
logging.debug( |
88
|
|
|
" Part\t: %d KB / %d KB (type: %s)", |
89
|
|
|
msg_size, |
90
|
|
|
self.args.min_size, |
91
|
|
|
part.get_content_maintype(), |
92
|
|
|
) |
93
|
|
|
|
94
|
|
|
non_detachable = ( |
95
|
|
|
part.get_content_maintype() == "multipart" |
96
|
|
|
or part.get("Content-Disposition") is None |
97
|
|
|
or msg_size <= self.args.min_size |
98
|
|
|
) |
99
|
|
|
logging.debug(" Non-Det.\t: %s", non_detachable) |
100
|
|
|
|
101
|
|
|
return non_detachable |
102
|
|
|
|
103
|
|
|
def download_attachment(self, part, date) -> str: |
104
|
|
|
"""Download the attachment from a part of an email.""" |
105
|
|
|
|
106
|
|
|
if self.args.skip_download: |
107
|
|
|
logging.info(" Downl.\t: skipped (disabled)") |
108
|
|
|
return "" |
109
|
|
|
|
110
|
|
|
file_attached = self.convert_filename(part.get_filename()) |
111
|
|
|
|
112
|
|
|
if file_attached == "unknown": |
113
|
|
|
logging.warning( |
114
|
|
|
"Warning\t: Unknown attachment " "(skipping this attachment)" |
115
|
|
|
) |
116
|
|
|
return None |
117
|
|
|
|
118
|
|
|
if not os.path.exists(self.args.target): |
119
|
|
|
os.mkdir(self.args.target) |
120
|
|
|
with tempfile.NamedTemporaryFile() as file_temp: |
121
|
|
|
logging.info( |
122
|
|
|
' Downl.\t: "%s" (%s)', |
123
|
|
|
file_attached, part.get_content_maintype( |
124
|
|
|
) |
125
|
|
|
) |
126
|
|
|
logging.debug(' Downl.\t: To "%s"', file_temp.name) |
127
|
|
|
payload = part.get_payload(decode=True) |
128
|
|
|
if payload is None: |
129
|
|
|
return None |
130
|
|
|
file_temp.write(payload) |
131
|
|
|
file_temp.flush() |
132
|
|
|
target = self._copy_file(file_temp.name, file_attached, date) |
133
|
|
|
|
134
|
|
|
return target |
135
|
|
|
|
136
|
|
|
def _copy_file(self, source, target_name, date, iterator=0) -> str: |
137
|
|
|
"""Copy file, check for duplicates via hash value.""" |
138
|
|
|
|
139
|
|
|
target_base, target_extension = os.path.splitext(target_name) |
140
|
|
|
if iterator > 0: |
141
|
|
|
target_base = target_base + "-" + str(iterator) |
142
|
|
|
target = os.path.join(self.args.target, target_base + target_extension) |
143
|
|
|
if iterator == 0: |
144
|
|
|
logging.debug(' Moving\t: From "%s" to "%s".', source, target) |
145
|
|
|
|
146
|
|
|
if not os.path.isfile(target): |
147
|
|
|
shutil.copy2(source, target) |
148
|
|
|
os.utime(target, (date, date)) |
149
|
|
|
else: |
150
|
|
|
source_hash = MailboxCleanerMessage.get_hash(source) |
151
|
|
|
target_hash = MailboxCleanerMessage.get_hash(target) |
152
|
|
|
if source_hash != target_hash: |
153
|
|
|
if iterator == 0: |
154
|
|
|
logging.debug( |
155
|
|
|
" Conflict\t: Resolving same file / other hash.." |
156
|
|
|
) |
157
|
|
|
target = self._copy_file( |
158
|
|
|
source, target_name, date, iterator + 1) |
159
|
|
|
else: |
160
|
|
|
logging.debug(" Moving\t: Already exists (same hash)") |
161
|
|
|
|
162
|
|
|
return target |
163
|
|
|
|
164
|
|
|
def process_directory(self, handler, folder=None, cache=None): |
165
|
|
|
"""Upload messages from a local directory.""" |
166
|
|
|
|
167
|
|
|
source = self.args.upload if folder is None else folder |
168
|
|
|
if os.path.isfile(source): |
169
|
|
|
filenames = [os.path.dirname(source)] |
170
|
|
|
source = os.path.basename(source) |
171
|
|
|
else: |
172
|
|
|
filenames = os.listdir(source) |
173
|
|
|
|
174
|
|
|
for i, filename in enumerate(filenames, start=1): |
175
|
|
|
if os.path.isfile(source): |
176
|
|
|
filename = source |
177
|
|
|
else: |
178
|
|
|
filename = os.path.join(source, filename) |
179
|
|
|
|
180
|
|
|
# Recursive walker |
181
|
|
|
if os.path.isdir(filename): |
182
|
|
|
self.process_directory(handler, filename, cache) |
183
|
|
|
|
184
|
|
|
# Only take eml files into account |
185
|
|
|
if not filename.lower().endswith(".eml") and \ |
186
|
|
|
not filename.lower().endswith( |
187
|
|
|
".emlx" |
188
|
|
|
): |
189
|
|
|
continue |
190
|
|
|
|
191
|
|
|
logging.warning("Files\t\t: %d / %d", i, len(filenames)) |
192
|
|
|
|
193
|
|
|
with open( |
194
|
|
|
filename, encoding="utf8", errors="surrogateescape" |
195
|
|
|
) as filepointer: |
196
|
|
|
# Specific handling of emlx files |
197
|
|
|
if filename.lower().endswith(".emlx"): |
198
|
|
|
msg = src.emlx2eml.parse_emlx(filename) |
199
|
|
|
else: |
200
|
|
|
msg = email.message_from_file(filepointer) |
201
|
|
|
|
202
|
|
|
# Logging |
203
|
|
|
msg_subject = self.get_subject(msg) |
204
|
|
|
msg_uid = self.get_uid(msg) |
205
|
|
|
logging.warning( |
206
|
|
|
" File\t: %s (%s: %s)", filename, msg_uid, msg_subject |
207
|
|
|
) |
208
|
|
|
if cache is not None and msg_uid in cache: |
209
|
|
|
logging.warning(" Cache\t: OK") |
210
|
|
|
continue |
211
|
|
|
|
212
|
|
|
logging.warning(" Cache\t: MISS") |
213
|
|
|
|
214
|
|
|
try: |
215
|
|
|
# Remove attachments |
216
|
|
|
self.download_and_detach_attachments(msg) |
217
|
|
|
|
218
|
|
|
# Post process message (e.g. upload or save it) |
219
|
|
|
handler(msg, self.args.folder) |
220
|
|
|
except (KeyError, UnicodeEncodeError) as error: |
221
|
|
|
logging.debug(" Error\t: %s (in %s)", error, filename) |
222
|
|
|
|
223
|
|
|
@staticmethod |
224
|
|
|
def detach_attachment(msg, target): |
225
|
|
|
"""Replace large attachment with dummy text.""" |
226
|
|
|
|
227
|
|
|
# Get message details |
228
|
|
|
msg_content = msg.get_content_type() |
229
|
|
|
msg_filename = MailboxCleanerMessage.convert_filename( |
230
|
|
|
msg.get_filename()) |
231
|
|
|
msg_size = len(str(msg)) / 1024 |
232
|
|
|
msg_type = msg.get_content_disposition() |
233
|
|
|
|
234
|
|
|
logging.debug(" Detaching\t: %s (saved as %s)", |
235
|
|
|
msg_filename, target) |
236
|
|
|
|
237
|
|
|
# Remove some old headers |
238
|
|
|
del msg["Content-Transfer-Encoding"] |
239
|
|
|
del msg["Content-Disposition"] |
240
|
|
|
del msg["Content-Description"] |
241
|
|
|
for k, _v in msg.get_params()[1:]: |
242
|
|
|
msg.del_param(k) |
243
|
|
|
|
244
|
|
|
# Make sure different clients visualize the removed content properly |
245
|
|
|
msg.set_type("text/plain") |
246
|
|
|
msg.set_payload("") |
247
|
|
|
msg.set_charset("utf-8") |
248
|
|
|
if msg_type == "attachment": |
249
|
|
|
msg.add_header("Content-Disposition", "inline") |
250
|
|
|
else: |
251
|
|
|
msg.add_header( |
252
|
|
|
"Content-Disposition", |
253
|
|
|
"attachment", |
254
|
|
|
filename="removed-%s.txt" % msg_filename, |
255
|
|
|
) |
256
|
|
|
msg.add_header("Content-Description", |
257
|
|
|
"removed-%s.txt" % msg_filename) |
258
|
|
|
|
259
|
|
|
# Replace content |
260
|
|
|
msg_details = dict( |
261
|
|
|
newfile=os.path.basename(target), |
262
|
|
|
type=msg_content, |
263
|
|
|
filename=msg_filename, |
264
|
|
|
size=msg_size, |
265
|
|
|
) |
266
|
|
|
msg_placeholder = MailboxCleanerMessage._PLACEHOLDER % msg_details |
267
|
|
|
msg_placeholder = email.mime.text.MIMEText( |
268
|
|
|
msg_placeholder, "text", "utf-8") |
269
|
|
|
msg.set_payload(msg_placeholder.get_payload()) |
270
|
|
|
|
271
|
|
|
@staticmethod |
272
|
|
|
def get_uid(message) -> str: |
273
|
|
|
"""Get UID of message.""" |
274
|
|
|
|
275
|
|
|
uid = MailboxCleanerMessage.get_header(message, "message-id") |
276
|
|
|
uid = email.utils.parseaddr(uid)[1] |
277
|
|
|
|
278
|
|
|
return uid |
279
|
|
|
|
280
|
|
|
@staticmethod |
281
|
|
|
def get_header(message, header: str) -> str: |
282
|
|
|
"""Get a header field.""" |
283
|
|
|
|
284
|
|
|
if header in message: |
285
|
|
|
item = message[header] |
286
|
|
|
else: |
287
|
|
|
item = "" |
288
|
|
|
item, encoding = email.header.decode_header(item)[0] |
289
|
|
|
encoding = "utf-8" if encoding is None else encoding |
290
|
|
|
try: |
291
|
|
|
item = ( |
292
|
|
|
item.decode(encoding, errors="replace") |
293
|
|
|
if hasattr(item, "decode") |
294
|
|
|
else item |
295
|
|
|
) |
296
|
|
|
except LookupError as error: |
297
|
|
|
logging.debug( |
298
|
|
|
" Error\t: decoding (%s) with (%s): %s", |
299
|
|
|
item, encoding, error |
300
|
|
|
) |
301
|
|
|
item = item.decode("ascii", "replace") |
302
|
|
|
|
303
|
|
|
return item |
304
|
|
|
|
305
|
|
|
@staticmethod |
306
|
|
|
def get_subject(message) -> str: |
307
|
|
|
"""Get shortened message subject for visualization.""" |
308
|
|
|
|
309
|
|
|
subject = MailboxCleanerMessage.get_header(message, "subject") |
310
|
|
|
subject = subject[:75] + (subject[75:] and "...") |
311
|
|
|
subject = subject.replace("\r\n", "") |
312
|
|
|
subject = subject.replace("\t", " ") |
313
|
|
|
|
314
|
|
|
return subject |
315
|
|
|
|
316
|
|
|
@staticmethod |
317
|
|
|
def get_hash(filename: str) -> str: |
318
|
|
|
"""Get hash from filename to detect duplicates.""" |
319
|
|
|
|
320
|
|
|
hash_value = hashlib.sha256() |
321
|
|
|
with open(filename, "rb") as file: |
322
|
|
|
for byte_block in iter(lambda: file.read(4096), b""): |
323
|
|
|
hash_value.update(byte_block) |
324
|
|
|
return hash_value.hexdigest() |
325
|
|
|
|
326
|
|
|
@staticmethod |
327
|
|
|
def slugify_filename(value): |
328
|
|
|
"""Make sure attachments contain only valid characters.""" |
329
|
|
|
|
330
|
|
|
value = str(value) |
331
|
|
|
value = unicodedata.normalize("NFKC", value) |
332
|
|
|
value = re.sub(r"[^.\w\s-]", "_", value) |
333
|
|
|
return value |
334
|
|
|
|
335
|
|
|
@staticmethod |
336
|
|
|
def convert_filename(file_struct) -> str: |
337
|
|
|
"""Decode the name of some attachments.""" |
338
|
|
|
|
339
|
|
|
filename = "unknown" |
340
|
|
|
if file_struct is not None: |
341
|
|
|
filename = "" |
342
|
|
|
file_decode = email.header.decode_header(file_struct) |
343
|
|
|
for file_part in file_decode: |
344
|
|
|
if file_part[1] is not None: |
345
|
|
|
filename += file_part[0].decode(file_part[1]) |
346
|
|
|
elif isinstance(file_part[0], str): |
347
|
|
|
filename += file_part[0] |
348
|
|
|
else: |
349
|
|
|
filename += file_part[0].decode('utf8') |
350
|
|
|
filename = filename.replace("\r", " ").replace("\n", " ") |
351
|
|
|
|
352
|
|
|
return MailboxCleanerMessage.slugify_filename(filename) |
353
|
|
|
|