Passed
Push — master ( 157197...45eeb8 )
by Dave
01:00
created

backend.fcmapp.ApplicationService.addknownsensor()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
'''Application Service layer for Full Cycle Mining
2
Gateway into most of application functionality'''
3
import sys
4
import os
5
import datetime
6
import logging
7
import json
8
import base64
9
from collections import defaultdict
10
import pika
11
from colorama import init, Fore
12
from sqlalchemy.orm import sessionmaker
13
from sqlalchemy import create_engine
14
import domain.minerstatistics
15
import domain.minerpool
16
from domain.mining import Miner, AvailablePool, MinerStatus
17
from domain.rep import MinerRepository, PoolRepository, LoginRepository, RuleParametersRepository, BaseRepository
18
#from domain.miningrules import RuleParameters
19
from messaging.messages import Message, MessageSchema, MinerMessageSchema, ConfigurationMessageSchema
20
from messaging.sensormessages import SensorValueSchema
21
from messaging.schema import MinerSchema, MinerStatsSchema, MinerCurrentPoolSchema
22
from helpers.queuehelper import QueueName, Queue, QueueEntry, QueueType
23
from helpers.temperaturehelper import readtemperature
24
import backend.fcmutils as utils
25
#import backend.fcmcamera
26
from backend.fcmcamera import CameraService
27
from backend.fcmcache import Cache, CacheKeys
28
from backend.fcmbus import Bus
29
from backend.fcmcomponent import ComponentName
30
from backend.fcmservice import BaseService, PoolService, ServiceName, InfrastructureService, Configuration, Telegram
31
from backend.fcmminer import Antminer
32
from backend.fcmsensors import SensorService
33
34
class Component(object):
35
    '''A component is a unit of execution of FCM'''
36
    def __init__(self, componentname, option=''):
37
        self.app = ApplicationService(component=componentname, option=option)
38
        #was a queue, now its a channel
39
        self.listeningqueue = None
40
41
    def listen(self):
42
        if self.listeningqueue:
43
            self.app.bus.listen(self.listeningqueue)
44
45
class ApplicationService(BaseService):
46
    '''Application Services'''
47
    programnamefull = ''
48
    programname = ''
49
    component = ComponentName.fullcycle
50
    loglevel = 0
51
    #true if user passed in -now command line argument
52
    isrunnow = False
53
    #dictionary of queues managed by this app
54
    _queues = {}
55
    _channels = []
56
    #the startup directory
57
    __logger = None
58
    __logger_debug = None
59
    __logger_error = None
60
61
    def __init__(self, component=ComponentName.fullcycle, option=None, announceyourself=False):
62
        #TODO: call the one in parent before this
63
        self.homedirectory = os.path.dirname(__file__)
64
        self.initcache()
65
        self.setup_configuration()
66
        super().__init__(self.configuration, self.__cache)
67
        #for some reason super fails
68
69
        self.component = component
70
        if self.component == ComponentName.fullcycle:
71
            self.print('Starting FCM Init')
72
        self.initargs(option)
73
        self.startupstuff()
74
        if self.component == ComponentName.fullcycle:
75
            self.print('Starting FCM Configuration')
76
        self.initlogger()
77
        self.initmessaging()
78
        #this is slow. should be option to opt out of cache?
79
        if self.component == ComponentName.fullcycle:
80
            self.loginfo('Starting FCM Cache')
81
        self.initbus()
82
        self.init_application()
83
        self.init_sensors()
84
85
        if announceyourself:
86
            self.sendqueueitem(QueueEntry(QueueName.Q_LOG, self.stamp('Started {0}'.format(self.component)), QueueType.broadcast))
87
88
    def initargs(self, option):
89
        '''process command line arguments'''
90
        if sys.argv:
91
            self.programnamefull = sys.argv[0]
92
            self.programname = os.path.basename(self.programnamefull)
93
        firstarg = option
94
        if len(sys.argv) > 1:
95
            firstarg = sys.argv[1]
96
        if firstarg is not None:
97
            if firstarg == '-now':
98
                self.isrunnow = True
99
100
    def startupstuff(self):
101
        #used with colorama on windows
102
        init(autoreset=True)
103
104
    def initcache(self):
105
        '''start up cache'''
106
        try:
107
            cachelogin = self.getservice(ServiceName.cache)
108
            self.__cache = Cache(cachelogin)
