Passed
Push — master ( 269add...33a345 )
by Dave
01:22
created

backend.fcmapp.ApplicationService.serializelist()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
'''Application Service layer for Full Cycle Mining
2
Gateway into most of application functionality'''
3
import sys
4
import os
5
import datetime
6
import logging
7
import json
8
import base64
9
from collections import defaultdict
10
import redis
11
import pika
12
from colorama import init, Fore
13
from sqlalchemy.orm import sessionmaker
14
from sqlalchemy import create_engine
15
import domain.minerstatistics
16
import domain.minerpool
17
from domain.mining import Miner, Pool, AvailablePool, MinerCurrentPool, MinerStatus
18
from domain.rep import MinerRepository, PoolRepository, LoginRepository, RuleParametersRepository, BaseRepository
19
#from domain.miningrules import RuleParameters
20
from domain.sensors import Sensor, SensorValue
21
from messaging.messages import Message, MessageSchema, MinerMessageSchema, ConfigurationMessageSchema
22
from messaging.sensormessages import SensorValueSchema
23
from messaging.schema import MinerSchema, MinerStatsSchema, MinerCurrentPoolSchema, AvailablePoolSchema, PoolSchema
24
from helpers.queuehelper import QueueName, Queue, QueueEntry, QueueType
25
from helpers.camerahelper import take_picture
26
from helpers.temperaturehelper import readtemperature
27
import backend.fcmutils as utils
28
from backend.fcmcache import Cache, CacheKeys
29
from backend.fcmbus import Bus
30
from backend.fcmcomponent import ComponentName
31
from backend.fcmservice import BaseService, PoolService, ServiceName, InfrastructureService, Configuration, Telegram
32
from backend.fcmminer import Antminer
33
34
class Component(object):
35
    '''A component is a unit of execution of FCM'''
36
    def __init__(self, componentname, option=''):
37
        self.app = ApplicationService(component=componentname, option=option)
38
        #was a queue, now its a channel
39
        self.listeningqueue = None
40
41
    def listen(self):
42
        if self.listeningqueue:
43
            self.app.bus.listen(self.listeningqueue)
44
45
class ApplicationService(BaseService):
46
    '''Application Services'''
47
    programnamefull = ''
48
    programname = ''
49
    component = ComponentName.fullcycle
50
    loglevel = 0
51
    #true if user passed in -now command line argument
52
    isrunnow = False
53
    #dictionary of queues managed by this app
54
    _queues = {}
55
    _channels = []
56
    #the startup directory
57
    __logger = None
58
    __logger_debug = None
59
    __logger_error = None
60
61
    def __init__(self, component=ComponentName.fullcycle, option=None, announceyourself=False):
62
        super(ApplicationService, self).__init__()
63
        self.component = component
64
        if self.component == ComponentName.fullcycle:
65
            self.print('Starting FCM Init')
66
        self.initargs(option)
67
        self.startupstuff()
68
        if self.component == ComponentName.fullcycle:
69
            self.print('Starting FCM Configuration')
70
        self.setup_configuration()
71
        self.initlogger()
72
        self.initmessaging()
73
        #this is slow. should be option to opt out of cache?
74
        if self.component == ComponentName.fullcycle:
75
            self.loginfo('Starting FCM Cache')
76
        self.initcache()
77
        self.initbus()
78
        self.init_application()
79
        self.init_sensors()
80
81
        if announceyourself:
82
            self.sendqueueitem(QueueEntry(QueueName.Q_LOG, self.stamp('Started {0}'.format(self.component)), QueueType.broadcast))
83
84
    def initargs(self, option):
85
        '''process command line arguments'''
86
        if sys.argv:
87
            self.programnamefull = sys.argv[0]
88
            self.programname = os.path.basename(self.programnamefull)
89
        firstarg = option
90
        if len(sys.argv) > 1:
91
            firstarg = sys.argv[1]
92
        if firstarg is not None:
93
            if firstarg == '-now':
94
                self.isrunnow = True
