Completed
Push — master ( ec85f3...61f9ec )
by Gus
01:25 queued 29s
created

ProcessorsAPI._resolve_jar_path()   D

Complexity

Conditions 8

Size

Total Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 8
c 3
b 0
f 0
dl 0
loc 30
rs 4
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
from __future__ import unicode_literals
4
#from pkg_resources import resource_filename
5
from six.moves.urllib.request import urlretrieve
6
from .utils import *
7
from .annotators import *
8
from .sentiment import SentimentAnalysisAPI
9
from .serialization import JSONSerializer
10
import os
11
import shlex
12
import subprocess as sp
13
import requests
14
import re
15
import time
16
import sys
17
import logging
18
import warnings
19
20
21
class ProcessorsAPI(object):
22
23
    """
24
    Manages a connection with the processors-server jar and provides an interface to the API.
25
26
    Parameters
27
    ----------
28
    port : int
29
        The port the server is running on or should be started on.  Default is 8886.
30
    hostname : str
31
        The host name to use for the server.  Default is "localhost".
32
    timeout : int
33
        The number of seconds to wait for the server to initialize.  Default is 120.
34
    jvm_mem : str
35
        The maximum amount of memory to allocate to the JVM for the server.  Default is "-Xmx3G".
36
    jar_path : str
37
        The path to the processors-server jar.  Default is the jar installed with the package.
38
    kee_alive : bool
39
        Whether or not to keep the server running when ProcessorsAPI instance goes out of scope.  Default is false (server is shut down).
40
    log_file: str
41
        The path for the log file.  Default is py-processors.log in the user's home directory.
42
43
    Methods
44
    -------
45
    annotate(text)
46
        Produces a Document from the provided `text` using the default processor.
47
    clu.annotate(text)
48
        Produces a Document from the provided `text` using CluProcessor.
49
    fastnlp.annotate(text)
50
        Produces a Document from the provided `text` using FastNLPProcessor.
51
    bionlp.annotate(text)
52
        Produces a Document from the provided `text` using BioNLPProcessor.
53
    annotate_from_sentences(sentences)
54
        Produces a Document from `sentences` (a list of text split into sentences). Uses the default processor.
55
    fastnlp.annotate_from_sentences(sentences)
56
        Produces a Document from `sentences` (a list of text split into sentences). Uses FastNLPProcessor.
57
    bionlp.annotate_from_sentences(sentences)
58
        Produces a Document from `sentences` (a list of text split into sentences). Uses BioNLPProcessor.
59
    corenlp.sentiment.score_sentence(sentence)
60
        Produces a sentiment score for the provided `sentence` (an instance of Sentence).
61
    corenlp.sentiment.score_document(doc)
62
        Produces sentiment scores for the provided `doc` (an instance of Document).  One score is produced for each sentence.
63
    corenlp.sentiment.score_segmented_text(sentences)
64
        Produces sentiment scores for the provided `sentences` (a list of text segmented into sentences).  One score is produced for item in `sentences`.
65
    odin.extract_from_text(text, rules)
66
        Produces a list of Mentions for matches of the provided `rules` on the `text`.  `rules` can be a string of Odin rules, or a url ending in `.yml` or `.yaml`.
67
    odin.extract_from_document(doc, rules)
68
        Produces a list of Mentions for matches of the provided `rules` on the `doc` (an instance of Document).  `rules` can be a string of Odin rules, or a url ending in .yml or yaml.
69
    start_server(jar_path, **kwargs)
70
        Starts the server using the provided `jar_path`.  Optionally takes hostname, port, jvm_mem, and timeout.
71
    stop_server()
72
        Attempts to stop the server running at self.address.
73
    """
74
75
    PROC_VAR = 'PROCESSORS_SERVER'
76
    TIMEOUT = 120
77
    # save to lib loc
78
    DEFAULT_JAR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "processors-server.jar")
79
    PORT = 8886
80
    JVM_MEM = "-Xmx3G"
81
    HOST = "localhost"
82
    LOG = full_path(os.path.join(os.path.expanduser("~"), "py-processors.log"))
83
    #print(resource_filename(__name__, "processors-server.jar"))
84
85
    def __init__(self, **kwargs):
86
87
        self.hostname = kwargs.get("hostname", ProcessorsAPI.HOST)
88
        self.port = kwargs.get("port", ProcessorsAPI.PORT)
89
        self.make_address(self.hostname, self.port)
90
        self.timeout = kwargs.get("timeout", ProcessorsAPI.TIMEOUT)
91
        self.jvm_mem = kwargs.get("jvm_mem", ProcessorsAPI.JVM_MEM)
92
        self._start_command = "java {mem} -cp {jp} NLPServer --port {port} --host {host}" # mem, jar path, port, host
