Passed
Push — master ( ab513a...1713a6 )
by Peter
05:21 queued 03:17
created

daemon.WorkerThread.run()   B

Complexity

Conditions 5

Size

Total Lines 48
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 30
nop 1
dl 0
loc 48
rs 8.2082
c 0
b 0
f 0
1
''' 
2
This is the connector daemon for all backend services. It talks to the FuzzEd web application
3
and retrieves new jobs. Based on the retrieved job type, the according backend executable 
4
is called.
5
6
This script takes the path of the config file 'daemon.ini' as command-line argument. 
7
If not given, the config file is searched in the current directory. 
8
It is generated through 'fab build.configs' from the central settings file.
9
10
If you want this thing on a development machine for backend services, 
11
use 'fab run.backend'.
12
'''
13
14
import ConfigParser
15
import base64
16
import sys
17
import logging
18
import urllib2
19
import tempfile
20
import shutil
21
import os
22
import threading
23
import json
24
from SimpleXMLRPCServer import SimpleXMLRPCServer
25
26
import requests
27
28
29
30
# Initial configuration of logging
31
logging.basicConfig(level=logging.DEBUG)
32
logger = logging.getLogger('FuzzEd')
33
34
backends = {}
35
options = {}
36
37
useTestServer = False
38
39
class WorkerThread(threading.Thread):
40
    jobtype = ""
41
    joburl = ""
42
43
    def __init__(self, jobtype, joburl):
44
        self.jobtype = jobtype
45
        self.joburl = joburl
46
        threading.Thread.__init__(self)
47
48
    def sendResult(self, exit_code, file_data=None, file_name=None):
49
        """
50
        :rtype : None
51
        """
52
        results = {'exit_code': exit_code}
53
        if file_data and file_name:
54
            results['file_name'] = file_name
55
            results['file_data'] = base64.b64encode(file_data)
56
        logger.debug("Sending result data to %s"%(self.joburl))
57
        headers = {'content-type': 'application/json'}
58
        r = requests.patch(self.joburl, data=json.dumps(results), verify=False, headers=headers)
59
        if r.text:
60
            logger.debug("Data sent, response was: "+str(r.text))
61
62
    def run(self):
63
        try:
64
            logger.info("Working for job URL: "+self.joburl)
65
66
            # Create tmp directories
67
            tmpdir = tempfile.mkdtemp()
68
            tmpfile = tempfile.NamedTemporaryFile(dir=tmpdir, delete=False)
69
70
            # Fetch input data and store it
71
            input_data = urllib2.urlopen(self.joburl).read()
72
            tmpfile.write(input_data)
73
#            logger.debug(input_data)
74
            tmpfile.close()
75
76
            # There trick is that we do not need to know the operational details 
77
            # of this job here, since the calling convention comes from daemon.ini 
78
            # and the input file format is determined by the web server on download.
79
            # Alle backend executables are just expected to follow the same 
80
            # command-line pattern as render.py.
81
            cmd = "%s %s %s %s %s"%(backends[self.jobtype]['executable'], 
82
                                    tmpfile.name, 
83
                                    tmpdir+os.sep+backends[self.jobtype]['output'],
84
                                    tmpdir,
85
                                    backends[self.jobtype]['log_file'])
86
            logger.info("Running "+cmd)
87
            output_file = backends[self.jobtype]['output']
88
89
            # Run command synchronousely and wait for the exit code
90
            exit_code = os.system(cmd)
91
            if exit_code == 0:
92
                logger.info("Exit code 0, preparing result upload")
93
                assert(not output_file.startswith("*"))     # multiple result file upload not implemented
94
                with open(tmpdir+os.sep+output_file, "rb") as fd:
95
                    data = fd.read()
96
                    self.sendResult(0, data, output_file)
97
#                    logger.debug(data)
98
            else:
99
                logger.error("Error on execution: Exit code "+str(exit_code))  
100
                logger.error("Saving input file for later reference: /tmp/lastinput.xml")
101
                os.system("cp %s /tmp/lastinput.xml"%tmpfile.name)
102
                self.sendResult(exit_code)
103
104
        except Exception as e:
105
            logger.debug('Exception, delivering -1 exit code to frontend: '+str(e))
106
            self.sendResult(-1)
107
108
        finally:
109
            shutil.rmtree(tmpdir, ignore_errors=True)
110
111
class JobServer(SimpleXMLRPCServer):
112
113
    def __init__(self, conf):
114
        SimpleXMLRPCServer.__init__(self, (options['backend_daemon_host'], int(options['backend_daemon_port'])))
115
        self.register_function(self.handle_request, 'start_job')
116
117
    def handle_request(self, jobtype, joburl):
118
        logger.debug("Received %s job at %s"%(jobtype, joburl))
119
        if useTestServer:
120
            logger.debug("Patching job URL for test server support")
121
            parts = joburl.split('/',3)
122
            joburl = "http://localhost:8081/"+parts[3]      # LifeTestServer URL from Django docs
123
        if jobtype not in backends.keys():
124
            logger.error("Unknown job type "+jobtype)
125
            return False
126
        else:
127
            # Start worker thread for this task
128
            worker = WorkerThread(jobtype, joburl)
129
            worker.start()
130
            return True
131
132
if __name__ == '__main__':
133
    # Read configuration
134
    assert(len(sys.argv) < 3)
135
    conf=ConfigParser.ConfigParser()
136
    if len(sys.argv) == 1:
137
        # Use default INI file in local directory
138
        conf.readfp(open('./daemon.ini'))
139
    elif len(sys.argv) == 2:
140
        if sys.argv[1] == "--testing":
141
            useTestServer = True
142
            conf.readfp(open('./daemon.ini'))
143
        else:
144
            useTestServer = False
145
            # Use provided INI file
146
            conf.readfp(open(sys.argv[1]))
147
    # Initialize logging, based on settings
148
    logger.addHandler(logging.FileHandler(conf.get('server','backend_log_file')))
149
    # Read backends from configuration
150
    for section in conf.sections():
151
        if section.startswith('backend_'):
152
            settings = dict(conf.items(section))
153
            backends[settings['job_kind']] = settings 
154
        elif section == 'server':
155
            options = dict(conf.items('server'))
156
    logger.info("Configured backends: "+str(backends.keys()))
157
    logger.info("Options: "+str(options))
158
    # Start server
159
    server = JobServer(conf)
160
    server.serve_forever()
161