95
96
    def startupstuff(self):
97
        #used with colorama on windows
98
        init(autoreset=True)
99
100
    def initmessaging(self):
101
        '''start up messaging'''
102
        self._schemamsg = MessageSchema()
103
104
    def initcache(self):
105
        '''start up cache'''
106
        try:
107
            cachelogin = self.getservice(ServiceName.cache)
108
            self.__cache = Cache(cachelogin)
109
        except Exception as ex:
110
            #cache is offline. try to run in degraded mode
111
            self.logexception(ex)
112
113
    def startup(self):
114
        self.initminercache()
115
        self.initpoolcache()
116
117
    def initbus(self):
118
        '''start up message bus'''
119
        login = self.getservice(ServiceName.messagebus)
120
        self.__bus = Bus(login)
121
122
    @property
123
    def bus(self):
124
        return self.__bus
125
126
    @property
127
    def cache(self):
128
        return self.__cache
129
130
    def init_sensors(self):
131
        self.sensor = Sensor('controller', 'DHT22', 'controller')
132
133
    def init_application(self):
134
        self.antminer = Antminer(self.configuration, self.sshlogin())
135
        self.telegram = Telegram(self.configuration, self.getservice(ServiceName.telegram))
136
        self.pools = PoolService(self.cache)
137
138
    @property
139
    def isdebug(self):
140
        return sys.flags.debug
141
142
    def setup_configuration(self):
143
        '''configuration is loaded once at startup'''
144
        raw = BaseRepository().readrawfile(self.getconfigfilename('config/fullcycle.conf'))
145
        config = json.loads(raw)
146
147
        self.configuration = Configuration(config)
148
        self.applicationid = self.configuration.get('applicationid')
149
        self.loglevel = self.configuration.get('loglevel')
150
151
    def initpoolcache(self):
152
        if self.__cache.get(CacheKeys.pools) is None:
153
            spools = PoolRepository().readrawfile(self.getconfigfilename('config/pools.conf'))
154
            self.tryputcache(CacheKeys.pools, spools)
155
        for pool in self.pools.get_all_pools():
156
            #pool isinstance of Pool
157
            availablepool = AvailablePool(pool.pool_type, pool, pool.url, pool.user, pool.password, pool.priority)
158
            minerpool = domain.minerpool.MinerPool(miner=None, priority=0, pool=availablepool)
159
            self.pools.putpool(pool)
160
            self.pools.add_pool(minerpool)
161
162
    def initminercache(self):
163
        '''put known miners into cache'''
164
        if self.__cache.get(CacheKeys.miners) is None:
165
            sminers = MinerRepository().readrawfile(self.getconfigfilename('config/miners.conf'))
166
            self.tryputcache(CacheKeys.miners, sminers)
167
168
        for miner in self.miners():
169
            #status is not persisted yet so init from name
170
            if miner.is_manually_disabled():
171
                miner.status = MinerStatus.Disabled
172
            if self.getminer(miner) is None:
173
                self.putminer(miner)
174
175
    def initlogger(self):
176
        '''set up logging application info'''
177
        self.__logger = self.setup_logger('fcmapp', 'fcm.log', logging.INFO)
178
179
        self.__logger_debug = self.setup_logger('fcmdebug', 'fcm.bug', logging.DEBUG)
180
181
        self.__logger_error = self.setup_logger('fcmerror', 'fcm.err', logging.ERROR)
182
183
    def setup_logger(self, logger_name, log_file, level=logging.INFO):
184
        '''start logger'''
185
        logr = logging.getLogger(logger_name)
186
        formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
187
        #by default will append. use mode='w' to overwrite
188
        file_handler = logging.FileHandler(log_file)
189
        file_handler.setFormatter(formatter)
190
        logr.addHandler(file_handler)
191
        # is setting stream necessary
192
        stream_handler = logging.StreamHandler()
193
        stream_handler.setFormatter(formatter)
194
        logr.setLevel(level)
195
        return logr
196
197
    def loginfo(self, message):
