Passed
Push — master ( c8d2fb...72a28d )
by Dave
01:11
created

backend.fcmapp.ApplicationService.initpoolcache()   A

Complexity

Conditions 3

Size

Total Lines 10
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 9
nop 1
dl 0
loc 10
rs 9.95
c 0
b 0
f 0
1
'''Application Service layer for Full Cycle Mining
2
Gateway into most of application functionality'''
3
import sys
4
import os
5
import datetime
6
import logging
7
import json
8
import base64
9
from collections import defaultdict
10
import redis
11
import pika
12
from colorama import init, Fore
13
from sqlalchemy.orm import sessionmaker
14
from sqlalchemy import create_engine
15
import domain.minerstatistics
16
import domain.minerpool
17
from domain.mining import Miner, Pool, AvailablePool, MinerCurrentPool, MinerStatus
18
from domain.rep import MinerRepository, PoolRepository, LoginRepository, RuleParametersRepository, BaseRepository
19
#from domain.miningrules import RuleParameters
20
from domain.sensors import Sensor, SensorValue
21
from messaging.messages import Message, MessageSchema, MinerMessageSchema, ConfigurationMessageSchema
22
from messaging.sensormessages import SensorValueSchema
23
from messaging.schema import MinerSchema, MinerStatsSchema, MinerCurrentPoolSchema, AvailablePoolSchema, PoolSchema
24
from helpers.queuehelper import QueueName, Queue, QueueEntry, QueueType
25
from helpers.camerahelper import take_picture
26
from helpers.temperaturehelper import readtemperature
27
import backend.fcmutils as utils
28
from backend.fcmcache import Cache, CacheKeys
29
from backend.fcmbus import Bus
30
from backend.fcmcomponent import ComponentName
31
from backend.fcmservice import BaseService, ServiceName, InfrastructureService, Configuration, Telegram
32
from backend.fcmminer import Antminer
33
34
class Component(object):
35
    '''A component is a unit of execution of FCM'''
36
    def __init__(self, componentname, option=''):
37
        self.app = ApplicationService(component=componentname, option=option)
38
        #was a queue, now its a channel
39
        self.listeningqueue = None
40
41
    def listen(self):
42
        if self.listeningqueue:
43
            self.app.bus.listen(self.listeningqueue)
44
45
46
class PoolService(BaseService):
47
    def __init__(self, cache):
48
        super(PoolService, self).__init__()
49
        self.__cache = cache
50
51
    def get_all_pools(self):
52
        '''configured pools'''
53
        pools = PoolRepository().readpools(self.getconfigfilename('config/pools.conf'))
54
        return pools
55
56
    def findpool(self, minerpool):
57
        '''find a pool in list'''
58
        if minerpool is None:
59
            return None
60
        for pool in self.get_all_pools():
61
            if minerpool.currentpool == pool.url and minerpool.currentworker.startswith(pool.user):
62
                return pool
63
        return None
64
65
    def knownpools(self):
66
        dknownpools = self.__cache.gethashset(CacheKeys.knownpools)
67
        if dknownpools:
68
            return utils.deserializelist_withschema(AvailablePoolSchema(), list(dknownpools.values()))
69
        return None
70
71
    def getpool(self, miner: Miner):
72
        '''get pool from cache'''
73
        valu = self.__cache.trygetvaluefromcache(miner.name + '.pool')
74
        if valu is None:
75
            return None
76
        entity = MinerCurrentPool(miner, **utils.deserialize(MinerCurrentPoolSchema(), valu))
77
        return entity
78
79
    def add_pool(self, minerpool: domain.minerpool.MinerPool):
80
        '''see if pool is known or not, then add it'''
81
        knownpool = self.__cache.getfromhashset(CacheKeys.knownpools, minerpool.pool.key)
82
        if not knownpool:
83
            val = utils.jsonserialize(AvailablePoolSchema(), minerpool.pool)