93
        # whether or not to stop the server when the object is destroyed
94
        self.keep_alive = kwargs.get("keep_alive", False)
95
        # how long to wait between requests
96
        self.wait_time = 2
97
        # processors
98
        self.default = Processor(self.address)
99
        self.clu = CluProcessor(self.address)
100
        self.fastnlp = FastNLPProcessor(self.address)
101
        self.bionlp = BioNLPProcessor(self.address)
102
        # sentiment
103
        self.sentiment = SentimentAnalysisAPI(self.address)
104
        # odin
105
        self.odin = OdinAPI(self.address)
106
        # use the os module's devnull for compatibility with python 2.7
107
        #self.DEVNULL = open(os.devnull, 'wb')
108
        self.logger = logging.getLogger(__name__)
109
        self.log_file = self._prepare_log_file(kwargs.get("log_file", ProcessorsAPI.LOG))
110
        # set self.jar_path
111
        self.jar_path = ProcessorsAPI.DEFAULT_JAR
112
        self._resolve_jar_path(kwargs.get("jar_path", self.jar_path))
113
        # attempt to establish connection with server
114
        self.establish_connection()
115
116
    def _check_server_version(self):
117
        """
118
        Checks server version to see if it meets the recommendations
119
        """
120
        # avoid circular imports by delaying this import
121
        from .__init__ import __ps_rec__
122
        try:
123
            service_address = "{}/version".format(self.address)
124
            server_version = post_json(service_address, None)["version"]
125
            if str(__ps_rec__) != str(server_version):
126
                warnings.warn("Recommended server version is {}, but server version is {}".format(__ps_rec__, server_version))
127
            else:
128
                self.logger.info("Server version meets recommendations (v{})".format(__ps_rec__))
129
        except Exception as e:
130
            warnings.warn("Unable to determine server version.  Recommended version is {}".format(__ps_rec__))
131
132
133
    def _prepare_log_file(self, lf):
134
        """
135
        Configure logger and return file path for logging
136
        """
137
        # log_file
138
        log_file = ProcessorsAPI.LOG if not lf else os.path.expanduser(lf)
139
        # configure logger
140
        self.logger.setLevel(logging.DEBUG)
141
        # create console handler and set level to info
142
        handler = logging.StreamHandler()
143
        handler.setLevel(logging.INFO)
144
        formatter = logging.Formatter("%(levelname)s - %(message)s")
145
        handler.setFormatter(formatter)
146
        self.logger.addHandler(handler)
147
        # create debug file handler and set level to debug
148
        handler = logging.FileHandler(log_file, "w")
149
        handler.setLevel(logging.DEBUG)
150
        formatter = logging.Formatter("%(levelname)s - %(message)s")
151
        handler.setFormatter(formatter)
152
        self.logger.addHandler(handler)
153
        return log_file
154
155
    def annotate(self, text):
156
        """
157
        Uses default processor (CoreNLP) to annotate text.  Included for backwards compatibility.
158
        """
159
        return self.default.annotate(text)
160
161
    def annotate_from_sentences(self, sentences):
162
        """
163
        Uses default processor (CoreNLP) to annotate a list of segmented sentences.
164
        """
165
        return self.default.annotate_from_sentences(sentences)
166
167
    def is_running(self):
168
        return True if self.annotate("Blah") else False
169
170
    def establish_connection(self):
171
        """
172
        Attempt to connect to a server (assumes server is running)
173
        """
174
        if self.is_running():
175
            self.logger.info("Connection with server established!")
176
            self._check_server_version()
177
        else:
178
            try:
179
                # resolve jar path if server is not already running
180
                self._resolve_jar_path(self.jar_path)
181
                # Attempt to start the server
182
                self._start_server()
183
            except Exception as e:
184
                self.logger.warn("Unable to start server. Please start the server manually with .start_server(jar_path=\"path/to/processors-server.jar\")")
185
                self.logger.warn("\n{}".format(e))
186
187
    def _resolve_jar_path(self, jar_path=None):
188
        """
189
        Attempts to preferentially set value of self.jar_path
190
        """
191
        jar_path = jar_path or ProcessorsAPI.DEFAULT_JAR
192
193
        # Preference 1: if a .jar is given, check to see if the path is valid
194
        if jar_path:
195
            jp = full_path(jar_path)
196
            # check if path is valid
197
            if os.path.exists(jp):
198
                self.jar_path = jp
199
200
        # Preference 2: if a PROCESSORS_SERVER environment variable is defined, check its validity
201
        if not os.path.exists(self.jar_path) and ProcessorsAPI.PROC_VAR in os.environ:
202
            self.logger.info("Using path given via ${}".format(ProcessorsAPI.PROC_VAR))
203
            jp = full_path(os.environ[ProcessorsAPI.PROC_VAR])
