Completed
Push — master ( 3cd1a6...75a58a )
by Anas
15s queued 12s
created

modules.utils.mp4_fix()   A

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 2
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
# get_image func courtesy of Slko
4
from telegram.ext import BaseFilter
5
from wand.image import Image
6
import subprocess
7
import requests
8
import os.path
9
import io
10
11
12
def extract_url(entity, text):
13
    if entity["type"] == "text_link":
14
        return entity["url"]
15
    elif entity["type"] == "url":
16
        offset = entity["offset"]
17
        length = entity["length"]
18
        return text[offset:offset+length]
19
    else:
20
        return None
21
22
23
def is_image(path):
24
    if path is None:
25
        return False
26
    ext = None
27
    image_extensions = (".jpg", ".jpeg", ".png", ".gif", ".svg", ".tif", ".bmp", ".mp4")
28
    if path is None:
29
        return False
30
    for i in image_extensions:
31
        if path.casefold().endswith(i):
32
            ext = i
33
            return ext
34
    if ext == None:
35
        ext = ".mp4"
36
        return ext
37
    return False
38
39
def is_image_by_mime_type(mime_type):
40
    if mime_type is None:
41
        return False
42
    mime_types = {
43
        "image/png": ".png",
44
        "image/gif": ".gif",
45
        "image/jpeg": ".jpg",
46
        "video/mp4": ".mp4",
47
    }
48
    return mime_types.get(mime_type.casefold(), False)
49
50
51
def get_image(update, context, dl_path, filename):
52
    output = os.path.join(dl_path, filename)
53
    reply = update.message.reply_to_message
54
    if reply is None:
55
        extension = ".jpg"
56
        context.bot.getFile(update.message.photo[-1].file_id).download(output + extension)
57
        return extension
58
    # Entities; url, text_link
59
    if reply.entities is not None:
60
        urls = (extract_url(x, reply.text) for x in reply.entities)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
61
        images = [x for x in urls if is_image(x)]
62
        if len(images) > 0:
63
            extension = is_image(images[0])
64
            r = requests.get(images[0])  # use only first image url
65
            with open(output+extension, "wb") as f:
66
                f.write(r.content)
67
            return extension
68
    # Document with file name
69
    if reply.document is not None and is_image(reply.document.file_name):
70
        extension = is_image(reply.document.file_name)
71
        context.bot.getFile(reply.document.file_id).download(output + extension)
72
        return extension
73
    # Document without file name
74
    if reply.document is not None and is_image_by_mime_type(reply.document.mime_type):
75
        extension = is_image_by_mime_type(reply.document.mime_type)
76
        context.bot.getFile(reply.document.file_id).download(output + extension)
77
        return extension
78
    # Sticker
79
    if reply.sticker is not None:
80
        extension = ".webp"
81
        context.bot.getFile(reply.sticker.file_id).download(output+extension)
82
        return extension
83
    # Photo in reply
84
    if reply.photo is not None:
85
        extension = ".jpg"
86
        context.bot.getFile(reply.photo[-1].file_id).download(output + extension)
87
        return extension
88
    return False
89
90
91
def send_image(update, filepath, name, extension):
92
    photo_extensions = (".jpg", ".jpeg")
93
    doc_extensions = (".png", ".svg", ".tif", ".bmp", ".gif", ".mp4")
94
    sticker_extensions = (".webp")
95
    if extension in photo_extensions:
96
        with open(filepath + name + extension, "rb") as f:
97
            update.message.reply_photo(f)
98
        return True  
99
    if extension in doc_extensions:
100
        with open(filepath + name + extension, "rb") as f:
101
            update.message.reply_document(f, timeout=90)
102
        return True
103
    if extension in sticker_extensions:
104
        with open(filepath + name + extension, "rb") as f:
105
            update.message.reply_sticker(f)
106
        return True
107
108
109
def get_param(update, defaultvalue, min_value, max_value):
110
    if update.message.reply_to_message is not None:
111
        parts = update.message.text.split(" ", 1)
112
    elif update.message.caption is not None:
113
        parts = update.message.caption.split(" ", 1)
114
    elif update.message.text is not None:
115
        parts = update.message.text.split(" ", 2)
116
    else:
117
        return defaultvalue
118
    if len(parts) == 1:
119
        parameter = defaultvalue
120
    else:
121
        try:
122
            parameter = int(parts[1])
123
        except:
124
            #update.message.reply_text("Paremeter needs to be a number!")
125
            return defaultvalue
126
        if  parameter < min_value or parameter > max_value:
127
            errtext = "Baka, make it from " + str(min_value) + " to " + str(max_value) + "!"
128
            update.message.reply_text(errtext)
129
            return None
130
    return parameter
131
    
132
def mp4_fix(path, filename):
133
    args = "ffmpeg -loglevel panic -i " + path + filename + ".mp4" + \
134
            " -an -vf scale=trunc(iw/2)*2:trunc(ih/2)*2 \
135
            -pix_fmt yuv420p -c:v libx264 -profile:v high -level:v 2.0 " \
136
            + path + filename + "fixed.mp4 -y"
137
    subprocess.run(args, shell=True)
138
    os.remove(path+filename+".mp4")
139
    fixed_file_name = filename + "fixed"
140
    return fixed_file_name
141
142
143
# custom filters for message handler
144
# photo with caption
145
class Caption_Filter(BaseFilter):
146
    def __init__(self, command):
147
        self.data=command
148
    def filter(self, message):
149
        if message.photo and message.caption and message.caption.startswith(self.data):
150
            return True
151