84
            self.__cache.putinhashset(CacheKeys.knownpools, minerpool.pool.key, val)
85
86
    def putpool(self, pool: Pool):
87
        '''put pool in cache'''
88
        if pool and pool.name:
89
            valu = self.serialize(pool)
90
            self.__cache.tryputcache('pool.{0}'.format(pool.name), valu)
91
92
    def update_pool(self, key, pool: AvailablePool):
93
        self.__cache.hdel(CacheKeys.knownpools, key)
94
        knownpool = self.__cache.getfromhashset(CacheKeys.knownpools, pool.key)
95
        if not knownpool:
96
            val = utils.jsonserialize(AvailablePoolSchema(), pool)
97
            self.__cache.putinhashset(CacheKeys.knownpools, pool.key, val)
98
99
    def save_pool(self, pool: Pool):
100
        sch = PoolSchema()
101
        pools = PoolRepository()
102
        pools.add(pool, self.getconfigfilename('config/pools.conf'), sch)
103
104
        #update the known pools
105
        for known in self.knownpools():
106
            if pool.is_same_as(known):
107
                oldkey = known.key
108
                known.named_pool = pool
109
                #this changes the pool key!
110
                known.user = pool.user
111
                #update the known pool (with new key)
112
                self.update_pool(oldkey, known)
113
114
class ApplicationService(BaseService):
115
    '''Application Services'''
116
    programnamefull = ''
117
    programname = ''
118
    component = ComponentName.fullcycle
119
    loglevel = 0
120
    #true if user passed in -now command line argument
121
    isrunnow = False
122
    #dictionary of queues managed by this app
123
    _queues = {}
124
    _channels = []
125
    #the startup directory
126
    __logger = None
127
    __logger_debug = None
128
    __logger_error = None
129
130
    def __init__(self, component=ComponentName.fullcycle, option=None, announceyourself=False):
131
        super(ApplicationService, self).__init__()
132
        self.component = component
133
        if self.component == ComponentName.fullcycle:
134
            self.print('Starting FCM Init')
135
        self.initargs(option)
136
        self.startupstuff()
137
        if self.component == ComponentName.fullcycle:
138
            self.print('Starting FCM Configuration')
139
        self.setup_configuration()
140
        self.initlogger()
141
        self.initmessaging()
142
        #this is slow. should be option to opt out of cache?
143
        if self.component == ComponentName.fullcycle:
144
            self.loginfo('Starting FCM Cache')
145
        self.initcache()
146
        self.initbus()
147
        self.init_application()
148
        self.init_sensors()
149
150
        if announceyourself:
151
            self.sendqueueitem(QueueEntry(QueueName.Q_LOG, self.stamp('Started {0}'.format(self.component)), QueueType.broadcast))
152
153
    def initargs(self, option):
154
        '''process command line arguments'''
155
        if sys.argv:
156
            self.programnamefull = sys.argv[0]
157
            self.programname = os.path.basename(self.programnamefull)
158
        firstarg = option
159
        if len(sys.argv) > 1:
160
            firstarg = sys.argv[1]
161
        if firstarg is not None:
162
            if firstarg == '-now':
163
                self.isrunnow = True
164
165
    def startupstuff(self):
166
        #used with colorama on windows
167
        init(autoreset=True)
168
169
    def initmessaging(self):
170
        '''start up messaging'''
171
        self._schemamsg = MessageSchema()
172
173
    def initcache(self):
174
        '''start up cache'''
175
        try:
176
            cachelogin = self.getservice(ServiceName.cache)
177
            self.__cache = Cache(cachelogin)
178
        except Exception as ex:
179
            #cache is offline. try to run in degraded mode
180
            self.logexception(ex)
181
182
    def startup(self):
183
        self.initminercache()
184
        self.initpoolcache()
185
186
    def initbus(self):
187
        '''start up message bus'''
188
        login = self.getservice(ServiceName.messagebus)
189
        self.__bus = Bus(login)
190
191
    @property
