Passed
Push — master ( 7de6a0...d1a07e )
by Dave
58s
created

backend.fcmapp.ApplicationService.initargs()   A

Complexity

Conditions 5

Size

Total Lines 11
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 10
nop 2
dl 0
loc 11
rs 9.3333
c 0
b 0
f 0
1
'''Application Service layer for Full Cycle Mining
2
Gateway into most of application functionality'''
3
import sys
4
import os
5
import datetime
6
import logging
7
import json
8
import base64
9
import redis
10
import pika
11
from collections import defaultdict
12
from colorama import init, Fore
13
from sqlalchemy.orm import sessionmaker
14
from sqlalchemy import create_engine
15
from domain.mining import Miner, Pool, MinerPool, AvailablePool, MinerCurrentPool, MinerStatistics, MinerStatus
16
from domain.rep import MinerRepository, PoolRepository, LoginRepository, RuleParametersRepository, BaseRepository
17
#from domain.miningrules import RuleParameters
18
from domain.sensors import Sensor, SensorValue
19
from messaging.messages import Message, MessageSchema, MinerMessageSchema, ConfigurationMessageSchema
20
from messaging.sensormessages import SensorValueSchema
21
from messaging.schema import MinerSchema, MinerStatsSchema, MinerCurrentPoolSchema, AvailablePoolSchema, PoolSchema
22
from helpers.queuehelper import QueueName, Queue, QueueEntry, QueueType
23
from helpers.camerahelper import take_picture
24
from helpers.antminerhelper import setminertoprivileged, privileged, setprivileged, setrestricted, waitforonline, restartmining, stopmining, restart, set_frequency
25
from helpers.temperaturehelper import readtemperature
26
from helpers.telegramhelper import sendalert, sendphoto
27
from backend.fcmcache import Cache, CacheKeys
28
from backend.fcmbus import Bus
29
from backend.fcmcomponent import ComponentName
30
from backend.fcmservice import ServiceName, InfrastructureService
31
from backend.fcmminer import Antminer
32
33
class Component(object):
34
    '''A component is a unit of execution of FCM'''
35
    def __init__(self, componentname, option=''):
36
        self.app = ApplicationService(component=componentname, option=option)
37
        #was a queue, now its a channel
38
        self.listeningqueue = None
39
40
    def listen(self):
41
        if self.listeningqueue:
42
            self.app.bus.listen(self.listeningqueue)
43
44
class ApplicationService:
45
    '''Application Services'''
46
    programnamefull = ''
47
    programname = ''
48
    component = ComponentName.fullcycle
49
    loglevel = 0
50
    #true if user passed in -now command line argument
51
    isrunnow = False
52
    #dictionary of queues managed by this app
53
    _queues = {}
54
    _channels = []
55
    #the startup directory
56
    homedirectory = None
57
    __cache = None
58
    __bus = None
59
    __config = {}
60
    __logger = None
61
    __logger_debug = None
62
    __logger_error = None
63
    antminer = None
64
65
    def __init__(self, component=ComponentName.fullcycle, option=None, announceyourself=False):
66
        if self.component == ComponentName.fullcycle:
67
            self.print('Starting FCM Init')
68
        self.component = component
69
        self.initargs(option)
70
        self.startupstuff()
71
        if self.component == ComponentName.fullcycle:
72
            self.print('Starting FCM Configuration')
73
        self.setup_configuration()
74
        self.initlogger()
75
        self.initmessaging()
76
        #this is slow. should be option to opt out of cache?
77
        if self.component == ComponentName.fullcycle:
78
            self.loginfo('Starting FCM Cache')
79
        self.initcache()
80
        self.initbus()
81
        self.init_application()
82
        self.init_sensors()
83
84
        if announceyourself:
85
            self.sendqueueitem(QueueEntry(QueueName.Q_LOG, self.stamp('Started {0}'.format(self.component)), QueueType.broadcast))
86
87
    def initargs(self, option):
88
        '''process command line arguments'''
89
        if sys.argv:
90
            self.programnamefull = sys.argv[0]
91
            self.programname = os.path.basename(self.programnamefull)
92
        firstarg = option
93
        if len(sys.argv) > 1:
94
            firstarg = sys.argv[1]
95
        if firstarg is not None:
96
            if firstarg == '-now':
