Completed
Push — master ( 43cb7b...7de6a0 )
by Dave
02:10
created

backend.fcmapp.ApplicationService.stamp()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
'''Application Service layer for Full Cycle Mining
2
Gateway into most of application functionality'''
3
import sys
4
import os
5
import datetime
6
import logging
7
import json
8
import base64
9
import redis
10
from collections import defaultdict
11
from colorama import init, Fore
12
from sqlalchemy.orm import sessionmaker
13
from sqlalchemy import create_engine
14
from domain.mining import Miner, Pool, MinerPool, AvailablePool, MinerCurrentPool, MinerStatistics, MinerStatus
15
from domain.rep import MinerRepository, PoolRepository, LoginRepository, RuleParametersRepository, BaseRepository
16
#from domain.miningrules import RuleParameters
17
from domain.sensors import Sensor, SensorValue
18
from messaging.messages import Message, MessageSchema, MinerMessageSchema, ConfigurationMessageSchema
19
from messaging.sensormessages import SensorValueSchema
20
from messaging.schema import MinerSchema, MinerStatsSchema, MinerCurrentPoolSchema, AvailablePoolSchema, PoolSchema
21
from helpers.queuehelper import QueueName, Queue, QueueEntry, QueueType
22
from helpers.camerahelper import take_picture
23
from helpers.antminerhelper import setminertoprivileged, privileged, setprivileged, setrestricted, waitforonline, restartmining, stopmining, restart, set_frequency
24
from helpers.temperaturehelper import readtemperature
25
from helpers.telegramhelper import sendalert, sendphoto
26
from backend.fcmcache import Cache, CacheKeys
27
from backend.fcmbus import Bus
28
29
class ComponentName:
30
    '''names of components, corresponds to queue login names'''
31
    fullcycle = 'fullcycle'
32
    rules = 'rules'
33
34
class Component(object):
35
    '''A component is a unit of execution of FCM'''
36
    def __init__(self, componentname, option=''):
37
        self.app = ApplicationService(component=componentname, option=option)
38
        #was a queue, now its a channel
39
        self.listeningqueue = None
40
41
    def listen(self):
42
        if self.listeningqueue:
43
            self.app.bus.listen(self.listeningqueue)
44
45
class ServiceName:
46
    '''names of infrastructure services'''
47
    messagebus = 'rabbit'
48
    cache = 'redis'
49
    database = 'mysql'
50
    email = 'gmail'
51
52
class InfrastructureService:
53
    '''configuration for a dependency'''
54
    name = ''
55
    connection = ''
56
    host = ''
57
    port = ''
58
    user = ''
59
    password = ''
60
    def __init__(self, name, connection, host, port, user, password):
61
        self.name = name
62
        self.connection = connection
63
        self.host = host
64
        self.port = port
65
        self.user = user
66
        self.password = password
67
68
69
class Antminer():
70
    def __init__(self, config, login):
71
        self.__config = config
72
        self.__login = login
73
74
    def set_privileged(self, miner):
75
        setprivileged(miner, self.__login, self.__config['provision.apiallow.privileged'])
76
77
    def setminertoprivileged(self, miner):
78
        return setminertoprivileged(miner, self.__login, self.__config['provision.apiallow.privileged'])
79
80
    def set_restricted(self, miner):
81
        setrestricted(miner, self.__login, self.__config['provision.apiallow.restricted'])
82
83
    def waitforonline(self, miner):
84
        return waitforonline(miner)
85
86
    def getaccesslevel(self, miner):
87
        return privileged(miner)
88
89
    def restart(self, miner):
90
        return restart(miner)
91
92
    def stopmining(self, miner):
93
        '''stop miner through ssh.'''
94
        return stopmining(miner, self.__login)
95
96
    def restartssh(self, miner):
97
        '''restart miner through ssh. start mining again'''
98
        return restartmining(miner, self.__login)
99
100
    def set_frequency(self, miner, frequency):
101
        return set_frequency(miner, self.__login, frequency)
102
103
class ApplicationService:
104
    '''Application Services'''
105
    programnamefull = ''
106
    programname = ''
107
    component = ComponentName.fullcycle
108
    loglevel = 0
109
    #true if user passed in -now command line argument
110
    isrunnow = False
111
    #dictionary of queues managed by this app
112
    _queues = {}
113
    _channels = []
114
    #the startup directory
115
    homedirectory = None
116
    __cache = None
117
    __bus = None
118
    __config = {}
119
    __logger = None