192
    def bus(self):
193
        return self.__bus
194
195
    @property
196
    def cache(self):
197
        return self.__cache
198
199
    def init_sensors(self):
200
        self.sensor = Sensor('controller', 'DHT22', 'controller')
201
202
    def init_application(self):
203
        self.antminer = Antminer(self.configuration, self.sshlogin())
204
        self.telegram = Telegram(self.configuration, self.getservice(ServiceName.telegram))
205
        self.pools = PoolService(self.cache)
206
207
    @property
208
    def isdebug(self):
209
        return sys.flags.debug
210
211
    def setup_configuration(self):
212
        '''configuration is loaded once at startup'''
213
        raw = BaseRepository().readrawfile(self.getconfigfilename('config/fullcycle.conf'))
214
        config = json.loads(raw)
215
216
        self.configuration = Configuration(config)
217
        self.applicationid = self.configuration.get('applicationid')
218
        self.loglevel = self.configuration.get('loglevel')
219
220
    def initpoolcache(self):
221
        if self.__cache.get(CacheKeys.pools) is None:
222
            spools = PoolRepository().readrawfile(self.getconfigfilename('config/pools.conf'))
223
            self.tryputcache(CacheKeys.pools, spools)
224
        for pool in self.pools.get_all_pools():
225
            #pool isinstance of Pool
226
            availablepool = AvailablePool(pool.pool_type, pool, pool.url, pool.user, pool.password, pool.priority)
227
            minerpool = domain.minerpool.MinerPool(miner=None, priority=0, pool=availablepool)
228
            self.pools.putpool(pool)
229
            self.pools.add_pool(minerpool)
230
231
    def initminercache(self):
232
        '''put known miners into cache'''
233
        if self.__cache.get(CacheKeys.miners) is None:
234
            sminers = MinerRepository().readrawfile(self.getconfigfilename('config/miners.conf'))
235
            self.tryputcache(CacheKeys.miners, sminers)
236
237
        for miner in self.miners():
238
            #status is not persisted yet so init from name
239
            if miner.is_manually_disabled():
240
                miner.status = MinerStatus.Disabled
241
            if self.getminer(miner) is None:
242
                self.putminer(miner)
243
244
    def initlogger(self):
245
        '''set up logging application info'''
246
        self.__logger = self.setup_logger('fcmapp', 'fcm.log', logging.INFO)
247
248
        self.__logger_debug = self.setup_logger('fcmdebug', 'fcm.bug', logging.DEBUG)
249
250
        self.__logger_error = self.setup_logger('fcmerror', 'fcm.err', logging.ERROR)
251
252
    def setup_logger(self, logger_name, log_file, level=logging.INFO):
253
        '''start logger'''
254
        logr = logging.getLogger(logger_name)
255
        formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
256
        #by default will append. use mode='w' to overwrite
257
        file_handler = logging.FileHandler(log_file)
258
        file_handler.setFormatter(formatter)
259
        logr.addHandler(file_handler)
260
        # is setting stream necessary
261
        stream_handler = logging.StreamHandler()
262
        stream_handler.setFormatter(formatter)
263
        logr.setLevel(level)
264
        return logr
265
266
    def loginfo(self, message):
267
        '''log informational message'''
268
        logmsg = '{0}: {1}'.format(self.programname, message)
269
        self.__logger.info(logmsg)
270
        print(message)
271
272
    def logerror(self, message):
273
        '''log error'''
274
        logmsg = '{0}: {1}'.format(self.programname, message)
275
        self.__logger_error.error(logmsg)
276
        print(Fore.RED+logmsg)
277
278
    def logdebug(self, message):
279
        '''log debug message'''
280
        if not self.loglevel or self.loglevel == 0:
281
            return
282
        logmsg = '{0}: {1}'.format(self.programname, message)
283
        self.__logger_debug.debug(logmsg)
284
        print(Fore.GREEN+logmsg)
285
286
    def print(self, message):