97
                self.isrunnow = True
98
99
    def startupstuff(self):
100
        '''start up the application'''
101
        self.homedirectory = os.path.dirname(__file__)
102
        #used with colorama on windows
103
        init(autoreset=True)
104
105
    def initmessaging(self):
106
        '''start up messaging'''
107
        self._schemamsg = MessageSchema()
108
109
    def initcache(self):
110
        '''start up cache'''
111
        try:
112
            cachelogin = self.getservice(ServiceName.cache)
113
            self.__cache = Cache(cachelogin)
114
        except Exception as ex:
115
            #cache is offline. try to run in degraded mode
116
            self.logexception(ex)
117
118
    def startup(self):
119
        self.initminercache()
120
        self.initpoolcache()
121
122
    def initbus(self):
123
        '''start up message bus'''
124
        login = self.getservice(ServiceName.messagebus)
125
        self.__bus = Bus(login)
126
127
    @property
128
    def bus(self):
129
        return self.__bus
130
131
    def init_sensors(self):
132
        self.sensor = Sensor('controller', 'DHT22', 'controller')
133
134
    def init_application(self):
135
        self.antminer = Antminer(self.__config, self.sshlogin())
136
137
    @property
138
    def isdebug(self):
139
        return sys.flags.debug
140
141
    def getconfigfilename(self, configfilename):
142
        '''get the contents of a config file'''
143
        return os.path.join(self.homedirectory, configfilename)
144
145
    def setup_configuration(self):
146
        '''configuration is loaded once at startup'''
147
        raw = BaseRepository().readrawfile(self.getconfigfilename('config/fullcycle.conf'))
148
        self.__config = json.loads(raw)
149
        self.applicationid = self.configuration('applicationid')
150
        self.loglevel = self.configuration('loglevel')
151
152
    def configuration(self, key):
153
        if not key in self.__config: return None
154
        return self.__config[key]
155
156
    def is_enabled_configuration(self, key):
157
        lookupkey = '{0}.enabled'.format(key)
158
        if not lookupkey in self.__config:
159
            return False
160
        value = self.__config[lookupkey]
161
        if isinstance(value, str):
162
            return value == 'true' or value == 'True'
163
        return value
164
165
    def initpoolcache(self):
166
        if self.__cache.get(CacheKeys.pools) is None:
167
            spools = PoolRepository().readrawfile(self.getconfigfilename('config/pools.conf'))
168
            self.tryputcache(CacheKeys.pools, spools)
169
        for pool in self.pools():
170
            #pool isinstance of Pool
171
            availablepool = AvailablePool(pool.pool_type, pool, pool.url, pool.user, pool.password, pool.priority)
172
            minerpool = MinerPool(miner=None, priority=0, pool=availablepool)
173
            self.putpool(pool)
174
            self.add_pool(minerpool)
175
176
    def initminercache(self):
177
        '''put known miners into cache'''
178
        if self.__cache.get(CacheKeys.miners) is None:
179
            sminers = MinerRepository().readrawfile(self.getconfigfilename('config/miners.conf'))
180
            self.tryputcache(CacheKeys.miners, sminers)
181
182
        for miner in self.miners():
183
            #status is not persisted yet so init from name
184
            if miner.is_manually_disabled():
185
                miner.status = MinerStatus.Disabled
186
            if self.getminer(miner) is None:
187
                self.putminer(miner)
188
189
    def cacheclear(self):
190
        '''clear the cache'''
191
        self.__cache.purge()
192
193
    def initlogger(self):
194
        '''set up logging application info'''
195
        self.__logger = self.setup_logger('fcmapp', 'fcm.log', logging.INFO)
196
197
        self.__logger_debug = self.setup_logger('fcmdebug', 'fcm.bug', logging.DEBUG)
198
199
        self.__logger_error = self.setup_logger('fcmerror', 'fcm.err', logging.ERROR)
200
201
    def setup_logger(self, logger_name, log_file, level=logging.INFO):
202
        '''start logger'''
203
        logr = logging.getLogger(logger_name)
204
        formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
205
        #by default will append. use mode='w' to overwrite
206
        file_handler = logging.FileHandler(log_file)
207
        file_handler.setFormatter(formatter)
208
        logr.addHandler(file_handler)