120
    __logger_debug = None
121
    __logger_error = None
122
    antminer = None
123
124
    def __init__(self, component=ComponentName.fullcycle, option=None, announceyourself=False):
125
        if self.component == ComponentName.fullcycle:
126
            self.print('Starting FCM Init')
127
        self.component = component
128
        self.initargs(option)
129
        self.startupstuff()
130
        if self.component == ComponentName.fullcycle:
131
            self.print('Starting FCM Configuration')
132
        self.setup_configuration()
133
        self.initlogger()
134
        self.initmessaging()
135
        #this is slow. should be option to opt out of cache?
136
        if self.component == ComponentName.fullcycle:
137
            self.loginfo('Starting FCM Cache')
138
        self.initcache()
139
        self.initbus()
140
        self.init_application()
141
        self.init_sensors()
142
143
        if announceyourself:
144
            self.sendqueueitem(QueueEntry(QueueName.Q_LOG, self.stamp('Started {0}'.format(self.component)), QueueType.broadcast))
145
146
    def initargs(self, option):
147
        '''process command line arguments'''
148
        if sys.argv:
149
            self.programnamefull = sys.argv[0]
150
            self.programname = os.path.basename(self.programnamefull)
151
        firstarg = option
152
        if len(sys.argv) > 1:
153
            firstarg = sys.argv[1]
154
        if firstarg is not None:
155
            if firstarg == '-now':
156
                self.isrunnow = True
157
158
    def startupstuff(self):
159
        '''start up the application'''
160
        self.homedirectory = os.path.dirname(__file__)
161
        #used with colorama on windows
162
        init(autoreset=True)
163
164
    def initmessaging(self):
165
        '''start up messaging'''
166
        self._schemamsg = MessageSchema()
167
168
    def initcache(self):
169
        '''start up cache'''
170
        try:
171
            cachelogin = self.getservice(ServiceName.cache)
172
            self.__cache = Cache(cachelogin)
173
        except Exception as ex:
174
            #cache is offline. try to run in degraded mode
175
            self.logexception(ex)
176
177
    def startup(self):
178
        self.initminercache()
179
        self.initpoolcache()
180
181
    def initbus(self):
182
        '''start up message bus'''
183
        login = self.getservice(ServiceName.messagebus)
184
        self.__bus = Bus(login)
185
186
    @property
187
    def bus(self):
188
        return self.__bus
189
190
    def init_sensors(self):
191
        self.sensor = Sensor('controller', 'DHT22', 'controller')
192
193
    def init_application(self):
194
        self.antminer = Antminer(self.__config, self.sshlogin())
195
196
    @property
197
    def isdebug(self):
198
        return sys.flags.debug
199
200
    def getconfigfilename(self, configfilename):
201
        '''get the contents of a config file'''
202
        return os.path.join(self.homedirectory, configfilename)
203
204
    def setup_configuration(self):
205
        '''configuration is loaded once at startup'''
206
        raw = BaseRepository().readrawfile(self.getconfigfilename('config/fullcycle.conf'))
207
        self.__config = json.loads(raw)
208
        self.applicationid = self.configuration('applicationid')
209
        self.loglevel = self.configuration('loglevel')
210
211
    def configuration(self, key):
212
        if not key in self.__config: return None
213
        return self.__config[key]
214
215
    def is_enabled_configuration(self, key):
216
        lookupkey = '{0}.enabled'.format(key)
217
        if not lookupkey in self.__config:
218
            return False
219
        value = self.__config[lookupkey]
220
        if isinstance(value, str):
221
            return value == 'true' or value == 'True'
222
        return value
223
224
    def initpoolcache(self):
225
        if self.__cache.get(CacheKeys.pools) is None:
226
            spools = PoolRepository().readrawfile(self.getconfigfilename('config/pools.conf'))
227
            self.tryputcache(CacheKeys.pools, spools)
228
        for pool in self.pools():
229
            #pool isinstance of Pool
230
            availablepool = AvailablePool(pool.pool_type, pool, pool.url, pool.user, pool.password, pool.priority)
231
            minerpool = MinerPool(miner=None, priority=0, pool=availablepool)
232
            self.putpool(pool)
233
            self.add_pool(minerpool)
234
235
    def initminercache(self):
236
        '''put known miners into cache'''
237
        if self.__cache.get(CacheKeys.miners) is None:
238
            sminers = MinerRepository().readrawfile(self.getconfigfilename('config/miners.conf'))
