Passed
Push — master ( 00153b...43cb7b )
by Dave
01:11
created

backend.fcmapp.Cache.purge()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 5
nop 1
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
'''Application Service layer for Full Cycle Mining
2
Gateway into most of application functionality'''
3
import sys
4
import os
5
import datetime
6
import logging
7
import json
8
import base64
9
from collections import defaultdict
10
from colorama import init, Fore
11
import pika
12
from sqlalchemy.orm import sessionmaker
13
from sqlalchemy import create_engine
14
from domain.mining import Miner, Pool, MinerPool, AvailablePool, MinerCurrentPool, MinerStatistics, MinerStatus
15
from domain.rep import MinerRepository, PoolRepository, LoginRepository, RuleParametersRepository, BaseRepository
16
#from domain.miningrules import RuleParameters
17
from domain.sensors import Sensor, SensorValue
18
from messaging.messages import Message, MessageSchema, MinerMessageSchema, ConfigurationMessageSchema
19
from messaging.sensormessages import SensorValueSchema
20
from messaging.schema import MinerSchema, MinerStatsSchema, MinerCurrentPoolSchema, AvailablePoolSchema, PoolSchema
21
from helpers.queuehelper import QueueName, Queue, QueueEntry, QueueType
22
from helpers.camerahelper import take_picture
23
from helpers.antminerhelper import setminertoprivileged, privileged, setprivileged, setrestricted, waitforonline, restartmining, stopmining, restart, set_frequency
24
from helpers.temperaturehelper import readtemperature
25
from helpers.telegramhelper import sendalert, sendphoto
26
from backend.fcmcache import Cache, CacheKeys
27
28
class ComponentName:
29
    '''names of components, corresponds to queue login names'''
30
    fullcycle = 'fullcycle'
31
    rules = 'rules'
32
33
class Component(object):
34
    '''A component is a unit of execution of FCM'''
35
    def __init__(self, componentname, option=''):
36
        self.app = ApplicationService(component=componentname, option=option)
37
        #was a queue, now its a channel
38
        self.listeningqueue = None
39
40
    def listen(self):
41
        if self.listeningqueue:
42
            self.app.bus.listen(self.listeningqueue)
43
44
class ServiceName:
45
    '''names of infrastructure services'''
46
    messagebus = 'rabbit'
47
    cache = 'redis'
48
    database = 'mysql'
49
    email = 'gmail'
50
51
class InfrastructureService:
52
    '''configuration for a dependency'''
53
    name = ''
54
    connection = ''
55
    host = ''
56
    port = ''
57
    user = ''
58
    password = ''
59
    def __init__(self, name, connection, host, port, user, password):
60
        self.name = name
61
        self.connection = connection
62
        self.host = host
63
        self.port = port
64
        self.user = user
65
        self.password = password
66
67
68
class Antminer():
69
    def __init__(self, config, login):
70
        self.__config = config
71
        self.__login = login
72
73
    def set_privileged(self, miner):
74
        setprivileged(miner, self.__login, self.__config['provision.apiallow.privileged'])
75
76
    def setminertoprivileged(self, miner):
77
        return setminertoprivileged(miner, self.__login, self.__config['provision.apiallow.privileged'])
78
79
    def set_restricted(self, miner):
80
        setrestricted(miner, self.__login, self.__config['provision.apiallow.restricted'])
81
82
    def waitforonline(self, miner):
83
        return waitforonline(miner)
84
85
    def getaccesslevel(self, miner):
86
        return privileged(miner)
87
88
    def restart(self, miner):
89
        return restart(miner)
90
91
    def stopmining(self, miner):
92
        '''stop miner through ssh.'''
93
        return stopmining(miner, self.__login)
94
95
    def restartssh(self, miner):
96
        '''restart miner through ssh. start mining again'''
97
        return restartmining(miner, self.__login)
98
99
    def set_frequency(self, miner, frequency):
100
        return set_frequency(miner, self.__login, frequency)
101
102
103
class Bus:
104
    _connection = None
105
    _durable = False
106
107
    def __init__(self, servicelogin):
108
        self._servicelogin = servicelogin
109
        self._userlogin = self._servicelogin.user
110
        if self._userlogin is None:
111
            self._userlogin = 'fullcycle'
112
113
    def connection(self):
114
        if not self._connection:
115
            credentials = pika.PlainCredentials(self._userlogin, self._servicelogin.password)
116
            parameters = pika.ConnectionParameters(self._servicelogin.host, self._servicelogin.port, '/', credentials)
117
            self._connection = pika.BlockingConnection(parameters)
118
        return self._connection
119
120
    def close(self):
121
        if self._connection:
122
            self._connection.close()
123
124
    def sleep(self, seconds):
125
        if self._connection:
126
            self._connection.sleep(seconds)
127
128
    def get_queue_name(self, queue_name):
129
        name_of_q = queue_name
130
        if isinstance(queue_name, QueueName):