209
        # is setting stream necessary
210
        stream_handler = logging.StreamHandler()
211
        stream_handler.setFormatter(formatter)
212
        logr.setLevel(level)
213
        return logr
214
215
    def loginfo(self, message):
216
        '''log informational message'''
217
        logmsg = '{0}: {1}'.format(self.programname, message)
218
        self.__logger.info(logmsg)
219
        print(message)
220
221
    def logerror(self, message):
222
        '''log error'''
223
        logmsg = '{0}: {1}'.format(self.programname, message)
224
        self.__logger_error.error(logmsg)
225
        print(Fore.RED+logmsg)
226
227
    def logdebug(self, message):
228
        '''log debug message'''
229
        if not self.loglevel or self.loglevel == 0:
230
            return
231
        logmsg = '{0}: {1}'.format(self.programname, message)
232
        self.__logger_debug.debug(logmsg)
233
        print(Fore.GREEN+logmsg)
234
235
    def print(self, message):
236
        '''echo message to screen'''
237
        print('{0}: {1}'.format(self.now(), message))
238
239
    def now(self):
240
        '''current time formatted as friendly string'''
241
        return self.formattime(datetime.datetime.now())
242
243
    def formattime(self, time):
244
        '''standard format for time'''
245
        return time.strftime('%Y-%m-%d %H:%M:%S')
246
247
    #region lookups
248
    #todo: move to configurations section
249
    def miners(self):
250
        '''configured miners'''
251
        miners = MinerRepository().readminers(self.getconfigfilename('config/miners.conf'))
252
        return miners
253
254
    def knownminers(self):
255
        '''for now just return a list of miners
256
        later should consider returning a list that is easily searched and filtered
257
        '''
258
        dknownminers = self.__cache.gethashset(CacheKeys.knownminers)
259
        if dknownminers is not None and dknownminers:
260
            #get list of miners from cache
261
            return self.deserializelistofstrings(list(dknownminers.values()))
262
        knownminers = self.miners()
263
        return knownminers
264
265
    def allminers(self):
266
        '''combined list of discovered miners and configured miners'''
267
        allminers = self.knownminers()
268
        for miner in self.miners():
269
            foundminer = [x for x in allminers if x.key() == miner.key()]
270
            if not foundminer:
271
                allminers.append(miner)
272
        return allminers
273
274
    def knownsensors(self):
275
        dknownsensors = self.__cache.gethashset(CacheKeys.knownsensors)
276
        if dknownsensors is not None and dknownsensors:
277
            return self.deserializelist_withschema(SensorValueSchema(), list(dknownsensors.values()))
278
        return None
279
280
    def knownpools(self):
281
        dknownpools = self.__cache.gethashset(CacheKeys.knownpools)
282
        if dknownpools:
283
            return self.deserializelist_withschema(AvailablePoolSchema(), list(dknownpools.values()))
284
        return None
285
286
    def addknownsensor(self, sensorvalue):
287
        val = self.jsonserialize(SensorValueSchema(), sensorvalue)
288
        self.__cache.putinhashset(CacheKeys.knownsensors, sensorvalue.sensorid, val)
289
290
    def minersummary(self, max_number=10):
291
        '''show a summary of known miners
292
        '''
293
        mode = self.configuration('summary')
294
        if not mode: mode = 'auto'
295
        knownminers = self.knownminers()
296
        if len(knownminers) <= max_number:
297
            return '\n'.join([m.summary() for m in knownminers])
298
        groupbystatus = defaultdict(list)
299
        for miner in knownminers:
300
            groupbystatus[miner.status].append(miner)
301
        return '\n'.join(['{0}: {1}'.format(s, self.summary_by_status(s, groupbystatus[s])) for s in groupbystatus])
302
303
    def summary_by_status(self, key, minerlist):
304
        if key == 'online':
305
            return '{0} miners hash {1}'.format(self.summarize_count(minerlist), self.summarize_hash(minerlist))
306
        return self.summarize_count(minerlist)
307
308
    def summarize_count(self, minerlist):
309
        return len(minerlist)
310
311
    def summarize_hash(self, minerlist):
312
        return sum(miner.minerstats.currenthash for miner in minerlist)
313
314
    def addknownminer(self, miner):
315
        '''add miner to known miners list'''
