GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Setup Failed
Push — master ( aee9c5...6ca25e )
by P.R.
03:23
created

Controller.daemonize()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 6
ccs 0
cts 2
cp 0
crap 2
rs 9.4285
1
"""
2
Enarksh
3
4
Copyright 2013-2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import gc
9
import os
10
import pwd
11
12
import zmq
13
14
import enarksh
15
from enarksh.DataLayer import DataLayer
16
from enarksh.controller import resource
17
from enarksh.controller.Schedule import Schedule
18
from enarksh.controller.event_handler.DynamicWorkerDefinitionMessageEventHandler import \
19
    DynamicWorkerDefinitionMessageEventHandler
20
from enarksh.controller.event_handler.EventQueueEmptyEventHandler import EventQueueEmptyEventHandler
21
from enarksh.controller.event_handler.JobFinishedMessageEventHandler import JobFinishedMessageEventHandler
22
from enarksh.controller.event_handler.RequestNodeActionMessageEventHandler import RequestNodeActionMessageEventHandler
23
from enarksh.controller.event_handler.RequestPossibleNodeActionsMessageEventHandler import \
24
    RequestPossibleNodeActionsMessageEventHandler
25
from enarksh.controller.event_handler.ScheduleDefinitionMessageEventHandler import ScheduleDefinitionMessageEventHandler
26
from enarksh.controller.message.DynamicWorkerDefinitionMessage import DynamicWorkerDefinitionMessage
27
from enarksh.controller.message.JobFinishedMessage import JobFinishedMessage
28
from enarksh.controller.message.RequestNodeActionMessage import RequestNodeActionMessage
29
from enarksh.controller.message.RequestPossibleNodeActionsMessage import RequestPossibleNodeActionsMessage
30
from enarksh.controller.message.ScheduleDefinitionMessage import ScheduleDefinitionMessage
31
from enarksh.event.EventActor import EventActor
32
from enarksh.event.EventController import EventController
33
from enarksh.message.MessageController import MessageController
34
35
36
class Controller(EventActor):
0 ignored issues
show
Coding Style introduced by
This class should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
37
    # ------------------------------------------------------------------------------------------------------------------
38
    def __init__(self):
39
        """
40
        Object constructor.
41
        """
42
        self.event_controller = EventController()
43
        """
44
        The event controller.
45
46
        :type: enarksh.event.EventController.EventController
47
        """
48
49
        EventActor.__init__(self)
50
51
        self.message_controller = MessageController()
52
        """
53
        The message controller.
54
55
        :type: enarksh.message.MessageController.MessageController
56
        """
57
58
        self.host_resources = {}
59
        """
60
        All resources defined at host level.
61
62
        :type: dict
63
        """
64
65
        self.schedules = {}
66
        """
67
        All the current schedules.
68
69
        :type: dict[int,enarksh.controller.Schedule.Schedule]
70
        """
71
72
    # ------------------------------------------------------------------------------------------------------------------
73
    @staticmethod
74
    def __set_unprivileged_user():
75
        """
76
        Set the real and effective user and group to an unprivileged user.
77
        """
78
        _, _, uid, gid, _, _, _ = pwd.getpwnam('enarksh')
79
80
        os.setresgid(gid, gid, 0)
81
        os.setresuid(uid, uid, 0)
82
83
    # ------------------------------------------------------------------------------------------------------------------
84
    def __create_host_resources(self):
85
        """
86
        Creates resources defined at host level.
87
        """
88
        resources_data = DataLayer.enk_back_get_host_resources()
89
        for resource_data in resources_data:
90
            self.host_resources[resource_data['rsc_id']] = resource.create_resource(resource_data)
91
92
    # ------------------------------------------------------------------------------------------------------------------
93
    def __startup(self):
94
        """
95
        Performs the necessary actions for starting the controller daemon.
96
        """
97
        print('Start controller.')
98
99
        # Set database configuration options.
100
        DataLayer.config['host'] = enarksh.MYSQL_HOSTNAME
101
        DataLayer.config['user'] = enarksh.MYSQL_USERNAME
102
        DataLayer.config['password'] = enarksh.MYSQL_PASSWORD
103
        DataLayer.config['database'] = enarksh.MYSQL_SCHEMA
104
        DataLayer.config['port'] = enarksh.MYSQL_PORT
105
        DataLayer.config['autocommit'] = False
106
107
        # Connect to the MySQL.
108
        DataLayer.connect()
109
        DataLayer.start_transaction()
110
111
        # Sanitise the data in the database.
112
        DataLayer.enk_back_controller_init()
113
114
        # Create resources defined at host level.
115
        self.__create_host_resources()
116
117
        # Set the effective user and group to an unprivileged user and group.
118
        self.__set_unprivileged_user()
119
120
        # Commit transaction and close connection to MySQL.
121
        DataLayer.commit()
122
123
    # ------------------------------------------------------------------------------------------------------------------
124
    @staticmethod
125
    def __shutdown():
126
        """
127
        Performs the necessary actions for stopping the controller.
128
        """
129
        # Log stop of the controller.
130
        print('Stop controller')
131
132
    # ------------------------------------------------------------------------------------------------------------------
133
    def slot_schedule_termination(self, event, rst_id, _listener_data):
134
        """
135
136
        :param enarksh.event.Event.Event event: The event.
137
        :param int rst_id: The run status of the schedule.
138
        :param * _listener_data: Not used