239
            self.tryputcache(CacheKeys.miners, sminers)
240
241
        for miner in self.miners():
242
            #status is not persisted yet so init from name
243
            if miner.is_manually_disabled():
244
                miner.status = MinerStatus.Disabled
245
            if self.getminer(miner) is None:
246
                self.putminer(miner)
247
248
    def cacheclear(self):
249
        '''clear the cache'''
250
        self.__cache.purge()
251
252
    def initlogger(self):
253
        '''set up logging application info'''
254
        self.__logger = self.setup_logger('fcmapp', 'fcm.log', logging.INFO)
255
256
        self.__logger_debug = self.setup_logger('fcmdebug', 'fcm.bug', logging.DEBUG)
257
258
        self.__logger_error = self.setup_logger('fcmerror', 'fcm.err', logging.ERROR)
259
260
    def setup_logger(self, logger_name, log_file, level=logging.INFO):
261
        '''start logger'''
262
        logr = logging.getLogger(logger_name)
263
        formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
264
        #by default will append. use mode='w' to overwrite
265
        file_handler = logging.FileHandler(log_file)
266
        file_handler.setFormatter(formatter)
267
        logr.addHandler(file_handler)
268
        # is setting stream necessary
269
        stream_handler = logging.StreamHandler()
270
        stream_handler.setFormatter(formatter)
271
        logr.setLevel(level)
272
        return logr
273
274
    def loginfo(self, message):
275
        '''log informational message'''
276
        logmsg = '{0}: {1}'.format(self.programname, message)
277
        self.__logger.info(logmsg)
278
        print(message)
279
280
    def logerror(self, message):
281
        '''log error'''
282
        logmsg = '{0}: {1}'.format(self.programname, message)
283
        self.__logger_error.error(logmsg)
284
        print(Fore.RED+logmsg)
285
286
    def logdebug(self, message):
287
        '''log debug message'''
288
        if not self.loglevel or self.loglevel == 0:
289
            return
290
        logmsg = '{0}: {1}'.format(self.programname, message)
291
        self.__logger_debug.debug(logmsg)
292
        print(Fore.GREEN+logmsg)
293
294
    def print(self, message):
295
        '''echo message to screen'''
296
        print('{0}: {1}'.format(self.now(), message))
297
298
    def now(self):
299
        '''current time formatted as friendly string'''
300
        return self.formattime(datetime.datetime.now())
301
302
    def formattime(self, time):
303
        '''standard format for time'''
304
        return time.strftime('%Y-%m-%d %H:%M:%S')
305
306
    #region lookups
307
    #todo: move to configurations section
308
    def miners(self):
309
        '''configured miners'''
310
        miners = MinerRepository().readminers(self.getconfigfilename('config/miners.conf'))
311
        return miners
312
313
    def knownminers(self):
314
        '''for now just return a list of miners
315
        later should consider returning a list that is easily searched and filtered
316
        '''
317
        dknownminers = self.__cache.gethashset(CacheKeys.knownminers)
318
        if dknownminers is not None and dknownminers:
319
            #get list of miners from cache
320
            return self.deserializelistofstrings(list(dknownminers.values()))
321
        knownminers = self.miners()
322
        return knownminers
323
324
    def allminers(self):
325
        '''combined list of discovered miners and configured miners'''
326
        allminers = self.knownminers()
327
        for miner in self.miners():
328
            foundminer = [x for x in allminers if x.key() == miner.key()]
329
            if not foundminer:
330
                allminers.append(miner)
331
        return allminers
332
333
    def knownsensors(self):
334
        dknownsensors = self.__cache.gethashset(CacheKeys.knownsensors)
335
        if dknownsensors is not None and dknownsensors:
336
            return self.deserializelist_withschema(SensorValueSchema(), list(dknownsensors.values()))
337
        return None
338
339
    def knownpools(self):
340
        dknownpools = self.__cache.gethashset(CacheKeys.knownpools)
341
        if dknownpools:
342
            return self.deserializelist_withschema(AvailablePoolSchema(), list(dknownpools.values()))
343
        return None
344
345
    def addknownsensor(self, sensorvalue):
346
        val = self.jsonserialize(SensorValueSchema(), sensorvalue)
347
        self.__cache.putinhashset(CacheKeys.knownsensors, sensorvalue.sensorid, val)
348
349
    def minersummary(self, max_number=10):
350
        '''show a summary of known miners
351
        '''
352
        mode = self.configuration('summary')
353
        if not mode: mode = 'auto'