316
        val = self.serialize(miner)
317
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
318
319
    def updateknownminer(self, miner):
320
        '''update known miner in cache'''
321
        sminer = self.__cache.getfromhashset(CacheKeys.knownminers, miner.key())
322
        memminer = self.deserialize(MinerSchema(), self.safestring(sminer))
323
        if memminer is None:
324
            memminer = miner
325
        else:
326
            #merge new values
327
            memminer.updatefrom(miner)
328
        val = self.serialize(memminer)
329
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
330
331
    #region Pools
332
    def pools(self):
333
        '''configured pools'''
334
        pools = PoolRepository().readpools(self.getconfigfilename('config/pools.conf'))
335
        return pools
336
337
    def findpool(self, minerpool):
338
        '''find a pool in list'''
339
        if minerpool is None: return None
340
        for pool in self.pools():
341
            if minerpool.currentpool == pool.url and minerpool.currentworker.startswith(pool.user):
342
                return pool
343
        return None
344
345
    def getpool(self, miner: Miner):
346
        '''get pool from cache'''
347
        valu = self.trygetvaluefromcache(miner.name + '.pool')
348
        if valu is None: return None
349
        entity = MinerCurrentPool(miner, **self.deserialize(MinerCurrentPoolSchema(), valu))
350
        return entity
351
352
    def add_pool(self, minerpool: MinerPool):
353
        '''see if pool is known or not, then add it'''
354
        knownpool = self.__cache.getfromhashset(CacheKeys.knownpools, minerpool.pool.key)
355
        if not knownpool:
356
            val = self.jsonserialize(AvailablePoolSchema(), minerpool.pool)
357
            self.__cache.putinhashset(CacheKeys.knownpools, minerpool.pool.key, val)
358
359
    def update_pool(self, key, pool: AvailablePool):
360
        self.__cache.hdel(CacheKeys.knownpools, key)
361
        knownpool = self.__cache.getfromhashset(CacheKeys.knownpools, pool.key)
362
        if not knownpool:
363
            val = self.jsonserialize(AvailablePoolSchema(), pool)
364
            self.__cache.putinhashset(CacheKeys.knownpools, pool.key, val)
365
    #endregion
366
367
    def sshlogin(self):
368
        '''return contents of login file'''
369
        return self.readlogin('ftp.conf')
370
371
    def readlogin(self, filename):
372
        '''read login file configuration'''
373
        login = LoginRepository().readlogins(self.getconfigfilename('config/'+filename))
374
        return login
375
376
    def ruleparameters(self):
377
        '''rules parameters'''
378
        return RuleParametersRepository().readrules(self.getconfigfilename('config/'+'rules.conf'))
379
380
    def getservice(self, servicename):
381
        '''get a service by name. should be repository'''
382
        file_name = self.getconfigfilename('config/services.conf')
383
        with open(file_name, encoding='utf-8-sig') as config_file:
384
            content = json.loads(config_file.read())
385
        services = [InfrastructureService(**s) for s in content]
386
        return next((s for s in services if s.name == servicename), None)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable s does not seem to be defined.
Loading history...
387
388
    def getservice_useroverride(self, servicename):
389
        service = self.getservice(servicename)
390
        service.user = self.component
391
        return service
392
    #endregion lookups
393
394
    def listen(self, qlisten):
395
        """Goes into listening mode on a queue"""
396
        try:
397
            self.bus.listen(qlisten)
398
        except KeyboardInterrupt:
399
            self.shutdown()
400
        except BaseException as unhandled:
401
            self.unhandledexception(unhandled)
402
403
    def registerqueue(self, qregister: Queue):
404
        '''register a queue'''
405
        self.logdebug(self.stamp('Registered queue {0}'.format(qregister.queue_name)))
406
        if qregister.queue_name not in self._queues.keys():
407
            self._queues[qregister.queue_name] = qregister
408
409
    def shutdown(self, exitstatus=0):
410
        '''shut down app services'''
411
        self.loginfo('Shutting down fcm app...')
412
        self.close_channels()
413
        self.closequeues()
414
        if self.__bus:
415
            self.bus.close()
416
        if self.__cache is not None:
417
            self.__cache.close()
418
        sys.exit(exitstatus)
419
420
    def closequeue(self, thequeue):