198
        '''log informational message'''
199
        logmsg = '{0}: {1}'.format(self.programname, message)
200
        self.__logger.info(logmsg)
201
        print(message)
202
203
    def logerror(self, message):
204
        '''log error'''
205
        logmsg = '{0}: {1}'.format(self.programname, message)
206
        self.__logger_error.error(logmsg)
207
        print(Fore.RED+logmsg)
208
209
    def logdebug(self, message):
210
        '''log debug message'''
211
        if not self.loglevel or self.loglevel == 0:
212
            return
213
        logmsg = '{0}: {1}'.format(self.programname, message)
214
        self.__logger_debug.debug(logmsg)
215
        print(Fore.GREEN+logmsg)
216
217
    def print(self, message):
218
        '''echo message to screen'''
219
        print('{0}: {1}'.format(self.now(), message))
220
221
    def now(self):
222
        '''current time formatted as friendly string'''
223
        return utils.formattime(datetime.datetime.now())
224
225
    #region lookups
226
    #todo: move to configurations section
227
    def miners(self):
228
        '''configured miners'''
229
        miners = MinerRepository().readminers(self.getconfigfilename('config/miners.conf'))
230
        return miners
231
232
    def knownminers(self):
233
        '''for now just return a list of miners
234
        later should consider returning a list that is easily searched and filtered
235
        '''
236
        dknownminers = self.__cache.gethashset(CacheKeys.knownminers)
237
        if dknownminers is not None and dknownminers:
238
            #get list of miners from cache
239
            return utils.deserializelistofstrings(list(dknownminers.values(), MinerSchema()))
240
        knownminers = self.miners()
241
        return knownminers
242
243
    def allminers(self):
244
        '''combined list of discovered miners and configured miners'''
245
        allminers = self.knownminers()
246
        for miner in self.miners():
247
            foundminer = [x for x in allminers if x.key() == miner.key()]
248
            if not foundminer:
249
                allminers.append(miner)
250
        return allminers
251
252
    def knownsensors(self):
253
        dknownsensors = self.__cache.gethashset(CacheKeys.knownsensors)
254
        if dknownsensors is not None and dknownsensors:
255
            return utils.deserializelist_withschema(SensorValueSchema(), list(dknownsensors.values()))
256
        return None
257
258
    def addknownsensor(self, sensorvalue):
259
        val = utils.jsonserialize(SensorValueSchema(), sensorvalue)
260
        self.__cache.putinhashset(CacheKeys.knownsensors, sensorvalue.sensorid, val)
261
262
    def minersummary(self, max_number=10):
263
        '''show a summary of known miners
264
        '''
265
        mode = self.configuration.get('summary')
266
        if not mode:
267
            mode = 'auto'
268
        knownminers = self.knownminers()
269
        if len(knownminers) <= max_number:
270
            return '\n'.join([m.summary() for m in knownminers])
271
        groupbystatus = defaultdict(list)
272
        for miner in knownminers:
273
            groupbystatus[miner.status].append(miner)
274
        return '\n'.join(['{0}: {1}'.format(s, self.summary_by_status(s, groupbystatus[s])) for s in groupbystatus])
275
276
    def summary_by_status(self, key, minerlist):
277
        if key == 'online':
278
            return '{0} miners hash {1}'.format(self.summarize_count(minerlist), self.summarize_hash(minerlist))
279
        return self.summarize_count(minerlist)
280
281
    def summarize_count(self, minerlist):
282
        return len(minerlist)
283
284
    def summarize_hash(self, minerlist):
285
        return sum(miner.minerstats.currenthash for miner in minerlist)
286
287
    def addknownminer(self, miner):
288
        '''add miner to known miners list'''
289
        val = self.serialize(miner)
290
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
291
292
    def updateknownminer(self, miner):
293
        '''update known miner in cache'''
294
        sminer = self.__cache.getfromhashset(CacheKeys.knownminers, miner.key())
295
        memminer = utils.deserialize(MinerSchema(), utils.safestring(sminer))
