Passed
Push — master ( 520b75...157197 )
by Dave
01:18
created

backend.fcmapp.ApplicationService.tryputcache()   A

Complexity

Conditions 4

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 7
nop 3
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
'''Application Service layer for Full Cycle Mining
2
Gateway into most of application functionality'''
3
import sys
4
import os
5
import datetime
6
import logging
7
import json
8
import base64
9
from collections import defaultdict
10
import pika
11
from colorama import init, Fore
12
from sqlalchemy.orm import sessionmaker
13
from sqlalchemy import create_engine
14
import domain.minerstatistics
15
import domain.minerpool
16
from domain.mining import Miner, AvailablePool, MinerStatus
17
from domain.rep import MinerRepository, PoolRepository, LoginRepository, RuleParametersRepository, BaseRepository
18
#from domain.miningrules import RuleParameters
19
from domain.sensors import Sensor, SensorValue
20
from messaging.messages import Message, MessageSchema, MinerMessageSchema, ConfigurationMessageSchema
21
from messaging.sensormessages import SensorValueSchema
22
from messaging.schema import MinerSchema, MinerStatsSchema, MinerCurrentPoolSchema
23
from helpers.queuehelper import QueueName, Queue, QueueEntry, QueueType
24
from helpers.camerahelper import take_picture
25
from helpers.temperaturehelper import readtemperature
26
import backend.fcmutils as utils
27
from backend.fcmcache import Cache, CacheKeys
28
from backend.fcmbus import Bus
29
from backend.fcmcomponent import ComponentName
30
from backend.fcmservice import BaseService, PoolService, ServiceName, InfrastructureService, Configuration, Telegram
31
from backend.fcmminer import Antminer
32
33
class Component(object):
34
    '''A component is a unit of execution of FCM'''
35
    def __init__(self, componentname, option=''):
36
        self.app = ApplicationService(component=componentname, option=option)
37
        #was a queue, now its a channel
38
        self.listeningqueue = None
39
40
    def listen(self):
41
        if self.listeningqueue:
42
            self.app.bus.listen(self.listeningqueue)
43
44
class ApplicationService(BaseService):
45
    '''Application Services'''
46
    programnamefull = ''
47
    programname = ''
48
    component = ComponentName.fullcycle
49
    loglevel = 0
50
    #true if user passed in -now command line argument
51
    isrunnow = False
52
    #dictionary of queues managed by this app
53
    _queues = {}
54
    _channels = []
55
    #the startup directory
56
    __logger = None
57
    __logger_debug = None
58
    __logger_error = None
59
60
    def __init__(self, component=ComponentName.fullcycle, option=None, announceyourself=False):
61
        super(ApplicationService, self).__init__()
62
        self.component = component
63
        if self.component == ComponentName.fullcycle:
64
            self.print('Starting FCM Init')
65
        self.initargs(option)
66
        self.startupstuff()
67
        if self.component == ComponentName.fullcycle:
68
            self.print('Starting FCM Configuration')
69
        self.setup_configuration()
70
        self.initlogger()
71
        self.initmessaging()
72
        #this is slow. should be option to opt out of cache?
73
        if self.component == ComponentName.fullcycle:
74
            self.loginfo('Starting FCM Cache')
75
        self.initcache()
76
        self.initbus()
77
        self.init_application()
78
        self.init_sensors()
79
80
        if announceyourself:
81
            self.sendqueueitem(QueueEntry(QueueName.Q_LOG, self.stamp('Started {0}'.format(self.component)), QueueType.broadcast))
82
83
    def initargs(self, option):
84
        '''process command line arguments'''
85
        if sys.argv:
86
            self.programnamefull = sys.argv[0]
87
            self.programname = os.path.basename(self.programnamefull)
88
        firstarg = option
89
        if len(sys.argv) > 1:
90
            firstarg = sys.argv[1]
91
        if firstarg is not None:
92
            if firstarg == '-now':
93
                self.isrunnow = True
94
95
    def startupstuff(self):