287
        '''echo message to screen'''
288
        print('{0}: {1}'.format(self.now(), message))
289
290
    def now(self):
291
        '''current time formatted as friendly string'''
292
        return utils.formattime(datetime.datetime.now())
293
294
    #region lookups
295
    #todo: move to configurations section
296
    def miners(self):
297
        '''configured miners'''
298
        miners = MinerRepository().readminers(self.getconfigfilename('config/miners.conf'))
299
        return miners
300
301
    def knownminers(self):
302
        '''for now just return a list of miners
303
        later should consider returning a list that is easily searched and filtered
304
        '''
305
        dknownminers = self.__cache.gethashset(CacheKeys.knownminers)
306
        if dknownminers is not None and dknownminers:
307
            #get list of miners from cache
308
            return self.deserializelistofstrings(list(dknownminers.values()))
309
        knownminers = self.miners()
310
        return knownminers
311
312
    def allminers(self):
313
        '''combined list of discovered miners and configured miners'''
314
        allminers = self.knownminers()
315
        for miner in self.miners():
316
            foundminer = [x for x in allminers if x.key() == miner.key()]
317
            if not foundminer:
318
                allminers.append(miner)
319
        return allminers
320
321
    def knownsensors(self):
322
        dknownsensors = self.__cache.gethashset(CacheKeys.knownsensors)
323
        if dknownsensors is not None and dknownsensors:
324
            return utils.deserializelist_withschema(SensorValueSchema(), list(dknownsensors.values()))
325
        return None
326
327
    def addknownsensor(self, sensorvalue):
328
        val = utils.jsonserialize(SensorValueSchema(), sensorvalue)
329
        self.__cache.putinhashset(CacheKeys.knownsensors, sensorvalue.sensorid, val)
330
331
    def minersummary(self, max_number=10):
332
        '''show a summary of known miners
333
        '''
334
        mode = self.configuration.get('summary')
335
        if not mode:
336
            mode = 'auto'
337
        knownminers = self.knownminers()
338
        if len(knownminers) <= max_number:
339
            return '\n'.join([m.summary() for m in knownminers])
340
        groupbystatus = defaultdict(list)
341
        for miner in knownminers:
342
            groupbystatus[miner.status].append(miner)
343
        return '\n'.join(['{0}: {1}'.format(s, self.summary_by_status(s, groupbystatus[s])) for s in groupbystatus])
344
345
    def summary_by_status(self, key, minerlist):
346
        if key == 'online':
347
            return '{0} miners hash {1}'.format(self.summarize_count(minerlist), self.summarize_hash(minerlist))
348
        return self.summarize_count(minerlist)
349
350
    def summarize_count(self, minerlist):
351
        return len(minerlist)
352
353
    def summarize_hash(self, minerlist):
354
        return sum(miner.minerstats.currenthash for miner in minerlist)
355
356
    def addknownminer(self, miner):
357
        '''add miner to known miners list'''
358
        val = self.serialize(miner)
359
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
360
361
    def updateknownminer(self, miner):
362
        '''update known miner in cache'''
363
        sminer = self.__cache.getfromhashset(CacheKeys.knownminers, miner.key())
364
        memminer = utils.deserialize(MinerSchema(), utils.safestring(sminer))
365
        if memminer is None:
366
            memminer = miner
367
        else:
368
            #merge new values
369
            memminer.updatefrom(miner)
370
        val = self.serialize(memminer)
371
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
372
373
    #region Pools
374
375
376
    #endregion
377
378
    def sshlogin(self):
379
        '''return contents of login file'''
380
        return self.readlogin('ftp.conf')
381
382
    def readlogin(self, filename):
383
        '''read login file configuration'''
384
        login = LoginRepository().readlogins(self.getconfigfilename('config/'+filename))
385
        return login
386
387
    def ruleparameters(self):
388
        '''rules parameters'''
389
        return RuleParametersRepository().readrules(self.getconfigfilename('config/'+'rules.conf'))