421
        '''close the queue'''
422
        if not thequeue: return
423
        try:
424
            if thequeue is not None:
425
                self.logdebug(self.stamp('closing queue {0}'.format(thequeue.queue_name)))
426
                thequeue.close()
427
            del self._queues[thequeue.queue_name]
428
        except Exception as ex:
429
            self.logexception(ex)
430
431
    def closequeues(self):
432
        '''close a bunch of queues'''
433
        for k in list(self._queues):
434
            self.closequeue(self._queues[k])
435
436
    def close_channel(self, chan):
437
        if not chan: return
438
        try:
439
            if chan.name in self._channels:
440
                self.logdebug(self.stamp('closing channel {0}'.format(chan.name)))
441
                chan.close()
442
                del self._channels[chan.name]
443
        except Exception as ex:
444
            self.logexception(ex)
445
446
    def close_channels(self):
447
        '''close all channels'''
448
        for chan in list(self._channels):
449
            self.close_channel(self._channels[chan])
450
451
    def unhandledexception(self, unhandled):
452
        '''what to do when there is an exception that app cannot handle'''
453
        self.logexception(unhandled)
454
455
    def exceptionmessage(self, ex):
456
        '''gets exception message even when it doesnt have one'''
457
        exc_type, _, exc_tb = sys.exc_info()
458
        exmsg = getattr(ex, 'message', repr(ex))
459
        return '{0}:{1}:{2}'.format(exc_type, exc_tb.tb_lineno, exmsg)
460
461
    def logexception(self, ex):
462
        '''log an exception'''
463
        self.logerror(self.exceptionmessage(ex))
464
465
    def sendlog(self, logmessage):
466
        '''send message to log queue'''
467
        item = QueueEntry(QueueName.Q_LOG, logmessage, 'broadcast')
468
        self.sendqueueitem(item)
469
        print(logmessage)
470
471
    def subscribe(self, name, callback, no_acknowledge=True, prefetch=1):
472
        '''subscribe to a queue'''
473
        chan = self.bus.subscribe(name, callback, no_acknowledge=no_acknowledge, prefetch_count=prefetch)
474
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(name))
475
        return chan
476
477
    def listen_to_broadcast(self, broadcast_name, callback, no_acknowledge=True):
478
        thebroadcast = self.bus.subscribe_broadcast(broadcast_name, callback, no_acknowledge)
479
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(broadcast_name))
480
        self.bus.listen(thebroadcast)
481
        #never returns becuase listen is blocking
482
        return thebroadcast
483
484
    def trypublish(self, queue_name, msg: str):
485
        '''publish a message to the queue'''
486
        try:
487
            self.bus.publish(queue_name, msg)
488
            return True
489
        except pika.exceptions.ConnectionClosed as ex:
490
            logmessage = 'Error publishing to {0} {1}'.format(queue_name, self.exceptionmessage(ex))
491
            self.logerror(logmessage)
492
            return False
493
494
    def trybroadcast(self, exchange_name, msg):
495
        '''broadcast a message to all queue listeners'''
496
        try:
497
            self.bus.broadcast(exchange_name, msg)
498
            return True
499
        except pika.exceptions.ConnectionClosed as conxex:
500
            self.logerror('Error broadcasting to {0} {1}'.format(exchange_name, self.exceptionmessage(conxex)))
501
            return False
502
503
    def queuestatus(self):
504
        """TODO:print queue status"""
505
        pass
506
        #for q in self._queues.values():
507
        #    print(q.queue_name,str(q._connection))
508
509
    def putpool(self, pool: Pool):
510
        '''put pool in cache'''
511
        if pool and pool.name:
512
            valu = self.serialize(pool)
513
            self.tryputcache('pool.{0}'.format(pool.name), valu)
514
515
    def putminer(self, miner: Miner):
516
        '''put miner in cache'''
517
        if miner and miner.key():
518
            valu = self.serialize(miner)
519
            self.tryputcache('miner.{0}'.format(miner.key()), valu)
520
521
    def getminer(self, miner: Miner) -> Miner:
522
        '''strategies for getting miner from cache
523
        originally was key=miner.name but that was not good
524
        changed to key='miner.'+minerid
525
        '''
526
        valu = self.trygetvaluefromcache('miner.{0}'.format(miner.key()))