96
        #used with colorama on windows
97
        init(autoreset=True)
98
99
    def initmessaging(self):
100
        '''start up messaging'''
101
        self._schemamsg = MessageSchema()
102
103
    def initcache(self):
104
        '''start up cache'''
105
        try:
106
            cachelogin = self.getservice(ServiceName.cache)
107
            self.__cache = Cache(cachelogin)
108
        except Exception as ex:
109
            #cache is offline. try to run in degraded mode
110
            self.logexception(ex)
111
112
    def startup(self):
113
        self.initminercache()
114
        self.initpoolcache()
115
116
    def initbus(self):
117
        '''start up message bus'''
118
        login = self.getservice(ServiceName.messagebus)
119
        self.__bus = Bus(login)
120
121
    @property
122
    def bus(self):
123
        return self.__bus
124
125
    @property
126
    def cache(self):
127
        return self.__cache
128
129
    def init_sensors(self):
130
        self.sensor = Sensor('controller', 'DHT22', 'controller')
131
132
    def init_application(self):
133
        self.antminer = Antminer(self.configuration, self.sshlogin())
134
        self.telegram = Telegram(self.configuration, self.getservice(ServiceName.telegram))
135
        self.pools = PoolService(self.cache)
136
137
    @property
138
    def isdebug(self):
139
        return sys.flags.debug
140
141
    def setup_configuration(self):
142
        '''configuration is loaded once at startup'''
143
        raw = BaseRepository().readrawfile(self.getconfigfilename('config/fullcycle.conf'))
144
        config = json.loads(raw)
145
146
        self.configuration = Configuration(config)
147
        self.applicationid = self.configuration.get('applicationid')
148
        self.loglevel = self.configuration.get('loglevel')
149
150
    def initpoolcache(self):
151
        if self.__cache.get(CacheKeys.pools) is None:
152
            spools = PoolRepository().readrawfile(self.getconfigfilename('config/pools.conf'))
153
            self.__cache.tryputcache(CacheKeys.pools, spools)
154
        for pool in self.pools.get_all_pools():
155
            #pool isinstance of Pool
156
            availablepool = AvailablePool(pool.pool_type, pool, pool.url, pool.user, pool.password, pool.priority)
157
            minerpool = domain.minerpool.MinerPool(miner=None, priority=0, pool=availablepool)
158
            self.pools.putpool(pool)
159
            self.pools.add_pool(minerpool)
160
161
    def initminercache(self):
162
        '''put known miners into cache'''
163
        if self.__cache.get(CacheKeys.miners) is None:
164
            sminers = MinerRepository().readrawfile(self.getconfigfilename('config/miners.conf'))
165
            self.__cache.tryputcache(CacheKeys.miners, sminers)
166
167
        for miner in self.miners():
168
            #status is not persisted yet so init from name
169
            if miner.is_manually_disabled():
170
                miner.status = MinerStatus.Disabled
171
            if self.getminer(miner) is None:
172
                self.putminer(miner)
173
174
    def initlogger(self):
175
        '''set up logging application info'''
176
        self.__logger = self.setup_logger('fcmapp', 'fcm.log', logging.INFO)
177
178
        self.__logger_debug = self.setup_logger('fcmdebug', 'fcm.bug', logging.DEBUG)
179
180
        self.__logger_error = self.setup_logger('fcmerror', 'fcm.err', logging.ERROR)
181
182
    def setup_logger(self, logger_name, log_file, level=logging.INFO):
183
        '''start logger'''
184
        logr = logging.getLogger(logger_name)
185
        formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
186
        #by default will append. use mode='w' to overwrite
187
        file_handler = logging.FileHandler(log_file)
188
        file_handler.setFormatter(formatter)
189
        logr.addHandler(file_handler)
190
        # is setting stream necessary
191
        stream_handler = logging.StreamHandler()
192
        stream_handler.setFormatter(formatter)
193
        logr.setLevel(level)
194
        return logr
195
196
    def loginfo(self, message):