131
            name_of_q = QueueName.value(queue_name)
132
        return name_of_q
133
134
    def publish(self, queue_name, msg, exchange=''):
135
        """Publishes message on new channel"""
136
        localchannel = self.connection().channel()
137
        localchannel.queue_declare(queue=self.get_queue_name(queue_name), durable=self._durable)
138
        localchannel.basic_publish(exchange=exchange, routing_key=name_of_q, body=msg)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable name_of_q does not seem to be defined.
Loading history...
139
        localchannel.close()
140
141
    def broadcast(self, exchange_name, msg):
142
        '''broadcast a message to anyone that is listening'''
143
        localchannel = self.connection().channel()
144
        localchannel.exchange_declare(exchange=self.get_queue_name(exchange_name), exchange_type='fanout')
145
        localchannel.basic_publish(exchange=self.get_queue_name(exchange_name), routing_key='', body=msg)
146
        localchannel.close()
147
148
    def subscribe(self, name, callback, no_acknowledge=True, prefetch_count=1):
149
        """basic subscribe messages from one queue
150
        remember to listen to channel to get messages
151
        """
152
        localchannel = self.connection().channel()
153
        localchannel.queue_declare(queue=self.get_queue_name(name))
154
        localchannel.basic_qos(prefetch_count=prefetch_count)
155
        localchannel.basic_consume(callback, queue=self.get_queue_name(name), no_ack=no_acknowledge)
156
        return localchannel
157
158
    def subscribe_broadcast(self, name, callback, no_acknowledge=True, prefetch_count=1):
159
        """Consumes messages from one queue"""
160
        localchannel = self.connection().channel()
161
        localchannel.exchange_declare(exchange=self.get_queue_name(name), exchange_type='fanout')
162
163
        result = localchannel.queue_declare(exclusive=True)
164
        queue_name = result.method.queue
165
        localchannel.queue_bind(exchange=self.get_queue_name(name), queue=queue_name)
166
167
        localchannel.basic_qos(prefetch_count=prefetch_count)
168
        localchannel.basic_consume(callback, queue=queue_name, no_ack=no_acknowledge)
169
        return localchannel
170
171
    def listen(self, channel):
172
        '''listen to queue. this is a blocking call'''
173
        channel.start_consuming()
174
175
    def acknowledge(self, channel, delivery_tag):
176
        '''acknowledge that message was processed'''
177
        channel.basic_ack(delivery_tag)
178
179
    def reject(self, channel, delivery_tag):
180
        '''tell queue that message was not processed'''
181
        channel.basic_nack(delivery_tag)
182
183
184
185
class ApplicationService:
186
    '''Application Services'''
187
    programnamefull = ''
188
    programname = ''
189
    component = ComponentName.fullcycle
190
    loglevel = 0
191
    #true if user passed in -now command line argument
192
    isrunnow = False
193
    #dictionary of queues managed by this app
194
    _queues = {}
195
    _channels = []
196
    #the startup directory
197
    homedirectory = None
198
    __cache = None
199
    __bus = None
200
    __config = {}
201
    __logger = None
202
    __logger_debug = None
203
    __logger_error = None
204
    antminer = None
205
206
    def __init__(self, component=ComponentName.fullcycle, option=None, announceyourself=False):
207
        if self.component == ComponentName.fullcycle:
208
            self.print('Starting FCM Init')
209
        self.component = component
210
        self.initargs(option)
211
        self.startupstuff()
212
        if self.component == ComponentName.fullcycle:
213
            self.print('Starting FCM Configuration')
214
        self.setup_configuration()
215
        self.initlogger()
216
        self.initmessaging()
217
        #this is slow. should be option to opt out of cache?
218
        if self.component == ComponentName.fullcycle:
219
            self.loginfo('Starting FCM Cache')
220
        self.initcache()
221
        self.initbus()
222
        self.init_application()
223
        self.init_sensors()
224
225
        if announceyourself:
226
            self.sendqueueitem(QueueEntry(QueueName.Q_LOG, self.stamp('Started {0}'.format(self.component)), QueueType.broadcast))
227
228
    def initargs(self, option):
229
        '''process command line arguments'''
230
        if sys.argv:
231
            self.programnamefull = sys.argv[0]
232
            self.programname = os.path.basename(self.programnamefull)
233
        firstarg = option
234
        if len(sys.argv) > 1:
235
            firstarg = sys.argv[1]
236
        if firstarg is not None:
237
            if firstarg == '-now':
238
                self.isrunnow = True
239
240
    def startupstuff(self):
241
        '''start up the application'''
242
        self.homedirectory = os.path.dirname(__file__)
243
        #used with colorama on windows
244
        init(autoreset=True)
245
246
    def initmessaging(self):
247
        '''start up messaging'''
248
        self._schemamsg = MessageSchema()
249
250
    def initcache(self):
251
        '''start up cache'''
252
        try:
253
            cachelogin = self.getservice(ServiceName.cache)
254
            self.__cache = Cache(cachelogin)
255
        except Exception as ex:
256
            #cache is offline. try to run in degraded mode
257
            self.logexception(ex)
258
259
    def startup(self):
260
        self.initminercache()
261
        self.initpoolcache()
262
263
    def initbus(self):
264
        '''start up message bus'''
265
        login = self.getservice(ServiceName.messagebus)
266
        self.__bus = Bus(login)
267
268
    @property
269
    def bus(self):
270
        return self.__bus
271
272
    def init_sensors(self):
273
        self.sensor = Sensor('controller', 'DHT22', 'controller')
274
275
    def init_application(self):
276
        self.antminer = Antminer(self.__config, self.sshlogin())
277
278
    @property
279
    def isdebug(self):
280
        return sys.flags.debug
281
282
    def getconfigfilename(self, configfilename):
283
        '''get the contents of a config file'''
284
        return os.path.join(self.homedirectory, configfilename)
285
286
    def setup_configuration(self):
287
        '''configuration is loaded once at startup'''
288
        raw = BaseRepository().readrawfile(self.getconfigfilename('config/fullcycle.conf'))
289
        self.__config = json.loads(raw)
290
        self.applicationid = self.configuration('applicationid')
291
        self.loglevel = self.configuration('loglevel')
292
293
    def configuration(self, key):
294
        if not key in self.__config: return None
295
        return self.__config[key]
296
297
    def is_enabled_configuration(self, key):
298
        lookupkey = '{0}.enabled'.format(key)
299
        if not lookupkey in self.__config:
300
            return False
301
        value = self.__config[lookupkey]
302
        if isinstance(value, str):
303
            return value == 'true' or value == 'True'
304
        return value
305
306
    def initpoolcache(self):
307
        if self.__cache.get(CacheKeys.pools) is None:
308
            spools = PoolRepository().readrawfile(self.getconfigfilename('config/pools.conf'))
309
            self.tryputcache(CacheKeys.pools, spools)
310
        for pool in self.pools():
311
            #pool isinstance of Pool
312
            availablepool = AvailablePool(pool.pool_type, pool, pool.url, pool.user, pool.password, pool.priority)
313
            minerpool = MinerPool(miner=None, priority=0, pool=availablepool)
314
            self.putpool(pool)
315
            self.add_pool(minerpool)
316
317
    def initminercache(self):
318
        '''put known miners into cache'''
319
        if self.__cache.get(CacheKeys.miners) is None:
320
            sminers = MinerRepository().readrawfile(self.getconfigfilename('config/miners.conf'))
321
            self.tryputcache(CacheKeys.miners, sminers)
322
323
        for miner in self.miners():
324
            #status is not persisted yet so init from name
325
            if miner.is_manually_disabled():
326
                miner.status = MinerStatus.Disabled
327
            if self.getminer(miner) is None:
328
                self.putminer(miner)
329
330
    def cacheclear(self):
331
        '''clear the cache'''
332
        self.__cache.purge()
333
334
    def initlogger(self):
335
        '''set up logging application info'''
336
        self.__logger = self.setup_logger('fcmapp', 'fcm.log', logging.INFO)
337
338
        self.__logger_debug = self.setup_logger('fcmdebug', 'fcm.bug', logging.DEBUG)
339
340
        self.__logger_error = self.setup_logger('fcmerror', 'fcm.err', logging.ERROR)
341
342
    def setup_logger(self, logger_name, log_file, level=logging.INFO):
343
        '''start logger'''
344
        logr = logging.getLogger(logger_name)
345
        formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
346
        #by default will append. use mode='w' to overwrite
347
        file_handler = logging.FileHandler(log_file)
348
        file_handler.setFormatter(formatter)
349
        logr.addHandler(file_handler)
350
        # is setting stream necessary
351
        stream_handler = logging.StreamHandler()
352
        stream_handler.setFormatter(formatter)
353
        logr.setLevel(level)
354
        return logr
355
356
    def loginfo(self, message):
357
        '''log informational message'''
358
        logmsg = '{0}: {1}'.format(self.programname, message)
359
        self.__logger.info(logmsg)
360
        print(message)
361
362
    def logerror(self, message):
363
        '''log error'''
364
        logmsg = '{0}: {1}'.format(self.programname, message)
365
        self.__logger_error.error(logmsg)
366
        print(Fore.RED+logmsg)
367
368
    def logdebug(self, message):
369
        '''log debug message'''
370
        if not self.loglevel or self.loglevel == 0:
371
            return
372
        logmsg = '{0}: {1}'.format(self.programname, message)
373
        self.__logger_debug.debug(logmsg)
374
        print(Fore.GREEN+logmsg)
375
376
    def print(self, message):
377
        '''echo message to screen'''
378
        print('{0}: {1}'.format(self.now(), message))
379
380
    def now(self):