527
        if valu is None:
528
            return None
529
        minerfromstore = self.deserialize(MinerSchema(), self.safestring(valu))
530
        if not minerfromstore.key():
531
            #do not allow entry with no key
532
            return None
533
        minerfromstore.store = 'mem'
534
        return minerfromstore
535
536
    def getknownminer(self, miner: Miner) -> Miner:
537
        '''get a known miner'''
538
        return self.getknownminerbykey(miner.key())
539
540
    def getminerbyname(self, minername):
541
        filtered = [x for x in self.miners() if x.name == minername]
542
        if filtered: return filtered[0]
543
        return None
544
545
    def getknownminerbykey(self, minername):
546
        str_miner = self.__cache.getfromhashset(CacheKeys.knownminers, minername)
547
        if str_miner is None:
548
            return None
549
        return self.deserialize(MinerSchema(), self.safestring(str_miner))
550
551
    def getknownminerbyname(self, minername):
552
        '''this could be slow if there are lots of miners'''
553
        known = self.knownminers()
554
        for miner in known:
555
            if miner.name == minername:
556
                return miner
557
        return None
558
559
    def tryputcache(self, key, value):
560
        '''put value in cache key'''
561
        if value is None: return
562
        try:
563
            if self.__cache is not None:
564
                self.__cache.set(key, value)
565
        except redis.exceptions.ConnectionError as ex:
566
            self.logexception(ex)
567
568
    def putminerandstats(self, miner: Miner, minerstats, minerpool):
569
        '''put miner and status in cache'''
570
        self.putminer(miner)
571
        schema = MinerStatsSchema()
572
        valstats = schema.dumps(minerstats).data
573
        self.tryputcache(miner.key() + '.stats', valstats)
574
        schema = MinerCurrentPoolSchema()
575
        valpool = schema.dumps(minerpool).data
576
        self.tryputcache(miner.key() + '.pool', valpool)
577
578
    def trygetvaluefromcache(self, key):
579
        '''get value from cache'''
580
        if self.__cache is not None:
581
            try:
582
                return self.__cache.get(key)
583
            except Exception as ex:
584
                self.logexception(ex)
585
        return None
586
587
    def getstats(self, miner: Miner):
588
        '''get stats entity'''
589
        valu = self.trygetvaluefromcache(miner.name + '.stats')
590
        if valu is None: return None
591
        entity = MinerStatistics(miner, **self.deserialize(MinerStatsSchema(), valu))
592
        return entity
593
594
    def safestring(self, thestring):
595
        '''safely convert anything into string'''
596
        if thestring is None: return None
597
        if isinstance(thestring, str): return thestring
598
        return str(thestring, "utf-8")
599
600
    def getminerstatscached(self):
601
        '''iterator for cached stats'''
602
        for miner in self.miners():
603
            yield (self.getminer(miner), self.getstats(miner), self.getpool(miner))
604
605
    def messagedecodeminer(self, body) -> Miner:
606
        '''deserialize a miner message'''
607
        message_envelope = self.deserializemessageenvelope(self.safestring(body))
608
        schema = MinerMessageSchema()
609
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
610
        minermessage_entity = schema.make_minermessage(minermessage_dict)
611
        miner = minermessage_entity.miner
612
        return miner
613
614
    def messagedecodeminerstats(self, body):
615
        '''deserialize miner stats'''
616
        message_envelope = self.deserializemessageenvelope(self.safestring(body))
617
        schema = MinerMessageSchema()
618
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
619
        minermessage_entity = schema.make_minermessage(minermessage_dict)
620
        return minermessage_entity
621
622
    def messagedecodeminercommand(self, body):
623
        '''deserialize  miner command'''
624
        message_envelope = self.deserializemessageenvelope(self.safestring(body))
625
        schema = MinerMessageSchema()
626
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
627
        minermessage_entity = schema.make_minermessage(minermessage_dict)
628
        return minermessage_entity
629
630
    def messagedecodesensor(self, body):
631
        '''deserialize sensor value '''
632
        message_envelope = self.deserializemessageenvelope(self.safestring(body))
633
        schema = SensorValueSchema()
634
        #minermessage_dict = schema.load(message_envelope.bodyjson()).data
635
        entity = schema.load(message_envelope.bodyjson()).data