390
391
    def getservice(self, servicename):
392
        '''get a service by name. should be repository'''
393
        file_name = self.getconfigfilename('config/services.conf')
394
        with open(file_name, encoding='utf-8-sig') as config_file:
395
            content = json.loads(config_file.read())
396
        svc = None #dummy initializer to make scrutinize happy
397
        services = [InfrastructureService(**s) for s in content]
398
        return next((svc for svc in services if svc.name == servicename), None)
399
400
    def getservice_useroverride(self, servicename):
401
        service = self.getservice(servicename)
402
        service.user = self.component
403
        return service
404
    #endregion lookups
405
406
    def listen(self, qlisten):
407
        """Goes into listening mode on a queue"""
408
        try:
409
            self.bus.listen(qlisten)
410
        except KeyboardInterrupt:
411
            self.shutdown()
412
        except BaseException as unhandled:
413
            self.unhandledexception(unhandled)
414
415
    def registerqueue(self, qregister: Queue):
416
        '''register a queue'''
417
        self.logdebug(self.stamp('Registered queue {0}'.format(qregister.queue_name)))
418
        if qregister.queue_name not in self._queues.keys():
419
            self._queues[qregister.queue_name] = qregister
420
421
    def shutdown(self, exitstatus=0):
422
        '''shut down app services'''
423
        self.loginfo('Shutting down fcm app...')
424
        self.close_channels()
425
        self.closequeues()
426
        if self.__bus:
427
            self.bus.close()
428
        if self.__cache is not None:
429
            self.__cache.close()
430
        sys.exit(exitstatus)
431
432
    def closequeue(self, thequeue):
433
        '''close the queue'''
434
        if not thequeue:
435
            return
436
        try:
437
            if thequeue is not None:
438
                self.logdebug(self.stamp('closing queue {0}'.format(thequeue.queue_name)))
439
                thequeue.close()
440
            del self._queues[thequeue.queue_name]
441
        except Exception as ex:
442
            self.logexception(ex)
443
444
    def closequeues(self):
445
        '''close a bunch of queues'''
446
        for k in list(self._queues):
447
            self.closequeue(self._queues[k])
448
449
    def close_channel(self, chan):
450
        if not chan:
451
            return
452
        try:
453
            if chan.name in self._channels:
454
                self.logdebug(self.stamp('closing channel {0}'.format(chan.name)))
455
                chan.close()
456
                del self._channels[chan.name]
457
        except Exception as ex:
458
            self.logexception(ex)
459
460
    def close_channels(self):
461
        '''close all channels'''
462
        for chan in list(self._channels):
463
            self.close_channel(self._channels[chan])
464
465
    def unhandledexception(self, unhandled):
466
        '''what to do when there is an exception that app cannot handle'''
467
        self.logexception(unhandled)
468
469
    def exceptionmessage(self, ex):
470
        '''gets exception message even when it doesnt have one'''
471
        exc_type, _, exc_tb = sys.exc_info()
472
        exmsg = getattr(ex, 'message', repr(ex))
473
        return '{0}:{1}:{2}'.format(exc_type, exc_tb.tb_lineno, exmsg)
474
475
    def logexception(self, ex):
476
        '''log an exception'''
477
        self.logerror(self.exceptionmessage(ex))
478
479
    def sendlog(self, logmessage):
480
        '''send message to log queue'''
481
        item = QueueEntry(QueueName.Q_LOG, logmessage, 'broadcast')
482
        self.sendqueueitem(item)
483
        print(logmessage)
484
485
    def subscribe(self, name, callback, no_acknowledge=True, prefetch=1):
486
        '''subscribe to a queue'''
487
        chan = self.bus.subscribe(name, callback, no_acknowledge=no_acknowledge, prefetch_count=prefetch)
488
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(name))
489
        return chan
490
491
    def listen_to_broadcast(self, broadcast_name, callback, no_acknowledge=True):