296
        if memminer is None:
297
            memminer = miner
298
        else:
299
            #merge new values
300
            memminer.updatefrom(miner)
301
        val = self.serialize(memminer)
302
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
303
304
    def sshlogin(self):
305
        '''return contents of login file'''
306
        return self.readlogin('ftp.conf')
307
308
    def readlogin(self, filename):
309
        '''read login file configuration'''
310
        login = LoginRepository().readlogins(self.getconfigfilename('config/'+filename))
311
        return login
312
313
    def ruleparameters(self):
314
        '''rules parameters'''
315
        return RuleParametersRepository().readrules(self.getconfigfilename('config/'+'rules.conf'))
316
317
    def getservice(self, servicename):
318
        '''get a service by name. should be repository'''
319
        file_name = self.getconfigfilename('config/services.conf')
320
        with open(file_name, encoding='utf-8-sig') as config_file:
321
            content = json.loads(config_file.read())
322
        svc = None #dummy initializer to make scrutinize happy
323
        services = [InfrastructureService(**s) for s in content]
324
        return next((svc for svc in services if svc.name == servicename), None)
325
326
    def getservice_useroverride(self, servicename):
327
        service = self.getservice(servicename)
328
        service.user = self.component
329
        return service
330
    #endregion lookups
331
332
    def listen(self, qlisten):
333
        """Goes into listening mode on a queue"""
334
        try:
335
            self.bus.listen(qlisten)
336
        except KeyboardInterrupt:
337
            self.shutdown()
338
        except BaseException as unhandled:
339
            self.unhandledexception(unhandled)
340
341
    def registerqueue(self, qregister: Queue):
342
        '''register a queue'''
343
        self.logdebug(self.stamp('Registered queue {0}'.format(qregister.queue_name)))
344
        if qregister.queue_name not in self._queues.keys():
345
            self._queues[qregister.queue_name] = qregister
346
347
    def shutdown(self, exitstatus=0):
348
        '''shut down app services'''
349
        self.loginfo('Shutting down fcm app...')
350
        self.close_channels()
351
        self.closequeues()
352
        if self.__bus:
353
            self.bus.close()
354
        if self.__cache is not None:
355
            self.__cache.close()
356
        sys.exit(exitstatus)
357
358
    def closequeue(self, thequeue):
359
        '''close the queue'''
360
        if not thequeue:
361
            return
362
        try:
363
            if thequeue is not None:
364
                self.logdebug(self.stamp('closing queue {0}'.format(thequeue.queue_name)))
365
                thequeue.close()
366
            del self._queues[thequeue.queue_name]
367
        except Exception as ex:
368
            self.logexception(ex)
369
370
    def closequeues(self):
371
        '''close a bunch of queues'''
372
        for k in list(self._queues):
373
            self.closequeue(self._queues[k])
374
375
    def close_channel(self, chan):
376
        if not chan:
377
            return
378
        try:
379
            if chan.name in self._channels:
380
                self.logdebug(self.stamp('closing channel {0}'.format(chan.name)))
381
                chan.close()
382
                del self._channels[chan.name]
383
        except Exception as ex:
384
            self.logexception(ex)
385
386
    def close_channels(self):
387
        '''close all channels'''
388
        for chan in list(self._channels):
389
            self.close_channel(self._channels[chan])
390
391
    def unhandledexception(self, unhandled):
392
        '''what to do when there is an exception that app cannot handle'''
393
        self.logexception(unhandled)
394
395
    def exceptionmessage(self, ex):
396
        '''gets exception message even when it doesnt have one'''
397
        exc_type, _, exc_tb = sys.exc_info()
398
        exmsg = getattr(ex, 'message', repr(ex))
399
        return '{0}:{1}:{2}'.format(exc_type, exc_tb.tb_lineno, exmsg)
400
401
    def logexception(self, ex):
402
        '''log an exception'''
403
        self.logerror(self.exceptionmessage(ex))
404
405
    def sendlog(self, logmessage):