197
        '''log informational message'''
198
        logmsg = '{0}: {1}'.format(self.programname, message)
199
        self.__logger.info(logmsg)
200
        print(message)
201
202
    def logerror(self, message):
203
        '''log error'''
204
        logmsg = '{0}: {1}'.format(self.programname, message)
205
        self.__logger_error.error(logmsg)
206
        print(Fore.RED+logmsg)
207
208
    def logdebug(self, message):
209
        '''log debug message'''
210
        if not self.loglevel or self.loglevel == 0:
211
            return
212
        logmsg = '{0}: {1}'.format(self.programname, message)
213
        self.__logger_debug.debug(logmsg)
214
        print(Fore.GREEN+logmsg)
215
216
    def print(self, message):
217
        '''echo message to screen'''
218
        print('{0}: {1}'.format(self.now(), message))
219
220
    def now(self):
221
        '''current time formatted as friendly string'''
222
        return utils.formattime(datetime.datetime.now())
223
224
    #region lookups
225
    #todo: move to configurations section
226
    def miners(self):
227
        '''configured miners'''
228
        miners = MinerRepository().readminers(self.getconfigfilename('config/miners.conf'))
229
        return miners
230
231
    def knownminers(self):
232
        '''for now just return a list of miners
233
        later should consider returning a list that is easily searched and filtered
234
        '''
235
        dknownminers = self.__cache.gethashset(CacheKeys.knownminers)
236
        if dknownminers is not None and dknownminers:
237
            #get list of miners from cache
238
            return utils.deserializelistofstrings(list(dknownminers.values()), MinerSchema())
239
        knownminers = self.miners()
240
        return knownminers
241
242
    def allminers(self):
243
        '''combined list of discovered miners and configured miners'''
244
        allminers = self.knownminers()
245
        for miner in self.miners():
246
            foundminer = [x for x in allminers if x.key() == miner.key()]
247
            if not foundminer:
248
                allminers.append(miner)
249
        return allminers
250
251
    def knownsensors(self):
252
        dknownsensors = self.__cache.gethashset(CacheKeys.knownsensors)
253
        if dknownsensors is not None and dknownsensors:
254
            return utils.deserializelist_withschema(SensorValueSchema(), list(dknownsensors.values()))
255
        return None
256
257
    def addknownsensor(self, sensorvalue):
258
        val = utils.jsonserialize(SensorValueSchema(), sensorvalue)
259
        self.__cache.putinhashset(CacheKeys.knownsensors, sensorvalue.sensorid, val)
260
261
    def minersummary(self, max_number=10):
262
        '''show a summary of known miners
263
        '''
264
        mode = self.configuration.get('summary')
265
        if not mode:
266
            mode = 'auto'
267
        knownminers = self.knownminers()
268
        if len(knownminers) <= max_number:
269
            return '\n'.join([m.summary() for m in knownminers])
270
        groupbystatus = defaultdict(list)
271
        for miner in knownminers:
272
            groupbystatus[miner.status].append(miner)
273
        return '\n'.join(['{0}: {1}'.format(s, self.summary_by_status(s, groupbystatus[s])) for s in groupbystatus])
274
275
    def summary_by_status(self, key, minerlist):
276
        if key == 'online':
277
            return '{0} miners hash {1}'.format(self.summarize_count(minerlist), self.summarize_hash(minerlist))
278
        return self.summarize_count(minerlist)
279
280
    def summarize_count(self, minerlist):
281
        return len(minerlist)
282
283
    def summarize_hash(self, minerlist):
284
        return sum(miner.minerstats.currenthash for miner in minerlist)
285
286
    def addknownminer(self, miner):
287
        '''add miner to known miners list'''
288
        val = self.serialize(miner)
289
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
290
291
    def updateknownminer(self, miner):
292
        '''update known miner in cache'''
293
        sminer = self.__cache.getfromhashset(CacheKeys.knownminers, miner.key())
294
        memminer = utils.deserialize(MinerSchema(), utils.safestring(sminer))
