1
|
|
|
""" |
2
|
|
|
Enarksh |
3
|
|
|
|
4
|
|
|
Copyright 2013-2016 Set Based IT Consultancy |
5
|
|
|
|
6
|
|
|
Licence MIT |
7
|
|
|
""" |
8
|
|
|
import os |
9
|
|
|
import traceback |
10
|
|
|
import sys |
11
|
|
|
|
12
|
|
|
import zmq |
13
|
|
|
|
14
|
|
|
import enarksh |
15
|
|
|
from enarksh.DataLayer import DataLayer |
16
|
|
|
|
17
|
|
|
|
18
|
|
|
class Logger: |
|
|
|
|
19
|
|
|
_instance = None |
20
|
|
|
|
21
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
22
|
|
|
def __init__(self): |
23
|
|
|
Logger._instance = self |
24
|
|
|
|
25
|
|
|
self._events = [] |
26
|
|
|
""" |
27
|
|
|
A list of events that needs be be processed. |
28
|
|
|
|
29
|
|
|
:type: list |
30
|
|
|
""" |
31
|
|
|
|
32
|
|
|
self._exit_flag = False |
33
|
|
|
""" |
34
|
|
|
If set the logger must terminate. |
35
|
|
|
|
36
|
|
|
:type: bool |
37
|
|
|
""" |
38
|
|
|
|
39
|
|
|
self._zmq_context = None |
40
|
|
|
""" |
41
|
|
|
The ZMQ context. |
42
|
|
|
|
43
|
|
|
:type: |
44
|
|
|
""" |
45
|
|
|
|
46
|
|
|
self._zmq_pull_socket = None |
47
|
|
|
""" |
48
|
|
|
ZMQ socket for asynchronous incoming messages. |
49
|
|
|
|
50
|
|
|
:type: |
51
|
|
|
""" |
52
|
|
|
|
53
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
54
|
|
|
def main(self): |
|
|
|
|
55
|
|
|
self._startup() |
56
|
|
|
|
57
|
|
|
while not self._exit_flag or self._events: |
58
|
|
|
try: |
59
|
|
|
self._read_messages() |
60
|
|
|
|
61
|
|
|
# Connect to the database and start a transaction. |
62
|
|
|
DataLayer.connect() |
63
|
|
|
DataLayer.start_transaction() |
64
|
|
|
|
65
|
|
|
while self._events: |
66
|
|
|
event = self._events.pop() |
67
|
|
|
|
68
|
|
|
if event['type'] == 'logfile': |
69
|
|
|
self._event_handler_log_file(event['message']) |
70
|
|
|
|
71
|
|
|
elif event['type'] == 'exit': |
72
|
|
|
self._event_handler_exit() |
73
|
|
|
|
74
|
|
|
else: |
75
|
|
|
raise Exception("Unknown event type '%s'." % event['type']) |
76
|
|
|
|
77
|
|
|
# Commit the transaction and disconnect form the database. |
78
|
|
|
DataLayer.commit() |
79
|
|
|
DataLayer.disconnect() |
80
|
|
|
|
81
|
|
|
except Exception as exception1: |
|
|
|
|
82
|
|
|
try: |
83
|
|
|
DataLayer.rollback() |
84
|
|
|
DataLayer.disconnect() |
85
|
|
|
except Exception as exception2: |
|
|
|
|
86
|
|
|
print(exception2, file=sys.stderr) |
87
|
|
|
traceback.print_exc(file=sys.stderr) |
88
|
|
|
print(exception1, file=sys.stderr) |
89
|
|
|
traceback.print_exc(file=sys.stderr) |
90
|
|
|
|
91
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
92
|
|
|
@staticmethod |
93
|
|
|
def daemonize(): |
|
|
|
|
94
|
|
|
enarksh.daemonize(os.path.join(enarksh.HOME, 'var/lock/loggerd.pid'), |
95
|
|
|
'/dev/null', |
96
|
|
|
os.path.join(enarksh.HOME, 'var/log/loggerd.log'), |
97
|
|
|
os.path.join(enarksh.HOME, 'var/log/loggerd.log')) |
98
|
|
|
|
99
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
100
|
|
|
def _add_event(self, event): |
101
|
|
|
""" |
102
|
|
|
Adds an event to the event queue. |
103
|
|
|
|
104
|
|
|
:param dict event: |
105
|
|
|
""" |
106
|
|
|
self._events.append(event) |
107
|
|
|
|
108
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
109
|
|
|
def _event_handler_exit(self): |
110
|
|
|
""" |
111
|
|
|
Handles an exit message from the controller. |
112
|
|
|
""" |
113
|
|
|
self._exit_flag = True |
114
|
|
|
|
115
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
116
|
|
|
def _event_handler_log_file(self, message): |
117
|
|
|
""" |
118
|
|
|
Handles a new log file available message from the spawner. |
119
|
|
|
|
120
|
|
|
:param dict message: |
121
|
|
|
""" |
122
|
|
|
print("%s %s %s" % (message['rnd_id'], message['name'], message['total_size'])) |
123
|
|
|
|
124
|
|
|
if message['total_size'] > 0: |
125
|
|
|
# Read the log file or log files and concatenate if necessary. |
126
|
|
|
with open(message['filename1'], 'rb') as f: |
|
|
|
|
127
|
|
|
log = f.read() |
128
|
|
|
|
129
|
|
|
if message['filename2']: |
130
|
|
|
with open(message['filename2'], 'rb') as f: |
|
|
|
|
131
|
|
|
buf2 = f.read() |
132
|
|
|
else: |
133
|
|
|
buf2 = '' |
134
|
|
|
|
135
|
|
|
# Compute the number of skipped bytes. |
136
|
|
|
skipped = message['total_size'] - len(log) - len(buf2) |
137
|
|
|
|
138
|
|
|
if skipped != 0: |
139
|
|
|
# Add a newline to the end of the buffer, if required. |
140
|
|
|
if log[-1:] != '\n': |
141
|
|
|
log += '\n' |
142
|
|
|
|
143
|
|
|
# Note: This concatenation doesn't work for multi byte character sets. |
144
|
|
|
log += '\n' |
145
|
|
|
log += "Enarksh: Skipped {0} bytes.\n".format(skipped) |
146
|
|
|
log += '\n' |
147
|
|
|
|
148
|
|
|
log += buf2 |
149
|
|
|
|
150
|
|
|
blb_id = DataLayer.enk_blob_insert_blob(message['name'], 'text/plain', log) |
151
|
|
|
|
152
|
|
|
else: |
153
|
|
|
blb_id = None |
154
|
|
|
|
155
|
|
|
if message['name'] == 'out': |
156
|
|
|
DataLayer.enk_back_run_node_update_log(message['rnd_id'], blb_id, message['total_size']) |
157
|
|
|
elif message['name'] == 'err': |
158
|
|
|
DataLayer.enk_back_run_node_update_err(message['rnd_id'], blb_id, message['total_size']) |
159
|
|
|
else: |
160
|
|
|
raise Exception("Unknown output name '%s'." % message['name']) |
161
|
|
|
|
162
|
|
|
# Remove the (temporary) log files. |
163
|
|
|
if message['filename1']: |
164
|
|
|
os.unlink(message['filename1']) |
165
|
|
|
if message['filename2']: |
166
|
|
|
os.unlink(message['filename2']) |
167
|
|
|
|
168
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
169
|
|
|
def _read_messages(self): |
170
|
|
|
""" |
171
|
|
|
Reads messages from other processes (i.e. spawner and controller). |
172
|
|
|
""" |
173
|
|
|
message = self._zmq_pull_socket.recv_json() |
174
|
|
|
|
175
|
|
|
if message['type'] == 'log_file': |
176
|
|
|
event = {'type': 'logfile', |
177
|
|
|
'message': message} |
178
|
|
|
self._add_event(event) |
179
|
|
|
|
180
|
|
|
elif message['type'] == 'exit': |
181
|
|
|
event = {'type': 'exit'} |
182
|
|
|
self._add_event(event) |
183
|
|
|
|
184
|
|
|
else: |
185
|
|
|
raise Exception("Unknown event type '%s'." % message['type']) |
186
|
|
|
|
187
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
188
|
|
|
@staticmethod |
189
|
|
|
def _create_pid_file(): |
190
|
|
|
""" |
191
|
|
|
Creates a PID file and writes the PID of the logger to this file. |
192
|
|
|
""" |
193
|
|
|
filename = os.path.join(enarksh.ENK_LOCK_DIR, 'logger.pid') |
194
|
|
|
with open(filename, 'wt') as f: |
|
|
|
|
195
|
|
|
f.write(str(os.getpid())) |
196
|
|
|
|
197
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
198
|
|
|
@staticmethod |
199
|
|
|
def _remove_pid_file(): |
200
|
|
|
""" |
201
|
|
|
Removes the PID file. |
202
|
|
|
""" |
203
|
|
|
filename = os.path.join(enarksh.ENK_LOCK_DIR, 'logger.pid') |
204
|
|
|
os.unlink(filename) |
205
|
|
|
|
206
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
207
|
|
|
def _shutdown(self): |
208
|
|
|
""" |
209
|
|
|
Performs the necessary actions for stopping the logger. |
210
|
|
|
""" |
211
|
|
|
# Commit the last transaction and close the connection to the database. |
212
|
|
|
DataLayer.commit() |
213
|
|
|
DataLayer.disconnect() |
214
|
|
|
|
215
|
|
|
# Remove the PID file. |
216
|
|
|
self._remove_pid_file() |
217
|
|
|
|
218
|
|
|
# Log stop of the logger. |
219
|
|
|
print('Stop logger') |
220
|
|
|
|
221
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
222
|
|
|
def _startup(self): |
223
|
|
|
""" |
224
|
|
|
Performs the necessary actions for starting up the logger. |
225
|
|
|
""" |
226
|
|
|
# Log the start of the logger. |
227
|
|
|
print('Start logger') |
228
|
|
|
|
229
|
|
|
# Set database configuration options. |
230
|
|
|
DataLayer.config['host'] = enarksh.MYSQL_HOSTNAME |
231
|
|
|
DataLayer.config['user'] = enarksh.MYSQL_USERNAME |
232
|
|
|
DataLayer.config['password'] = enarksh.MYSQL_PASSWORD |
233
|
|
|
DataLayer.config['database'] = enarksh.MYSQL_SCHEMA |
234
|
|
|
DataLayer.config['port'] = enarksh.MYSQL_PORT |
235
|
|
|
|
236
|
|
|
# Setup ZMQ. |
237
|
|
|
self._zmq_init() |
238
|
|
|
|
239
|
|
|
# Create our PID file. |
240
|
|
|
self._create_pid_file() |
241
|
|
|
|
242
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
243
|
|
|
def _zmq_init(self): |
244
|
|
|
""" |
245
|
|
|
Initializes the ZMQ socket. |
246
|
|
|
""" |
247
|
|
|
self._zmq_context = zmq.Context() |
248
|
|
|
|
249
|
|
|
# Create socket for asynchronous incoming messages. |
250
|
|
|
self._zmq_pull_socket = self._zmq_context.socket(zmq.PULL) |
|
|
|
|
251
|
|
|
self._zmq_pull_socket.bind(enarksh.LOGGER_PULL_END_POINT) |
252
|
|
|
|
253
|
|
|
|
254
|
|
|
# ---------------------------------------------------------------------------------------------------------------------- |
255
|
|
|
|
The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:
If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.