Passed
Push — master ( 525f7c...1e26bf )
by Dave
01:15
created

ApplicationService.summarize_count()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
'''Application Service layer for Full Cycle Mining
2
Gateway into most of application functionality'''
3
import sys
4
import os
5
import datetime
6
import logging
7
import json
8
import base64
9
import redis
10
import pika
11
import domain.minerstatistics
12
import domain.minerpool
13
import backend.fcmutils as utils
14
from collections import defaultdict
15
from colorama import init, Fore
16
from sqlalchemy.orm import sessionmaker
17
from sqlalchemy import create_engine
18
from domain.mining import Miner, Pool, AvailablePool, MinerCurrentPool, MinerStatus
19
from domain.rep import MinerRepository, PoolRepository, LoginRepository, RuleParametersRepository, BaseRepository
20
#from domain.miningrules import RuleParameters
21
from domain.sensors import Sensor, SensorValue
22
from messaging.messages import Message, MessageSchema, MinerMessageSchema, ConfigurationMessageSchema
23
from messaging.sensormessages import SensorValueSchema
24
from messaging.schema import MinerSchema, MinerStatsSchema, MinerCurrentPoolSchema, AvailablePoolSchema, PoolSchema
25
from helpers.queuehelper import QueueName, Queue, QueueEntry, QueueType
26
from helpers.camerahelper import take_picture
27
from helpers.temperaturehelper import readtemperature
28
from helpers.telegramhelper import sendalert, sendphoto
29
from backend.fcmcache import Cache, CacheKeys
30
from backend.fcmbus import Bus
31
from backend.fcmcomponent import ComponentName
32
from backend.fcmservice import ServiceName, InfrastructureService, Configuration, Telegram
33
from backend.fcmminer import Antminer
34
35
class Component(object):
36
    '''A component is a unit of execution of FCM'''
37
    def __init__(self, componentname, option=''):
38
        self.app = ApplicationService(component=componentname, option=option)
39
        #was a queue, now its a channel
40
        self.listeningqueue = None
41
42
    def listen(self):
43
        if self.listeningqueue:
44
            self.app.bus.listen(self.listeningqueue)
45
46
47
class ApplicationService:
48
    '''Application Services'''
49
    programnamefull = ''
50
    programname = ''
51
    component = ComponentName.fullcycle
52
    loglevel = 0
53
    #true if user passed in -now command line argument
54
    isrunnow = False
55
    #dictionary of queues managed by this app
56
    _queues = {}
57
    _channels = []
58
    #the startup directory
59
    homedirectory = None
60
    __logger = None
61
    __logger_debug = None
62
    __logger_error = None
63
64
    def __init__(self, component=ComponentName.fullcycle, option=None, announceyourself=False):
65
        self.component = component
66
        if self.component == ComponentName.fullcycle:
67
            self.print('Starting FCM Init')
68
        self.initargs(option)
69
        self.startupstuff()
70
        if self.component == ComponentName.fullcycle:
71
            self.print('Starting FCM Configuration')
72
        self.setup_configuration()
73
        self.initlogger()
74
        self.initmessaging()
75
        #this is slow. should be option to opt out of cache?
76
        if self.component == ComponentName.fullcycle:
77
            self.loginfo('Starting FCM Cache')
78
        self.initcache()
79
        self.initbus()
80
        self.init_application()
81
        self.init_sensors()
82
83
        if announceyourself:
84
            self.sendqueueitem(QueueEntry(QueueName.Q_LOG, self.stamp('Started {0}'.format(self.component)), QueueType.broadcast))
85
86
    def initargs(self, option):
87
        '''process command line arguments'''
88
        if sys.argv:
89
            self.programnamefull = sys.argv[0]
90
            self.programname = os.path.basename(self.programnamefull)
91
        firstarg = option
92
        if len(sys.argv) > 1:
93
            firstarg = sys.argv[1]
94
        if firstarg is not None:
95
            if firstarg == '-now':
96
                self.isrunnow = True
97
98
    def startupstuff(self):
99
        '''start up the application'''
100
        self.homedirectory = os.path.dirname(__file__)
101
        #used with colorama on windows