492
        thebroadcast = self.bus.subscribe_broadcast(broadcast_name, callback, no_acknowledge)
493
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(broadcast_name))
494
        self.bus.listen(thebroadcast)
495
        #never returns becuase listen is blocking
496
        return thebroadcast
497
498
    def trypublish(self, queue_name, msg: str):
499
        '''publish a message to the queue'''
500
        try:
501
            self.bus.publish(queue_name, msg)
502
            return True
503
        except pika.exceptions.ConnectionClosed as ex:
504
            logmessage = 'Error publishing to {0} {1}'.format(queue_name, self.exceptionmessage(ex))
505
            self.logerror(logmessage)
506
            return False
507
508
    def trybroadcast(self, exchange_name, msg):
509
        '''broadcast a message to all queue listeners'''
510
        try:
511
            self.bus.broadcast(exchange_name, msg)
512
            return True
513
        except pika.exceptions.ConnectionClosed as conxex:
514
            self.logerror('Error broadcasting to {0} {1}'.format(exchange_name, self.exceptionmessage(conxex)))
515
            return False
516
517
    def queuestatus(self):
518
        """TODO:print queue status"""
519
        pass
520
        #for q in self._queues.values():
521
        #    print(q.queue_name,str(q._connection))
522
523
    def putminer(self, miner: Miner):
524
        '''put miner in cache'''
525
        if miner and miner.key():
526
            valu = self.serialize(miner)
527
            self.tryputcache('miner.{0}'.format(miner.key()), valu)
528
529
    def getminer(self, miner: Miner) -> Miner:
530
        '''strategies for getting miner from cache
531
        originally was key=miner.name but that was not good
532
        changed to key='miner.'+minerid
533
        '''
534
        valu = self.cache.trygetvaluefromcache('miner.{0}'.format(miner.key()))
535
        if valu is None:
536
            return None
537
        minerfromstore = utils.deserialize(MinerSchema(), utils.safestring(valu))
538
        if not minerfromstore.key():
539
            #do not allow entry with no key
540
            return None
541
        minerfromstore.store = 'mem'
542
        return minerfromstore
543
544
    def getknownminer(self, miner: Miner) -> Miner:
545
        '''get a known miner'''
546
        return self.getknownminerbykey(miner.key())
547
548
    def getminerbyname(self, minername):
549
        filtered = [x for x in self.miners() if x.name == minername]
550
        if filtered: return filtered[0]
551
        return None
552
553
    def getknownminerbykey(self, minername):
554
        str_miner = self.__cache.getfromhashset(CacheKeys.knownminers, minername)
555
        if str_miner is None:
556
            return None
557
        return utils.deserialize(MinerSchema(), utils.safestring(str_miner))
558
559
    def getknownminerbyname(self, minername):
560
        '''this could be slow if there are lots of miners'''
561
        known = self.knownminers()
562
        for miner in known:
563
            if miner.name == minername:
564
                return miner
565
        return None
566
567
    def tryputcache(self, key, value):
568
        '''put value in cache key'''
569
        if value is None: return
570
        try:
571
            if self.__cache is not None:
572
                self.__cache.set(key, value)
573
        except redis.exceptions.ConnectionError as ex:
574
            self.logexception(ex)
575
576
    def putminerandstats(self, miner: Miner, minerstats, minerpool):
577
        '''put miner and status in cache'''
578
        self.putminer(miner)
579
        schema = MinerStatsSchema()
580
        valstats = schema.dumps(minerstats).data
581
        self.tryputcache(miner.key() + '.stats', valstats)
582
        schema = MinerCurrentPoolSchema()
583
        valpool = schema.dumps(minerpool).data
584
        self.tryputcache(miner.key() + '.pool', valpool)
585
586
    def getstats(self, miner: Miner):
587
        '''get stats entity'''
588
        valu = self.cache.trygetvaluefromcache(miner.name + '.stats')
589
        if valu is None: return None
