Completed
Pull Request — master (#11)
by Gus
23s
created

ProcessorsAPI._start_server()   B

Complexity

Conditions 5

Size

Total Lines 35

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 5
dl 0
loc 35
rs 8.0894
c 3
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
from __future__ import unicode_literals
4
#from pkg_resources import resource_filename
5
from .utils import *
6
from .annotators import *
7
from .sentiment import SentimentAnalysisAPI
8
from .serialization import JSONSerializer
9
import os
10
import shlex
11
import subprocess as sp
12
import requests
13
import re
14
import time
15
import sys
16
import logging
17
import warnings
18
19
20
class ProcessorsBaseAPI(object):
21
    """
22
    Manages a connection with processors-server and provides an interface to the API.
23
24
    Parameters
25
    ----------
26
    port : int
27
        The port the server is running on or should be started on.  Default is 8886.
28
    hostname : str
29
        The host name to use for the server.  Default is "localhost".
30
    log_file: str
31
        The path for the log file.  Default is py-processors.log in the user's home directory.
32
33
    Methods
34
    -------
35
    annotate(text)
36
        Produces a Document from the provided `text` using the default processor.
37
    clu.annotate(text)
38
        Produces a Document from the provided `text` using CluProcessor.
39
    fastnlp.annotate(text)
40
        Produces a Document from the provided `text` using FastNLPProcessor.
41
    bionlp.annotate(text)
42
        Produces a Document from the provided `text` using BioNLPProcessor.
43
    annotate_from_sentences(sentences)
44
        Produces a Document from `sentences` (a list of text split into sentences). Uses the default processor.
45
    fastnlp.annotate_from_sentences(sentences)
46
        Produces a Document from `sentences` (a list of text split into sentences). Uses FastNLPProcessor.
47
    bionlp.annotate_from_sentences(sentences)
48
        Produces a Document from `sentences` (a list of text split into sentences). Uses BioNLPProcessor.
49
    corenlp.sentiment.score_sentence(sentence)
50
        Produces a sentiment score for the provided `sentence` (an instance of Sentence).
51
    corenlp.sentiment.score_document(doc)
52
        Produces sentiment scores for the provided `doc` (an instance of Document).  One score is produced for each sentence.
53
    corenlp.sentiment.score_segmented_text(sentences)
54
        Produces sentiment scores for the provided `sentences` (a list of text segmented into sentences).  One score is produced for item in `sentences`.
55
    odin.extract_from_text(text, rules)
56
        Produces a list of Mentions for matches of the provided `rules` on the `text`.  `rules` can be a string of Odin rules, or a url ending in `.yml` or `.yaml`.
57
    odin.extract_from_document(doc, rules)
58
        Produces a list of Mentions for matches of the provided `rules` on the `doc` (an instance of Document).  `rules` can be a string of Odin rules, or a url ending in .yml or yaml.
59
    """
60
    PORT = 8888
61
    HOST = "localhost"
62
    LOG = full_path(os.path.join(os.path.expanduser("~"), "py-processors.log"))
63
64
    def __init__(self, **kwargs):
65
66
        self.hostname = kwargs.get("hostname", ProcessorsBaseAPI.HOST)
67
        self.port = kwargs.get("port", ProcessorsBaseAPI.PORT)
68
        self.make_address(self.hostname, self.port)
69
        # processors
70
        self.default = Processor(self.address)
71
        self.clu = CluProcessor(self.address)
72
        self.fastnlp = FastNLPProcessor(self.address)
73
        self.bionlp = BioNLPProcessor(self.address)
74
        # sentiment
75
        self.sentiment = SentimentAnalysisAPI(self.address)
76
        # odin
77
        self.odin = OdinAPI(self.address)
78
        # use the os module's devnull for compatibility with python 2.7
79
        #self.DEVNULL = open(os.devnull, 'wb')
80
        self.logger = logging.getLogger(__name__)
81
        self.log_file = self._prepare_log_file(kwargs.get("log_file", ProcessorsAPI.LOG))
82
83
    def make_address(self, hostname, port):
84
        # update hostname
85
        self.hostname = hostname
86
        # update port
87
        self.port = port
88
        # update address
89
        self.address = "http://{}:{}".format(self.hostname, self.port)
90
91
    def _prepare_log_file(self, lf):
92
        """
93
        Configure logger and return file path for logging
94
        """
95
        # log_file
96
        log_file = ProcessorsAPI.LOG if not lf else os.path.expanduser(lf)
97
        # configure logger
98
        self.logger.setLevel(logging.DEBUG)
99
        # create console handler and set level to info
100
        handler = logging.StreamHandler()
101
        handler.setLevel(logging.INFO)
102
        formatter = logging.Formatter("%(levelname)s - %(message)s")
103
        handler.setFormatter(formatter)
104
        self.logger.addHandler(handler)
105
        # create debug file handler and set level to debug
106
        handler = logging.FileHandler(log_file, "w")
107
        handler.setLevel(logging.DEBUG)
108
        formatter = logging.Formatter("%(levelname)s - %(message)s")
109
        handler.setFormatter(formatter)
110
        self.logger.addHandler(handler)
111
        return log_file
112
113
    def annotate(self, text):
114
        """
115
        Uses default processor (CoreNLP) to annotate text.  Included for backwards compatibility.
116
        """
117
        return self.default.annotate(text)
118
119
    def annotate_from_sentences(self, sentences):
120
        """
121
        Uses default processor (CoreNLP) to annotate a list of segmented sentences.
122
        """
123
        return self.default.annotate_from_sentences(sentences)
124
125
    def is_running(self):
126
        return True if self.annotate("Blah") else False
127
128
    def _check_server_version(self):
129
        """
130
        Checks server version to see if it meets the recommendations
131
        """
132
        # avoid circular imports by delaying this import
133
        from .__init__ import __ps_rec__
134
        try:
135
            service_address = "{}/version".format(self.address)
136
            server_version = post_json(service_address, None)["version"]
137
            if str(__ps_rec__) != str(server_version):
138
                warnings.warn("Recommended server version is {}, but server version is {}".format(__ps_rec__, server_version))
139
            else:
140
                self.logger.info("Server version meets recommendations (v{})".format(__ps_rec__))
141
        except Exception as e:
142
            warnings.warn("Unable to determine server version.  Recommended version is {}".format(__ps_rec__))
143
144
145
class ProcessorsAPI(ProcessorsBaseAPI):
146
147
    """
148
    Manages a connection with the processors-server jar and provides an interface to the API.
149
150
    Parameters
151
    ----------
152
    timeout : int
153
        The number of seconds to wait for the server to initialize.  Default is 120.
154
    jvm_mem : str
155
        The maximum amount of memory to allocate to the JVM for the server.  Default is "-Xmx3G".
156
    jar_path : str
157
        The path to the processors-server jar.  Default is the jar installed with the package.
158
    kee_alive : bool
159
        Whether or not to keep the server running when ProcessorsAPI instance goes out of scope.  Default is false (server is shut down).
160
    log_file: str
161
        The path for the log file.  Default is py-processors.log in the user's home directory.
162
163
    Methods
164
    -------
165
    start_server(jar_path, **kwargs)
166
        Starts the server using the provided `jar_path`.  Optionally takes hostname, port, jvm_mem, and timeout.
167
    stop_server()
168
        Attempts to stop the server running at self.address.
169
    """
170
171
    PROC_VAR = 'PROCESSORS_SERVER'
172
    TIMEOUT = 120
173
    # save to lib loc
174
    DEFAULT_JAR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "processors-server.jar")