102
        init(autoreset=True)
103
104
    def initmessaging(self):
105
        '''start up messaging'''
106
        self._schemamsg = MessageSchema()
107
108
    def initcache(self):
109
        '''start up cache'''
110
        try:
111
            cachelogin = self.getservice(ServiceName.cache)
112
            self.__cache = Cache(cachelogin)
113
        except Exception as ex:
114
            #cache is offline. try to run in degraded mode
115
            self.logexception(ex)
116
117
    def startup(self):
118
        self.initminercache()
119
        self.initpoolcache()
120
121
    def initbus(self):
122
        '''start up message bus'''
123
        login = self.getservice(ServiceName.messagebus)
124
        self.__bus = Bus(login)
125
126
    @property
127
    def bus(self):
128
        return self.__bus
129
130
    def init_sensors(self):
131
        self.sensor = Sensor('controller', 'DHT22', 'controller')
132
133
    def init_application(self):
134
        self.antminer = Antminer(self.configuration, self.sshlogin())
135
        self.telegram = Telegram(self.configuration, self.getservice(ServiceName.telegram))
136
137
    @property
138
    def isdebug(self):
139
        return sys.flags.debug
140
141
    def getconfigfilename(self, configfilename):
142
        '''get the contents of a config file'''
143
        return os.path.join(self.homedirectory, configfilename)
144
145
    def setup_configuration(self):
146
        '''configuration is loaded once at startup'''
147
        raw = BaseRepository().readrawfile(self.getconfigfilename('config/fullcycle.conf'))
148
        config = json.loads(raw)
149
150
        self.configuration = Configuration(config)
151
        self.applicationid = self.configuration.get('applicationid')
152
        self.loglevel = self.configuration.get('loglevel')
153
154
    def initpoolcache(self):
155
        if self.__cache.get(CacheKeys.pools) is None:
156
            spools = PoolRepository().readrawfile(self.getconfigfilename('config/pools.conf'))
157
            self.tryputcache(CacheKeys.pools, spools)
158
        for pool in self.pools():
159
            #pool isinstance of Pool
160
            availablepool = AvailablePool(pool.pool_type, pool, pool.url, pool.user, pool.password, pool.priority)
161
            minerpool = domain.minerpool.MinerPool(miner=None, priority=0, pool=availablepool)
162
            self.putpool(pool)
163
            self.add_pool(minerpool)
164
165
    def initminercache(self):
166
        '''put known miners into cache'''
167
        if self.__cache.get(CacheKeys.miners) is None:
168
            sminers = MinerRepository().readrawfile(self.getconfigfilename('config/miners.conf'))
169
            self.tryputcache(CacheKeys.miners, sminers)
170
171
        for miner in self.miners():
172
            #status is not persisted yet so init from name
173
            if miner.is_manually_disabled():
174
                miner.status = MinerStatus.Disabled
175
            if self.getminer(miner) is None:
176
                self.putminer(miner)
177
178
    def cacheclear(self):
179
        '''clear the cache'''
180
        self.__cache.purge()
181
182
    def initlogger(self):
183
        '''set up logging application info'''
184
        self.__logger = self.setup_logger('fcmapp', 'fcm.log', logging.INFO)
185
186
        self.__logger_debug = self.setup_logger('fcmdebug', 'fcm.bug', logging.DEBUG)
187
188
        self.__logger_error = self.setup_logger('fcmerror', 'fcm.err', logging.ERROR)
189
190
    def setup_logger(self, logger_name, log_file, level=logging.INFO):
191
        '''start logger'''
192
        logr = logging.getLogger(logger_name)
193
        formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
194
        #by default will append. use mode='w' to overwrite
195
        file_handler = logging.FileHandler(log_file)
196
        file_handler.setFormatter(formatter)
197
        logr.addHandler(file_handler)
198
        # is setting stream necessary
199
        stream_handler = logging.StreamHandler()
200
        stream_handler.setFormatter(formatter)
201
        logr.setLevel(level)
202
        return logr
203
204
    def loginfo(self, message):
205
        '''log informational message'''
206
        logmsg = '{0}: {1}'.format(self.programname, message)