295
        if memminer is None:
296
            memminer = miner
297
        else:
298
            #merge new values
299
            memminer.updatefrom(miner)
300
        val = self.serialize(memminer)
301
        self.__cache.putinhashset(CacheKeys.knownminers, miner.key(), val)
302
303
    def sshlogin(self):
304
        '''return contents of login file'''
305
        return self.readlogin('ftp.conf')
306
307
    def readlogin(self, filename):
308
        '''read login file configuration'''
309
        login = LoginRepository().readlogins(self.getconfigfilename('config/'+filename))
310
        return login
311
312
    def ruleparameters(self):
313
        '''rules parameters'''
314
        return RuleParametersRepository().readrules(self.getconfigfilename('config/'+'rules.conf'))
315
316
    def getservice(self, servicename):
317
        '''get a service by name. should be repository'''
318
        file_name = self.getconfigfilename('config/services.conf')
319
        with open(file_name, encoding='utf-8-sig') as config_file:
320
            content = json.loads(config_file.read())
321
        svc = None #dummy initializer to make scrutinize happy
322
        services = [InfrastructureService(**s) for s in content]
323
        return next((svc for svc in services if svc.name == servicename), None)
324
325
    def getservice_useroverride(self, servicename):
326
        service = self.getservice(servicename)
327
        service.user = self.component
328
        return service
329
    #endregion lookups
330
331
    def listen(self, qlisten):
332
        """Goes into listening mode on a queue"""
333
        try:
334
            self.bus.listen(qlisten)
335
        except KeyboardInterrupt:
336
            self.shutdown()
337
        except BaseException as unhandled:
338
            self.unhandledexception(unhandled)
339
340
    def registerqueue(self, qregister: Queue):
341
        '''register a queue'''
342
        self.logdebug(self.stamp('Registered queue {0}'.format(qregister.queue_name)))
343
        if qregister.queue_name not in self._queues.keys():
344
            self._queues[qregister.queue_name] = qregister
345
346
    def shutdown(self, exitstatus=0):
347
        '''shut down app services'''
348
        self.loginfo('Shutting down fcm app...')
349
        self.close_channels()
350
        self.closequeues()
351
        if self.__bus:
352
            self.bus.close()
353
        if self.__cache is not None:
354
            self.__cache.close()
355
        sys.exit(exitstatus)
356
357
    def closequeue(self, thequeue):
358
        '''close the queue'''
359
        if not thequeue:
360
            return
361
        try:
362
            if thequeue is not None:
363
                self.logdebug(self.stamp('closing queue {0}'.format(thequeue.queue_name)))
364
                thequeue.close()
365
            del self._queues[thequeue.queue_name]
366
        except Exception as ex:
367
            self.logexception(ex)
368
369
    def closequeues(self):
370
        '''close a bunch of queues'''
371
        for k in list(self._queues):
372
            self.closequeue(self._queues[k])
373
374
    def close_channel(self, chan):
375
        if not chan:
376
            return
377
        try:
378
            if chan.name in self._channels:
379
                self.logdebug(self.stamp('closing channel {0}'.format(chan.name)))
380
                chan.close()
381
                del self._channels[chan.name]
382
        except Exception as ex:
383
            self.logexception(ex)
384
385
    def close_channels(self):
386
        '''close all channels'''
387
        for chan in list(self._channels):
388
            self.close_channel(self._channels[chan])
389
390
    def unhandledexception(self, unhandled):
391
        '''what to do when there is an exception that app cannot handle'''
392
        self.logexception(unhandled)
393
394
    def exceptionmessage(self, ex):
395
        '''gets exception message even when it doesnt have one'''
396
        exc_type, _, exc_tb = sys.exc_info()
397
        exmsg = getattr(ex, 'message', repr(ex))
398
        return '{0}:{1}:{2}'.format(exc_type, exc_tb.tb_lineno, exmsg)
399
400
    def logexception(self, ex):
401
        '''log an exception'''
402
        self.logerror(self.exceptionmessage(ex))