406
        '''send message to log queue'''
407
        item = QueueEntry(QueueName.Q_LOG, logmessage, 'broadcast')
408
        self.sendqueueitem(item)
409
        print(logmessage)
410
411
    def subscribe(self, name, callback, no_acknowledge=True, prefetch=1):
412
        '''subscribe to a queue'''
413
        chan = self.bus.subscribe(name, callback, no_acknowledge=no_acknowledge, prefetch_count=prefetch)
414
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(name))
415
        return chan
416
417
    def listen_to_broadcast(self, broadcast_name, callback, no_acknowledge=True):
418
        thebroadcast = self.bus.subscribe_broadcast(broadcast_name, callback, no_acknowledge)
419
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(broadcast_name))
420
        self.bus.listen(thebroadcast)
421
        #never returns becuase listen is blocking
422
        return thebroadcast
423
424
    def trypublish(self, queue_name, msg: str):
425
        '''publish a message to the queue'''
426
        try:
427
            self.bus.publish(queue_name, msg)
428
            return True
429
        except pika.exceptions.ConnectionClosed as ex:
430
            logmessage = 'Error publishing to {0} {1}'.format(queue_name, self.exceptionmessage(ex))
431
            self.logerror(logmessage)
432
            return False
433
434
    def trybroadcast(self, exchange_name, msg):
435
        '''broadcast a message to all queue listeners'''
436
        try:
437
            self.bus.broadcast(exchange_name, msg)
438
            return True
439
        except pika.exceptions.ConnectionClosed as conxex:
440
            self.logerror('Error broadcasting to {0} {1}'.format(exchange_name, self.exceptionmessage(conxex)))
441
            return False
442
443
    def queuestatus(self):
444
        """TODO:print queue status"""
445
        pass
446
        #for q in self._queues.values():
447
        #    print(q.queue_name,str(q._connection))
448
449
    def putminer(self, miner: Miner):
450
        '''put miner in cache'''
451
        if miner and miner.key():
452
            valu = self.serialize(miner)
453
            self.tryputcache('miner.{0}'.format(miner.key()), valu)
454
455
    def getminer(self, miner: Miner) -> Miner:
456
        '''strategies for getting miner from cache
457
        originally was key=miner.name but that was not good
458
        changed to key='miner.'+minerid
459
        '''
460
        valu = self.cache.trygetvaluefromcache('miner.{0}'.format(miner.key()))
461
        if valu is None:
462
            return None
463
        minerfromstore = utils.deserialize(MinerSchema(), utils.safestring(valu))
464
        if not minerfromstore.key():
465
            #do not allow entry with no key
466
            return None
467
        minerfromstore.store = 'mem'
468
        return minerfromstore
469
470
    def getknownminer(self, miner: Miner) -> Miner:
471
        '''get a known miner'''
472
        return self.getknownminerbykey(miner.key())
473
474
    def getminerbyname(self, minername):
475
        filtered = [x for x in self.miners() if x.name == minername]
476
        if filtered: return filtered[0]
477
        return None
478
479
    def getknownminerbykey(self, minername):
480
        str_miner = self.__cache.getfromhashset(CacheKeys.knownminers, minername)
481
        if str_miner is None:
482
            return None
483
        return utils.deserialize(MinerSchema(), utils.safestring(str_miner))
484
485
    def getknownminerbyname(self, minername):
486
        '''this could be slow if there are lots of miners'''
487
        known = self.knownminers()
488
        for miner in known:
489
            if miner.name == minername:
490
                return miner
491
        return None
492
493
    def tryputcache(self, key, value):
494
        '''put value in cache key'''
495
        if value is None: return
496
        try:
497
            if self.__cache is not None:
498
                self.__cache.set(key, value)
499
        except redis.exceptions.ConnectionError as ex:
500
            self.logexception(ex)
501
502
    def putminerandstats(self, miner: Miner, minerstats, minerpool):
503
        '''put miner and status in cache'''
504
        self.putminer(miner)