204
            # check if path is valid
205
            if os.path.exists(jp):
206
                self.jar_path = jp
207
            else:
208
                self.jar_path = None
209
                self.logger.warn("WARNING: {0} path is invalid.  \nPlease verify this entry in your environment:\n\texport {0}=/path/to/processors-server.jar".format(ProcessorsAPI.PROC_VAR))
210
211
        # Preference 3: attempt to use the processors-sever.jar (download if not found)
212
        # check if jar exists
213
        if not self.jar_path or not os.path.exists(self.jar_path):
214
            self.logger.info("No jar found.  Downloading to {} ...".format(ProcessorsAPI.DEFAULT_JAR))
215
            ProcessorsAPI._download_jar()
216
            self.jar_path = ProcessorsAPI.DEFAULT_JAR
217
218
    def start_server(self, jar_path, **kwargs):
219
        """
220
        Starts processors-sever.jar
221
        """
222
        self.port = kwargs.get("port", self.port)
223
        self.hostname = kwargs.get("hostname", self.hostname)
224
        self.jvm_mem = kwargs.get("jvm_mem", self.jvm_mem)
225
        self.timeout = int(float(kwargs.get("timeout", self.jvm_mem))/2)
226
        jp = full_path(jar_path)
227
        if jp:
228
            self.jar_path = jp
229
            self._start_server()
230
        else:
231
            raise Exception("Please provide jar_path=\"path/to/processors-server.jar\"")
232
233
    def stop_server(self, port=None):
234
        """
235
        Sends a poison pill to the server and waits for shutdown response
236
        """
237
        port = port or self.port
238
        address = "http://{}:{}".format(self.hostname, port)
239
        shutdown_address = "{}/shutdown".format(address)
240
        # attempt shutdown
241
        try:
242
            response = requests.post(shutdown_address)
243
            if response:
244
                print(response.content.decode("utf-8"))
245
            return True
246
        # will fail if the server is already down
247
        except Exception as e:
248
            pass
249
        return False
250
251
    def _ensure_jar_path_exists(self):
252
        # check if jar exists
253
        if not os.path.exists(self.jar_path):
254
            raise Exception("jar not found at {}".format(self.jar_path))
255
256
    def _start_server(self, port=None):
257
        """
258
        "Private" method called by start_server()
259
        """
260
261
        # does the jar exist?
262
        self._ensure_jar_path_exists()
263
264
        if port:
265
            self.port = port
266
        # build the command
267
        cmd = self._start_command.format(mem=self.jvm_mem, jp=self.jar_path, port=self.port, host=self.hostname)
268
        self._process = sp.Popen(shlex.split(cmd),
269
                                 shell=False,
270
                                 stderr=open(self.log_file, 'wb'),
271
                                 stdout=open(self.log_file, 'wb'),
272
                                 universal_newlines=True)
273
274
        self.logger.info("Starting processors-server ({}) ...".format(cmd))
275
        print("\nWaiting for server...")
276
277
        progressbar_length = int(self.timeout/self.wait_time)
278
        for i in range(progressbar_length):
279
            try:
280
                success = self.annotate("blah")
281
                if success:
282
                    print("\n\nConnection with processors-server established ({})".format(self.address))
283
                    return True
284
                sys.stdout.write("\r[{:{}}]".format('='*i, progressbar_length))
285
                time.sleep(self.wait_time)
286
            except Exception as e:
287
                raise(e)
288
289
        # if the server still hasn't started, raise an Exception
290
        raise Exception("Couldn't connect to processors-server. Is the port in use?")
291
292
    def make_address(self, hostname, port):
293
        # update hostname
294
        self.hostname = hostname
295
        # update port
296
        self.port = port
297
        # update address
298
        self.address = "http://{}:{}".format(self.hostname, self.port)
299
300
    @staticmethod
301
    def _download_jar(jar_url=None):
302
        from .__init__ import SERVER_JAR_URL
303
        jar_url = jar_url or SERVER_JAR_URL
304
        # download processors-server.jar
305
        ppjar = ProcessorsAPI.DEFAULT_JAR
306
        percent = 0
307
        def dlProgress(count, blockSize, totalSize):
308
            percent = int(count*blockSize*100/totalSize)
309
            sys.stdout.write("\r{}% complete".format(percent))
310
            sys.stdout.flush()
311
312
        print("Downloading {} from {} ...".format(ppjar, jar_url))
313
        urlretrieve(jar_url, ppjar, reporthook=dlProgress)
314
        print("\nDownload Complete! {}".format(ppjar))
315
316
    def __del__(self):
317
        """
318
        Stop server unless otherwise specified
319
        """
320
        if not self.keep_alive:
321
            try:
322
                self.stop_server()
323
                # close our file object