381
        '''current time formatted as friendly string'''
382
        return self.formattime(datetime.datetime.now())
383
384
    def formattime(self, time):
385
        '''standard format for time'''
386
        return time.strftime('%Y-%m-%d %H:%M:%S')
387
388
    #region lookups
389
    #todo: move to configurations section
390
    def miners(self):
391
        '''configured miners'''
392
        miners = MinerRepository().readminers(self.getconfigfilename('config/miners.conf'))
393
        return miners
394
395
    def knownminers(self):
396
        '''for now just return a list of miners
397
        later should consider returning a list that is easily searched and filtered
398
        '''
399
        dknownminers = self.__cache.gethashset(CacheKeys.knownminers)
400
        if dknownminers is not None and dknownminers:
401
            #get list of miners from cache
402
            return self.deserializelistofstrings(list(dknownminers.values()))
403
        knownminers = self.miners()
404
        return knownminers
405
406
    def allminers(self):
407
        '''combined list of discovered miners and configured miners'''
408
        allminers = self.knownminers()
409
        for miner in self.miners():
410
            foundminer = [x for x in allminers if x.key() == miner.key()]
411
            if not foundminer:
412
                allminers.append(miner)
413
        return allminers
414
415
    def knownsensors(self):
416
        dknownsensors = self.__cache.gethashset(CacheKeys.knownsensors)
417
        if dknownsensors is not None and dknownsensors:
418
            return self.deserializelist_withschema(SensorValueSchema(), list(dknownsensors.values()))
419
        return None
420
421
    def knownpools(self):
422
        dknownpools = self.__cache.gethashset(CacheKeys.knownpools)
423
        if dknownpools:
424
            return self.deserializelist_withschema(AvailablePoolSchema(), list(dknownpools.values()))
425
        return None
426
427
    def addknownsensor(self, sensorvalue):
428
        val = self.jsonserialize(SensorValueSchema(), sensorvalue)
429
        self.__cache.putinhashset(CacheKeys.knownsensors, sensorvalue.sensorid, val)
430
431
    def minersummary(self, max_number=10):
432
        '''show a summary of known miners
433
        '''
434
        mode = self.configuration('summary')
435
        if not mode: mode = 'auto'
436
        knownminers = self.knownminers()
437
        if len(knownminers) <= max_number:
438
            return '\n'.join([m.summary() for m in knownminers])
439
        groupbystatus = defaultdict(list)
440
        for miner in knownminers:
441
            groupbystatus[miner.status].append(miner)
442
        return '\n'.join(['{0}: {1}'.format(s, self.summary_by_status(s, groupbystatus[s])) for s in groupbystatus])
443
444
    def summary_by_status(self, key, minerlist):
445
        if key == 'online':
446
            return '{0} miners hash {1}'.format(self.summarize_count(minerlist), self.summarize_hash(minerlist))
447
        return self.summarize_count(minerlist)
448
449
    def summarize_count(self, minerlist):
450
        return len(minerlist)
451
452
    def summarize_hash(self, minerlist):
453
        return sum(miner.minerstats.currenthash for miner in minerlist)
454
455
    def addknownminer(self, miner):
456
        '''add miner to known miners list'''
457
        val = self.serialize(miner)
458
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
459
460
    def updateknownminer(self, miner):
461
        '''update known miner in cache'''
462
        sminer = self.__cache.getfromhashset(CacheKeys.knownminers, miner.key())
463
        memminer = self.deserialize(MinerSchema(), self.safestring(sminer))
464
        if memminer is None:
465
            memminer = miner
466
        else:
467
            #merge new values
468
            memminer.updatefrom(miner)
469
        val = self.serialize(memminer)
470
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
471
472
    #region Pools
473
    def pools(self):
474
        '''configured pools'''
475
        pools = PoolRepository().readpools(self.getconfigfilename('config/pools.conf'))
476
        return pools
477
478
    def findpool(self, minerpool):
479
        '''find a pool in list'''
480
        if minerpool is None: return None
481
        for pool in self.pools():
482
            if minerpool.currentpool == pool.url and minerpool.currentworker.startswith(pool.user):
483
                return pool
484
        return None
485
486
    def getpool(self, miner: Miner):
487
        '''get pool from cache'''
488
        valu = self.trygetvaluefromcache(miner.name + '.pool')
489
        if valu is None: return None
490
        entity = MinerCurrentPool(miner, **self.deserialize(MinerCurrentPoolSchema(), valu))
491
        return entity
492
493
    def add_pool(self, minerpool: MinerPool):
494
        '''see if pool is known or not, then add it'''
495
        knownpool = self.__cache.getfromhashset(CacheKeys.knownpools, minerpool.pool.key)
496
        if not knownpool:
497
            val = self.jsonserialize(AvailablePoolSchema(), minerpool.pool)
498
            self.__cache.putinhashset(CacheKeys.knownpools, minerpool.pool.key, val)
