1
|
|
|
#!/usr/bin/python |
2
|
|
|
# -*- coding: utf-8 -*- |
3
|
|
|
# get_image func courtesy of Slko |
4
|
|
|
from telegram.ext import BaseFilter |
5
|
|
|
from wand.image import Image |
6
|
|
|
import subprocess |
7
|
|
|
import requests |
8
|
|
|
import os.path |
9
|
|
|
import io |
10
|
|
|
|
11
|
|
|
|
12
|
|
|
def extract_url(entity, text): |
13
|
|
|
if entity["type"] == "text_link": |
14
|
|
|
return entity["url"] |
15
|
|
|
elif entity["type"] == "url": |
16
|
|
|
offset = entity["offset"] |
17
|
|
|
length = entity["length"] |
18
|
|
|
return text[offset:offset+length] |
19
|
|
|
else: |
20
|
|
|
return None |
21
|
|
|
|
22
|
|
|
|
23
|
|
|
def is_image(path): |
24
|
|
|
if path is None: |
25
|
|
|
return False |
26
|
|
|
ext = None |
27
|
|
|
image_extensions = (".jpg", ".jpeg", ".png", ".gif", ".svg", ".tif", ".bmp", ".mp4") |
28
|
|
|
if path is None: |
29
|
|
|
return False |
30
|
|
|
for i in image_extensions: |
31
|
|
|
if path.casefold().endswith(i): |
32
|
|
|
ext = i |
33
|
|
|
return ext |
34
|
|
|
if ext == None: |
35
|
|
|
ext = ".mp4" |
36
|
|
|
return ext |
37
|
|
|
return False |
38
|
|
|
|
39
|
|
|
def is_image_by_mime_type(mime_type): |
40
|
|
|
if mime_type is None: |
41
|
|
|
return False |
42
|
|
|
mime_types = { |
43
|
|
|
"image/png": ".png", |
44
|
|
|
"image/gif": ".gif", |
45
|
|
|
"image/jpeg": ".jpg", |
46
|
|
|
"video/mp4": ".mp4", |
47
|
|
|
} |
48
|
|
|
return mime_types.get(mime_type.casefold(), False) |
49
|
|
|
|
50
|
|
|
|
51
|
|
|
def get_image(update, context, dl_path, filename): |
52
|
|
|
output = os.path.join(dl_path, filename) |
53
|
|
|
reply = update.message.reply_to_message |
54
|
|
|
if reply is None: |
55
|
|
|
extension = ".jpg" |
56
|
|
|
context.bot.getFile(update.message.photo[-1].file_id).download(output + extension) |
57
|
|
|
return extension |
58
|
|
|
# Entities; url, text_link |
59
|
|
|
if reply.entities is not None: |
60
|
|
|
urls = (extract_url(x, reply.text) for x in reply.entities) |
|
|
|
|
61
|
|
|
images = [x for x in urls if is_image(x)] |
62
|
|
|
if len(images) > 0: |
63
|
|
|
extension = is_image(images[0]) |
64
|
|
|
r = requests.get(images[0]) # use only first image url |
65
|
|
|
with open(output+extension, "wb") as f: |
66
|
|
|
f.write(r.content) |
67
|
|
|
return extension |
68
|
|
|
# Document with file name |
69
|
|
|
if reply.document is not None and is_image(reply.document.file_name): |
70
|
|
|
extension = is_image(reply.document.file_name) |
71
|
|
|
context.bot.getFile(reply.document.file_id).download(output + extension) |
72
|
|
|
return extension |
73
|
|
|
# Document without file name |
74
|
|
|
if reply.document is not None and is_image_by_mime_type(reply.document.mime_type): |
75
|
|
|
extension = is_image_by_mime_type(reply.document.mime_type) |
76
|
|
|
context.bot.getFile(reply.document.file_id).download(output + extension) |
77
|
|
|
return extension |
78
|
|
|
# Sticker |
79
|
|
|
if reply.sticker is not None: |
80
|
|
|
extension = ".webp" |
81
|
|
|
context.bot.getFile(reply.sticker.file_id).download(output+extension) |
82
|
|
|
return extension |
83
|
|
|
# Photo in reply |
84
|
|
|
if reply.photo is not None: |
85
|
|
|
extension = ".jpg" |
86
|
|
|
context.bot.getFile(reply.photo[-1].file_id).download(output + extension) |
87
|
|
|
return extension |
88
|
|
|
return False |
89
|
|
|
|
90
|
|
|
|
91
|
|
|
def send_image(update, filepath, name, extension): |
92
|
|
|
photo_extensions = (".jpg", ".jpeg") |
93
|
|
|
doc_extensions = (".png", ".svg", ".tif", ".bmp", ".gif", ".mp4") |
94
|
|
|
sticker_extensions = (".webp") |
95
|
|
|
if extension in photo_extensions: |
96
|
|
|
with open(filepath + name + extension, "rb") as f: |
97
|
|
|
update.message.reply_photo(f) |
98
|
|
|
return True |
99
|
|
|
if extension in doc_extensions: |
100
|
|
|
with open(filepath + name + extension, "rb") as f: |
101
|
|
|
update.message.reply_document(f, timeout=90) |
102
|
|
|
return True |
103
|
|
|
if extension in sticker_extensions: |
104
|
|
|
with open(filepath + name + extension, "rb") as f: |
105
|
|
|
update.message.reply_sticker(f) |
106
|
|
|
return True |
107
|
|
|
|
108
|
|
|
|
109
|
|
|
def get_param(update, defaultvalue, min_value, max_value): |
110
|
|
|
if update.message.reply_to_message is not None: |
111
|
|
|
parts = update.message.text.split(" ", 1) |
112
|
|
|
elif update.message.caption is not None: |
113
|
|
|
parts = update.message.caption.split(" ", 1) |
114
|
|
|
elif update.message.text is not None: |
115
|
|
|
parts = update.message.text.split(" ", 2) |
116
|
|
|
else: |
117
|
|
|
return defaultvalue |
118
|
|
|
if len(parts) == 1: |
119
|
|
|
parameter = defaultvalue |
120
|
|
|
else: |
121
|
|
|
try: |
122
|
|
|
parameter = int(parts[1]) |
123
|
|
|
except: |
124
|
|
|
#update.message.reply_text("Paremeter needs to be a number!") |
125
|
|
|
return defaultvalue |
126
|
|
|
if parameter < min_value or parameter > max_value: |
127
|
|
|
errtext = "Baka, make it from " + str(min_value) + " to " + str(max_value) + "!" |
128
|
|
|
update.message.reply_text(errtext) |
129
|
|
|
return None |
130
|
|
|
return parameter |
131
|
|
|
|
132
|
|
|
def mp4_fix(path, filename): |
133
|
|
|
args = "ffmpeg -loglevel panic -i " + path + filename + ".mp4" + \ |
134
|
|
|
" -an -vf scale=trunc(iw/2)*2:trunc(ih/2)*2 \ |
135
|
|
|
-pix_fmt yuv420p -c:v libx264 -profile:v high -level:v 2.0 " \ |
136
|
|
|
+ path + filename + "fixed.mp4 -y" |
137
|
|
|
subprocess.run(args, shell=True) |
138
|
|
|
os.remove(path+filename+".mp4") |
139
|
|
|
fixed_file_name = filename + "fixed" |
140
|
|
|
return fixed_file_name |
141
|
|
|
|
142
|
|
|
|
143
|
|
|
# custom filters for message handler |
144
|
|
|
# photo with caption |
145
|
|
|
class Caption_Filter(BaseFilter): |
146
|
|
|
def __init__(self, command): |
147
|
|
|
self.data=command |
148
|
|
|
def filter(self, message): |
149
|
|
|
if message.photo and message.caption and message.caption.startswith(self.data): |
150
|
|
|
return True |
151
|
|
|
|