207
        self.__logger.info(logmsg)
208
        print(message)
209
210
    def logerror(self, message):
211
        '''log error'''
212
        logmsg = '{0}: {1}'.format(self.programname, message)
213
        self.__logger_error.error(logmsg)
214
        print(Fore.RED+logmsg)
215
216
    def logdebug(self, message):
217
        '''log debug message'''
218
        if not self.loglevel or self.loglevel == 0:
219
            return
220
        logmsg = '{0}: {1}'.format(self.programname, message)
221
        self.__logger_debug.debug(logmsg)
222
        print(Fore.GREEN+logmsg)
223
224
    def print(self, message):
225
        '''echo message to screen'''
226
        print('{0}: {1}'.format(self.now(), message))
227
228
    def now(self):
229
        '''current time formatted as friendly string'''
230
        return utils.formattime(datetime.datetime.now())
231
232
    #region lookups
233
    #todo: move to configurations section
234
    def miners(self):
235
        '''configured miners'''
236
        miners = MinerRepository().readminers(self.getconfigfilename('config/miners.conf'))
237
        return miners
238
239
    def knownminers(self):
240
        '''for now just return a list of miners
241
        later should consider returning a list that is easily searched and filtered
242
        '''
243
        dknownminers = self.__cache.gethashset(CacheKeys.knownminers)
244
        if dknownminers is not None and dknownminers:
245
            #get list of miners from cache
246
            return self.deserializelistofstrings(list(dknownminers.values()))
247
        knownminers = self.miners()
248
        return knownminers
249
250
    def allminers(self):
251
        '''combined list of discovered miners and configured miners'''
252
        allminers = self.knownminers()
253
        for miner in self.miners():
254
            foundminer = [x for x in allminers if x.key() == miner.key()]
255
            if not foundminer:
256
                allminers.append(miner)
257
        return allminers
258
259
    def knownsensors(self):
260
        dknownsensors = self.__cache.gethashset(CacheKeys.knownsensors)
261
        if dknownsensors is not None and dknownsensors:
262
            return self.deserializelist_withschema(SensorValueSchema(), list(dknownsensors.values()))
263
        return None
264
265
    def knownpools(self):
266
        dknownpools = self.__cache.gethashset(CacheKeys.knownpools)
267
        if dknownpools:
268
            return self.deserializelist_withschema(AvailablePoolSchema(), list(dknownpools.values()))
269
        return None
270
271
    def addknownsensor(self, sensorvalue):
272
        val = self.jsonserialize(SensorValueSchema(), sensorvalue)
273
        self.__cache.putinhashset(CacheKeys.knownsensors, sensorvalue.sensorid, val)
274
275
    def minersummary(self, max_number=10):
276
        '''show a summary of known miners
277
        '''
278
        mode = self.configuration.get('summary')
279
        if not mode:
280
            mode = 'auto'
281
        knownminers = self.knownminers()
282
        if len(knownminers) <= max_number:
283
            return '\n'.join([m.summary() for m in knownminers])
284
        groupbystatus = defaultdict(list)
285
        for miner in knownminers:
286
            groupbystatus[miner.status].append(miner)
287
        return '\n'.join(['{0}: {1}'.format(s, self.summary_by_status(s, groupbystatus[s])) for s in groupbystatus])
288
289
    def summary_by_status(self, key, minerlist):
290
        if key == 'online':
291
            return '{0} miners hash {1}'.format(self.summarize_count(minerlist), self.summarize_hash(minerlist))
292
        return self.summarize_count(minerlist)
293
294
    def summarize_count(self, minerlist):
295
        return len(minerlist)
296
297
    def summarize_hash(self, minerlist):
298
        return sum(miner.minerstats.currenthash for miner in minerlist)
299
300
    def addknownminer(self, miner):
301
        '''add miner to known miners list'''
302
        val = self.serialize(miner)
303
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
304
305
    def updateknownminer(self, miner):
306
        '''update known miner in cache'''
307
        sminer = self.__cache.getfromhashset(CacheKeys.knownminers, miner.key())
308
        memminer = self.deserialize(MinerSchema(), utils.safestring(sminer))