403
404
    def sendlog(self, logmessage):
405
        '''send message to log queue'''
406
        item = QueueEntry(QueueName.Q_LOG, logmessage, 'broadcast')
407
        self.sendqueueitem(item)
408
        print(logmessage)
409
410
    def subscribe(self, name, callback, no_acknowledge=True, prefetch=1):
411
        '''subscribe to a queue'''
412
        chan = self.bus.subscribe(name, callback, no_acknowledge=no_acknowledge, prefetch_count=prefetch)
413
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(name))
414
        return chan
415
416
    def listen_to_broadcast(self, broadcast_name, callback, no_acknowledge=True):
417
        thebroadcast = self.bus.subscribe_broadcast(broadcast_name, callback, no_acknowledge)
418
        print('Waiting for messages on {0}. To exit press CTRL+C'.format(broadcast_name))
419
        self.bus.listen(thebroadcast)
420
        #never returns becuase listen is blocking
421
        return thebroadcast
422
423
    def trypublish(self, queue_name, msg: str):
424
        '''publish a message to the queue'''
425
        try:
426
            self.bus.publish(queue_name, msg)
427
            return True
428
        except pika.exceptions.ConnectionClosed as ex:
429
            logmessage = 'Error publishing to {0} {1}'.format(queue_name, self.exceptionmessage(ex))
430
            self.logerror(logmessage)
431
            return False
432
433
    def trybroadcast(self, exchange_name, msg):
434
        '''broadcast a message to all queue listeners'''
435
        try:
436
            self.bus.broadcast(exchange_name, msg)
437
            return True
438
        except pika.exceptions.ConnectionClosed as conxex:
439
            self.logerror('Error broadcasting to {0} {1}'.format(exchange_name, self.exceptionmessage(conxex)))
440
            return False
441
442
    def queuestatus(self):
443
        """TODO:print queue status"""
444
        pass
445
        #for q in self._queues.values():
446
        #    print(q.queue_name,str(q._connection))
447
448
    def putminer(self, miner: Miner):
449
        '''put miner in cache'''
450
        if miner and miner.key():
451
            valu = self.serialize(miner)
452
            self.__cache.tryputcache('miner.{0}'.format(miner.key()), valu)
453
454
    def getminer(self, miner: Miner) -> Miner:
455
        '''strategies for getting miner from cache
456
        originally was key=miner.name but that was not good
457
        changed to key='miner.'+minerid
458
        '''
459
        valu = self.cache.trygetvaluefromcache('miner.{0}'.format(miner.key()))
460
        if valu is None:
461
            return None
462
        minerfromstore = utils.deserialize(MinerSchema(), utils.safestring(valu))
463
        if not minerfromstore.key():
464
            #do not allow entry with no key
465
            return None
466
        minerfromstore.store = 'mem'
467
        return minerfromstore
468
469
    def getknownminer(self, miner: Miner) -> Miner:
470
        '''get a known miner'''
471
        return self.getknownminerbykey(miner.key())
472
473
    def getminerbyname(self, minername):
474
        filtered = [x for x in self.miners() if x.name == minername]
475
        if filtered: return filtered[0]
476
        return None
477
478
    def getknownminerbykey(self, minername):
479
        str_miner = self.__cache.getfromhashset(CacheKeys.knownminers, minername)
480
        if str_miner is None:
481
            return None
482
        return utils.deserialize(MinerSchema(), utils.safestring(str_miner))
483
484
    def getknownminerbyname(self, minername):
485
        '''this could be slow if there are lots of miners'''
486
        known = self.knownminers()
487
        for miner in known:
488
            if miner.name == minername:
489
                return miner
490
        return None
491
492
    def putminerandstats(self, miner: Miner, minerstats, minerpool):
493
        '''put miner and status in cache'''
494
        self.putminer(miner)
495
        schema = MinerStatsSchema()
496
        valstats = schema.dumps(minerstats).data
497
        self.__cache.tryputcache(miner.key() + '.stats', valstats)