590
        entity = domain.minerstatistics.MinerStatistics(miner, **utils.deserialize(MinerStatsSchema(), valu))
591
        return entity
592
593
    def getminerstatscached(self):
594
        '''iterator for cached stats'''
595
        for miner in self.miners():
596
            yield (self.getminer(miner), self.getstats(miner), self.pools.getpool(miner))
597
598
    def messagedecodeminer(self, body) -> Miner:
599
        '''deserialize a miner message'''
600
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
601
        schema = MinerMessageSchema()
602
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
603
        minermessage_entity = schema.make_minermessage(minermessage_dict)
604
        miner = minermessage_entity.miner
605
        return miner
606
607
    def messagedecodeminerstats(self, body):
608
        '''deserialize miner stats'''
609
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
610
        schema = MinerMessageSchema()
611
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
612
        minermessage_entity = schema.make_minermessage(minermessage_dict)
613
        return minermessage_entity
614
615
    def messagedecodeminercommand(self, body):
616
        '''deserialize  miner command'''
617
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
618
        schema = MinerMessageSchema()
619
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
620
        minermessage_entity = schema.make_minermessage(minermessage_dict)
621
        return minermessage_entity
622
623
    def messagedecodesensor(self, body):
624
        '''deserialize sensor value '''
625
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
626
        schema = SensorValueSchema()
627
        #minermessage_dict = schema.load(message_envelope.bodyjson()).data
628
        entity = schema.load(message_envelope.bodyjson()).data
629
        return message_envelope, entity
630
631
    def messagedecode_configuration(self, body):
632
        '''deserialize  configuration command'''
633
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
634
        schema = ConfigurationMessageSchema()
635
        configurationmessage_dict = schema.load(message_envelope.bodyjson()).data
636
        configurationmessage_entity = schema.make_configurationmessage(configurationmessage_dict)
637
        return configurationmessage_entity
638
639
    def createmessageenvelope(self):
640
        '''create message envelope'''
641
        return Message(timestamp=datetime.datetime.utcnow(), sender=self.component)
642
643
    def serializemessageenvelope(self, msg):
644
        '''serialize message envelope'''
645
        return self._schemamsg.dumps(msg).data
646
647
    def serializelist(self, listofentities):
648
        '''serialize a list of entities'''
649
        json_list = json.dumps([e.__dict__ for e in listofentities])
650
        return json_list
651
652
    def deserializelistofstrings(self, the_list):
653
        '''deserialize list of strings into list of miners'''
654
        results = []
655
        for item in the_list:
656
            miner = utils.deserialize(MinerSchema(), utils.safestring(item))
657
            results.append(miner)
658
        return results
659
660
    def deserializemessageenvelope(self, body):
661
        '''serialize message envelope'''
662
        return self._schemamsg.load(json.loads(utils.safestring(body))).data
663
664
    def createmessagestats(self, miner, minerstats, minerpool):
665
        #always save the miner so the next guy can get latest changes
666
        #only put in cache if it came from cache
667
        if miner.store == 'mem':
668
            self.putminer(miner)
669
        message = self.createmessageenvelope()
670
        message = message.make_minerstats(miner, minerstats, minerpool)
671
        return self.serializemessageenvelope(message)
672
673
    def createmessagecommand(self, miner, command):
674
        '''create message command'''
675
        if miner.store == 'mem':
676
            self.putminer(miner)
677
        message = self.createmessageenvelope()
678
        message = message.make_minercommand(miner, command)
679
        return self.serializemessageenvelope(message)
680
681
    def messageencode(self, miner: Miner):
682
        '''command is optional, however should convert this call into minercommand'''
683
        #always save the miner so the next guy can get latest changes
684
        if miner.store == 'mem':
685
            self.putminer(miner)
686
        message = self.createmessageenvelope()
687
        message = message.make_minerstats(miner, minerstats=None, minerpool=None)
688
        return self._schemamsg.dumps(message).data
689
690
    def stamp(self, message):