109
        except Exception as ex:
110
            #cache is offline. try to run in degraded mode
111
            self.logexception(ex)
112
113
    def startup(self):
114
        self.initminercache()
115
        self.initpoolcache()
116
117
    def initbus(self):
118
        '''start up message bus'''
119
        login = self.getservice(ServiceName.messagebus)
120
        self.__bus = Bus(login)
121
122
    def init_sensors(self):
123
        self.sensors = SensorService(self.configuration, self.__cache)
124
125
    def init_application(self):
126
        self.antminer = Antminer(self.configuration, self.sshlogin())
127
        self.telegram = Telegram(self.configuration, self.getservice(ServiceName.telegram))
128
        self.pools = PoolService(self.configuration, self.__cache)
129
        self.camera = CameraService(self.configuration, self.__cache)
130
131
    @property
132
    def bus(self):
133
        return self.__bus
134
135
    #@property
136
    #def cache(self):
137
    #    return self.__cache
138
139
    @property
140
    def isdebug(self):
141
        return sys.flags.debug
142
143
    def setup_configuration(self):
144
        '''configuration is loaded once at startup'''
145
        raw = BaseRepository().readrawfile(self.getconfigfilename('config/fullcycle.conf'))
146
        config = json.loads(raw)
147
148
        self.configuration = Configuration(config)
149
        self.applicationid = self.configuration.get('applicationid')
150
        self.loglevel = self.configuration.get('loglevel')
151
152
    def initpoolcache(self):
153
        if self.__cache.get(CacheKeys.pools) is None:
154
            spools = PoolRepository().readrawfile(self.getconfigfilename('config/pools.conf'))
155
            self.__cache.tryputcache(CacheKeys.pools, spools)
156
        for pool in self.pools.get_all_pools():
157
            #pool isinstance of Pool
158
            availablepool = AvailablePool(pool.pool_type, pool, pool.url, pool.user, pool.password, pool.priority)
159
            minerpool = domain.minerpool.MinerPool(miner=None, priority=0, pool=availablepool)
160
            self.pools.putpool(pool)
161
            self.pools.add_pool(minerpool)
162
163
    def initminercache(self):
164
        '''put known miners into cache'''
165
        if self.__cache.get(CacheKeys.miners) is None:
166
            sminers = MinerRepository().readrawfile(self.getconfigfilename('config/miners.conf'))
167
            self.__cache.tryputcache(CacheKeys.miners, sminers)
168
169
        for miner in self.miners():
170
            #status is not persisted yet so init from name
171
            if miner.is_manually_disabled():
172
                miner.status = MinerStatus.Disabled
173
            if self.getminer(miner) is None:
174
                self.putminer(miner)
175
176
    def initlogger(self):
177
        '''set up logging application info'''
178
        self.__logger = self.setup_logger('fcmapp', 'fcm.log', logging.INFO)
179
180
        self.__logger_debug = self.setup_logger('fcmdebug', 'fcm.bug', logging.DEBUG)
181
182
        self.__logger_error = self.setup_logger('fcmerror', 'fcm.err', logging.ERROR)
183
184
    def setup_logger(self, logger_name, log_file, level=logging.INFO):
185
        '''start logger'''
186
        logr = logging.getLogger(logger_name)
187
        formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
188
        #by default will append. use mode='w' to overwrite
189
        file_handler = logging.FileHandler(log_file)
190
        file_handler.setFormatter(formatter)
191
        logr.addHandler(file_handler)
192
        # is setting stream necessary
193
        stream_handler = logging.StreamHandler()
194
        stream_handler.setFormatter(formatter)
195
        logr.setLevel(level)
196
        return logr
197
198
    def loginfo(self, message):
199
        '''log informational message'''
200
        logmsg = '{0}: {1}'.format(self.programname, message)
201
        self.__logger.info(logmsg)
202
        print(message)
203
204
    def logerror(self, message):
205
        '''log error'''
206
        logmsg = '{0}: {1}'.format(self.programname, message)
207
        self.__logger_error.error(logmsg)
208
        print(Fore.RED+logmsg)
209
210
    def logdebug(self, message):
211
        '''log debug message'''
212
        if not self.loglevel or self.loglevel == 0:
213
            return
214
        logmsg = '{0}: {1}'.format(self.programname, message)
215
        self.__logger_debug.debug(logmsg)
