kuon.watcher.watcher.Watcher.run()   B
last analyzed

Complexity

Conditions 8

Size

Total Lines 44
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 64.8889

Importance

Changes 0
Metric Value
eloc 30
dl 0
loc 44
ccs 1
cts 26
cp 0.0385
rs 7.2933
c 0
b 0
f 0
cc 8
nop 1
crap 64.8889
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3 1
import logging
4 1
import threading
5 1
from json.decoder import JSONDecodeError
6 1
from time import time, sleep
7 1
from typing import Type
8
9 1
from requests.exceptions import ConnectionError as RequestsConnectionError
10 1
from selenium.common.exceptions import TimeoutException
11
12 1
from kuon.api_response import LockedDict
13 1
from kuon.exceptions import InvalidApiResponseType
14 1
from kuon.watcher import Settings
15 1
from kuon.watcher.adapters import SalesAdapterBase
16 1
from kuon.watcher.condition_checker import ConditionChecker
17 1
from kuon.watcher.currency import Currency
18 1
from kuon.watcher.notifications import Telegram
19 1
from kuon.watcher.notifications.mail import Mail
20 1
from kuon.watcher.settings import NotificationType
21 1
from kuon.watcher.tracker import Tracker
22
23
24 1
class Watcher(threading.Thread):
25
    """
26
    Watcher class to watch tracked items and notify the user on reached conditions
27
    """
28
29 1
    _mail = None
30 1
    _telegram = None
31
32 1
    def __init__(self, adapter: Type[SalesAdapterBase], log_level: int = logging.ERROR, *args, **kwargs) -> None:
33
        """Initializing function
34
35
        :type adapter: Type[SalesAdapterBase]
36
        :type log_level: int
37
        :type args: list
38
        :type kwargs: dict
39
        """
40
        # initialize logger
41
        logging.basicConfig(level=log_level,
42
                            format='[%(asctime)s.%(msecs)03d %(levelname)s %(name)s] %(message)s',
43
                            datefmt="%H:%M:%S")
44
        self.logger = logging.getLogger(adapter.__name__)
45
46
        self.sales_interface = adapter(*args, **kwargs)
47
        self._item_tracker = Tracker()
48
        self.condition_checker = ConditionChecker(adapter, *args, **kwargs)
49
        self.checked_ids = []
50
51
        self.validate_settings()
52
        super().__init__(target=self.run)
53
54 1
    def run(self) -> None:
55
        """The main function, checking the tracked items in a while True loop
56
57
        :return:
58
        """
59
        while True:
60
            start_time = time()
61
62
            for tracked_item in self._item_tracker.tracked_items:
63
                self.logger.debug("searching for item: \"{item}\"".format(item=tracked_item.search_item))
64
65
                track_index = self._item_tracker.tracked_items.index(tracked_item)
66
67
                try:
68
                    results = self.sales_interface.search(market_name=tracked_item.search_item, no_delay=True)
69
                except (InvalidApiResponseType, JSONDecodeError, RequestsConnectionError, TimeoutException):
70
                    self.logger.info("could not reach the API though the adapter implementation")
71
                    continue
72
73
                for search_item in results.data.market_items:
74
75
                    if (track_index, search_item.item_id) in self.checked_ids:
76
                        # continue on items we checked already
77
                        self.logger.debug("already checked item: {item}".format(item=search_item.item_id))
78
                        continue
79
80
                    if self.condition_checker.check_condition(item=search_item, settings=tracked_item):
81
                        # condition matches and user didn't get notified yet
82
                        self.logger.info("conditions ({cond}) met for item: {item}(${price:.2f})({id})".format(
83
                            cond=tracked_item.conditions, item=search_item.market_name, price=search_item.price / 100,
84
                            id=search_item.item_id))
85
                        self.notify_user(item=search_item)
86
                        self.checked_ids.append((track_index, search_item.item_id))
87
                    else:
88
                        self.logger.debug("conditions ({cond}) not met for item: {item}(${price:.2f})({id})".format(
89
                            cond=tracked_item.conditions, item=search_item.market_name, price=search_item.price / 100,
90
                            id=search_item.item_id))
91
                        self.checked_ids.append((track_index, search_item.item_id))
92
93
            duration = time() - start_time
94
            self.condition_checker.clear_sold_history_cache()
95
            if duration < Settings.check_frequency:
96
                self.logger.info("sleeping '{:.2f}' seconds".format(Settings.check_frequency - duration))
97
                sleep(Settings.check_frequency - duration)
98
99 1
    def notify_user(self, item: LockedDict) -> bool:
100
        """Notifying the user with the selected option
101
102
        :type item: LockedDict
103
        :return:
104
        """
105
        if item.item_id in self.checked_ids:
106
            return False
107
108
        item_link = self.sales_interface.get_item_link(item.item_id)
109
110
        if Settings.notification_type == NotificationType.Nothing:
111
            self.logger.info('Search conditions met on item:\n<a href="{item_link}">{name}({price})</a>'.format(
112
                item_link=item_link, name=item.market_name, price=Currency(item.price)))
113
114
        if Settings.notification_type == NotificationType.Telegram:
115
            self.telegram.send_message(
116
                'Search conditions met on item:\n<a href="{item_link}">{name}({price})</a>'.format(
117
                    item_link=item_link, name=item.market_name, price=Currency(item.price)),
118
                Settings.Notification.Telegram.chat_id, parse_mode="HTML")
119
120
        if Settings.notification_type == NotificationType.Mail:
121
            self.mail.send_message("Search conditions met on item", '<a href="{item_link}">{name}({price})</a>'.format(
122
                item_link=item_link, name=item.market_name, price=Currency(item.price)))
123
124
            raise NotImplementedError("Mail notification is not implemented yet")
125
126 1
    def validate_settings(self) -> None:
127
        """Validate the settings and overwrite the invalid settings
128
129
        :return:
130
        """
131
        if Settings.check_frequency < (60 / Settings.LIMIT_60_SECONDS):
132
            self.logger.info("check frequency was too high, set to {0:d}s".format(60 / Settings.LIMIT_60_SECONDS))
133
            Settings.check_frequency = 60 / Settings.LIMIT_60_SECONDS
134
135 1
    @property
136 1
    def mail(self) -> Mail:
137
        """Property for Mail class to only initialize it on usage"""
138
        if not self._mail:
139
            self._mail = Mail()
140
        return self._mail
141
142 1
    @property
143 1
    def telegram(self) -> Telegram:
144
        """Property for Telegram class to only initialize it on usage"""
145
        if not self._telegram:
146
            self._telegram = Telegram()
147
        return self._telegram
148