505
        schema = MinerStatsSchema()
506
        valstats = schema.dumps(minerstats).data
507
        self.tryputcache(miner.key() + '.stats', valstats)
508
        schema = MinerCurrentPoolSchema()
509
        valpool = schema.dumps(minerpool).data
510
        self.tryputcache(miner.key() + '.pool', valpool)
511
512
    def getstats(self, miner: Miner):
513
        '''get stats entity'''
514
        valu = self.cache.trygetvaluefromcache(miner.name + '.stats')
515
        if valu is None: return None
516
        entity = domain.minerstatistics.MinerStatistics(miner, **utils.deserialize(MinerStatsSchema(), valu))
517
        return entity
518
519
    def getminerstatscached(self):
520
        '''iterator for cached stats'''
521
        for miner in self.miners():
522
            yield (self.getminer(miner), self.getstats(miner), self.pools.getpool(miner))
523
524
    def messagedecodeminer(self, body) -> Miner:
525
        '''deserialize a miner message'''
526
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
527
        schema = MinerMessageSchema()
528
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
529
        minermessage_entity = schema.make_minermessage(minermessage_dict)
530
        miner = minermessage_entity.miner
531
        return miner
532
533
    def messagedecodeminerstats(self, body):
534
        '''deserialize miner stats'''
535
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
536
        schema = MinerMessageSchema()
537
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
538
        minermessage_entity = schema.make_minermessage(minermessage_dict)
539
        return minermessage_entity
540
541
    def messagedecodeminercommand(self, body):
542
        '''deserialize  miner command'''
543
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
544
        schema = MinerMessageSchema()
545
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
546
        minermessage_entity = schema.make_minermessage(minermessage_dict)
547
        return minermessage_entity
548
549
    def messagedecodesensor(self, body):
550
        '''deserialize sensor value '''
551
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
552
        schema = SensorValueSchema()
553
        #minermessage_dict = schema.load(message_envelope.bodyjson()).data
554
        entity = schema.load(message_envelope.bodyjson()).data
555
        return message_envelope, entity
556
557
    def messagedecode_configuration(self, body):
558
        '''deserialize  configuration command'''
559
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
560
        schema = ConfigurationMessageSchema()
561
        configurationmessage_dict = schema.load(message_envelope.bodyjson()).data
562
        configurationmessage_entity = schema.make_configurationmessage(configurationmessage_dict)
563
        return configurationmessage_entity
564
565
    def createmessageenvelope(self):
566
        '''create message envelope'''
567
        return Message(timestamp=datetime.datetime.utcnow(), sender=self.component)
568
569
    def serializemessageenvelope(self, msg):
570
        '''serialize message envelope'''
571
        return self._schemamsg.dumps(msg).data
572
573
    def deserializemessageenvelope(self, body):
574
        '''serialize message envelope'''
575
        return self._schemamsg.load(json.loads(utils.safestring(body))).data
576
577
    def createmessagestats(self, miner, minerstats, minerpool):
578
        #always save the miner so the next guy can get latest changes
579
        #only put in cache if it came from cache
580
        if miner.store == 'mem':
581
            self.putminer(miner)
582
        message = self.createmessageenvelope()
583
        message = message.make_minerstats(miner, minerstats, minerpool)
584
        return self.serializemessageenvelope(message)
585
586
    def createmessagecommand(self, miner, command):
587
        '''create message command'''
588
        if miner.store == 'mem':
589
            self.putminer(miner)
590
        message = self.createmessageenvelope()
591
        message = message.make_minercommand(miner, command)
592
        return self.serializemessageenvelope(message)
593
594
    def messageencode(self, miner: Miner):
595
        '''command is optional, however should convert this call into minercommand'''
596
        #always save the miner so the next guy can get latest changes
597
        if miner.store == 'mem':
598
            self.putminer(miner)
599
        message = self.createmessageenvelope()
600
        message = message.make_minerstats(miner, minerstats=None, minerpool=None)
601
        return self._schemamsg.dumps(message).data