499
500
    def update_pool(self, key, pool: AvailablePool):
501
        self.__cache.hdel(CacheKeys.knownpools, key)
502
        knownpool = self.__cache.getfromhashset(CacheKeys.knownpools, pool.key)
503
        if not knownpool:
504
            val = self.jsonserialize(AvailablePoolSchema(), pool)
505
            self.__cache.putinhashset(CacheKeys.knownpools, pool.key, val)
506
    #endregion
507
508
    def sshlogin(self):
509
        '''return contents of login file'''
510
        return self.readlogin('ftp.conf')
511
512
    def readlogin(self, filename):
513
        '''read login file configuration'''
514
        login = LoginRepository().readlogins(self.getconfigfilename('config/'+filename))
515
        return login
516
517
    def ruleparameters(self):
518
        '''rules parameters'''
519
        return RuleParametersRepository().readrules(self.getconfigfilename('config/'+'rules.conf'))
520
521
    def getservice(self, servicename):
522
        '''get a service by name. should be repository'''
523
        file_name = self.getconfigfilename('config/services.conf')
524
        with open(file_name, encoding='utf-8-sig') as config_file:
525
            content = json.loads(config_file.read())
526
        services = [InfrastructureService(**s) for s in content]
527
        return next((s for s in services if s.name == servicename), None)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable s does not seem to be defined.
Loading history...
528
529
    def getservice_useroverride(self, servicename):
530
        service = self.getservice(servicename)
531
        service.user = self.component
532
        return service
533
    #endregion lookups
534
535
    def listen(self, qlisten):
536
        """Goes into listening mode on a queue"""
537
        try:
538
            self.bus.listen(qlisten)
539
        except KeyboardInterrupt:
540
            self.shutdown()
541
        except BaseException as unhandled:
542
            self.unhandledexception(unhandled)
543
544
    def registerqueue(self, qregister: Queue):
545
        '''register a queue'''
546
        self.logdebug(self.stamp('Registered queue {0}'.format(qregister.queue_name)))
547
        if qregister.queue_name not in self._queues.keys():
548
            self._queues[qregister.queue_name] = qregister
549
550
    def shutdown(self, exitstatus=0):
551
        '''shut down app services'''
552
        self.loginfo('Shutting down fcm app...')
553
        self.close_channels()
554
        self.closequeues()
555
        if self.__bus:
556
            self.bus.close()
557
        if self.__cache is not None:
558
            self.__cache.close()
559
        sys.exit(exitstatus)
560
561
    def closequeue(self, thequeue):
562
        '''close the queue'''
563
        if not thequeue: return
564
        try:
565
            if thequeue is not None:
566
                self.logdebug(self.stamp('closing queue {0}'.format(thequeue.queue_name)))
567
                thequeue.close()
568
            del self._queues[thequeue.queue_name]
569
        except Exception as ex:
570
            self.logexception(ex)
571
572
    def closequeues(self):
573
        '''close a bunch of queues'''
574
        for k in list(self._queues):
575
            self.closequeue(self._queues[k])
576
577
    def close_channel(self, chan):
578
        if not chan: return
579
        try:
580
            if chan.name in self._channels:
581
                self.logdebug(self.stamp('closing channel {0}'.format(chan.name)))
582
                chan.close()
583
                del self._channels[chan.name]
584
        except Exception as ex:
585
            self.logexception(ex)
586
587
    def close_channels(self):
588
        '''close all channels'''
589
        for chan in list(self._channels):
590
            self.close_channel(self._channels[chan])
591
592
    def unhandledexception(self, unhandled):
593
        '''what to do when there is an exception that app cannot handle'''
594
        self.logexception(unhandled)
595
596
    def exceptionmessage(self, ex):
597
        '''gets exception message even when it doesnt have one'''
598
        exc_type, _, exc_tb = sys.exc_info()
599
        exmsg = getattr(ex, 'message', repr(ex))
600
        return '{0}:{1}:{2}'.format(exc_type, exc_tb.tb_lineno, exmsg)
601
602
    def logexception(self, ex):
603
        '''log an exception'''
604
        self.logerror(self.exceptionmessage(ex))
605
606
    def sendlog(self, logmessage):
607
        '''send message to log queue'''
608
        item = QueueEntry(QueueName.Q_LOG, logmessage, 'broadcast')
609
        self.sendqueueitem(item)
610
        print(logmessage)
611
612
    def subscribe(self, name, callback, no_acknowledge=True, prefetch=1):
613
        '''subscribe to a queue'''
614
        chan = self.bus.subscribe(name, callback, no_acknowledge=no_acknowledge, prefetch_count=prefetch)
615
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(name))
616
        return chan
617
618
    def listen_to_broadcast(self, broadcast_name, callback, no_acknowledge=True):
619
        thebroadcast = self.bus.subscribe_broadcast(broadcast_name, callback, no_acknowledge)