498
        schema = MinerCurrentPoolSchema()
499
        valpool = schema.dumps(minerpool).data
500
        self.__cache.tryputcache(miner.key() + '.pool', valpool)
501
502
    def getstats(self, miner: Miner):
503
        '''get stats entity'''
504
        valu = self.cache.trygetvaluefromcache(miner.name + '.stats')
505
        if valu is None: return None
506
        entity = domain.minerstatistics.MinerStatistics(miner, **utils.deserialize(MinerStatsSchema(), valu))
507
        return entity
508
509
    def getminerstatscached(self):
510
        '''iterator for cached stats'''
511
        for miner in self.miners():
512
            yield (self.getminer(miner), self.getstats(miner), self.pools.getpool(miner))
513
514
    def messagedecodeminer(self, body) -> Miner:
515
        '''deserialize a miner message'''
516
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
517
        schema = MinerMessageSchema()
518
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
519
        minermessage_entity = schema.make_minermessage(minermessage_dict)
520
        miner = minermessage_entity.miner
521
        return miner
522
523
    def messagedecodeminerstats(self, body):
524
        '''deserialize miner stats'''
525
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
526
        schema = MinerMessageSchema()
527
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
528
        minermessage_entity = schema.make_minermessage(minermessage_dict)
529
        return minermessage_entity
530
531
    def messagedecodeminercommand(self, body):
532
        '''deserialize  miner command'''
533
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
534
        schema = MinerMessageSchema()
535
        minermessage_dict = schema.load(message_envelope.bodyjson()).data
536
        minermessage_entity = schema.make_minermessage(minermessage_dict)
537
        return minermessage_entity
538
539
    def messagedecodesensor(self, body):
540
        '''deserialize sensor value '''
541
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
542
        schema = SensorValueSchema()
543
        #minermessage_dict = schema.load(message_envelope.bodyjson()).data
544
        entity = schema.load(message_envelope.bodyjson()).data
545
        return message_envelope, entity
546
547
    def messagedecode_configuration(self, body):
548
        '''deserialize  configuration command'''
549
        message_envelope = self.deserializemessageenvelope(utils.safestring(body))
550
        schema = ConfigurationMessageSchema()
551
        configurationmessage_dict = schema.load(message_envelope.bodyjson()).data
552
        configurationmessage_entity = schema.make_configurationmessage(configurationmessage_dict)
553
        return configurationmessage_entity
554
555
    def createmessageenvelope(self):
556
        '''create message envelope'''
557
        return Message(timestamp=datetime.datetime.utcnow(), sender=self.component)
558
559
    def serializemessageenvelope(self, msg):
560
        '''serialize message envelope'''
561
        return self._schemamsg.dumps(msg).data
562
563
    def deserializemessageenvelope(self, body):
564
        '''serialize message envelope'''
565
        return self._schemamsg.load(json.loads(utils.safestring(body))).data
566
567
    def createmessagestats(self, miner, minerstats, minerpool):
568
        #always save the miner so the next guy can get latest changes
569
        #only put in cache if it came from cache
570
        if miner.store == 'mem':
571
            self.putminer(miner)
572
        message = self.createmessageenvelope()
573
        message = message.make_minerstats(miner, minerstats, minerpool)
574
        return self.serializemessageenvelope(message)
575
576
    def createmessagecommand(self, miner, command):
577
        '''create message command'''
578
        if miner.store == 'mem':
579
            self.putminer(miner)
580
        message = self.createmessageenvelope()
581
        message = message.make_minercommand(miner, command)
582
        return self.serializemessageenvelope(message)
583
584
    def messageencode(self, miner: Miner):
585
        '''command is optional, however should convert this call into minercommand'''
586
        #always save the miner so the next guy can get latest changes
587
        if miner.store == 'mem':
588
            self.putminer(miner)
589
        message = self.createmessageenvelope()
590
        message = message.make_minerstats(miner, minerstats=None, minerpool=None)
591
        return self._schemamsg.dumps(message).data
