Passed
Push — master ( 39bb6e...cb377a )
by Anas
02:06
created

modules.utils   A

Complexity

Total Complexity 38

Size/Duplication

Total Lines 133
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 106
dl 0
loc 133
rs 9.36
c 0
b 0
f 0
wmc 38

6 Functions

Rating   Name   Duplication   Size   Complexity  
B get_param() 0 22 8
A is_image() 0 13 5
C get_image() 0 33 9
A extract_url() 0 9 3
B send_image() 0 16 7
A mp4_fix() 0 9 1

2 Methods

Rating   Name   Duplication   Size   Complexity  
A Caption_Filter.filter() 0 3 4
A Caption_Filter.__init__() 0 2 1
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
# get_image func courtesy of Slko
4
from telegram.ext import BaseFilter
5
from wand.image import Image
6
import subprocess
7
import requests
8
import os.path
9
import io
10
11
12
def extract_url(entity, text):
13
    if entity["type"] == "text_link":
14
        return entity["url"]
15
    elif entity["type"] == "url":
16
        offset = entity["offset"]
17
        length = entity["length"]
18
        return text[offset:offset+length]
19
    else:
20
        return None
21
22
23
def is_image(path):
24
    ext = None
25
    image_extensions = (".jpg", ".jpeg", ".png", ".gif", ".svg", ".tif", ".bmp", ".mp4")
26
    if path is None:
27
        return False
28
    for i in image_extensions:
29
        if path.casefold().endswith(i):
30
            ext = i
31
            return ext
32
    if ext == None:
33
        ext = ".mp4"
34
        return ext
35
    return False
36
37
38
def get_image(update, context, dl_path, filename):
39
    output = os.path.join(dl_path, filename)
40
    reply = update.message.reply_to_message
41
    if reply is None:
42
        extension = ".jpg"
43
        context.bot.getFile(update.message.photo[-1].file_id).download(output + extension)
44
        return extension
45
    # Entities; url, text_link
46
    if reply.entities is not None:
47
        urls = (extract_url(x, reply.text) for x in reply.entities)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
48
        images = [x for x in urls if is_image(x)]
49
        if len(images) > 0:
50
            extension = is_image(images[0])
51
            r = requests.get(images[0])  # use only first image url
52
            with open(output+extension, "wb") as f:
53
                f.write(r.content)
54
            return extension
55
    # Document
56
    if reply.document is not None and is_image(reply.document.file_name):
57
        extension = is_image(reply.document.file_name)
58
        context.bot.getFile(reply.document.file_id).download(output + extension)
59
        return extension
60
    # Sticker
61
    if reply.sticker is not None:
62
        extension = ".webp"
63
        context.bot.getFile(reply.sticker.file_id).download(output+extension)
64
        return extension
65
    # Photo in reply
66
    if reply.photo is not None:
67
        extension = ".jpg"
68
        context.bot.getFile(reply.photo[-1].file_id).download(output + extension)
69
        return extension
70
    return False
71
72
73
def send_image(update, filepath, name, extension):
74
    photo_extensions = (".jpg", ".jpeg")
75
    doc_extensions = (".png", ".svg", ".tif", ".bmp", ".gif", ".mp4")
76
    sticker_extensions = (".webp")
77
    if extension in photo_extensions:
78
        with open(filepath + name + extension, "rb") as f:
79
            update.message.reply_photo(f)
80
        return True  
81
    if extension in doc_extensions:
82
        with open(filepath + name + extension, "rb") as f:
83
            update.message.reply_document(f, timeout=90)
84
        return True
85
    if extension in sticker_extensions:
86
        with open(filepath + name + extension, "rb") as f:
87
            update.message.reply_sticker(f)
88
        return True
89
90
91
def get_param(update, defaultvalue, min_value, max_value):
92
    if update.message.reply_to_message is not None:
93
        parts = update.message.text.split(" ", 1)
94
    elif update.message.caption is not None:
95
        parts = update.message.caption.split(" ", 1)
96
    elif update.message.text is not None:
97
        parts = update.message.text.split(" ", 2)
98
    else:
99
        return defaultvalue
100
    if len(parts) == 1:
101
        parameter = defaultvalue
102
    else:
103
        try:
104
            parameter = int(parts[1])
105
        except:
106
            #update.message.reply_text("Paremeter needs to be a number!")
107
            return defaultvalue
108
        if  parameter < min_value or parameter > max_value:
109
            errtext = "Baka, make it from " + str(min_value) + " to " + str(max_value) + "!"
110
            update.message.reply_text(errtext)
111
            return None
112
    return parameter
113
    
114
def mp4_fix(path, filename):
115
    args = "ffmpeg -loglevel panic -i " + path + filename + ".mp4" + \
116
            " -an -vf scale=trunc(iw/2)*2:trunc(ih/2)*2 \
117
            -pix_fmt yuv420p -c:v libx264 -profile:v high -level:v 2.0 " \
118
            + path + filename + "fixed.mp4 -y"
119
    subprocess.run(args, shell=True)
120
    os.remove(path+filename+".mp4")
121
    fixed_file_name = filename + "fixed"
122
    return fixed_file_name
123
124
125
# custom filters for message handler
126
# photo with caption
127
class Caption_Filter(BaseFilter):
128
    def __init__(self, command):
129
        self.data=command
130
    def filter(self, message):
131
        if message.photo and message.caption and message.caption.startswith(self.data):
132
            return True
133