620
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(broadcast_name))
621
        self.bus.listen(thebroadcast)
622
        #never returns becuase listen is blocking
623
        return thebroadcast
624
625
    def trypublish(self, queue_name, msg: str):
626
        '''publish a message to the queue'''
627
        try:
628
            self.bus.publish(queue_name, msg)
629
            return True
630
        except pika.exceptions.ConnectionClosed as ex:
631
            logmessage = 'Error publishing to {0} {1}'.format(queue_name, self.exceptionmessage(ex))
632
            self.logerror(logmessage)
633
            return False
634
635
    def trybroadcast(self, exchange_name, msg):
636
        '''broadcast a message to all queue listeners'''
637
        try:
638
            self.bus.broadcast(exchange_name, msg)
639
            return True
640
        except pika.exceptions.ConnectionClosed as conxex:
641
            self.logerror('Error broadcasting to {0} {1}'.format(exchange_name, self.exceptionmessage(conxex)))
642
            return False
643
644
    def queuestatus(self):
645
        """TODO:print queue status"""
646
        pass
647
        #for q in self._queues.values():
648
        #    print(q.queue_name,str(q._connection))
649
650
    def putpool(self, pool: Pool):
651
        '''put pool in cache'''
652
        if pool and pool.name:
653
            valu = self.serialize(pool)
654
            self.tryputcache('pool.{0}'.format(pool.name), valu)
655
656
    def putminer(self, miner: Miner):
657
        '''put miner in cache'''
658
        if miner and miner.key():
659
            valu = self.serialize(miner)
660
            self.tryputcache('miner.{0}'.format(miner.key()), valu)
661
662
    def getminer(self, miner: Miner) -> Miner:
663
        '''strategies for getting miner from cache
664
        originally was key=miner.name but that was not good
665
        changed to key='miner.'+minerid
666
        '''
667
        valu = self.trygetvaluefromcache('miner.{0}'.format(miner.key()))
668
        if valu is None:
669
            return None
670
        minerfromstore = self.deserialize(MinerSchema(), self.safestring(valu))
671
        if not minerfromstore.key():
672
            #do not allow entry with no key
673
            return None
674
        minerfromstore.store = 'mem'
675
        return minerfromstore
676
677
    def getknownminer(self, miner: Miner) -> Miner:
678
        '''get a known miner'''
679
        return self.getknownminerbykey(miner.key())
680
681
    def getminerbyname(self, minername):
682
        filtered = [x for x in self.miners() if x.name == minername]
683
        if filtered: return filtered[0]
684
        return None
685
686
    def getknownminerbykey(self, minername):
687
        str_miner = self.__cache.getfromhashset(CacheKeys.knownminers, minername)
688
        if str_miner is None:
689
            return None
690
        return self.deserialize(MinerSchema(), self.safestring(str_miner))
691
692
    def getknownminerbyname(self, minername):
693
        '''this could be slow if there are lots of miners'''
694
        known = self.knownminers()
695
        for miner in known:
696
            if miner.name == minername:
697
                return miner
698
        return None
699
700
    def tryputcache(self, key, value):
701
        '''put value in cache key'''
702
        if value is None: return
703
        try:
704
            if self.__cache is not None:
705
                self.__cache.set(key, value)
706
        except redis.exceptions.ConnectionError as ex:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable redis does not seem to be defined.
Loading history...
707
            self.logexception(ex)
708
709
    def putminerandstats(self, miner: Miner, minerstats, minerpool):
710
        '''put miner and status in cache'''
711
        self.putminer(miner)
712
        schema = MinerStatsSchema()
713
        valstats = schema.dumps(minerstats).data
714
        self.tryputcache(miner.key() + '.stats', valstats)
715
        schema = MinerCurrentPoolSchema()
716
        valpool = schema.dumps(minerpool).data
717
        self.tryputcache(miner.key() + '.pool', valpool)
718
719
    def trygetvaluefromcache(self, key):
720
        '''get value from cache'''
721
        if self.__cache is not None:
722
            try:
723
                return self.__cache.get(key)
724
            except Exception as ex:
725
                self.logexception(ex)
726
        return None
727
728
    def getstats(self, miner: Miner):
729
        '''get stats entity'''
730
        valu = self.trygetvaluefromcache(miner.name + '.stats')
731
        if valu is None: return None
732
        entity = MinerStatistics(miner, **self.deserialize(MinerStatsSchema(), valu))
733
        return entity
734
735
    def safestring(self, thestring):
736
        '''safely convert anything into string'''
737
        if thestring is None: return None
738
        if isinstance(thestring, str): return thestring
739
        return str(thestring, "utf-8")
740
741
    def getminerstatscached(self):
742
        '''iterator for cached stats'''
743
        for miner in self.miners():
744
            yield (self.getminer(miner), self.getstats(miner), self.getpool(miner))
745
746
    def messagedecodeminer(self, body) -> Miner:
747
        '''deserialize a miner message'''
748
        message_envelope = self.deserializemessageenvelope(self.safestring(body))
749
        schema = MinerMessageSchema()
750
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
751
        minermessage_entity = schema.make_minermessage(minermessage_dict)
752
        miner = minermessage_entity.miner
753
        return miner
754
755
    def messagedecodeminerstats(self, body):
756
        '''deserialize miner stats'''
757
        message_envelope = self.deserializemessageenvelope(self.safestring(body))
758
        schema = MinerMessageSchema()
759
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
760
        minermessage_entity = schema.make_minermessage(minermessage_dict)
761
        return minermessage_entity
762
763
    def messagedecodeminercommand(self, body):
764
        '''deserialize  miner command'''
765
        message_envelope = self.deserializemessageenvelope(self.safestring(body))
766
        schema = MinerMessageSchema()
767
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
768
        minermessage_entity = schema.make_minermessage(minermessage_dict)
769
        return minermessage_entity
770
771
    def messagedecodesensor(self, body):
772
        '''deserialize sensor value '''
773
        message_envelope = self.deserializemessageenvelope(self.safestring(body))
774
        schema = SensorValueSchema()
775
        #minermessage_dict = schema.load(message_envelope.bodyjson()).data
776
        entity = schema.load(message_envelope.bodyjson()).data
777
        return message_envelope, entity
778
779
    def messagedecode_configuration(self, body):
780
        '''deserialize  configuration command'''
781
        message_envelope = self.deserializemessageenvelope(self.safestring(body))
782
        schema = ConfigurationMessageSchema()
783
        configurationmessage_dict = schema.load(message_envelope.bodyjson()).data
784
        configurationmessage_entity = schema.make_configurationmessage(configurationmessage_dict)
785
        return configurationmessage_entity
786
787
    def createmessageenvelope(self):
788
        '''create message envelope'''
789
        return Message(timestamp=datetime.datetime.utcnow(), sender=self.component)
790
791
    def serializemessageenvelope(self, msg):
792
        '''serialize message envelope'''
793
        return self._schemamsg.dumps(msg).data
794
795
    def jsonserialize(self, sch, msg):
796
        '''serialize a message with schema. returns string'''
797
        smessage = sch.dumps(msg)
798
        #json.dumps(jmessage)
799
        return smessage.data
800
801
    def serialize(self, entity):
802
        '''serialize any entity
803
        only need schema, message class not needed
804
        '''
805
        if isinstance(entity, Miner):
806
            schema = MinerSchema()
807
            return schema.dumps(entity).data
808
809
        if isinstance(entity, Pool):
810
            schema = PoolSchema()
811
            return schema.dumps(entity).data
812
        return None
813
814
    def serializelist(self, listofentities):
815
        '''serialize a list of entities'''
816
        json_list = json.dumps([e.__dict__ for e in listofentities])
817
        return json_list
818
819
    def deserializelistofstrings(self, the_list):
820
        '''deserialize list of strings into list of miners'''
821
        results = []
822
        for item in the_list:
823
            miner = self.deserialize(MinerSchema(), self.safestring(item))
824
            results.append(miner)
825
        return results
826
827
    def deserializelist_withschema(self, schema, the_list):
828
        '''deserialize list of strings into entities'''
829
        results = []
830
        for item in the_list:
831
            entity = self.deserialize(schema, self.safestring(item))
832
            #todo:for pools the entry is a list
833
            results.append(entity)
834
        return results
835
836
    def deserialize(self, sch, msg):
837
        '''Output should be entity, not python json object
838
        msg parameter should be string
839
        '''
840
        if msg is None: return None
841
        return sch.loads(msg).data
842
843
    def deserializemessageenvelope(self, body):
844
        '''serialize message envelope'''
845
        return self._schemamsg.load(json.loads(self.safestring(body))).data
846
847
    def createmessagestats(self, miner, minerstats, minerpool):
848
        #always save the miner so the next guy can get latest changes
849
        #only put in cache if it came from cache
850
        if miner.store == 'mem':
851
            self.putminer(miner)
852
        message = self.createmessageenvelope()
853
        message = message.make_minerstats(miner, minerstats, minerpool)
854
        return self.serializemessageenvelope(message)
855
856
    def createmessagecommand(self, miner, command):
857
        '''create message command'''
858
        if miner.store == 'mem':
859
            self.putminer(miner)
860
        message = self.createmessageenvelope()
861
        message = message.make_minercommand(miner, command)
862
        return self.serializemessageenvelope(message)
863
864
    def messageencode(self, miner: Miner):
865
        '''command is optional, however should convert this call into minercommand'''
866
        #always save the miner so the next guy can get latest changes
867
        if miner.store == 'mem':
868
            self.putminer(miner)
869
        message = self.createmessageenvelope()
870
        message = message.make_minerstats(miner, minerstats=None, minerpool=None)
871
        return self._schemamsg.dumps(message).data