309
        if memminer is None:
310
            memminer = miner
311
        else:
312
            #merge new values
313
            memminer.updatefrom(miner)
314
        val = self.serialize(memminer)
315
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
316
317
    #region Pools
318
    def pools(self):
319
        '''configured pools'''
320
        pools = PoolRepository().readpools(self.getconfigfilename('config/pools.conf'))
321
        return pools
322
323
    def findpool(self, minerpool):
324
        '''find a pool in list'''
325
        if minerpool is None:
326
            return None
327
        for pool in self.pools():
328
            if minerpool.currentpool == pool.url and minerpool.currentworker.startswith(pool.user):
329
                return pool
330
        return None
331
332
    def getpool(self, miner: Miner):
333
        '''get pool from cache'''
334
        valu = self.trygetvaluefromcache(miner.name + '.pool')
335
        if valu is None:
336
            return None
337
        entity = MinerCurrentPool(miner, **self.deserialize(MinerCurrentPoolSchema(), valu))
338
        return entity
339
340
    def add_pool(self, minerpool: domain.minerpool.MinerPool):
341
        '''see if pool is known or not, then add it'''
342
        knownpool = self.__cache.getfromhashset(CacheKeys.knownpools, minerpool.pool.key)
343
        if not knownpool:
344
            val = self.jsonserialize(AvailablePoolSchema(), minerpool.pool)
345
            self.__cache.putinhashset(CacheKeys.knownpools, minerpool.pool.key, val)
346
347
    def update_pool(self, key, pool: AvailablePool):
348
        self.__cache.hdel(CacheKeys.knownpools, key)
349
        knownpool = self.__cache.getfromhashset(CacheKeys.knownpools, pool.key)
350
        if not knownpool:
351
            val = self.jsonserialize(AvailablePoolSchema(), pool)
352
            self.__cache.putinhashset(CacheKeys.knownpools, pool.key, val)
353
    #endregion
354
355
    def sshlogin(self):
356
        '''return contents of login file'''
357
        return self.readlogin('ftp.conf')
358
359
    def readlogin(self, filename):
360
        '''read login file configuration'''
361
        login = LoginRepository().readlogins(self.getconfigfilename('config/'+filename))
362
        return login
363
364
    def ruleparameters(self):
365
        '''rules parameters'''
366
        return RuleParametersRepository().readrules(self.getconfigfilename('config/'+'rules.conf'))
367
368
    def getservice(self, servicename):
369
        '''get a service by name. should be repository'''
370
        file_name = self.getconfigfilename('config/services.conf')
371
        with open(file_name, encoding='utf-8-sig') as config_file:
372
            content = json.loads(config_file.read())
373
        services = [InfrastructureService(**s) for s in content]
374
        return next((s for s in services if s.name == servicename), None)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable s does not seem to be defined.
Loading history...
375
376
    def getservice_useroverride(self, servicename):
377
        service = self.getservice(servicename)
378
        service.user = self.component
379
        return service
380
    #endregion lookups
381
382
    def listen(self, qlisten):
383
        """Goes into listening mode on a queue"""
384
        try:
385
            self.bus.listen(qlisten)
386
        except KeyboardInterrupt:
387
            self.shutdown()
388
        except BaseException as unhandled:
389
            self.unhandledexception(unhandled)
390
391
    def registerqueue(self, qregister: Queue):
392
        '''register a queue'''
393
        self.logdebug(self.stamp('Registered queue {0}'.format(qregister.queue_name)))
394
        if qregister.queue_name not in self._queues.keys():
395
            self._queues[qregister.queue_name] = qregister
396
397
    def shutdown(self, exitstatus=0):
398
        '''shut down app services'''
399
        self.loginfo('Shutting down fcm app...')
400
        self.close_channels()
401
        self.closequeues()
402
        if self.__bus:
403
            self.bus.close()
404
        if self.__cache is not None:
405
            self.__cache.close()
406
        sys.exit(exitstatus)
407
408
    def closequeue(self, thequeue):
409
        '''close the queue'''
410
        if not thequeue:
411
            return
412
        try:
413
            if thequeue is not None:
414
                self.logdebug(self.stamp('closing queue {0}'.format(thequeue.queue_name)))
415
                thequeue.close()
416
            del self._queues[thequeue.queue_name]
417
        except Exception as ex:
418
            self.logexception(ex)
419
420
    def closequeues(self):
421
        '''close a bunch of queues'''
422
        for k in list(self._queues):
423
            self.closequeue(self._queues[k])
424
425
    def close_channel(self, chan):
426
        if not chan:
427
            return
428
        try:
429
            if chan.name in self._channels:
430
                self.logdebug(self.stamp('closing channel {0}'.format(chan.name)))
431
                chan.close()
432
                del self._channels[chan.name]
433
        except Exception as ex:
434
            self.logexception(ex)
435
436
    def close_channels(self):
437
        '''close all channels'''
438
        for chan in list(self._channels):
439
            self.close_channel(self._channels[chan])
440
441
    def unhandledexception(self, unhandled):
442
        '''what to do when there is an exception that app cannot handle'''
443
        self.logexception(unhandled)
444
445
    def exceptionmessage(self, ex):
446
        '''gets exception message even when it doesnt have one'''
447
        exc_type, _, exc_tb = sys.exc_info()
448
        exmsg = getattr(ex, 'message', repr(ex))
449
        return '{0}:{1}:{2}'.format(exc_type, exc_tb.tb_lineno, exmsg)
450
451
    def logexception(self, ex):
452
        '''log an exception'''
453
        self.logerror(self.exceptionmessage(ex))
454
455
    def sendlog(self, logmessage):
456
        '''send message to log queue'''
457
        item = QueueEntry(QueueName.Q_LOG, logmessage, 'broadcast')
458
        self.sendqueueitem(item)
459
        print(logmessage)
460
461
    def subscribe(self, name, callback, no_acknowledge=True, prefetch=1):
462
        '''subscribe to a queue'''
463
        chan = self.bus.subscribe(name, callback, no_acknowledge=no_acknowledge, prefetch_count=prefetch)
464
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(name))
465
        return chan
466
467
    def listen_to_broadcast(self, broadcast_name, callback, no_acknowledge=True):
468
        thebroadcast = self.bus.subscribe_broadcast(broadcast_name, callback, no_acknowledge)
469
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(broadcast_name))
470
        self.bus.listen(thebroadcast)
471
        #never returns becuase listen is blocking
472
        return thebroadcast
473
474
    def trypublish(self, queue_name, msg: str):
475
        '''publish a message to the queue'''
476
        try:
477
            self.bus.publish(queue_name, msg)
478
            return True
479
        except pika.exceptions.ConnectionClosed as ex:
480
            logmessage = 'Error publishing to {0} {1}'.format(queue_name, self.exceptionmessage(ex))
481
            self.logerror(logmessage)
482
            return False
483
484
    def trybroadcast(self, exchange_name, msg):
485
        '''broadcast a message to all queue listeners'''
486
        try:
487
            self.bus.broadcast(exchange_name, msg)
488
            return True
489
        except pika.exceptions.ConnectionClosed as conxex:
490
            self.logerror('Error broadcasting to {0} {1}'.format(exchange_name, self.exceptionmessage(conxex)))
491
            return False
492
493
    def queuestatus(self):
494
        """TODO:print queue status"""
495
        pass
496
        #for q in self._queues.values():
497
        #    print(q.queue_name,str(q._connection))
498
499
    def putpool(self, pool: Pool):
500
        '''put pool in cache'''
501
        if pool and pool.name:
502
            valu = self.serialize(pool)
503
            self.tryputcache('pool.{0}'.format(pool.name), valu)
504
505
    def putminer(self, miner: Miner):
506
        '''put miner in cache'''
507
        if miner and miner.key():
508
            valu = self.serialize(miner)
509
            self.tryputcache('miner.{0}'.format(miner.key()), valu)
510
511
    def getminer(self, miner: Miner) -> Miner:
512
        '''strategies for getting miner from cache
513
        originally was key=miner.name but that was not good
514
        changed to key='miner.'+minerid
515
        '''
516
        valu = self.trygetvaluefromcache('miner.{0}'.format(miner.key()))