636
        return message_envelope, entity
637
638
    def messagedecode_configuration(self, body):
639
        '''deserialize  configuration command'''
640
        message_envelope = self.deserializemessageenvelope(self.safestring(body))
641
        schema = ConfigurationMessageSchema()
642
        configurationmessage_dict = schema.load(message_envelope.bodyjson()).data
643
        configurationmessage_entity = schema.make_configurationmessage(configurationmessage_dict)
644
        return configurationmessage_entity
645
646
    def createmessageenvelope(self):
647
        '''create message envelope'''
648
        return Message(timestamp=datetime.datetime.utcnow(), sender=self.component)
649
650
    def serializemessageenvelope(self, msg):
651
        '''serialize message envelope'''
652
        return self._schemamsg.dumps(msg).data
653
654
    def jsonserialize(self, sch, msg):
655
        '''serialize a message with schema. returns string'''
656
        smessage = sch.dumps(msg)
657
        #json.dumps(jmessage)
658
        return smessage.data
659
660
    def serialize(self, entity):
661
        '''serialize any entity
662
        only need schema, message class not needed
663
        '''
664
        if isinstance(entity, Miner):
665
            schema = MinerSchema()
666
            return schema.dumps(entity).data
667
668
        if isinstance(entity, Pool):
669
            schema = PoolSchema()
670
            return schema.dumps(entity).data
671
        return None
672
673
    def serializelist(self, listofentities):
674
        '''serialize a list of entities'''
675
        json_list = json.dumps([e.__dict__ for e in listofentities])
676
        return json_list
677
678
    def deserializelistofstrings(self, the_list):
679
        '''deserialize list of strings into list of miners'''
680
        results = []
681
        for item in the_list:
682
            miner = self.deserialize(MinerSchema(), self.safestring(item))
683
            results.append(miner)
684
        return results
685
686
    def deserializelist_withschema(self, schema, the_list):
687
        '''deserialize list of strings into entities'''
688
        results = []
689
        for item in the_list:
690
            entity = self.deserialize(schema, self.safestring(item))
691
            #todo:for pools the entry is a list
692
            results.append(entity)
693
        return results
694
695
    def deserialize(self, sch, msg):
696
        '''Output should be entity, not python json object
697
        msg parameter should be string
698
        '''
699
        if msg is None: return None
700
        return sch.loads(msg).data
701
702
    def deserializemessageenvelope(self, body):
703
        '''serialize message envelope'''
704
        return self._schemamsg.load(json.loads(self.safestring(body))).data
705
706
    def createmessagestats(self, miner, minerstats, minerpool):
707
        #always save the miner so the next guy can get latest changes
708
        #only put in cache if it came from cache
709
        if miner.store == 'mem':
710
            self.putminer(miner)
711
        message = self.createmessageenvelope()
712
        message = message.make_minerstats(miner, minerstats, minerpool)
713
        return self.serializemessageenvelope(message)
714
715
    def createmessagecommand(self, miner, command):
716
        '''create message command'''
717
        if miner.store == 'mem':
718
            self.putminer(miner)
719
        message = self.createmessageenvelope()
720
        message = message.make_minercommand(miner, command)
721
        return self.serializemessageenvelope(message)
722
723
    def messageencode(self, miner: Miner):
724
        '''command is optional, however should convert this call into minercommand'''
725
        #always save the miner so the next guy can get latest changes
726
        if miner.store == 'mem':
727
            self.putminer(miner)
728
        message = self.createmessageenvelope()
729
        message = message.make_minerstats(miner, minerstats=None, minerpool=None)
730
        return self._schemamsg.dumps(message).data
731
732
    def stamp(self, message):
733
        return '{0}:{1}: {2}'.format(self.now(), self.applicationid, message)
734
735
    def alert(self, message):
736
        '''send alert message'''
737
        return self.sendqueueitem(QueueEntry(QueueName.Q_ALERT, self.stamp(message), QueueType.broadcast))
738
739
    def send(self, q_name, message):
740
        '''send message to queue'''
741
        success = self.trypublish(q_name, message)
742
        return success
743
744
    def enqueue(self, queuelist):
745
        '''send a list of queue messages'''
746
        if queuelist is None:
747
            return
748
        if not queuelist.hasentries():