872
873
    def stamp(self, message):
874
        return '{0}:{1}: {2}'.format(self.now(), self.applicationid, message)
875
876
    def alert(self, message):
877
        '''send alert message'''
878
        return self.sendqueueitem(QueueEntry(QueueName.Q_ALERT, self.stamp(message), QueueType.broadcast))
879
880
    def send(self, q_name, message):
881
        '''send message to queue'''
882
        success = self.trypublish(q_name, message)
883
        return success
884
885
    def enqueue(self, queuelist):
886
        '''send a list of queue messages'''
887
        if queuelist is None:
888
            return
889
        if not queuelist.hasentries():
890
            return
891
        #todo: group by queuename
892
        for entry in queuelist.entries:
893
            self.sendqueueitem(entry)
894
895
    def sendqueueitem(self, entry):
896
        '''send one queue item'''
897
        if entry.eventtype == 'broadcast':
898
            send_result = self.trybroadcast(entry.queuename, entry.message)
899
            return send_result
900
        return self.send(entry.queuename, entry.message)
901
902
    def take_picture(self, file_name='fullcycle_camera.png'):
903
        pic = take_picture(file_name,
904
                           self.configuration('camera.size'),
905
                           self.configuration('camera.quality'),
906
                           self.configuration('camera.brightness'),
907
                           self.configuration('camera.contrast'))
908
        return pic
909
910
    def store_picture_cache(self, file_name):
911
        if os.path.isfile(file_name):
912
            with open(file_name, 'rb') as photofile:
913
                picdata = photofile.read()
914
            reading = SensorValue('fullcyclecamera', base64.b64encode(picdata), 'camera')
915
            #reading.sensor = self.sensor
916
            #self.sendsensor(reading)
917
            message = self.createmessageenvelope()
918
            sensorjson = message.jsonserialize(SensorValueSchema(), reading)
919
            self.tryputcache(CacheKeys.camera, sensorjson)
920
921
    def readtemperature(self):
922
        try:
923
            sensor_humid, sensor_temp = readtemperature()
924
            if sensor_temp is not None:
925
                reading = SensorValue('fullcycletemp', sensor_temp, 'temperature')
926
                reading.sensor = self.sensor
927
                self.sendsensor(reading)
928
            if sensor_humid is not None:
929
                reading = SensorValue('fullcyclehumid', sensor_humid, 'humidity')
930
                reading.sensor = self.sensor
931
                self.sendsensor(reading)
932
            return sensor_humid, sensor_temp
933
        except BaseException as ex:
934
            self.logexception(ex)
935
        return None, None
936
937
    def sendsensor(self, reading):
938
        message = self.createmessageenvelope()
939
        sensorjson = message.jsonserialize(SensorValueSchema(), reading)
940
        self.sendqueueitem(QueueEntry(QueueName.Q_SENSOR, self.serializemessageenvelope(message.make_any('sensorvalue', sensorjson)), QueueType.broadcast))
941
942
    def sendtelegrammessage(self, message):
943
        if self.is_enabled_configuration('telegram'):
944
            sendalert(message, self.getservice('telegram'))
945
946
    def sendtelegramphoto(self, file_name):
947
        if os.path.isfile(file_name) and os.path.getsize(file_name) > 0:
948
            if self.is_enabled_configuration('telegram'):
949
                sendphoto(file_name, self.getservice('telegram'))
950
951
    def getsession(self):
952
        service = self.getservice(ServiceName.database)
953
        engine = create_engine(service.connection, echo=False)
954
        Session = sessionmaker(bind=engine)
955
        return Session()
956
957
    def log_mineractivity(self, minerlog):
958
        try:
959
            session = self.getsession()
960
            session.add(minerlog)
961
            session.commit()
962
            return True
963
        except BaseException as ex:
964
            self.logexception(ex)
965
        return False
966
967
    def save_miner(self, miner: Miner):
968
        found = self.getknownminer(miner)
969
        if found is None:
970
            self.addknownminer(miner)
971
            #miners = MinerRepository()
972
            #todo:add the miner to the json config
973
        else:
974
            found.updatefrom(miner)
975
            self.putminer(found)
976
977
    def save_pool(self, pool: Pool):
978
        sch = PoolSchema()
979
        pools = PoolRepository()
980
        pools.add(pool, self.getconfigfilename('config/pools.conf'), sch)
981
982
        #update the known pools
983
        for known in self.knownpools():
984
            if pool.is_same_as(known):
985
                oldkey = known.key
986
                known.named_pool = pool
987
                #this changes the pool key!
988
                known.user = pool.user
989
                #update the known pool (with new key)
990
                self.update_pool(oldkey, known)
991
992
def main():
993
    full_cycle = ApplicationService()
994
    full_cycle.loginfo('Full Cycle was run in a script')
995
    full_cycle.shutdown()
996
997
if __name__ == "__main__":
998
    main()
999