175
    JVM_MEM = "-Xmx3G"
176
    #print(resource_filename(__name__, "processors-server.jar"))
177
178
    def __init__(self, **kwargs):
179
        super(ProcessorsAPI, self).__init__(**kwargs)
180
        self.timeout = kwargs.get("timeout", ProcessorsAPI.TIMEOUT)
181
        self.jvm_mem = kwargs.get("jvm_mem", ProcessorsAPI.JVM_MEM)
182
        self._start_command = "java {mem} -cp {jp} NLPServer --port {port} --host {host}" # mem, jar path, port, host
183
        # whether or not to stop the server when the object is destroyed
184
        self.keep_alive = kwargs.get("keep_alive", False)
185
        # how long to wait between requests
186
        self.wait_time = 2
187
        # set self.jar_path
188
        self.jar_path = ProcessorsAPI.DEFAULT_JAR
189
        self._resolve_jar_path(kwargs.get("jar_path", self.jar_path))
190
        # attempt to establish connection with server
191
        self.establish_connection()
192
193
    def establish_connection(self):
194
        """
195
        Attempt to connect to a server (assumes server is running)
196
        """
197
        if self.is_running():
198
            self.logger.info("Connection with server established!")
199
            self._check_server_version()
200
        else:
201
            try:
202
                # resolve jar path if server is not already running