139
        """
140
        del _listener_data
141
142
        schedule = event.source
143
144
        print("Schedule %s has terminated with status %s" % (schedule.sch_id, rst_id))
145
146
        self.unload_schedule(schedule.sch_id)
147
148
    # ------------------------------------------------------------------------------------------------------------------
149
    def load_schedule(self, sch_id):
150
        """
151
        :param int sch_id:
152
153
        :rtype: enarksh.controller.Schedule.Schedule
154
        """
155
        print("Loading schedule '%s'." % sch_id)
156
157
        # Load the schedule.
158
        schedule = Schedule(sch_id, self.host_resources)
159
        schedule.event_schedule_termination.register_listener(self.slot_schedule_termination)
160
161
        # Register the schedule.
162
        self.schedules[sch_id] = schedule
163
164
        return schedule
165
166
    # ------------------------------------------------------------------------------------------------------------------
167
    def unload_schedule(self, sch_id):
168
        """
169
        :param int sch_id:
170
        """
171
        print("Unloading schedule '%s'." % sch_id)
172
173
        if sch_id in self.schedules:
174
            # Remove the schedule.
175
            del self.schedules[sch_id]
176
177
            gc.collect()
178
179
    # ------------------------------------------------------------------------------------------------------------------
180
    def reload_schedule(self, sch_id):
181
        """
182
        :param int sch_id:
183
184
        :rtype: enarksh.controller.Schedule.Schedule
185
        """
186
        self.unload_schedule(sch_id)
187
188
        return self.load_schedule(sch_id)
189
190
    # ------------------------------------------------------------------------------------------------------------------
191
    def get_schedule_by_sch_id(self, sch_id):
192
        """
193
        Returns a schedule.
194
195
        :param int sch_id: The ID of the schedule.
196
197
        :rtype: enarksh.controller.Schedule.Schedule
198
        """
199
        schedule = self.schedules.get(int(sch_id), None)
200
        if not schedule:
201
            # Load the schedule if the schedule is not currently loaded.
202
            schedule = self.load_schedule(sch_id)
203
204
        return schedule
205
206
    # ------------------------------------------------------------------------------------------------------------------
207
    def _register_sockets(self):
208
        """
209
        Registers ZMQ sockets for communication with other processes in Enarksh.
210
        """
211
        # Register socket for receiving asynchronous incoming messages.
212
        self.message_controller.register_end_point('pull', zmq.PULL, enarksh.CONTROLLER_PULL_END_POINT)
213
214
        # Create socket for lockstep incoming messages.       .
215
        self.message_controller.register_end_point('lockstep', zmq.REP, enarksh.CONTROLLER_LOCKSTEP_END_POINT)
216
217
        # Create socket for sending asynchronous messages to the spanner.
218
        self.message_controller.register_end_point('spawner', zmq.PUSH, enarksh.SPAWNER_PULL_END_POINT)
219
220
        # Create socket for sending asynchronous messages to the logger.
221
        self.message_controller.register_end_point('logger', zmq.PUSH, enarksh.LOGGER_PULL_END_POINT)
222
223
    # ------------------------------------------------------------------------------------------------------------------
224
    def _register_message_types(self):
225
        """
226
        Registers all message type that the controller handles at the message controller.
227
        """
228
        self.message_controller.register_message_type(DynamicWorkerDefinitionMessage.MESSAGE_TYPE)
229
        self.message_controller.register_message_type(JobFinishedMessage.MESSAGE_TYPE)
230
        self.message_controller.register_message_type(RequestNodeActionMessage.MESSAGE_TYPE)
231
        self.message_controller.register_message_type(RequestPossibleNodeActionsMessage.MESSAGE_TYPE)
232
        self.message_controller.register_message_type(ScheduleDefinitionMessage.MESSAGE_TYPE)
233
234
    # ------------------------------------------------------------------------------------------------------------------
235
    def _register_events_handlers(self):
236
        """
237
        Registers all event handlers at the event controller.
238
        """
239
        # Register message received event handlers.
240
        self.message_controller.register_listener(DynamicWorkerDefinitionMessage.MESSAGE_TYPE,
241
                                                  DynamicWorkerDefinitionMessageEventHandler.handle,
242
                                                  self)
243
        self.message_controller.register_listener(JobFinishedMessage.MESSAGE_TYPE,
244
                                                  JobFinishedMessageEventHandler.handle,
245
                                                  self)
246
        self.message_controller.register_listener(RequestNodeActionMessage.MESSAGE_TYPE,
247
                                                  RequestNodeActionMessageEventHandler.handle,
248
                                                  self)
249
        self.message_controller.register_listener(RequestPossibleNodeActionsMessage.MESSAGE_TYPE,
250
                                                  RequestPossibleNodeActionsMessageEventHandler.handle,
251
                                                  self)
252
        self.message_controller.register_listener(ScheduleDefinitionMessage.MESSAGE_TYPE,
253
                                                  ScheduleDefinitionMessageEventHandler.handle,
254
                                                  self)
255
256
        # Register other event handlers.
257
        self.event_controller.event_queue_empty.register_listener(EventQueueEmptyEventHandler.handle, self)
258
259
    # ------------------------------------------------------------------------------------------------------------------
260
    def main(self):
261
        """
262
        The main function of the job spawner.
263
        """
264
        self.__startup()
265
266
        self._register_sockets()
267
268
        self._register_message_types()
269
270
        self._register_events_handlers()
271
272
        self.message_controller.no_barking(5)
273
274
        self.event_controller.loop()
275
276
        self.__shutdown()
277
278
# ----------------------------------------------------------------------------------------------------------------------
279