Passed
Push — master ( 1eef4e...280449 )
by Anas
02:26
created

modules.utils.get_image_new()   C

Complexity

Conditions 9

Size

Total Lines 36
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 32
nop 2
dl 0
loc 36
rs 6.6666
c 0
b 0
f 0
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
# get_image func courtesy of Slko
4
from wand.image import Image
5
import requests
6
import os.path
7
import io
8
9
10
def extract_url(entity, text):
11
    if entity["type"] == "text_link":
12
        return entity["url"]
13
    elif entity["type"] == "url":
14
        offset = entity["offset"]
15
        length = entity["length"]
16
        return text[offset:offset+length]
17
    else:
18
        return None
19
20
21
def is_image(path):
22
    ext = None
23
    image_extensions = (".jpg", ".jpeg", ".png", ".gif", ".svg", ".tif", ".bmp", ".mp4")
24
    if path is None:
25
        return False
26
    for i in image_extensions:
27
        if path.casefold().endswith(i):
28
            ext = i
29
            return ext
30
    if ext == None:
31
        ext = ".mp4"
32
        return ext
33
    return False
34
35
36
def get_image(bot, update, dl_path, filename):
37
    output = os.path.join(dl_path, filename)
38
    reply = update.message.reply_to_message
39
    if reply is None:
40
        extension = ".jpg"
41
        bot.getFile(update.message.photo[-1].file_id).download(output + extension)
42
        return extension
43
    # Entities; url, text_link
44
    if reply.entities is not None:
45
        urls = (extract_url(x, reply.text) for x in reply.entities)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
46
        images = [x for x in urls if is_image(x)]
47
        if len(images) > 0:
48
            extension = is_image(images[0])
49
            r = requests.get(images[0])  # use only first image url
50
            with open(output+extension, "wb") as f:
51
                f.write(r.content)
52
            return extension
53
    # Document
54
    if reply.document is not None and is_image(reply.document.file_name):
55
        extension = is_image(reply.document.file_name)
56
        bot.getFile(reply.document.file_id).download(output + extension)
57
        return extension
58
    # Sticker
59
    if reply.sticker is not None:
60
        extension = ".webp"
61
        bot.getFile(reply.sticker.file_id).download(output+extension)
62
        return extension
63
    # Photo in reply
64
    if reply.photo is not None:
65
        extension = ".jpg"
66
        bot.getFile(reply.photo[-1].file_id).download(output + extension)
67
        return extension
68
    return False
69
70
71
def send_image(update, filepath, name, extension):
72
    photo_extensions = (".jpg", ".jpeg")
73
    doc_extensions = (".png", ".svg", ".tif", ".bmp", ".gif", ".mp4")
74
    sticker_extension = ".webp"
75
    for i in photo_extensions:
76
        if extension.endswith(i):
77
            with open(filepath + name + extension, "rb") as f:
78
                update.message.reply_photo(f)
79
            return True  
80
    for i in doc_extensions:
81
        if extension.endswith(i):
82
            with open(filepath + name + extension, "rb") as f:
83
                update.message.reply_document(f)
84
            return True
85
    if extension.endswith(sticker_extension):
86
        with open(filepath + name + extension, "rb") as f:
87
            update.message.reply_sticker(f)
88
        return True
89
90
91
def get_image_new(bot, update):
92
    reply = update.message.reply_to_message
93
    if reply is None:
94
        file = bot.getFile(update.message.photo[-1].file_id).download_as_bytearray()
95
        mimetype = Image(blob=bytes(file)).mimetype
96
        file = bytes(file)
97
        return file, mimetype
98
    # Entities; url, text_link
99
    if reply.entities is not None:
100
        urls = (extract_url(x, reply.text) for x in reply.entities)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
101
        images = [x for x in urls if is_image(x)]
102
        if len(images) > 0:
103
            extension = is_image(images[0])
104
            r = requests.get(images[0])  # use only first image url
105
            with open(output+extension, "wb") as f:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable output does not seem to be defined.
Loading history...
106
                f.write(r.content)
107
            return extension
108
    # Document
109
    if reply.document is not None and is_image(reply.document.file_name):
110
        file = bot.getFile(reply.document.file_id).download_as_bytearray()
111
        mimetype = Image(blob=bytes(file)).mimetype
112
        file = bytes(file)
113
        return file, mimetype
114
    # Sticker
115
    if reply.sticker is not None:
116
        file = bot.getFile(reply.sticker.file_id).download_as_bytearray()
117
        mimetype = Image(blob=bytes(file)).mimetype
118
        file = bytes(file)
119
        return file, mimetype
120
    # Photo in reply
121
    if reply.photo is not None:
122
        file = bot.getFile(reply.photo[-1].file_id).download_as_bytearray()
123
        mimetype = Image(blob=bytes(file)).mimetype
124
        file = bytes(file)
125
        return file, mimetype
126
    return False
127
128
129
def send_image_new(update, file, mimetype):
130
    photo_mimetypes = ["image/jpeg"]
131
    doc_extensions = ["image/png", "image/svg", "image/tif", "image/bmp", "video/mp4"]
132
    sticker_mimetypes = "image/x-webp"
133
    for i in photo_mimetypes:
134
        if mimetype.startswith(i):
135
            update.message.reply_photo(file, timeout=60)
136
            return True
137
    for i in doc_extensions:
138
        if mimetype.startswith(i):
139
            update.message.reply_document(file, timeout=60)
140
            return True
141
    if mimetype.startswith(sticker_mimetypes):
142
        update.message.reply_sticker(file, timeout=60)
143
        return True
144
145
146
def get_param(update, defaultvalue, min_value, max_value):
147
    if update.message.reply_to_message is not None:
148
        parts = update.message.text.split(" ", 1)
149
    elif update.message.caption is not None:
150
        parts = update.message.caption.split(" ", 1)
151
    elif update.message.text is not None:
152
        parts = update.message.text.split(" ", 2)
153
    else:
154
        return defaultvalue
155
    if len(parts) == 1:
156
        parameter = defaultvalue
157
    else:
158
        try:
159
            parameter = int(parts[1])
160
        except:
161
            #update.message.reply_text("Paremeter needs to be a number!")
162
            return defaultvalue
163
        if  parameter < min_value or parameter > max_value:
164
            errtext = "Baka, make it from " + str(min_value) + " to " + str(max_value) + "!"
165
            update.message.reply_text(errtext)
166
            return None
167
    return parameter
168
169
170
# custom filters for message handler
171
# photo with caption
172
def caption_filter(text):
173
    return lambda msg: bool(msg.photo) and bool(msg.caption) and msg.caption.startswith(text)
174
175
176
# text of choice
177
def text_filter(text):
178
    return lambda msg: bool(text in msg.text)
179