324
                #self.DEVNULL.close()
325
                print("Successfully shut down processors-server!")
326
            except Exception as e:
327
                self.logger.debug(e)
328
                print("Couldn't kill processors-server.  Was server started externally?")
329
330
331
class OdinAPI(object):
332
    """
333
    API for performing rule-based information extraction with Odin.
334
335
    Parameters
336
    ----------
337
    address : str
338
        The base address for the API (i.e., everything preceding `/api/..`)
339
340
    """
341
342
    validator = re.compile("^(https?|ftp):.+?\.?ya?ml$")
343
344
    def __init__(self, address):
345
        self._service = "{}/api/odin/extract".format(address)
346
347
    def _extract(self, json_data):
348
        mns_json = post_json(self._service, json_data)
349
        if "error" in mns_json:
350
            error_msg = mns_json["error"]
351
            original_msg = json.loads(json_data)
352
            rules = original_msg.get("rules", original_msg.get("url", None))
353
            oe = OdinError(rules=rules, message=error_msg)
354
            print(oe)
355
            return None
356
        else:
357
            return JSONSerializer.mentions_from_JSON(mns_json)
358
359
    @staticmethod
360
    def valid_rule_url(url):
361
        return True if OdinAPI.validator.match(url) else False
362
363
    def extract_from_text(self, text, rules):
364
        """
365
        Sends text to the server with rules for information extraction (IE).
366
367
        Parameters
368
        ----------
369
        text : str
370
            `rules` will be applied to this `text`.
371
        rules : str
372
            Either Odin rules provided as a `yaml` string, or a url pointing to a `yaml` file of rules.
373
374
        Returns
375
        -------
376
        [processors.odin.Mention] or None
377
            Rule matches produce a list of `processors.odin.Mention`.
378
        """
379
        if OdinAPI.valid_rule_url(rules):
380
            # this is actually a URL to a yaml file
381
            url = rules
382
            container = TextWithURL(text, url)
383
        else:
384
            container = TextWithRules(text, rules)
385
        return self._extract(container.to_JSON())
386
387
    def extract_from_document(self, doc, rules):
388
        """
389
        Sends a `processors.ds.Document` (`doc`) to the server with rules for information extraction (IE).
390
391
        Parameters
392
        ----------
393
        doc : processors.ds.Document
394
            `rules` will be applied to this `processors.ds.Document`.
395
        rules : str
396
            Either Odin rules provided as a `yaml` string, or a url pointing to a `yaml` file of rules.
397
398
        Returns
399
        -------
400
        [processors.odin.Mention] or None
401
            Rule matches produce a list of `processors.odin.Mention`.
402
403
        """
404
        if OdinAPI.valid_rule_url(rules):
405
            # this is actually a URL to a yaml file
406
            url = rules
407
            container = DocumentWithURL(doc, rules)
408
        else:
409
            container = DocumentWithRules(doc, rules)
410
        return self._extract(container.to_JSON())
411
412
#############################################
413
# Containers for Odin data
414
# transmitted to the server for processing
415
#############################################
416
417
class TextWithRules(object):
418
419
    def __init__(self, text, rules):
420
        self.text = text
421
        self.rules = rules
422
423
    def to_JSON_dict(self):
424
        jdict = dict()
425
        jdict["text"] = self.text
426
        jdict["rules"] = self.rules
427
        return jdict
428
429
    def to_JSON(self):
430
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
431
432
class TextWithURL(object):
433
434
    def __init__(self, text, url):
435
        self.text = text
436
        # TODO: throw exception if url is invalid
437
        self.url = url
438
439
    def to_JSON_dict(self):
440 View Code Duplication
        jdict = dict()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
441
        jdict["text"] = self.text
442
        jdict["url"] = self.url
443
        return jdict
444
445
    def to_JSON(self):
446
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
447
448
class DocumentWithRules(object):
449
450
    def __init__(self, document, rules):
451
        # TODO: throw exception if isinstance(document, Document) is False
452
        self.document = document
453
        self.rules = rules
454
455
    def to_JSON_dict(self):
456 View Code Duplication
        jdict = dict()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
457
        jdict["document"] = self.document.to_JSON_dict()
458
        jdict["rules"] = self.rules
459
        return jdict
460
461
    def to_JSON(self):
462
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
463
464
class DocumentWithURL(object):
465
466
    def __init__(self, document, url):
467
        # TODO: throw exception if isinstance(document, Document) is False
468
        self.document = document
469
        # TODO: throw exception if url is invalid
470
        self.url = url
471
472
    def to_JSON_dict(self):
473
        jdict = dict()
474
        jdict["document"] = self.document.to_JSON_dict()
475
        jdict["url"] = self.url
476
        return jdict
477
478
    def to_JSON(self):
479
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
480