517
        if valu is None:
518
            return None
519
        minerfromstore = self.deserialize(MinerSchema(), utils.safestring(valu))
520
        if not minerfromstore.key():
521
            #do not allow entry with no key
522
            return None
523
        minerfromstore.store = 'mem'
524
        return minerfromstore
525
526
    def getknownminer(self, miner: Miner) -> Miner:
527
        '''get a known miner'''
528
        return self.getknownminerbykey(miner.key())
529
530
    def getminerbyname(self, minername):
531
        filtered = [x for x in self.miners() if x.name == minername]
532
        if filtered: return filtered[0]
533
        return None
534
535
    def getknownminerbykey(self, minername):
536
        str_miner = self.__cache.getfromhashset(CacheKeys.knownminers, minername)
537
        if str_miner is None:
538
            return None
539
        return self.deserialize(MinerSchema(), utils.safestring(str_miner))
540
541
    def getknownminerbyname(self, minername):
542
        '''this could be slow if there are lots of miners'''
543
        known = self.knownminers()
544
        for miner in known:
545
            if miner.name == minername:
546
                return miner
547
        return None
548
549
    def tryputcache(self, key, value):
550
        '''put value in cache key'''
551
        if value is None: return
552
        try:
553
            if self.__cache is not None:
554
                self.__cache.set(key, value)
555
        except redis.exceptions.ConnectionError as ex:
556
            self.logexception(ex)
557
558
    def putminerandstats(self, miner: Miner, minerstats, minerpool):
559
        '''put miner and status in cache'''
560
        self.putminer(miner)
561
        schema = MinerStatsSchema()
562
        valstats = schema.dumps(minerstats).data
563
        self.tryputcache(miner.key() + '.stats', valstats)
564
        schema = MinerCurrentPoolSchema()
565
        valpool = schema.dumps(minerpool).data
566
        self.tryputcache(miner.key() + '.pool', valpool)
567
568
    def trygetvaluefromcache(self, key):
569
        '''get value from cache'''
570
        if self.__cache is not None:
571
            try:
572
                return self.__cache.get(key)
573
            except Exception as ex:
574
                self.logexception(ex)
575
        return None
576
577
    def getstats(self, miner: Miner):
578
        '''get stats entity'''
579
        valu = self.trygetvaluefromcache(miner.name + '.stats')
580
        if valu is None: return None
581
        entity = domain.minerstatistics.MinerStatistics(miner, **self.deserialize(MinerStatsSchema(), valu))
582
        return entity
583
584
    def getminerstatscached(self):
585
        '''iterator for cached stats'''
586
        for miner in self.miners():
587
            yield (self.getminer(miner), self.getstats(miner), self.getpool(miner))
588
589
    def messagedecodeminer(self, body) -> Miner:
590
        '''deserialize a miner message'''
591
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
592
        schema = MinerMessageSchema()
593
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
594
        minermessage_entity = schema.make_minermessage(minermessage_dict)
595
        miner = minermessage_entity.miner
596
        return miner
597
598
    def messagedecodeminerstats(self, body):
599
        '''deserialize miner stats'''
600
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
601
        schema = MinerMessageSchema()
602
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
603
        minermessage_entity = schema.make_minermessage(minermessage_dict)
604
        return minermessage_entity
605
606
    def messagedecodeminercommand(self, body):
607
        '''deserialize  miner command'''
608
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
609
        schema = MinerMessageSchema()
610
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
611
        minermessage_entity = schema.make_minermessage(minermessage_dict)
612
        return minermessage_entity
613
614
    def messagedecodesensor(self, body):
615
        '''deserialize sensor value '''
616
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
617
        schema = SensorValueSchema()
618
        #minermessage_dict = schema.load(message_envelope.bodyjson()).data
619
        entity = schema.load(message_envelope.bodyjson()).data
620
        return message_envelope, entity
621
622
    def messagedecode_configuration(self, body):
623
        '''deserialize  configuration command'''
624
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
625
        schema = ConfigurationMessageSchema()
626
        configurationmessage_dict = schema.load(message_envelope.bodyjson()).data