602
603
    def stamp(self, message):
604
        return '{0}:{1}: {2}'.format(self.now(), self.applicationid, message)
605
606
    def alert(self, message):
607
        '''send alert message'''
608
        return self.sendqueueitem(QueueEntry(QueueName.Q_ALERT, self.stamp(message), QueueType.broadcast))
609
610
    def send(self, q_name, message):
611
        '''send message to queue'''
612
        success = self.trypublish(q_name, message)
613
        return success
614
615
    def enqueue(self, queuelist):
616
        '''send a list of queue messages'''
617
        if queuelist is None:
618
            return
619
        if not queuelist.hasentries():
620
            return
621
        #todo: group by queuename
622
        for entry in queuelist.entries:
623
            self.sendqueueitem(entry)
624
625
    def sendqueueitem(self, entry):
626
        '''send one queue item'''
627
        if entry.eventtype == 'broadcast':
628
            send_result = self.trybroadcast(entry.queuename, entry.message)
629
            return send_result
630
        return self.send(entry.queuename, entry.message)
631
632
    def take_picture(self, file_name='fullcycle_camera.png'):
633
        pic = take_picture(file_name,
634
                           self.configuration.get('camera.size'),
635
                           self.configuration.get('camera.quality'),
636
                           self.configuration.get('camera.brightness'),
637
                           self.configuration.get('camera.contrast'))
638
        return pic
639
640
    def store_picture_cache(self, file_name):
641
        if os.path.isfile(file_name):
642
            with open(file_name, 'rb') as photofile:
643
                picdata = photofile.read()
644
            reading = SensorValue('fullcyclecamera', base64.b64encode(picdata), 'camera')
645
            #reading.sensor = self.sensor
646
            #self.sendsensor(reading)
647
            message = self.createmessageenvelope()
648
            sensorjson = message.jsonserialize(SensorValueSchema(), reading)
649
            self.tryputcache(CacheKeys.camera, sensorjson)
650
651
    def readtemperature(self):
652
        try:
653
            sensor_humid, sensor_temp = readtemperature()
654
            if sensor_temp is not None:
655
                reading = SensorValue('fullcycletemp', sensor_temp, 'temperature')
656
                reading.sensor = self.sensor
657
                self.sendsensor(reading)
658
            if sensor_humid is not None:
659
                reading = SensorValue('fullcyclehumid', sensor_humid, 'humidity')
660
                reading.sensor = self.sensor
661
                self.sendsensor(reading)
662
            return sensor_humid, sensor_temp
663
        except BaseException as ex:
664
            self.logexception(ex)
665
        return None, None
666
667
    def sendsensor(self, reading):
668
        message = self.createmessageenvelope()
669
        sensorjson = message.jsonserialize(SensorValueSchema(), reading)
670
        self.sendqueueitem(QueueEntry(QueueName.Q_SENSOR, self.serializemessageenvelope(message.make_any('sensorvalue', sensorjson)), QueueType.broadcast))
671
672
    def getsession(self):
673
        service = self.getservice(ServiceName.database)
674
        engine = create_engine(service.connection, echo=False)
675
        Session = sessionmaker(bind=engine)
676
        return Session()
677
678
    def log_mineractivity(self, minerlog):
679
        try:
680
            session = self.getsession()
681
            session.add(minerlog)
682
            session.commit()
683
            return True
684
        except BaseException as ex:
685
            self.logexception(ex)
686
        return False
687
688
    def save_miner(self, miner: Miner):
689
        found = self.getknownminer(miner)
690
        if found is None:
691
            self.addknownminer(miner)
692
            #miners = MinerRepository()
693
            #todo:add the miner to the json config
694
        else:
695
            found.updatefrom(miner)
696
            self.putminer(found)
697
698
def main():
699
    full_cycle = ApplicationService()
700
    full_cycle.loginfo('Full Cycle was run in a script')
701
    full_cycle.shutdown()
702
703
if __name__ == "__main__":
704
    main()
705