354
        knownminers = self.knownminers()
355
        if len(knownminers) <= max_number:
356
            return '\n'.join([m.summary() for m in knownminers])
357
        groupbystatus = defaultdict(list)
358
        for miner in knownminers:
359
            groupbystatus[miner.status].append(miner)
360
        return '\n'.join(['{0}: {1}'.format(s, self.summary_by_status(s, groupbystatus[s])) for s in groupbystatus])
361
362
    def summary_by_status(self, key, minerlist):
363
        if key == 'online':
364
            return '{0} miners hash {1}'.format(self.summarize_count(minerlist), self.summarize_hash(minerlist))
365
        return self.summarize_count(minerlist)
366
367
    def summarize_count(self, minerlist):
368
        return len(minerlist)
369
370
    def summarize_hash(self, minerlist):
371
        return sum(miner.minerstats.currenthash for miner in minerlist)
372
373
    def addknownminer(self, miner):
374
        '''add miner to known miners list'''
375
        val = self.serialize(miner)
376
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
377
378
    def updateknownminer(self, miner):
379
        '''update known miner in cache'''
380
        sminer = self.__cache.getfromhashset(CacheKeys.knownminers, miner.key())
381
        memminer = self.deserialize(MinerSchema(), self.safestring(sminer))
382
        if memminer is None:
383
            memminer = miner
384
        else:
385
            #merge new values
386
            memminer.updatefrom(miner)
387
        val = self.serialize(memminer)
388
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
389
390
    #region Pools
391
    def pools(self):
392
        '''configured pools'''
393
        pools = PoolRepository().readpools(self.getconfigfilename('config/pools.conf'))
394
        return pools
395
396
    def findpool(self, minerpool):
397
        '''find a pool in list'''
398
        if minerpool is None: return None
399
        for pool in self.pools():
400
            if minerpool.currentpool == pool.url and minerpool.currentworker.startswith(pool.user):
401
                return pool
402
        return None
403
404
    def getpool(self, miner: Miner):
405
        '''get pool from cache'''
406
        valu = self.trygetvaluefromcache(miner.name + '.pool')
407
        if valu is None: return None
408
        entity = MinerCurrentPool(miner, **self.deserialize(MinerCurrentPoolSchema(), valu))
409
        return entity
410
411
    def add_pool(self, minerpool: MinerPool):
412
        '''see if pool is known or not, then add it'''
413
        knownpool = self.__cache.getfromhashset(CacheKeys.knownpools, minerpool.pool.key)
414
        if not knownpool:
415
            val = self.jsonserialize(AvailablePoolSchema(), minerpool.pool)
416
            self.__cache.putinhashset(CacheKeys.knownpools, minerpool.pool.key, val)
417
418
    def update_pool(self, key, pool: AvailablePool):
419
        self.__cache.hdel(CacheKeys.knownpools, key)
420
        knownpool = self.__cache.getfromhashset(CacheKeys.knownpools, pool.key)
421
        if not knownpool:
422
            val = self.jsonserialize(AvailablePoolSchema(), pool)
423
            self.__cache.putinhashset(CacheKeys.knownpools, pool.key, val)
424
    #endregion
425
426
    def sshlogin(self):
427
        '''return contents of login file'''
428
        return self.readlogin('ftp.conf')
429
430
    def readlogin(self, filename):
431
        '''read login file configuration'''
432
        login = LoginRepository().readlogins(self.getconfigfilename('config/'+filename))
433
        return login
434
435
    def ruleparameters(self):
436
        '''rules parameters'''
437
        return RuleParametersRepository().readrules(self.getconfigfilename('config/'+'rules.conf'))
438
439
    def getservice(self, servicename):
440
        '''get a service by name. should be repository'''
441
        file_name = self.getconfigfilename('config/services.conf')
442
        with open(file_name, encoding='utf-8-sig') as config_file:
443
            content = json.loads(config_file.read())
444
        services = [InfrastructureService(**s) for s in content]
445
        return next((s for s in services if s.name == servicename), None)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable s does not seem to be defined.
Loading history...
446
447
    def getservice_useroverride(self, servicename):
448
        service = self.getservice(servicename)
449
        service.user = self.component
450
        return service
451
    #endregion lookups
452
453
    def listen(self, qlisten):
454
        """Goes into listening mode on a queue"""
455
        try:
456
            self.bus.listen(qlisten)
457
        except KeyboardInterrupt:
458
            self.shutdown()
459
        except BaseException as unhandled:
460
            self.unhandledexception(unhandled)
461
462
    def registerqueue(self, qregister: Queue):
463
        '''register a queue'''
464
        self.logdebug(self.stamp('Registered queue {0}'.format(qregister.queue_name)))
465
        if qregister.queue_name not in self._queues.keys():
466
            self._queues[qregister.queue_name] = qregister
467
468
    def shutdown(self, exitstatus=0):
469
        '''shut down app services'''
470
        self.loginfo('Shutting down fcm app...')
471
        self.close_channels()
472
        self.closequeues()
473
        if self.__bus:
474
            self.bus.close()
475
        if self.__cache is not None:
476
            self.__cache.close()
477
        sys.exit(exitstatus)
478
479
    def closequeue(self, thequeue):
480
        '''close the queue'''
481
        if not thequeue: return
482
        try:
483
            if thequeue is not None:
484
                self.logdebug(self.stamp('closing queue {0}'.format(thequeue.queue_name)))
485
                thequeue.close()
486
            del self._queues[thequeue.queue_name]
487
        except Exception as ex:
488
            self.logexception(ex)
489
490
    def closequeues(self):
491
        '''close a bunch of queues'''
492
        for k in list(self._queues):
493
            self.closequeue(self._queues[k])
494
495
    def close_channel(self, chan):
496
        if not chan: return
497
        try:
498
            if chan.name in self._channels:
499
                self.logdebug(self.stamp('closing channel {0}'.format(chan.name)))
500
                chan.close()
501
                del self._channels[chan.name]
502
        except Exception as ex:
503
            self.logexception(ex)
504
505
    def close_channels(self):
506
        '''close all channels'''
507
        for chan in list(self._channels):
508
            self.close_channel(self._channels[chan])
509
510
    def unhandledexception(self, unhandled):
511
        '''what to do when there is an exception that app cannot handle'''
512
        self.logexception(unhandled)
513
514
    def exceptionmessage(self, ex):
515
        '''gets exception message even when it doesnt have one'''
516
        exc_type, _, exc_tb = sys.exc_info()
517
        exmsg = getattr(ex, 'message', repr(ex))
518
        return '{0}:{1}:{2}'.format(exc_type, exc_tb.tb_lineno, exmsg)
519
520
    def logexception(self, ex):
521
        '''log an exception'''
522
        self.logerror(self.exceptionmessage(ex))
523
524
    def sendlog(self, logmessage):
525
        '''send message to log queue'''
526
        item = QueueEntry(QueueName.Q_LOG, logmessage, 'broadcast')
527
        self.sendqueueitem(item)
528
        print(logmessage)
529
530
    def subscribe(self, name, callback, no_acknowledge=True, prefetch=1):
531
        '''subscribe to a queue'''
532
        chan = self.bus.subscribe(name, callback, no_acknowledge=no_acknowledge, prefetch_count=prefetch)
533
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(name))
534
        return chan
535
536
    def listen_to_broadcast(self, broadcast_name, callback, no_acknowledge=True):
537
        thebroadcast = self.bus.subscribe_broadcast(broadcast_name, callback, no_acknowledge)
538
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(broadcast_name))
539
        self.bus.listen(thebroadcast)
540
        #never returns becuase listen is blocking
541
        return thebroadcast
542
543
    def trypublish(self, queue_name, msg: str):
544
        '''publish a message to the queue'''
545
        try:
546
            self.bus.publish(queue_name, msg)
547
            return True
548
        except pika.exceptions.ConnectionClosed as ex:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable pika does not seem to be defined.
Loading history...
549
            logmessage = 'Error publishing to {0} {1}'.format(queue_name, self.exceptionmessage(ex))
550
            self.logerror(logmessage)
551
            return False
552
553
    def trybroadcast(self, exchange_name, msg):
554
        '''broadcast a message to all queue listeners'''
555
        try:
556
            self.bus.broadcast(exchange_name, msg)
557
            return True
558
        except pika.exceptions.ConnectionClosed as conxex:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable pika does not seem to be defined.
Loading history...
559
            self.logerror('Error broadcasting to {0} {1}'.format(exchange_name, self.exceptionmessage(conxex)))
560
            return False
561
562
    def queuestatus(self):
563
        """TODO:print queue status"""
564
        pass
565
        #for q in self._queues.values():
566
        #    print(q.queue_name,str(q._connection))
567
568
    def putpool(self, pool: Pool):
569
        '''put pool in cache'''
570
        if pool and pool.name:
571
            valu = self.serialize(pool)
572
            self.tryputcache('pool.{0}'.format(pool.name), valu)
573
574
    def putminer(self, miner: Miner):
575
        '''put miner in cache'''
576
        if miner and miner.key():
577
            valu = self.serialize(miner)
578
            self.tryputcache('miner.{0}'.format(miner.key()), valu)
579
580
    def getminer(self, miner: Miner) -> Miner:
581
        '''strategies for getting miner from cache
582
        originally was key=miner.name but that was not good
583
        changed to key='miner.'+minerid
584
        '''
585
        valu = self.trygetvaluefromcache('miner.{0}'.format(miner.key()))
586
        if valu is None:
587
            return None
588
        minerfromstore = self.deserialize(MinerSchema(), self.safestring(valu))
589
        if not minerfromstore.key():
590
            #do not allow entry with no key
591
            return None
592
        minerfromstore.store = 'mem'
593
        return minerfromstore
594
595
    def getknownminer(self, miner: Miner) -> Miner:
596
        '''get a known miner'''
597
        return self.getknownminerbykey(miner.key())
598
599
    def getminerbyname(self, minername):
600
        filtered = [x for x in self.miners() if x.name == minername]
601
        if filtered: return filtered[0]
602
        return None
603
604
    def getknownminerbykey(self, minername):
605
        str_miner = self.__cache.getfromhashset(CacheKeys.knownminers, minername)
606
        if str_miner is None:
607
            return None
608
        return self.deserialize(MinerSchema(), self.safestring(str_miner))
609
610
    def getknownminerbyname(self, minername):
611
        '''this could be slow if there are lots of miners'''
612
        known = self.knownminers()
613
        for miner in known:
614
            if miner.name == minername:
615
                return miner
616
        return None
617
618
    def tryputcache(self, key, value):
619
        '''put value in cache key'''
620
        if value is None: return
621
        try:
622
            if self.__cache is not None:
623
                self.__cache.set(key, value)
624
        except redis.exceptions.ConnectionError as ex:
625
            self.logexception(ex)
626
627
    def putminerandstats(self, miner: Miner, minerstats, minerpool):
628
        '''put miner and status in cache'''
629
        self.putminer(miner)
630
        schema = MinerStatsSchema()
631
        valstats = schema.dumps(minerstats).data
632
        self.tryputcache(miner.key() + '.stats', valstats)
633
        schema = MinerCurrentPoolSchema()
634
        valpool = schema.dumps(minerpool).data
635
        self.tryputcache(miner.key() + '.pool', valpool)
636
637
    def trygetvaluefromcache(self, key):
638
        '''get value from cache'''
639
        if self.__cache is not None:
640
            try:
641
                return self.__cache.get(key)
642
            except Exception as ex:
643
                self.logexception(ex)
644
        return None
645
646
    def getstats(self, miner: Miner):
647
        '''get stats entity'''
648
        valu = self.trygetvaluefromcache(miner.name + '.stats')
649
        if valu is None: return None
650
        entity = MinerStatistics(miner, **self.deserialize(MinerStatsSchema(), valu))
651
        return entity
652
653
    def safestring(self, thestring):
654
        '''safely convert anything into string'''
655
        if thestring is None: return None
656
        if isinstance(thestring, str): return thestring
657
        return str(thestring, "utf-8")
658
659
    def getminerstatscached(self):
660
        '''iterator for cached stats'''
661
        for miner in self.miners():
662
            yield (self.getminer(miner), self.getstats(miner), self.getpool(miner))
663
664
    def messagedecodeminer(self, body) -> Miner:
665
        '''deserialize a miner message'''
666
        message_envelope = self.deserializemessageenvelope(self.safestring(body))
667
        schema = MinerMessageSchema()
668
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
669
        minermessage_entity = schema.make_minermessage(minermessage_dict)
670
        miner = minermessage_entity.miner
671
        return miner
672
673
    def messagedecodeminerstats(self, body):
674
        '''deserialize miner stats'''
675
        message_envelope = self.deserializemessageenvelope(self.safestring(body))
676
        schema = MinerMessageSchema()
677
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
678
        minermessage_entity = schema.make_minermessage(minermessage_dict)
679
        return minermessage_entity
680
681
    def messagedecodeminercommand(self, body):
682
        '''deserialize  miner command'''
683
        message_envelope = self.deserializemessageenvelope(self.safestring(body))
684
        schema = MinerMessageSchema()