627
        configurationmessage_entity = schema.make_configurationmessage(configurationmessage_dict)
628
        return configurationmessage_entity
629
630
    def createmessageenvelope(self):
631
        '''create message envelope'''
632
        return Message(timestamp=datetime.datetime.utcnow(), sender=self.component)
633
634
    def serializemessageenvelope(self, msg):
635
        '''serialize message envelope'''
636
        return self._schemamsg.dumps(msg).data
637
638
    def jsonserialize(self, sch, msg):
639
        '''serialize a message with schema. returns string'''
640
        smessage = sch.dumps(msg)
641
        #json.dumps(jmessage)
642
        return smessage.data
643
644
    def serialize(self, entity):
645
        '''serialize any entity
646
        only need schema, message class not needed
647
        '''
648
        if isinstance(entity, Miner):
649
            schema = MinerSchema()
650
            return schema.dumps(entity).data
651
652
        if isinstance(entity, Pool):
653
            schema = PoolSchema()
654
            return schema.dumps(entity).data
655
        return None
656
657
    def serializelist(self, listofentities):
658
        '''serialize a list of entities'''
659
        json_list = json.dumps([e.__dict__ for e in listofentities])
660
        return json_list
661
662
    def deserializelistofstrings(self, the_list):
663
        '''deserialize list of strings into list of miners'''
664
        results = []
665
        for item in the_list:
666
            miner = self.deserialize(MinerSchema(), utils.safestring(item))
667
            results.append(miner)
668
        return results
669
670
    def deserializelist_withschema(self, schema, the_list):
671
        '''deserialize list of strings into entities'''
672
        results = []
673
        for item in the_list:
674
            entity = self.deserialize(schema, utils.safestring(item))
675
            #todo:for pools the entry is a list
676
            results.append(entity)
677
        return results
678
679
    def deserialize(self, sch, msg):
680
        '''Output should be entity, not python json object
681
        msg parameter should be string
682
        '''
683
        if msg is None: return None
684
        return sch.loads(msg).data
685
686
    def deserializemessageenvelope(self, body):
687
        '''serialize message envelope'''
688
        return self._schemamsg.load(json.loads(utils.safestring(body))).data
689
690
    def createmessagestats(self, miner, minerstats, minerpool):
691
        #always save the miner so the next guy can get latest changes
692
        #only put in cache if it came from cache
693
        if miner.store == 'mem':
694
            self.putminer(miner)
695
        message = self.createmessageenvelope()
696
        message = message.make_minerstats(miner, minerstats, minerpool)
697
        return self.serializemessageenvelope(message)
698
699
    def createmessagecommand(self, miner, command):
700
        '''create message command'''
701
        if miner.store == 'mem':
702
            self.putminer(miner)
703
        message = self.createmessageenvelope()
704
        message = message.make_minercommand(miner, command)
705
        return self.serializemessageenvelope(message)
706
707
    def messageencode(self, miner: Miner):
708
        '''command is optional, however should convert this call into minercommand'''
709
        #always save the miner so the next guy can get latest changes
710
        if miner.store == 'mem':
711
            self.putminer(miner)
712
        message = self.createmessageenvelope()
713
        message = message.make_minerstats(miner, minerstats=None, minerpool=None)
714
        return self._schemamsg.dumps(message).data
715
716
    def stamp(self, message):
717
        return '{0}:{1}: {2}'.format(self.now(), self.applicationid, message)
718
719
    def alert(self, message):
720
        '''send alert message'''
721
        return self.sendqueueitem(QueueEntry(QueueName.Q_ALERT, self.stamp(message), QueueType.broadcast))
722
723
    def send(self, q_name, message):
724
        '''send message to queue'''
725
        success = self.trypublish(q_name, message)
726
        return success
727
728
    def enqueue(self, queuelist):
729
        '''send a list of queue messages'''
730
        if queuelist is None:
731
            return
732
        if not queuelist.hasentries():
733
            return
734
        #todo: group by queuename
735
        for entry in queuelist.entries:
736
            self.sendqueueitem(entry)
737
738
    def sendqueueitem(self, entry):
739
        '''send one queue item'''