592
593
    def stamp(self, message):
594
        return '{0}:{1}: {2}'.format(self.now(), self.applicationid, message)
595
596
    def alert(self, message):
597
        '''send alert message'''
598
        return self.sendqueueitem(QueueEntry(QueueName.Q_ALERT, self.stamp(message), QueueType.broadcast))
599
600
    def send(self, q_name, message):
601
        '''send message to queue'''
602
        success = self.trypublish(q_name, message)
603
        return success
604
605
    def enqueue(self, queuelist):
606
        '''send a list of queue messages'''
607
        if queuelist is None:
608
            return
609
        if not queuelist.hasentries():
610
            return
611
        #todo: group by queuename
612
        for entry in queuelist.entries:
613
            self.sendqueueitem(entry)
614
615
    def sendqueueitem(self, entry):
616
        '''send one queue item'''
617
        if entry.eventtype == 'broadcast':
618
            send_result = self.trybroadcast(entry.queuename, entry.message)
619
            return send_result
620
        return self.send(entry.queuename, entry.message)
621
622
    def take_picture(self, file_name='fullcycle_camera.png'):
623
        pic = take_picture(file_name,
624
                           self.configuration.get('camera.size'),
625
                           self.configuration.get('camera.quality'),
626
                           self.configuration.get('camera.brightness'),
627
                           self.configuration.get('camera.contrast'))
628
        return pic
629
630
    def store_picture_cache(self, file_name):
631
        if os.path.isfile(file_name):
632
            with open(file_name, 'rb') as photofile:
633
                picdata = photofile.read()
634
            reading = SensorValue('fullcyclecamera', base64.b64encode(picdata), 'camera')
635
            #reading.sensor = self.sensor
636
            #self.sendsensor(reading)
637
            message = self.createmessageenvelope()
638
            sensorjson = message.jsonserialize(SensorValueSchema(), reading)
639
            self.__cache.tryputcache(CacheKeys.camera, sensorjson)
640
641
    def readtemperature(self):
642
        try:
643
            sensor_humid, sensor_temp = readtemperature()
644
            if sensor_temp is not None:
645
                reading = SensorValue('fullcycletemp', sensor_temp, 'temperature')
646
                reading.sensor = self.sensor
647
                self.sendsensor(reading)
648
            if sensor_humid is not None:
649
                reading = SensorValue('fullcyclehumid', sensor_humid, 'humidity')
650
                reading.sensor = self.sensor
651
                self.sendsensor(reading)
652
            return sensor_humid, sensor_temp
653
        except BaseException as ex:
654
            self.logexception(ex)
655
        return None, None
656
657
    def sendsensor(self, reading):
658
        message = self.createmessageenvelope()
659
        sensorjson = message.jsonserialize(SensorValueSchema(), reading)
660
        self.sendqueueitem(QueueEntry(QueueName.Q_SENSOR, self.serializemessageenvelope(message.make_any('sensorvalue', sensorjson)), QueueType.broadcast))
661
662
    def getsession(self):
663
        service = self.getservice(ServiceName.database)
664
        engine = create_engine(service.connection, echo=False)
665
        Session = sessionmaker(bind=engine)
666
        return Session()
667
668
    def log_mineractivity(self, minerlog):
669
        try:
670
            session = self.getsession()
671
            session.add(minerlog)
672
            session.commit()
673
            return True
674
        except BaseException as ex:
675
            self.logexception(ex)
676
        return False
677
678
    def save_miner(self, miner: Miner):
679
        found = self.getknownminer(miner)
680
        if found is None:
681
            self.addknownminer(miner)
682
            #miners = MinerRepository()
683
            #todo:add the miner to the json config
684
        else:
685
            found.updatefrom(miner)
686
            self.putminer(found)
687
688
def main():
689
    full_cycle = ApplicationService()
690
    full_cycle.loginfo('Full Cycle was run in a script')
691
    full_cycle.shutdown()
692
693
if __name__ == "__main__":
694
    main()
695