216
        print(Fore.GREEN+logmsg)
217
218
    def print(self, message):
219
        '''echo message to screen'''
220
        print('{0}: {1}'.format(self.now(), message))
221
222
    def now(self):
223
        '''current time formatted as friendly string'''
224
        return utils.formattime(datetime.datetime.now())
225
226
    #region lookups
227
    #todo: move to configurations section
228
    def miners(self):
229
        '''configured miners'''
230
        miners = MinerRepository().readminers(self.getconfigfilename('config/miners.conf'))
231
        return miners
232
233
    def knownminers(self):
234
        '''for now just return a list of miners
235
        later should consider returning a list that is easily searched and filtered
236
        '''
237
        dknownminers = self.__cache.gethashset(CacheKeys.knownminers)
238
        if dknownminers is not None and dknownminers:
239
            #get list of miners from cache
240
            return utils.deserializelistofstrings(list(dknownminers.values()), MinerSchema())
241
        knownminers = self.miners()
242
        return knownminers
243
244
    def allminers(self):
245
        '''combined list of discovered miners and configured miners'''
246
        allminers = self.knownminers()
247
        for miner in self.miners():
248
            foundminer = [x for x in allminers if x.key() == miner.key()]
249
            if not foundminer:
250
                allminers.append(miner)
251
        return allminers
252
253
    def minersummary(self, max_number=10):
254
        '''show a summary of known miners
255
        '''
256
        mode = self.configuration.get('summary')
257
        if not mode:
258
            mode = 'auto'
259
        knownminers = self.knownminers()
260
        if len(knownminers) <= max_number:
261
            return '\n'.join([m.summary() for m in knownminers])
262
        groupbystatus = defaultdict(list)
263
        for miner in knownminers:
264
            groupbystatus[miner.status].append(miner)
265
        return '\n'.join(['{0}: {1}'.format(s, self.summary_by_status(s, groupbystatus[s])) for s in groupbystatus])
266
267
    def summary_by_status(self, key, minerlist):
268
        if key == 'online':
269
            return '{0} miners hash {1}'.format(self.summarize_count(minerlist), self.summarize_hash(minerlist))
270
        return self.summarize_count(minerlist)
271
272
    def summarize_count(self, minerlist):
273
        return len(minerlist)
274
275
    def summarize_hash(self, minerlist):
276
        return sum(miner.minerstats.currenthash for miner in minerlist)
277
278
    def addknownminer(self, miner):
279
        '''add miner to known miners list'''
280
        val = self.serialize(miner)
281
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
282
283
    def updateknownminer(self, miner):
284
        '''update known miner in cache'''
285
        sminer = self.__cache.getfromhashset(CacheKeys.knownminers, miner.key())
286
        memminer = utils.deserialize(MinerSchema(), utils.safestring(sminer))
287
        if memminer is None:
288
            memminer = miner
289
        else:
290
            #merge new values
291
            memminer.updatefrom(miner)
292
        val = self.serialize(memminer)
293
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
294
295
    def sshlogin(self):
296
        '''return contents of login file'''
297
        return self.readlogin('ftp.conf')
298
299
    def readlogin(self, filename):
300
        '''read login file configuration'''
301
        login = LoginRepository().readlogins(self.getconfigfilename('config/'+filename))
302
        return login
303
304
    def ruleparameters(self):
305
        '''rules parameters'''
306
        return RuleParametersRepository().readrules(self.getconfigfilename('config/'+'rules.conf'))
307
308
    def getservice(self, servicename):
309
        '''get a service by name. should be repository'''
310
        file_name = self.getconfigfilename('config/services.conf')
311
        with open(file_name, encoding='utf-8-sig') as config_file:
312
            content = json.loads(config_file.read())
313
        svc = None #dummy initializer to make scrutinize happy
314
        services = [InfrastructureService(**s) for s in content]
315
        return next((svc for svc in services if svc.name == servicename), None)
316
317
    def getservice_useroverride(self, servicename):
318
        service = self.getservice(servicename)
319
        service.user = self.component
320
        return service
321
    #endregion lookups
322
323
    def listen(self, qlisten):
324
        """Goes into listening mode on a queue"""
325
        try:
326
            self.bus.listen(qlisten)
327
        except KeyboardInterrupt:
328
            self.shutdown()
329
        except BaseException as unhandled:
330
            self.unhandledexception(unhandled)