740
        if entry.eventtype == 'broadcast':
741
            send_result = self.trybroadcast(entry.queuename, entry.message)
742
            return send_result
743
        return self.send(entry.queuename, entry.message)
744
745
    def take_picture(self, file_name='fullcycle_camera.png'):
746
        pic = take_picture(file_name,
747
                           self.configuration.get('camera.size'),
748
                           self.configuration.get('camera.quality'),
749
                           self.configuration.get('camera.brightness'),
750
                           self.configuration.get('camera.contrast'))
751
        return pic
752
753
    def store_picture_cache(self, file_name):
754
        if os.path.isfile(file_name):
755
            with open(file_name, 'rb') as photofile:
756
                picdata = photofile.read()
757
            reading = SensorValue('fullcyclecamera', base64.b64encode(picdata), 'camera')
758
            #reading.sensor = self.sensor
759
            #self.sendsensor(reading)
760
            message = self.createmessageenvelope()
761
            sensorjson = message.jsonserialize(SensorValueSchema(), reading)
762
            self.tryputcache(CacheKeys.camera, sensorjson)
763
764
    def readtemperature(self):
765
        try:
766
            sensor_humid, sensor_temp = readtemperature()
767
            if sensor_temp is not None:
768
                reading = SensorValue('fullcycletemp', sensor_temp, 'temperature')
769
                reading.sensor = self.sensor
770
                self.sendsensor(reading)
771
            if sensor_humid is not None:
772
                reading = SensorValue('fullcyclehumid', sensor_humid, 'humidity')
773
                reading.sensor = self.sensor
774
                self.sendsensor(reading)
775
            return sensor_humid, sensor_temp
776
        except BaseException as ex:
777
            self.logexception(ex)
778
        return None, None
779
780
    def sendsensor(self, reading):
781
        message = self.createmessageenvelope()
782
        sensorjson = message.jsonserialize(SensorValueSchema(), reading)
783
        self.sendqueueitem(QueueEntry(QueueName.Q_SENSOR, self.serializemessageenvelope(message.make_any('sensorvalue', sensorjson)), QueueType.broadcast))
784
785
    def sendtelegrammessage(self, message):
786
        if self.configuration.is_enabled('telegram'):
787
            sendalert(message, self.getservice('telegram'))
788
789
    def sendtelegramphoto(self, file_name):
790
        self.telegram.sendtelegramphoto(file_name)
791
        #if os.path.isfile(file_name) and os.path.getsize(file_name) > 0:
792
        #    if self.is_enabled_configuration('telegram'):
793
        #        sendphoto(file_name, self.getservice('telegram'))
794
795
    def getsession(self):
796
        service = self.getservice(ServiceName.database)
797
        engine = create_engine(service.connection, echo=False)
798
        Session = sessionmaker(bind=engine)
799
        return Session()
800
801
    def log_mineractivity(self, minerlog):
802
        try:
803
            session = self.getsession()
804
            session.add(minerlog)
805
            session.commit()
806
            return True
807
        except BaseException as ex:
808
            self.logexception(ex)
809
        return False
810
811
    def save_miner(self, miner: Miner):
812
        found = self.getknownminer(miner)
813
        if found is None:
814
            self.addknownminer(miner)
815
            #miners = MinerRepository()
816
            #todo:add the miner to the json config
817
        else:
818
            found.updatefrom(miner)
819
            self.putminer(found)
820
821
    def save_pool(self, pool: Pool):
822
        sch = PoolSchema()
823
        pools = PoolRepository()
824
        pools.add(pool, self.getconfigfilename('config/pools.conf'), sch)
825
826
        #update the known pools
827
        for known in self.knownpools():
828
            if pool.is_same_as(known):
829
                oldkey = known.key
830
                known.named_pool = pool
831
                #this changes the pool key!
832
                known.user = pool.user
833
                #update the known pool (with new key)
834
                self.update_pool(oldkey, known)
835
836
def main():
837
    full_cycle = ApplicationService()
838
    full_cycle.loginfo('Full Cycle was run in a script')
839
    full_cycle.shutdown()
840
841
if __name__ == "__main__":
842
    main()
843