685
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
686
        minermessage_entity = schema.make_minermessage(minermessage_dict)
687
        return minermessage_entity
688
689
    def messagedecodesensor(self, body):
690
        '''deserialize sensor value '''
691
        message_envelope = self.deserializemessageenvelope(self.safestring(body))
692
        schema = SensorValueSchema()
693
        #minermessage_dict = schema.load(message_envelope.bodyjson()).data
694
        entity = schema.load(message_envelope.bodyjson()).data
695
        return message_envelope, entity
696
697
    def messagedecode_configuration(self, body):
698
        '''deserialize  configuration command'''
699
        message_envelope = self.deserializemessageenvelope(self.safestring(body))
700
        schema = ConfigurationMessageSchema()
701
        configurationmessage_dict = schema.load(message_envelope.bodyjson()).data
702
        configurationmessage_entity = schema.make_configurationmessage(configurationmessage_dict)
703
        return configurationmessage_entity
704
705
    def createmessageenvelope(self):
706
        '''create message envelope'''
707
        return Message(timestamp=datetime.datetime.utcnow(), sender=self.component)
708
709
    def serializemessageenvelope(self, msg):
710
        '''serialize message envelope'''
711
        return self._schemamsg.dumps(msg).data
712
713
    def jsonserialize(self, sch, msg):
714
        '''serialize a message with schema. returns string'''
715
        smessage = sch.dumps(msg)
716
        #json.dumps(jmessage)
717
        return smessage.data
718
719
    def serialize(self, entity):
720
        '''serialize any entity
721
        only need schema, message class not needed
722
        '''
723
        if isinstance(entity, Miner):
724
            schema = MinerSchema()
725
            return schema.dumps(entity).data
726
727
        if isinstance(entity, Pool):
728
            schema = PoolSchema()
729
            return schema.dumps(entity).data
730
        return None
731
732
    def serializelist(self, listofentities):
733
        '''serialize a list of entities'''
734
        json_list = json.dumps([e.__dict__ for e in listofentities])
735
        return json_list
736
737
    def deserializelistofstrings(self, the_list):
738
        '''deserialize list of strings into list of miners'''
739
        results = []
740
        for item in the_list:
741
            miner = self.deserialize(MinerSchema(), self.safestring(item))
742
            results.append(miner)
743
        return results
744
745
    def deserializelist_withschema(self, schema, the_list):
746
        '''deserialize list of strings into entities'''
747
        results = []
748
        for item in the_list:
749
            entity = self.deserialize(schema, self.safestring(item))
750
            #todo:for pools the entry is a list
751
            results.append(entity)
752
        return results
753
754
    def deserialize(self, sch, msg):
755
        '''Output should be entity, not python json object
756
        msg parameter should be string
757
        '''
758
        if msg is None: return None
759
        return sch.loads(msg).data
760
761
    def deserializemessageenvelope(self, body):
762
        '''serialize message envelope'''
763
        return self._schemamsg.load(json.loads(self.safestring(body))).data
764
765
    def createmessagestats(self, miner, minerstats, minerpool):
766
        #always save the miner so the next guy can get latest changes
767
        #only put in cache if it came from cache
768
        if miner.store == 'mem':
769
            self.putminer(miner)
770
        message = self.createmessageenvelope()
771
        message = message.make_minerstats(miner, minerstats, minerpool)
772
        return self.serializemessageenvelope(message)
773
774
    def createmessagecommand(self, miner, command):
775
        '''create message command'''
776
        if miner.store == 'mem':
777
            self.putminer(miner)
778
        message = self.createmessageenvelope()
779
        message = message.make_minercommand(miner, command)
780
        return self.serializemessageenvelope(message)
781
782
    def messageencode(self, miner: Miner):
783
        '''command is optional, however should convert this call into minercommand'''
784
        #always save the miner so the next guy can get latest changes
785
        if miner.store == 'mem':
786
            self.putminer(miner)
787
        message = self.createmessageenvelope()
788
        message = message.make_minerstats(miner, minerstats=None, minerpool=None)
789
        return self._schemamsg.dumps(message).data
790
791
    def stamp(self, message):
792
        return '{0}:{1}: {2}'.format(self.now(), self.applicationid, message)
793
794
    def alert(self, message):
795
        '''send alert message'''
796
        return self.sendqueueitem(QueueEntry(QueueName.Q_ALERT, self.stamp(message), QueueType.broadcast))
797
798
    def send(self, q_name, message):
799
        '''send message to queue'''