331
332
    def registerqueue(self, qregister: Queue):
333
        '''register a queue'''
334
        self.logdebug(self.stamp('Registered queue {0}'.format(qregister.queue_name)))
335
        if qregister.queue_name not in self._queues.keys():
336
            self._queues[qregister.queue_name] = qregister
337
338
    def shutdown(self, exitstatus=0):
339
        '''shut down app services'''
340
        self.loginfo('Shutting down fcm app...')
341
        self.close_channels()
342
        self.closequeues()
343
        if self.__bus:
344
            self.bus.close()
345
        if self.__cache is not None:
346
            self.__cache.close()
347
        sys.exit(exitstatus)
348
349
    def closequeue(self, thequeue):
350
        '''close the queue'''
351
        if not thequeue:
352
            return
353
        try:
354
            if thequeue is not None:
355
                self.logdebug(self.stamp('closing queue {0}'.format(thequeue.queue_name)))
356
                thequeue.close()
357
            del self._queues[thequeue.queue_name]
358
        except Exception as ex:
359
            self.logexception(ex)
360
361
    def closequeues(self):
362
        '''close a bunch of queues'''
363
        for k in list(self._queues):
364
            self.closequeue(self._queues[k])
365
366
    def close_channel(self, chan):
367
        if not chan:
368
            return
369
        try:
370
            if chan.name in self._channels:
371
                self.logdebug(self.stamp('closing channel {0}'.format(chan.name)))
372
                chan.close()
373
                del self._channels[chan.name]
374
        except Exception as ex:
375
            self.logexception(ex)
376
377
    def close_channels(self):
378
        '''close all channels'''
379
        for chan in list(self._channels):
380
            self.close_channel(self._channels[chan])
381
382
    def unhandledexception(self, unhandled):
383
        '''what to do when there is an exception that app cannot handle'''
384
        self.logexception(unhandled)
385
386
    def exceptionmessage(self, ex):
387
        '''gets exception message even when it doesnt have one'''
388
        exc_type, _, exc_tb = sys.exc_info()
389
        exmsg = getattr(ex, 'message', repr(ex))
390
        return '{0}:{1}:{2}'.format(exc_type, exc_tb.tb_lineno, exmsg)
391
392
    def logexception(self, ex):
393
        '''log an exception'''
394
        self.logerror(self.exceptionmessage(ex))
395
396
    def sendlog(self, logmessage):
397
        '''send message to log queue'''
398
        item = QueueEntry(QueueName.Q_LOG, logmessage, 'broadcast')
399
        self.sendqueueitem(item)
400
        print(logmessage)
401
402
    def subscribe(self, name, callback, no_acknowledge=True, prefetch=1):
403
        '''subscribe to a queue'''
404
        chan = self.bus.subscribe(name, callback, no_acknowledge=no_acknowledge, prefetch_count=prefetch)
405
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(name))
406
        return chan
407
408
    def listen_to_broadcast(self, broadcast_name, callback, no_acknowledge=True):
409
        thebroadcast = self.bus.subscribe_broadcast(broadcast_name, callback, no_acknowledge)
410
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(broadcast_name))
411
        self.bus.listen(thebroadcast)
412
        #never returns becuase listen is blocking
413
        return thebroadcast
414
415
    def trypublish(self, queue_name, msg: str):
416
        '''publish a message to the queue'''
417
        try:
418
            self.bus.publish(queue_name, msg)
419
            return True
420
        except pika.exceptions.ConnectionClosed as ex:
421
            logmessage = 'Error publishing to {0} {1}'.format(queue_name, self.exceptionmessage(ex))
422
            self.logerror(logmessage)
423
            return False
424
425
    def trybroadcast(self, exchange_name, msg):
426
        '''broadcast a message to all queue listeners'''
427
        try:
428
            self.bus.broadcast(exchange_name, msg)
429
            return True
430
        except pika.exceptions.ConnectionClosed as conxex:
431
            self.logerror('Error broadcasting to {0} {1}'.format(exchange_name, self.exceptionmessage(conxex)))
432
            return False
433
434
    def queuestatus(self):
435
        """TODO:print queue status"""
436
        pass
437
        #for q in self._queues.values():
438
        #    print(q.queue_name,str(q._connection))
439
440
    def putminer(self, miner: Miner):
441
        '''put miner in cache'''
442
        if miner and miner.key():