749
            return
750
        #todo: group by queuename
751
        for entry in queuelist.entries:
752
            self.sendqueueitem(entry)
753
754
    def sendqueueitem(self, entry):
755
        '''send one queue item'''
756
        if entry.eventtype == 'broadcast':
757
            send_result = self.trybroadcast(entry.queuename, entry.message)
758
            return send_result
759
        return self.send(entry.queuename, entry.message)
760
761
    def take_picture(self, file_name='fullcycle_camera.png'):
762
        pic = take_picture(file_name,
763
                           self.configuration('camera.size'),
764
                           self.configuration('camera.quality'),
765
                           self.configuration('camera.brightness'),
766
                           self.configuration('camera.contrast'))
767
        return pic
768
769
    def store_picture_cache(self, file_name):
770
        if os.path.isfile(file_name):
771
            with open(file_name, 'rb') as photofile:
772
                picdata = photofile.read()
773
            reading = SensorValue('fullcyclecamera', base64.b64encode(picdata), 'camera')
774
            #reading.sensor = self.sensor
775
            #self.sendsensor(reading)
776
            message = self.createmessageenvelope()
777
            sensorjson = message.jsonserialize(SensorValueSchema(), reading)
778
            self.tryputcache(CacheKeys.camera, sensorjson)
779
780
    def readtemperature(self):
781
        try:
782
            sensor_humid, sensor_temp = readtemperature()
783
            if sensor_temp is not None:
784
                reading = SensorValue('fullcycletemp', sensor_temp, 'temperature')
785
                reading.sensor = self.sensor
786
                self.sendsensor(reading)
787
            if sensor_humid is not None:
788
                reading = SensorValue('fullcyclehumid', sensor_humid, 'humidity')
789
                reading.sensor = self.sensor
790
                self.sendsensor(reading)
791
            return sensor_humid, sensor_temp
792
        except BaseException as ex:
793
            self.logexception(ex)
794
        return None, None
795
796
    def sendsensor(self, reading):
797
        message = self.createmessageenvelope()
798
        sensorjson = message.jsonserialize(SensorValueSchema(), reading)
799
        self.sendqueueitem(QueueEntry(QueueName.Q_SENSOR, self.serializemessageenvelope(message.make_any('sensorvalue', sensorjson)), QueueType.broadcast))
800
801
    def sendtelegrammessage(self, message):
802
        if self.is_enabled_configuration('telegram'):
803
            sendalert(message, self.getservice('telegram'))
804
805
    def sendtelegramphoto(self, file_name):
806
        if os.path.isfile(file_name) and os.path.getsize(file_name) > 0:
807
            if self.is_enabled_configuration('telegram'):
808
                sendphoto(file_name, self.getservice('telegram'))
809
810
    def getsession(self):
811
        service = self.getservice(ServiceName.database)
812
        engine = create_engine(service.connection, echo=False)
813
        Session = sessionmaker(bind=engine)
814
        return Session()
815
816
    def log_mineractivity(self, minerlog):
817
        try:
818
            session = self.getsession()
819
            session.add(minerlog)
820
            session.commit()
821
            return True
822
        except BaseException as ex:
823
            self.logexception(ex)
824
        return False
825
826
    def save_miner(self, miner: Miner):
827
        found = self.getknownminer(miner)
828
        if found is None:
829
            self.addknownminer(miner)
830
            #miners = MinerRepository()
831
            #todo:add the miner to the json config
832
        else:
833
            found.updatefrom(miner)
834
            self.putminer(found)
835
836
    def save_pool(self, pool: Pool):
837
        sch = PoolSchema()
838
        pools = PoolRepository()
839
        pools.add(pool, self.getconfigfilename('config/pools.conf'), sch)
840
841
        #update the known pools
842
        for known in self.knownpools():
843
            if pool.is_same_as(known):
844
                oldkey = known.key
845
                known.named_pool = pool
846
                #this changes the pool key!
847
                known.user = pool.user
848
                #update the known pool (with new key)
849
                self.update_pool(oldkey, known)
850
851
def main():
852
    full_cycle = ApplicationService()
853
    full_cycle.loginfo('Full Cycle was run in a script')
854
    full_cycle.shutdown()
855
856
if __name__ == "__main__":
857
    main()
858