Completed
Push — master ( 72b331...d4b7d2 )
by Steffen
02:14
created

kuon.watcher.watcher.Watcher.run()   B

Complexity

Conditions 8

Size

Total Lines 44
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 30
dl 0
loc 44
rs 7.2933
c 0
b 0
f 0
cc 8
nop 1
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
4
import logging
5
import threading
6
from json.decoder import JSONDecodeError
7
from time import time, sleep
8
from typing import Type
9
10
from requests.exceptions import ConnectionError as RequestsConnectionError
11
from selenium.common.exceptions import TimeoutException
12
13
from kuon.api_response import LockedDict
14
from kuon.exceptions import InvalidApiResponseType
15
from kuon.watcher import Settings
16
from kuon.watcher.adapters import SalesAdapterBase
17
from kuon.watcher.condition_checker import ConditionChecker
18
from kuon.watcher.currency import Currency
19
from kuon.watcher.notifications import Telegram
20
from kuon.watcher.notifications.mail import Mail
21
from kuon.watcher.settings import NotificationType
22
from kuon.watcher.tracker import Tracker
23
24
25
class Watcher(threading.Thread):
26
    """
27
    Watcher class to watch tracked items and notify the user on reached conditions
28
    """
29
30
    _mail = None
31
    _telegram = None
32
33
    def __init__(self, adapter: Type[SalesAdapterBase], log_level=logging.ERROR, *args, **kwargs):
34
        """Initializing function
35
36
        :param log_level:
37
        """
38
        # initialize logger
39
        logging.basicConfig(level=log_level,
40
                            format='[%(asctime)s.%(msecs)03d %(levelname)s %(name)s] %(message)s',
41
                            datefmt="%H:%M:%S")
42
        self.logger = logging.getLogger(adapter.__name__)
43
44
        self.sales_interface = adapter(*args, **kwargs)
45
        self._item_tracker = Tracker()
46
        self.condition_checker = ConditionChecker(adapter, *args, **kwargs)
47
        self.checked_ids = []
48
49
        self.validate_settings()
50
        super().__init__(target=self.run)
51
52
    def run(self):
53
        """The main function, checking the tracked items in a while True loop
54
55
        :return:
56
        """
57
        while True:
58
            start_time = time()
59
60
            for tracked_item in self._item_tracker.tracked_items:
61
                self.logger.debug("searching for item: \"{item}\"".format(item=tracked_item.search_item))
62
63
                track_index = self._item_tracker.tracked_items.index(tracked_item)
64
65
                try:
66
                    results = self.sales_interface.search(market_name=tracked_item.search_item, no_delay=True)
67
                except (InvalidApiResponseType, JSONDecodeError, RequestsConnectionError, TimeoutException):
68
                    self.logger.info("could not reach the API though the adapter implementation")
69
                    continue
70
71
                for search_item in results.data.market_items:
72
73
                    if (track_index, search_item.item_id) in self.checked_ids:
74
                        # continue on items we checked already
75
                        self.logger.debug("already notified user about item: {item}".format(item=search_item.item_id))
76
                        continue
77
78
                    if self.condition_checker.check_condition(item=search_item, settings=tracked_item):
79
                        # condition matches and user didn't get notified yet
80
                        self.logger.info("conditions ({cond}) met for item: {item}(${price:.2f})({id})".format(
81
                            cond=tracked_item.conditions, item=search_item.market_name, price=search_item.price / 100,
82
                            id=search_item.item_id))
83
                        self.notify_user(item=search_item)
84
                        self.checked_ids.append((track_index, search_item.item_id))
85
                    else:
86
                        self.logger.debug("conditions ({cond}) not met for item: {item}(${price:.2f})({id})".format(
87
                            cond=tracked_item.conditions, item=search_item.market_name, price=search_item.price / 100,
88
                            id=search_item.item_id))
89
                        self.checked_ids.append((track_index, search_item.item_id))
90
91
            duration = time() - start_time
92
            self.condition_checker.clear_sold_history_cache()
93
            if duration < Settings.check_frequency:
94
                self.logger.info("sleeping '{:.2f}' seconds".format(Settings.check_frequency - duration))
95
                sleep(Settings.check_frequency - duration)
96
97
    def notify_user(self, item: LockedDict):
98
        """Notifying the user with the selected option
99
100
        :param item:
101
        :return:
102
        """
103
        if item.item_id in self.checked_ids:
104
            return False
105
106
        item_link = self.sales_interface.get_item_link(item.item_id)
107
108
        if Settings.notification_type == NotificationType.Nothing:
109
            self.logger.info('Search conditions met on item:\n<a href="{item_link}">{name}({price})</a>'.format(
110
                item_link=item_link, name=item.market_name, price=Currency(item.price)))
111
112
        if Settings.notification_type == NotificationType.Telegram:
113
            self.telegram.send_message(
114
                'Search conditions met on item:\n<a href="{item_link}">{name}({price})</a>'.format(
115
                    item_link=item_link, name=item.market_name, price=Currency(item.price)),
116
                Settings.Notification.Telegram.chat_id, parse_mode="HTML")
117
118
        if Settings.notification_type == NotificationType.Mail:
119
            self.mail.send_message("Search conditions met on item", '<a href="{item_link}">{name}({price})</a>'.format(
120
                item_link=item_link, name=item.market_name, price=Currency(item.price)))
121
122
            raise NotImplementedError("Mail notification is not implemented yet")
123
124
    def validate_settings(self):
125
        """Validate the settings and overwrite the invalid settings
126
127
        :return:
128
        """
129
        if Settings.check_frequency < (60 / Settings.LIMIT_60_SECONDS):
130
            self.logger.info("check frequency was too high, set to {0:d}s".format(60 / Settings.LIMIT_60_SECONDS))
131
            Settings.check_frequency = 60 / Settings.LIMIT_60_SECONDS
132
133
    @property
134
    def mail(self):
135
        """Property for Mail class to only initialize it on usage"""
136
        if not self._mail:
137
            self._mail = Mail()
138
        return self._mail
139
140
    @property
141
    def telegram(self):
142
        """Property for Telegram class to only initialize it on usage"""
143
        if not self._telegram:
144
            self._telegram = Telegram()
145
        return self._telegram
146