|
1
|
|
|
""" |
|
2
|
|
|
Enarksh |
|
3
|
|
|
|
|
4
|
|
|
Copyright 2013-2016 Set Based IT Consultancy |
|
5
|
|
|
|
|
6
|
|
|
Licence MIT |
|
7
|
|
|
""" |
|
8
|
|
|
import os |
|
9
|
|
|
import traceback |
|
10
|
|
|
import sys |
|
11
|
|
|
|
|
12
|
|
|
import zmq |
|
13
|
|
|
|
|
14
|
|
|
import enarksh |
|
15
|
|
|
from enarksh.DataLayer import DataLayer |
|
16
|
|
|
|
|
17
|
|
|
|
|
18
|
|
|
class Logger: |
|
|
|
|
|
|
19
|
|
|
_instance = None |
|
20
|
|
|
|
|
21
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
|
22
|
|
|
def __init__(self): |
|
23
|
|
|
Logger._instance = self |
|
24
|
|
|
|
|
25
|
|
|
self._events = [] |
|
26
|
|
|
""" |
|
27
|
|
|
A list of events that needs be be processed. |
|
28
|
|
|
|
|
29
|
|
|
:type: list |
|
30
|
|
|
""" |
|
31
|
|
|
|
|
32
|
|
|
self._exit_flag = False |
|
33
|
|
|
""" |
|
34
|
|
|
If set the logger must terminate. |
|
35
|
|
|
|
|
36
|
|
|
:type: bool |
|
37
|
|
|
""" |
|
38
|
|
|
|
|
39
|
|
|
self._zmq_context = None |
|
40
|
|
|
""" |
|
41
|
|
|
The ZMQ context. |
|
42
|
|
|
|
|
43
|
|
|
:type: |
|
44
|
|
|
""" |
|
45
|
|
|
|
|
46
|
|
|
self._zmq_pull_socket = None |
|
47
|
|
|
""" |
|
48
|
|
|
ZMQ socket for asynchronous incoming messages. |
|
49
|
|
|
|
|
50
|
|
|
:type: |
|
51
|
|
|
""" |
|
52
|
|
|
|
|
53
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
|
54
|
|
|
def main(self): |
|
|
|
|
|
|
55
|
|
|
self._startup() |
|
56
|
|
|
|
|
57
|
|
|
while not self._exit_flag or self._events: |
|
58
|
|
|
try: |
|
59
|
|
|
self._read_messages() |
|
60
|
|
|
|
|
61
|
|
|
# Connect to the database and start a transaction. |
|
62
|
|
|
DataLayer.connect() |
|
63
|
|
|
DataLayer.start_transaction() |
|
64
|
|
|
|
|
65
|
|
|
while self._events: |
|
66
|
|
|
event = self._events.pop() |
|
67
|
|
|
|
|
68
|
|
|
if event['type'] == 'logfile': |
|
69
|
|
|
self._event_handler_log_file(event['message']) |
|
70
|
|
|
|
|
71
|
|
|
elif event['type'] == 'exit': |
|
72
|
|
|
self._event_handler_exit() |
|
73
|
|
|
|
|
74
|
|
|
else: |
|
75
|
|
|
raise Exception("Unknown event type '%s'." % event['type']) |
|
76
|
|
|
|
|
77
|
|
|
# Commit the transaction and disconnect form the database. |
|
78
|
|
|
DataLayer.commit() |
|
79
|
|
|
DataLayer.disconnect() |
|
80
|
|
|
|
|
81
|
|
|
except Exception as exception1: |
|
|
|
|
|
|
82
|
|
|
try: |
|
83
|
|
|
DataLayer.rollback() |
|
84
|
|
|
DataLayer.disconnect() |
|
85
|
|
|
except Exception as exception2: |
|
|
|
|
|
|
86
|
|
|
print(exception2, file=sys.stderr) |
|
87
|
|
|
traceback.print_exc(file=sys.stderr) |
|
88
|
|
|
print(exception1, file=sys.stderr) |
|
89
|
|
|
traceback.print_exc(file=sys.stderr) |
|
90
|
|
|
|
|
91
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
|
92
|
|
|
@staticmethod |
|
93
|
|
|
def daemonize(): |
|
|
|
|
|
|
94
|
|
|
enarksh.daemonize(os.path.join(enarksh.HOME, 'var/lock/loggerd.pid'), |
|
95
|
|
|
'/dev/null', |
|
96
|
|
|
os.path.join(enarksh.HOME, 'var/log/loggerd.log'), |
|
97
|
|
|
os.path.join(enarksh.HOME, 'var/log/loggerd.log')) |
|
98
|
|
|
|
|
99
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
|
100
|
|
|
def _add_event(self, event): |
|
101
|
|
|
""" |
|
102
|
|
|
Adds an event to the event queue. |
|
103
|
|
|
|
|
104
|
|
|
:param dict event: |
|
105
|
|
|
""" |
|
106
|
|
|
self._events.append(event) |
|
107
|
|
|
|
|
108
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
|
109
|
|
|
def _event_handler_exit(self): |
|
110
|
|
|
""" |
|
111
|
|
|
Handles an exit message from the controller. |
|
112
|
|
|
""" |
|
113
|
|
|
self._exit_flag = True |
|
114
|
|
|
|
|
115
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
|
116
|
|
|
def _event_handler_log_file(self, message): |
|
117
|
|
|
""" |
|
118
|
|
|
Handles a new log file available message from the spawner. |
|
119
|
|
|
|
|
120
|
|
|
:param dict message: |
|
121
|
|
|
""" |
|
122
|
|
|
print("%s %s %s" % (message['rnd_id'], message['name'], message['total_size'])) |
|
123
|
|
|
|
|
124
|
|
|
if message['total_size'] > 0: |
|
125
|
|
|
# Read the log file or log files and concatenate if necessary. |
|
126
|
|
|
with open(message['filename1'], 'rb') as f: |
|
|
|
|
|
|
127
|
|
|
log = f.read() |
|
128
|
|
|
|
|
129
|
|
|
if message['filename2']: |
|
130
|
|
|
with open(message['filename2'], 'rb') as f: |
|
|
|
|
|
|
131
|
|
|
buf2 = f.read() |
|
132
|
|
|
else: |
|
133
|
|
|
buf2 = '' |
|
134
|
|
|
|
|
135
|
|
|
# Compute the number of skipped bytes. |
|
136
|
|
|
skipped = message['total_size'] - len(log) - len(buf2) |
|
137
|
|
|
|
|
138
|
|
|
if skipped != 0: |
|
139
|
|
|
# Add a newline to the end of the buffer, if required. |
|
140
|
|
|
if log[-1:] != '\n': |
|
141
|
|
|
log += '\n' |
|
142
|
|
|
|
|
143
|
|
|
# Note: This concatenation doesn't work for multi byte character sets. |
|
144
|
|
|
log += '\n' |
|
145
|
|
|
log += "Enarksh: Skipped {0} bytes.\n".format(skipped) |
|
146
|
|
|
log += '\n' |
|
147
|
|
|
|
|
148
|
|
|
log += buf2 |
|
149
|
|
|
|
|
150
|
|
|
blb_id = DataLayer.enk_blob_insert_blob(message['name'], 'text/plain', log) |
|
151
|
|
|
|
|
152
|
|
|
else: |
|
153
|
|
|
blb_id = None |
|
154
|
|
|
|
|
155
|
|
|
if message['name'] == 'out': |
|
156
|
|
|
DataLayer.enk_back_run_node_update_log(message['rnd_id'], blb_id, message['total_size']) |
|
157
|
|
|
elif message['name'] == 'err': |
|
158
|
|
|
DataLayer.enk_back_run_node_update_err(message['rnd_id'], blb_id, message['total_size']) |
|
159
|
|
|
else: |
|
160
|
|
|
raise Exception("Unknown output name '%s'." % message['name']) |
|
161
|
|
|
|
|
162
|
|
|
# Remove the (temporary) log files. |
|
163
|
|
|
if message['filename1']: |
|
164
|
|
|
os.unlink(message['filename1']) |
|
165
|
|
|
if message['filename2']: |
|
166
|
|
|
os.unlink(message['filename2']) |
|
167
|
|
|
|
|
168
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
|
169
|
|
|
def _read_messages(self): |
|
170
|
|
|
""" |
|
171
|
|
|
Reads messages from other processes (i.e. spawner and controller). |
|
172
|
|
|
""" |
|
173
|
|
|
message = self._zmq_pull_socket.recv_json() |
|
174
|
|
|
|
|
175
|
|
|
if message['type'] == 'log_file': |
|
176
|
|
|
event = {'type': 'logfile', |
|
177
|
|
|
'message': message} |
|
178
|
|
|
self._add_event(event) |
|
179
|
|
|
|
|
180
|
|
|
elif message['type'] == 'exit': |
|
181
|
|
|
event = {'type': 'exit'} |
|
182
|
|
|
self._add_event(event) |
|
183
|
|
|
|
|
184
|
|
|
else: |
|
185
|
|
|
raise Exception("Unknown event type '%s'." % message['type']) |
|
186
|
|
|
|
|
187
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
|
188
|
|
|
@staticmethod |
|
189
|
|
|
def _create_pid_file(): |
|
190
|
|
|
""" |
|
191
|
|
|
Creates a PID file and writes the PID of the logger to this file. |
|
192
|
|
|
""" |
|
193
|
|
|
filename = os.path.join(enarksh.ENK_LOCK_DIR, 'logger.pid') |
|
194
|
|
|
with open(filename, 'wt') as f: |
|
|
|
|
|
|
195
|
|
|
f.write(str(os.getpid())) |
|
196
|
|
|
|
|
197
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
|
198
|
|
|
@staticmethod |
|
199
|
|
|
def _remove_pid_file(): |
|
200
|
|
|
""" |
|
201
|
|
|
Removes the PID file. |
|
202
|
|
|
""" |
|
203
|
|
|
filename = os.path.join(enarksh.ENK_LOCK_DIR, 'logger.pid') |
|
204
|
|
|
os.unlink(filename) |
|
205
|
|
|
|
|
206
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
|
207
|
|
|
def _shutdown(self): |
|
208
|
|
|
""" |
|
209
|
|
|
Performs the necessary actions for stopping the logger. |
|
210
|
|
|
""" |
|
211
|
|
|
# Commit the last transaction and close the connection to the database. |
|
212
|
|
|
DataLayer.commit() |
|
213
|
|
|
DataLayer.disconnect() |
|
214
|
|
|
|
|
215
|
|
|
# Remove the PID file. |
|
216
|
|
|
self._remove_pid_file() |
|
217
|
|
|
|
|
218
|
|
|
# Log stop of the logger. |
|
219
|
|
|
print('Stop logger') |
|
220
|
|
|
|
|
221
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
|
222
|
|
|
def _startup(self): |
|
223
|
|
|
""" |
|
224
|
|
|
Performs the necessary actions for starting up the logger. |
|
225
|
|
|
""" |
|
226
|
|
|
# Log the start of the logger. |
|
227
|
|
|
print('Start logger') |
|
228
|
|
|
|
|
229
|
|
|
# Set database configuration options. |
|
230
|
|
|
DataLayer.config['host'] = enarksh.MYSQL_HOSTNAME |
|
231
|
|
|
DataLayer.config['user'] = enarksh.MYSQL_USERNAME |
|
232
|
|
|
DataLayer.config['password'] = enarksh.MYSQL_PASSWORD |
|
233
|
|
|
DataLayer.config['database'] = enarksh.MYSQL_SCHEMA |
|
234
|
|
|
DataLayer.config['port'] = enarksh.MYSQL_PORT |
|
235
|
|
|
|
|
236
|
|
|
# Setup ZMQ. |
|
237
|
|
|
self._zmq_init() |
|
238
|
|
|
|
|
239
|
|
|
# Create our PID file. |
|
240
|
|
|
self._create_pid_file() |
|
241
|
|
|
|
|
242
|
|
|
# ------------------------------------------------------------------------------------------------------------------ |
|
243
|
|
|
def _zmq_init(self): |
|
244
|
|
|
""" |
|
245
|
|
|
Initializes the ZMQ socket. |
|
246
|
|
|
""" |
|
247
|
|
|
self._zmq_context = zmq.Context() |
|
248
|
|
|
|
|
249
|
|
|
# Create socket for asynchronous incoming messages. |
|
250
|
|
|
self._zmq_pull_socket = self._zmq_context.socket(zmq.PULL) |
|
|
|
|
|
|
251
|
|
|
self._zmq_pull_socket.bind(enarksh.LOGGER_PULL_END_POINT) |
|
252
|
|
|
|
|
253
|
|
|
|
|
254
|
|
|
# ---------------------------------------------------------------------------------------------------------------------- |
|
255
|
|
|
|
The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:
If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.