203
                self._resolve_jar_path(self.jar_path)
204
                # Attempt to start the server
205
                self._start_server()
206
            except Exception as e:
207
                self.logger.warn("Unable to start server. Please start the server manually with .start_server(jar_path=\"path/to/processors-server.jar\")")
208
                self.logger.warn("\n{}".format(e))
209
210
    def _resolve_jar_path(self, jar_path=None):
211
        """
212
        Attempts to preferentially set value of self.jar_path
213
        """
214
        jar_path = jar_path or ProcessorsAPI.DEFAULT_JAR
215
216
        # Preference 1: if a .jar is given, check to see if the path is valid
217
        if jar_path:
218
            jp = full_path(jar_path)
219
            # check if path is valid
220
            if os.path.exists(jp):
221
                self.jar_path = jp
222
223
        # Preference 2: if a PROCESSORS_SERVER environment variable is defined, check its validity
224
        if not os.path.exists(self.jar_path) and ProcessorsAPI.PROC_VAR in os.environ:
225
            self.logger.info("Using path given via ${}".format(ProcessorsAPI.PROC_VAR))
226
            jp = full_path(os.environ[ProcessorsAPI.PROC_VAR])
227
            # check if path is valid
228
            if os.path.exists(jp):
229
                self.jar_path = jp
230
            else:
231
                self.jar_path = None
232
                self.logger.warn("WARNING: {0} path is invalid.  \nPlease verify this entry in your environment:\n\texport {0}=/path/to/processors-server.jar".format(ProcessorsAPI.PROC_VAR))
233
234
        # Preference 3: attempt to use the processors-sever.jar (download if not found)
235
        # check if jar exists
236
        if not self.jar_path or not os.path.exists(self.jar_path):
237
            self.logger.info("No jar found.  Downloading to {} ...".format(ProcessorsAPI.DEFAULT_JAR))
238
            ProcessorsAPI._download_jar()
239
            self.jar_path = ProcessorsAPI.DEFAULT_JAR
240
241
    def start_server(self, jar_path, **kwargs):
242
        """
243
        Starts processors-sever.jar
244
        """
245
        self.port = kwargs.get("port", self.port)
246
        self.hostname = kwargs.get("hostname", self.hostname)
247
        self.jvm_mem = kwargs.get("jvm_mem", self.jvm_mem)
248
        self.timeout = int(float(kwargs.get("timeout", self.jvm_mem))/2)
249
        jp = full_path(jar_path)
250
        if jp:
251
            self.jar_path = jp
252
            self._start_server()
253
        else:
254
            raise Exception("Please provide jar_path=\"path/to/processors-server.jar\"")
255
256
    def stop_server(self, port=None):
257
        """
258
        Sends a poison pill to the server and waits for shutdown response
259
        """
260
        port = port or self.port
261
        address = "http://{}:{}".format(self.hostname, port)
262
        shutdown_address = "{}/shutdown".format(address)
263
        # attempt shutdown
264
        try:
265
            response = requests.post(shutdown_address)