800
        success = self.trypublish(q_name, message)
801
        return success
802
803
    def enqueue(self, queuelist):
804
        '''send a list of queue messages'''
805
        if queuelist is None:
806
            return
807
        if not queuelist.hasentries():
808
            return
809
        #todo: group by queuename
810
        for entry in queuelist.entries:
811
            self.sendqueueitem(entry)
812
813
    def sendqueueitem(self, entry):
814
        '''send one queue item'''
815
        if entry.eventtype == 'broadcast':
816
            send_result = self.trybroadcast(entry.queuename, entry.message)
817
            return send_result
818
        return self.send(entry.queuename, entry.message)
819
820
    def take_picture(self, file_name='fullcycle_camera.png'):
821
        pic = take_picture(file_name,
822
                           self.configuration('camera.size'),
823
                           self.configuration('camera.quality'),
824
                           self.configuration('camera.brightness'),
825
                           self.configuration('camera.contrast'))
826
        return pic
827
828
    def store_picture_cache(self, file_name):
829
        if os.path.isfile(file_name):
830
            with open(file_name, 'rb') as photofile:
831
                picdata = photofile.read()
832
            reading = SensorValue('fullcyclecamera', base64.b64encode(picdata), 'camera')
833
            #reading.sensor = self.sensor
834
            #self.sendsensor(reading)
835
            message = self.createmessageenvelope()
836
            sensorjson = message.jsonserialize(SensorValueSchema(), reading)
837
            self.tryputcache(CacheKeys.camera, sensorjson)
838
839
    def readtemperature(self):
840
        try:
841
            sensor_humid, sensor_temp = readtemperature()
842
            if sensor_temp is not None:
843
                reading = SensorValue('fullcycletemp', sensor_temp, 'temperature')
844
                reading.sensor = self.sensor
845
                self.sendsensor(reading)
846
            if sensor_humid is not None:
847
                reading = SensorValue('fullcyclehumid', sensor_humid, 'humidity')
848
                reading.sensor = self.sensor
849
                self.sendsensor(reading)
850
            return sensor_humid, sensor_temp
851
        except BaseException as ex:
852
            self.logexception(ex)
853
        return None, None
854
855
    def sendsensor(self, reading):
856
        message = self.createmessageenvelope()
857
        sensorjson = message.jsonserialize(SensorValueSchema(), reading)
858
        self.sendqueueitem(QueueEntry(QueueName.Q_SENSOR, self.serializemessageenvelope(message.make_any('sensorvalue', sensorjson)), QueueType.broadcast))
859
860
    def sendtelegrammessage(self, message):
861
        if self.is_enabled_configuration('telegram'):
862
            sendalert(message, self.getservice('telegram'))
863
864
    def sendtelegramphoto(self, file_name):
865
        if os.path.isfile(file_name) and os.path.getsize(file_name) > 0:
866
            if self.is_enabled_configuration('telegram'):
867
                sendphoto(file_name, self.getservice('telegram'))
868
869
    def getsession(self):
870
        service = self.getservice(ServiceName.database)
871
        engine = create_engine(service.connection, echo=False)
872
        Session = sessionmaker(bind=engine)
873
        return Session()
874
875
    def log_mineractivity(self, minerlog):
876
        try:
877
            session = self.getsession()
878
            session.add(minerlog)
879
            session.commit()
880
            return True
881
        except BaseException as ex:
882
            self.logexception(ex)
883
        return False
884
885
    def save_miner(self, miner: Miner):
886
        found = self.getknownminer(miner)
887
        if found is None:
888
            self.addknownminer(miner)
889
            #miners = MinerRepository()
890
            #todo:add the miner to the json config
891
        else:
892
            found.updatefrom(miner)
893
            self.putminer(found)
894
895
    def save_pool(self, pool: Pool):
896
        sch = PoolSchema()
897
        pools = PoolRepository()
898
        pools.add(pool, self.getconfigfilename('config/pools.conf'), sch)
899
900
        #update the known pools
901
        for known in self.knownpools():
902
            if pool.is_same_as(known):
903
                oldkey = known.key
904
                known.named_pool = pool
905
                #this changes the pool key!
906
                known.user = pool.user
907
                #update the known pool (with new key)
908
                self.update_pool(oldkey, known)
909
910
def main():
911
    full_cycle = ApplicationService()
912
    full_cycle.loginfo('Full Cycle was run in a script')
913
    full_cycle.shutdown()
914
915
if __name__ == "__main__":
916
    main()
917