691
        return '{0}:{1}: {2}'.format(self.now(), self.applicationid, message)
692
693
    def alert(self, message):
694
        '''send alert message'''
695
        return self.sendqueueitem(QueueEntry(QueueName.Q_ALERT, self.stamp(message), QueueType.broadcast))
696
697
    def send(self, q_name, message):
698
        '''send message to queue'''
699
        success = self.trypublish(q_name, message)
700
        return success
701
702
    def enqueue(self, queuelist):
703
        '''send a list of queue messages'''
704
        if queuelist is None:
705
            return
706
        if not queuelist.hasentries():
707
            return
708
        #todo: group by queuename
709
        for entry in queuelist.entries:
710
            self.sendqueueitem(entry)
711
712
    def sendqueueitem(self, entry):
713
        '''send one queue item'''
714
        if entry.eventtype == 'broadcast':
715
            send_result = self.trybroadcast(entry.queuename, entry.message)
716
            return send_result
717
        return self.send(entry.queuename, entry.message)
718
719
    def take_picture(self, file_name='fullcycle_camera.png'):
720
        pic = take_picture(file_name,
721
                           self.configuration.get('camera.size'),
722
                           self.configuration.get('camera.quality'),
723
                           self.configuration.get('camera.brightness'),
724
                           self.configuration.get('camera.contrast'))
725
        return pic
726
727
    def store_picture_cache(self, file_name):
728
        if os.path.isfile(file_name):
729
            with open(file_name, 'rb') as photofile:
730
                picdata = photofile.read()
731
            reading = SensorValue('fullcyclecamera', base64.b64encode(picdata), 'camera')
732
            #reading.sensor = self.sensor
733
            #self.sendsensor(reading)
734
            message = self.createmessageenvelope()
735
            sensorjson = message.jsonserialize(SensorValueSchema(), reading)
736
            self.tryputcache(CacheKeys.camera, sensorjson)
737
738
    def readtemperature(self):
739
        try:
740
            sensor_humid, sensor_temp = readtemperature()
741
            if sensor_temp is not None:
742
                reading = SensorValue('fullcycletemp', sensor_temp, 'temperature')
743
                reading.sensor = self.sensor
744
                self.sendsensor(reading)
745
            if sensor_humid is not None:
746
                reading = SensorValue('fullcyclehumid', sensor_humid, 'humidity')
747
                reading.sensor = self.sensor
748
                self.sendsensor(reading)
749
            return sensor_humid, sensor_temp
750
        except BaseException as ex:
751
            self.logexception(ex)
752
        return None, None
753
754
    def sendsensor(self, reading):
755
        message = self.createmessageenvelope()
756
        sensorjson = message.jsonserialize(SensorValueSchema(), reading)
757
        self.sendqueueitem(QueueEntry(QueueName.Q_SENSOR, self.serializemessageenvelope(message.make_any('sensorvalue', sensorjson)), QueueType.broadcast))
758
759
    def getsession(self):
760
        service = self.getservice(ServiceName.database)
761
        engine = create_engine(service.connection, echo=False)
762
        Session = sessionmaker(bind=engine)
763
        return Session()
764
765
    def log_mineractivity(self, minerlog):
766
        try:
767
            session = self.getsession()
768
            session.add(minerlog)
769
            session.commit()
770
            return True
771
        except BaseException as ex:
772
            self.logexception(ex)
773
        return False
774
775
    def save_miner(self, miner: Miner):
776
        found = self.getknownminer(miner)
777
        if found is None:
778
            self.addknownminer(miner)
779
            #miners = MinerRepository()
780
            #todo:add the miner to the json config
781
        else:
782
            found.updatefrom(miner)
783
            self.putminer(found)
784
785
def main():
786
    full_cycle = ApplicationService()
787
    full_cycle.loginfo('Full Cycle was run in a script')
788
    full_cycle.shutdown()
789
790
if __name__ == "__main__":
791
    main()
792