266
            if response:
267
                print(response.content.decode("utf-8"))
268
            return True
269
        # will fail if the server is already down
270
        except Exception as e:
271
            pass
272
        return False
273
274
    def _ensure_jar_path_exists(self):
275
        # check if jar exists
276
        if not os.path.exists(self.jar_path):
277
            raise Exception("jar not found at {}".format(self.jar_path))
278
279
    def _start_server(self, port=None):
280
        """
281
        "Private" method called by start_server()
282
        """
283
284
        # does the jar exist?
285
        self._ensure_jar_path_exists()
286
287
        if port:
288
            self.port = port
289
        # build the command
290
        cmd = self._start_command.format(mem=self.jvm_mem, jp=self.jar_path, port=self.port, host=self.hostname)
291
        self._process = sp.Popen(shlex.split(cmd),
292
                                 shell=False,
293
                                 stderr=open(self.log_file, 'wb'),
294
                                 stdout=open(self.log_file, 'wb'),
295
                                 universal_newlines=True)
296
297
        self.logger.info("Starting processors-server ({}) ...".format(cmd))
298
        print("\nWaiting for server...")
299
300
        progressbar_length = int(self.timeout/self.wait_time)
301
        for i in range(progressbar_length):
302
            try:
303
                success = self.annotate("blah")
304
                if success:
305
                    print("\n\nConnection with processors-server established ({})".format(self.address))
306
                    return True
307
                sys.stdout.write("\r[{:{}}]".format('='*i, progressbar_length))
308
                time.sleep(self.wait_time)
309
            except Exception as e:
310
                raise(e)
311
312
        # if the server still hasn't started, raise an Exception
313
        raise Exception("Couldn't connect to processors-server. Is the port in use?")
314
315
    @staticmethod
316
    def _download_jar(jar_url=None):
317
        from .__init__ import SERVER_JAR_URL
318
        jar_url = jar_url or SERVER_JAR_URL
319
        # download processors-server.jar
320
        ppjar = ProcessorsAPI.DEFAULT_JAR
321
        dl = 0
322
        # def dlProgress(count, blockSize, totalSize):
323
        #     percent = int(count*blockSize*100/totalSize)
324
        #     sys.stdout.write("\r{}".format("."))
325
        #     sys.stdout.flush()
326
        print("Downloading {} from {} ...".format(ppjar, jar_url))
327
        response = requests.get(jar_url, stream=True, headers={'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'})
328
        total_length = int(response.headers.get('content-length'))
329
        with open(ppjar, "wb") as handle:
330
            for data in response.iter_content(chunk_size=2048):
331
                # do we know the total file size?
332
                if total_length:
333
                    percent_complete = int(100 * float(dl) / float(total_length))
334
                    sys.stdout.write("\r{}% complete".format(percent_complete))
335
                    sys.stdout.flush()
336
                    dl += len(data)
337
                # write data to disk
338
                handle.write(data)
339
340
        print("\nDownload Complete! {}".format(ppjar))
341
342
    def __del__(self):
343
        """
344
        Stop server unless otherwise specified
345
        """
346
        if not self.keep_alive:
347
            try:
348
                self.stop_server()
349
                # close our file object
350
                #self.DEVNULL.close()
351
                print("Successfully shut down processors-server!")
352
            except Exception as e:
353
                self.logger.debug(e)
354
                print("Couldn't kill processors-server.  Was server started externally?")
355
356
357
class OdinAPI(object):
358
    """
359
    API for performing rule-based information extraction with Odin.
360
361
    Parameters
362
    ----------
363
    address : str
364
        The base address for the API (i.e., everything preceding `/api/..`)
365
366
    """
367
368
    validator = re.compile("^(https?|ftp):.+?\.?ya?ml$")
369
370
    def __init__(self, address):
371
        self._service = "{}/api/odin/extract".format(address)
372
373
    def _extract(self, json_data):
374
        mns_json = post_json(self._service, json_data)
375
        if "error" in mns_json:
376
            error_msg = mns_json["error"]
377
            original_msg = json.loads(json_data)
378
            rules = original_msg.get("rules", original_msg.get("url", None))
379
            oe = OdinError(rules=rules, message=error_msg)
380
            print(oe)
381
            return None
382
        else:
383
            return JSONSerializer.mentions_from_JSON(mns_json)
384
385
    @staticmethod
386
    def valid_rule_url(url):
387
        return True if OdinAPI.validator.match(url) else False
388
389
    def extract_from_text(self, text, rules):
390
        """
391
        Sends text to the server with rules for information extraction (IE).
392
393
        Parameters
394
        ----------
395
        text : str
396
            `rules` will be applied to this `text`.
397
        rules : str
398
            Either Odin rules provided as a `yaml` string, or a url pointing to a `yaml` file of rules.
399
400
        Returns
401
        -------
402
        [processors.odin.Mention] or None
403
            Rule matches produce a list of `processors.odin.Mention`.
404
        """
405
        if OdinAPI.valid_rule_url(rules):
406
            # this is actually a URL to a yaml file
407
            url = rules
408
            container = TextWithURL(text, url)
409
        else:
410
            container = TextWithRules(text, rules)
411
        return self._extract(container.to_JSON())
412
413
    def extract_from_document(self, doc, rules):
414
        """
415
        Sends a `processors.ds.Document` (`doc`) to the server with rules for information extraction (IE).
416
417
        Parameters
418
        ----------
419
        doc : processors.ds.Document
420
            `rules` will be applied to this `processors.ds.Document`.
421
        rules : str
422
            Either Odin rules provided as a `yaml` string, or a url pointing to a `yaml` file of rules.
423
424
        Returns
425
        -------
426
        [processors.odin.Mention] or None
427
            Rule matches produce a list of `processors.odin.Mention`.
428
429
        """
430
        if OdinAPI.valid_rule_url(rules):
431
            # this is actually a URL to a yaml file
432
            url = rules
433
            container = DocumentWithURL(doc, rules)
434
        else:
435
            container = DocumentWithRules(doc, rules)
436
        return self._extract(container.to_JSON())
437
438
#############################################
439
# Containers for Odin data
440 View Code Duplication
# transmitted to the server for processing
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
441
#############################################
442
443
class TextWithRules(object):
444
445
    def __init__(self, text, rules):
446
        self.text = text
447
        self.rules = rules
448
449
    def to_JSON_dict(self):
450
        jdict = dict()
451
        jdict["text"] = self.text
452
        jdict["rules"] = self.rules
453
        return jdict
454
455
    def to_JSON(self):
456 View Code Duplication
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
457
458
class TextWithURL(object):
459
460
    def __init__(self, text, url):
461
        self.text = text
462
        # TODO: throw exception if url is invalid
463
        self.url = url
464
465
    def to_JSON_dict(self):
466
        jdict = dict()
467
        jdict["text"] = self.text
468
        jdict["url"] = self.url
469
        return jdict
470
471
    def to_JSON(self):
472
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
473
474
class DocumentWithRules(object):
475
476
    def __init__(self, document, rules):
477
        # TODO: throw exception if isinstance(document, Document) is False
478
        self.document = document
479
        self.rules = rules
480
481
    def to_JSON_dict(self):
482
        jdict = dict()
483
        jdict["document"] = self.document.to_JSON_dict()
484
        jdict["rules"] = self.rules
485
        return jdict
486
487
    def to_JSON(self):
488
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
489
490
class DocumentWithURL(object):
491
492
    def __init__(self, document, url):
493
        # TODO: throw exception if isinstance(document, Document) is False
494
        self.document = document
495
        # TODO: throw exception if url is invalid
496
        self.url = url
497
498
    def to_JSON_dict(self):
499
        jdict = dict()
500
        jdict["document"] = self.document.to_JSON_dict()
501
        jdict["url"] = self.url
502
        return jdict
503
504
    def to_JSON(self):
505
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
506