443
            valu = self.serialize(miner)
444
            self.__cache.tryputcache('miner.{0}'.format(miner.key()), valu)
445
446
    def getminer(self, miner: Miner) -> Miner:
447
        '''strategies for getting miner from cache
448
        originally was key=miner.name but that was not good
449
        changed to key='miner.'+minerid
450
        '''
451
        valu = self.cache.trygetvaluefromcache('miner.{0}'.format(miner.key()))
452
        if valu is None:
453
            return None
454
        minerfromstore = utils.deserialize(MinerSchema(), utils.safestring(valu))
455
        if not minerfromstore.key():
456
            #do not allow entry with no key
457
            return None
458
        minerfromstore.store = 'mem'
459
        return minerfromstore
460
461
    def getknownminer(self, miner: Miner) -> Miner:
462
        '''get a known miner'''
463
        return self.getknownminerbykey(miner.key())
464
465
    def getminerbyname(self, minername):
466
        filtered = [x for x in self.miners() if x.name == minername]
467
        if filtered: return filtered[0]
468
        return None
469
470
    def getknownminerbykey(self, minername):
471
        str_miner = self.__cache.getfromhashset(CacheKeys.knownminers, minername)
472
        if str_miner is None:
473
            return None
474
        return utils.deserialize(MinerSchema(), utils.safestring(str_miner))
475
476
    def getknownminerbyname(self, minername):
477
        '''this could be slow if there are lots of miners'''
478
        known = self.knownminers()
479
        for miner in known:
480
            if miner.name == minername:
481
                return miner
482
        return None
483
484
    def putminerandstats(self, miner: Miner, minerstats, minerpool):
485
        '''put miner and status in cache'''
486
        self.putminer(miner)
487
        schema = MinerStatsSchema()
488
        valstats = schema.dumps(minerstats).data
489
        self.__cache.tryputcache(miner.key() + '.stats', valstats)
490
        schema = MinerCurrentPoolSchema()
491
        valpool = schema.dumps(minerpool).data
492
        self.__cache.tryputcache(miner.key() + '.pool', valpool)
493
494
    def getstats(self, miner: Miner):
495
        '''get stats entity'''
496
        valu = self.cache.trygetvaluefromcache(miner.name + '.stats')
497
        if valu is None: return None
498
        entity = domain.minerstatistics.MinerStatistics(miner, **utils.deserialize(MinerStatsSchema(), valu))
499
        return entity
500
501
    def getminerstatscached(self):
502
        '''iterator for cached stats'''
503
        for miner in self.miners():
504
            yield (self.getminer(miner), self.getstats(miner), self.pools.getpool(miner))
505
506
    def messagedecodeminer(self, body) -> Miner:
507
        '''deserialize a miner message'''
508
        message_envelope = super().deserializemessageenvelope(utils.safestring(body))
509
        schema = MinerMessageSchema()
510
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
511
        minermessage_entity = schema.make_minermessage(minermessage_dict)
512
        miner = minermessage_entity.miner
513
        return miner
514
515
    def messagedecodeminerstats(self, body):
516
        '''deserialize miner stats'''
517
        message_envelope = super().deserializemessageenvelope(utils.safestring(body))
518
        schema = MinerMessageSchema()
519
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
520
        minermessage_entity = schema.make_minermessage(minermessage_dict)
521
        return minermessage_entity
522
523
    def messagedecodeminercommand(self, body):
524
        '''deserialize  miner command'''
525
        message_envelope = super().deserializemessageenvelope(utils.safestring(body))
526
        schema = MinerMessageSchema()
527
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
528
        minermessage_entity = schema.make_minermessage(minermessage_dict)
529
        return minermessage_entity
530
531
    def messagedecodesensor(self, body):
532
        '''deserialize sensor value '''
533
        message_envelope = super().deserializemessageenvelope(utils.safestring(body))
534
        schema = SensorValueSchema()
535
        #minermessage_dict = schema.load(message_envelope.bodyjson()).data
536
        entity = schema.load(message_envelope.bodyjson()).data
537
        return message_envelope, entity
538
539
    def messagedecode_configuration(self, body):
540
        '''deserialize  configuration command'''
541
        message_envelope = super().deserializemessageenvelope(utils.safestring(body))
542
        schema = ConfigurationMessageSchema()
543
        configurationmessage_dict = schema.load(message_envelope.bodyjson()).data
