Completed
Branch dadyarri/6.1.0/divide-apis (ba35d6)
by Daniil
02:59
created

vk.Vk.send_message()   A

Complexity

Conditions 3

Size

Total Lines 30
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 21
dl 0
loc 30
rs 9.376
c 0
b 0
f 0
cc 3
nop 7
1
import os
2
import random
3
from typing import List
4
from typing import NoReturn
5
6
from requests import exceptions
7
import vk_api
8
from vk_api.bot_longpoll import VkBotEventType
9
10
from logger import Logger
11
from vkbotlongpoll import RalphVkBotLongPoll
12
13
14
class Vk:
15
    def __init__(self):
16
        self.log = Logger()
17
18
        self.token = os.environ["VK_TOKEN"]
19
        self.user_token = os.environ["VK_USER_TOKEN"]
20
        self.gid = os.environ["GID_ID"]
21
        self.cid = os.environ["CID_ID"]
22
23
        self.bot_session = None
24
        self.user_session = None
25
26
        self.bot_vk = None
27
        self.user_vk = None
28
        self.longpoll = None
29
30
        # Переименование обрабатываемых типов событий
31
        self.NEW_MESSAGE = VkBotEventType.MESSAGE_NEW
32
33
        self.log.log.info(
34
            f"Беседа... {'Тестовая' if self.cid.endswith('1') else 'Основная'}"
35
        )
36
37
    def auth(self):
38
        # Авторизация в API ВКонтакте
39
        self.log.log.info("Авторизация ВКонтакте...")
40
        try:
41
            self.bot_session = vk_api.VkApi(token=self.token, api_version="5.103")
42
            self.user_session = vk_api.VkApi(token=self.user_token, api_version="5.103")
43
        except vk_api.exceptions.AuthError:
44
            self.log.log.error("Неудача. Ошибка авторизации.")
45
        else:
46
            try:
47
                self.bot_vk = self.bot_session.get_api()
48
                self.user_vk = self.user_session.get_api()
49
                self.longpoll = RalphVkBotLongPoll(
50
                    vk=self.bot_session, group_id=self.gid
51
                )
52
            except exceptions.ConnectionError:
53
                self.log.log.error("Неудача. Превышен лимит попыток подключения.")
54
            except vk_api.exceptions.ApiError:
55
                self.log.log.error("Неудача. Ошибка доступа.")
56
            else:
57
                self.log.log.info("Успех.")
58
59
    def update_status(self):
60
        self.log.log.info("Обновление версии в статусе группы...")
61
        try:
62
            with open("VERSION.txt", "r") as f:
63
                v = f"Версия: {f.read()}"
64
            self.user_vk.status.set(text=v, group_id=self.gid)
65
        except vk_api.exceptions.ApiError as e:
66
            self.log.log.error(f"Ошибка {e.__str__()}")
67
        else:
68
            self.log.log.info(f"Успех.")
69
70
    def send_message(
71
        self,
72
        msg: str,
73
        pid: int = None,
74
        keyboard: str = "",
75
        attachments: str = None,
76
        user_ids: str = None,
77
        forward: str = "",
78
    ) -> NoReturn:
79
80
        """
81
        Отправка сообщения msg пользователю/в беседу pid
82
        с клавиатурой keyboard (не отправляется, если не указан json файл)
83
        """
84
85
        try:
86
            self.bot_vk.messages.send(
87
                peer_id=pid,
88
                random_id=random.getrandbits(64),
89
                message=msg,
90
                keyboard=keyboard,
91
                attachments=attachments,
92
                user_ids=user_ids,
93
                forward_messages=forward,
94
            )
95
96
        except vk_api.exceptions.ApiError as e:
97
            self.log.log.error(msg=e.__str__())
98
        except FileNotFoundError as e:
99
            self.log.log.error(msg=e)
100
101
    def send_mailing(self, ids: str, msg: str = "") -> NoReturn:
102
        """
103
        Отправка рассылки
104
        """
105
        self.send_message(msg=msg, user_ids=ids)
106
107
    def _get_conversations_ids(self) -> list:
108
        """
109
        Получает идентификаторы пользователей последних 200 диалогов
110
        """
111
        q = self.bot_vk.messages.getConversations(
112
            offset=1, count=200, group_id=self.gid
113
        )
114
        _l = []
115
        for i in range(len(q["items"])):
116
            if q["items"][i]["conversation"]["can_write"]["allowed"]:
117
                _l.append(str(q["items"][i]["conversation"]["peer"]["id"]))
118
        return _l
119
120
    def get_users_info(self, ids: list) -> List[dict]:
121
        """
122
        Получает информацию о пользователях с указанными id
123
        """
124
        return self.bot_vk.users.get(user_ids=",".join(map(str, ids)))
125