544
        configurationmessage_entity = schema.make_configurationmessage(configurationmessage_dict)
545
        return configurationmessage_entity
546
547
    def createmessagestats(self, miner, minerstats, minerpool):
548
        #always save the miner so the next guy can get latest changes
549
        #only put in cache if it came from cache
550
        if miner.store == 'mem':
551
            self.putminer(miner)
552
        message = super().createmessageenvelope()
553
        message = message.make_minerstats(miner, minerstats, minerpool)
554
        return super().serializemessageenvelope(message)
555
556
    def createmessagecommand(self, miner, command):
557
        '''create message command'''
558
        if miner.store == 'mem':
559
            self.putminer(miner)
560
        message = super().createmessageenvelope()
561
        message = message.make_minercommand(miner, command)
562
        return super().serializemessageenvelope(message)
563
564
    def messageencode(self, miner: Miner):
565
        '''command is optional, however should convert this call into minercommand'''
566
        #always save the miner so the next guy can get latest changes
567
        if miner.store == 'mem':
568
            self.putminer(miner)
569
        message = super().createmessageenvelope()
570
        message = message.make_minerstats(miner, minerstats=None, minerpool=None)
571
        return self._schemamsg.dumps(message).data
572
573
    def stamp(self, message):
574
        return '{0}:{1}: {2}'.format(self.now(), self.applicationid, message)
575
576
    def alert(self, message):
577
        '''send alert message'''
578
        return self.sendqueueitem(QueueEntry(QueueName.Q_ALERT, self.stamp(message), QueueType.broadcast))
579
580
    def send(self, q_name, message):
581
        '''send message to queue'''
582
        success = self.trypublish(q_name, message)
583
        return success
584
585
    def enqueue(self, queuelist):
586
        '''send a list of queue messages'''
587
        if queuelist is None:
588
            return
589
        if not queuelist.hasentries():
590
            return
591
        #todo: group by queuename
592
        for entry in queuelist.entries:
593
            self.sendqueueitem(entry)
594
595
    def sendqueueitem(self, entry):
596
        '''send one queue item'''
597
        if entry.eventtype == 'broadcast':
598
            send_result = self.trybroadcast(entry.queuename, entry.message)
599
            return send_result
600
        return self.send(entry.queuename, entry.message)
601
602
    def readtemperature(self):
603
        try:
604
            sensor_humid, sensor_temp = readtemperature()
605
            if sensor_temp is not None:
606
                reading = SensorValue('fullcycletemp', sensor_temp, 'temperature')
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable SensorValue does not seem to be defined.
Loading history...
607
                reading.sensor = self.sensor
608
                self.sendsensor(reading)
609
            if sensor_humid is not None:
610
                reading = SensorValue('fullcyclehumid', sensor_humid, 'humidity')
611
                reading.sensor = self.sensor
612
                self.sendsensor(reading)
613
            return sensor_humid, sensor_temp
614
        except BaseException as ex:
615
            self.logexception(ex)
616
        return None, None
617
618
    def sendsensor(self, reading):
619
        message = super().createmessageenvelope()
620
        sensorjson = message.jsonserialize(SensorValueSchema(), reading)
621
        self.sendqueueitem(QueueEntry(QueueName.Q_SENSOR, super().serializemessageenvelope(message.make_any('sensorvalue', sensorjson)), QueueType.broadcast))
622
623
    def getsession(self):
624
        service = self.getservice(ServiceName.database)
625
        engine = create_engine(service.connection, echo=False)
626
        Session = sessionmaker(bind=engine)
627
        return Session()
628
629
    def log_mineractivity(self, minerlog):
630
        try:
631
            session = self.getsession()
632
            session.add(minerlog)
633
            session.commit()
634
            return True
635
        except BaseException as ex:
636
            self.logexception(ex)
637
        return False
638
639
    def save_miner(self, miner: Miner):
640
        found = self.getknownminer(miner)
641
        if found is None:
642
            self.addknownminer(miner)
643
            #miners = MinerRepository()
644
            #todo:add the miner to the json config
645
        else:
646
            found.updatefrom(miner)
647
            self.putminer(found)
648
649
def main():
650
    full_cycle = ApplicationService()
651
    full_cycle.loginfo('Full Cycle was run in a script')
652
    full_cycle.shutdown()
653
654
if __name__ == "__main__":
655
    main()
656