Completed
Push — master ( 352e2e...4c1319 )
by Gus
8s
created

ProcessorsAPI._download_jar()   B

Complexity

Conditions 5

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 5
dl 0
loc 23
rs 8.2508
c 4
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
from __future__ import unicode_literals
4
#from pkg_resources import resource_filename
5
from .utils import *
6
from .annotators import *
7
from .sentiment import SentimentAnalysisAPI
8
from .serialization import JSONSerializer
9
import os
10
import shlex
11
import subprocess as sp
12
import requests
13
import re
14
import time
15
import sys
16
import logging
17
import warnings
18
19
20
class ProcessorsBaseAPI(object):
21
    """
22
    Manages a connection with processors-server and provides an interface to the API.
23
24
    Parameters
25
    ----------
26
    port : int
27
        The port the server is running on or should be started on.  Default is 8886.
28
    hostname : str
29
        The host name to use for the server.  Default is "localhost".
30
    log_file: str
31
        The path for the log file.  Default is py-processors.log in the user's home directory.
32
33
    Methods
34
    -------
35
    annotate(text)
36
        Produces a Document from the provided `text` using the default processor.
37
    clu.annotate(text)
38
        Produces a Document from the provided `text` using CluProcessor.
39
    fastnlp.annotate(text)
40
        Produces a Document from the provided `text` using FastNLPProcessor.
41
    bionlp.annotate(text)
42
        Produces a Document from the provided `text` using BioNLPProcessor.
43
    annotate_from_sentences(sentences)
44
        Produces a Document from `sentences` (a list of text split into sentences). Uses the default processor.
45
    fastnlp.annotate_from_sentences(sentences)
46
        Produces a Document from `sentences` (a list of text split into sentences). Uses FastNLPProcessor.
47
    bionlp.annotate_from_sentences(sentences)
48
        Produces a Document from `sentences` (a list of text split into sentences). Uses BioNLPProcessor.
49
    corenlp.sentiment.score_sentence(sentence)
50
        Produces a sentiment score for the provided `sentence` (an instance of Sentence).
51
    corenlp.sentiment.score_document(doc)
52
        Produces sentiment scores for the provided `doc` (an instance of Document).  One score is produced for each sentence.
53
    corenlp.sentiment.score_segmented_text(sentences)
54
        Produces sentiment scores for the provided `sentences` (a list of text segmented into sentences).  One score is produced for item in `sentences`.
55
    odin.extract_from_text(text, rules)
56
        Produces a list of Mentions for matches of the provided `rules` on the `text`.  `rules` can be a string of Odin rules, or a url ending in `.yml` or `.yaml`.
57
    odin.extract_from_document(doc, rules)
58
        Produces a list of Mentions for matches of the provided `rules` on the `doc` (an instance of Document).  `rules` can be a string of Odin rules, or a url ending in .yml or yaml.
59
    """
60
    PORT = 8888
61
    HOST = "localhost"
62
    LOG = full_path(os.path.join(os.path.expanduser("~"), "py-processors.log"))
63
64
    def __init__(self, **kwargs):
65
66
        self.hostname = kwargs.get("hostname", ProcessorsBaseAPI.HOST)
67
        self.port = kwargs.get("port", ProcessorsBaseAPI.PORT)
68
        self.make_address(self.hostname, self.port)
69
        # processors
70
        self.default = Processor(self.address)
71
        self.clu = CluProcessor(self.address)
72
        self.fastnlp = FastNLPProcessor(self.address)
73
        self.bionlp = BioNLPProcessor(self.address)
74
        # sentiment
75
        self.sentiment = SentimentAnalysisAPI(self.address)
76
        # odin
77
        self.odin = OdinAPI(self.address)
78
        # use the os module's devnull for compatibility with python 2.7
79
        #self.DEVNULL = open(os.devnull, 'wb')
80
        self.logger = logging.getLogger(__name__)
81
        self.log_file = self._prepare_log_file(kwargs.get("log_file", ProcessorsAPI.LOG))
82
83
    def make_address(self, hostname, port):
84
        # update hostname
85
        self.hostname = hostname
86
        # update port
87
        self.port = port
88
        # update address
89
        self.address = "http://{}:{}".format(self.hostname, self.port)
90
91
    def _prepare_log_file(self, lf):
92
        """
93
        Configure logger and return file path for logging
94
        """
95
        # log_file
96
        log_file = ProcessorsAPI.LOG if not lf else os.path.expanduser(lf)
97
        # configure logger
98
        self.logger.setLevel(logging.DEBUG)
99
        # create console handler and set level to info
100
        handler = logging.StreamHandler()
101
        handler.setLevel(logging.INFO)
102
        formatter = logging.Formatter("%(levelname)s - %(message)s")
103
        handler.setFormatter(formatter)
104
        self.logger.addHandler(handler)
105
        # create debug file handler and set level to debug
106
        handler = logging.FileHandler(log_file, "w")
107
        handler.setLevel(logging.DEBUG)
108
        formatter = logging.Formatter("%(levelname)s - %(message)s")
109
        handler.setFormatter(formatter)
110
        self.logger.addHandler(handler)
111
        return log_file
112
113
    def annotate(self, text):
114
        """
115
        Uses default processor (CoreNLP) to annotate text.  Included for backwards compatibility.
116
        """
117
        return self.default.annotate(text)
118
119
    def annotate_from_sentences(self, sentences):
120
        """
121
        Uses default processor (CoreNLP) to annotate a list of segmented sentences.
122
        """
123
        return self.default.annotate_from_sentences(sentences)
124
125
    def is_running(self):
126
        return True if self.annotate("Blah") else False
127
128
    def _check_server_version(self):
129
        """
130
        Checks server version to see if it meets the recommendations
131
        """
132
        # avoid circular imports by delaying this import
133
        from .__init__ import __ps_rec__
134
        try:
135
            service_address = "{}/version".format(self.address)
136
            server_version = post_json(service_address, None)["version"]
137
            if str(__ps_rec__) != str(server_version):
138
                warnings.warn("Recommended server version is {}, but server version is {}".format(__ps_rec__, server_version))
139
            else:
140
                self.logger.info("Server version meets recommendations (v{})".format(__ps_rec__))
141
        except Exception as e:
142
            warnings.warn("Unable to determine server version.  Recommended version is {}".format(__ps_rec__))
143
144
145
class ProcessorsAPI(ProcessorsBaseAPI):
146
147
    """
148
    Manages a connection with the processors-server jar and provides an interface to the API.
149
150
    Parameters
151
    ----------
152
    timeout : int
153
        The number of seconds to wait for the server to initialize.  Default is 120.
154
    jvm_mem : str
155
        The maximum amount of memory to allocate to the JVM for the server.  Default is "-Xmx3G".
156
    jar_path : str
157
        The path to the processors-server jar.  Default is the jar installed with the package.
158
    kee_alive : bool
159
        Whether or not to keep the server running when ProcessorsAPI instance goes out of scope.  Default is false (server is shut down).
160
    log_file: str
161
        The path for the log file.  Default is py-processors.log in the user's home directory.
162
163
    Methods
164
    -------
165
    start_server(jar_path, **kwargs)
166
        Starts the server using the provided `jar_path`.  Optionally takes hostname, port, jvm_mem, and timeout.
167
    stop_server()
168
        Attempts to stop the server running at self.address.
169
    """
170
171
    PROC_VAR = 'PROCESSORS_SERVER'
172
    TIMEOUT = 120
173
    # save to lib loc
174
    DEFAULT_JAR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "processors-server.jar")
175
    JVM_MEM = "-Xmx3G"
176
    #print(resource_filename(__name__, "processors-server.jar"))
177
178
    def __init__(self, **kwargs):
179
        super(ProcessorsAPI, self).__init__(**kwargs)
180
        self.timeout = kwargs.get("timeout", ProcessorsAPI.TIMEOUT)
181
        self.jvm_mem = kwargs.get("jvm_mem", ProcessorsAPI.JVM_MEM)
182
        self._start_command = "java {mem} -cp {jp} NLPServer --port {port} --host {host}" # mem, jar path, port, host
183
        # whether or not to stop the server when the object is destroyed
184
        self.keep_alive = kwargs.get("keep_alive", False)
185
        # how long to wait between requests
186
        self.wait_time = 2
187
        # set self.jar_path
188
        self.jar_path = ProcessorsAPI.DEFAULT_JAR
189
        self._resolve_jar_path(kwargs.get("jar_path", self.jar_path))
190
        # attempt to establish connection with server
191
        self.establish_connection()
192
193
    def establish_connection(self):
194
        """
195
        Attempt to connect to a server (assumes server is running)
196
        """
197
        if self.is_running():
198
            self.logger.info("Connection with server established!")
199
            self._check_server_version()
200
        else:
201
            try:
202
                # resolve jar path if server is not already running
203
                self._resolve_jar_path(self.jar_path)
204
                # Attempt to start the server
205
                self._start_server()
206
            except Exception as e:
207
                self.logger.warn("Unable to start server. Please start the server manually with .start_server(jar_path=\"path/to/processors-server.jar\")")
208
                self.logger.warn("\n{}".format(e))
209
210
    def _resolve_jar_path(self, jar_path=None):
211
        """
212
        Attempts to preferentially set value of self.jar_path
213
        """
214
        jar_path = jar_path or ProcessorsAPI.DEFAULT_JAR
215
216
        # Preference 1: if a .jar is given, check to see if the path is valid
217
        if jar_path:
218
            jp = full_path(jar_path)
219
            # check if path is valid
220
            if os.path.exists(jp):
221
                self.jar_path = jp
222
223
        # Preference 2: if a PROCESSORS_SERVER environment variable is defined, check its validity
224
        if not os.path.exists(self.jar_path) and ProcessorsAPI.PROC_VAR in os.environ:
225
            self.logger.info("Using path given via ${}".format(ProcessorsAPI.PROC_VAR))
226
            jp = full_path(os.environ[ProcessorsAPI.PROC_VAR])
227
            # check if path is valid
228
            if os.path.exists(jp):
229
                self.jar_path = jp
230
            else:
231
                self.jar_path = None
232
                self.logger.warn("WARNING: {0} path is invalid.  \nPlease verify this entry in your environment:\n\texport {0}=/path/to/processors-server.jar".format(ProcessorsAPI.PROC_VAR))
233
234
        # Preference 3: attempt to use the processors-sever.jar (download if not found)
235
        # check if jar exists
236
        if not self.jar_path or not os.path.exists(self.jar_path):
237
            self.logger.info("No jar found.  Downloading to {} ...".format(ProcessorsAPI.DEFAULT_JAR))
238
            ProcessorsAPI._download_jar()
239
            self.jar_path = ProcessorsAPI.DEFAULT_JAR
240
241
    def start_server(self, jar_path, **kwargs):
242
        """
243
        Starts processors-sever.jar
244
        """
245
        self.port = kwargs.get("port", self.port)
246
        self.hostname = kwargs.get("hostname", self.hostname)
247
        self.jvm_mem = kwargs.get("jvm_mem", self.jvm_mem)
248
        self.timeout = int(float(kwargs.get("timeout", self.jvm_mem))/2)
249
        jp = full_path(jar_path)
250
        if jp:
251
            self.jar_path = jp
252
            self._start_server()
253
        else:
254
            raise Exception("Please provide jar_path=\"path/to/processors-server.jar\"")
255
256
    def stop_server(self, port=None):
257
        """
258
        Sends a poison pill to the server and waits for shutdown response
259
        """
260
        port = port or self.port
261
        address = "http://{}:{}".format(self.hostname, port)
262
        shutdown_address = "{}/shutdown".format(address)
263
        # attempt shutdown
264
        try:
265
            response = requests.post(shutdown_address)
266
            if response:
267
                print(response.content.decode("utf-8"))
268
            return True
269
        # will fail if the server is already down
270
        except Exception as e:
271
            pass
272
        return False
273
274
    def _ensure_jar_path_exists(self):
275
        # check if jar exists
276
        if not os.path.exists(self.jar_path):
277
            raise Exception("jar not found at {}".format(self.jar_path))
278
279
    def _start_server(self, port=None):
280
        """
281
        "Private" method called by start_server()
282
        """
283
284
        # does the jar exist?
285
        self._ensure_jar_path_exists()
286
287
        if port:
288
            self.port = port
289
        # build the command
290
        cmd = self._start_command.format(mem=self.jvm_mem, jp=self.jar_path, port=self.port, host=self.hostname)
291
        self._process = sp.Popen(shlex.split(cmd),
292
                                 shell=False,
293
                                 stderr=open(self.log_file, 'wb'),
294
                                 stdout=open(self.log_file, 'wb'),
295
                                 universal_newlines=True)
296
297
        self.logger.info("Starting processors-server ({}) ...".format(cmd))
298
        print("\nWaiting for server...")
299
300
        progressbar_length = int(self.timeout/self.wait_time)
301
        for i in range(progressbar_length):
302
            try:
303
                success = self.annotate("blah")
304
                if success:
305
                    print("\n\nConnection with processors-server established ({})".format(self.address))
306
                    return True
307
                sys.stdout.write("\r[{:{}}]".format('='*i, progressbar_length))
308
                time.sleep(self.wait_time)
309
            except Exception as e:
310
                raise(e)
311
312
        # if the server still hasn't started, raise an Exception
313
        raise Exception("Couldn't connect to processors-server. Is the port in use?")
314
315
    @staticmethod
316
    def _download_jar(jar_url=None):
317
        from .__init__ import SERVER_JAR_URL
318
        jar_url = jar_url or SERVER_JAR_URL
319
        # download processors-server.jar
320
        ppjar = ProcessorsAPI.DEFAULT_JAR
321
        dl = 0
322
        print("Downloading {} from {} ...".format(ppjar, jar_url))
323
        response = requests.get(jar_url, stream=True, headers={'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'})
324
        total_length = int(response.headers.get('content-length'))
325
        with open(ppjar, "wb") as handle:
326
            for data in response.iter_content(chunk_size=2048):
327
                # do we know the total file size?
328
                if total_length:
329
                    percent_complete = int(100 * float(dl) / float(total_length))
330
                    if percent_complete % 5 == 0:
331
                        sys.stdout.write("\r{}% complete".format(percent_complete))
332
                        sys.stdout.flush()
333
                    dl += len(data)
334
                # write data to disk
335
                handle.write(data)
336
337
        print("\nDownload Complete! {}".format(ppjar))
338
339
    def __del__(self):
340
        """
341
        Stop server unless otherwise specified
342
        """
343
        if not self.keep_alive:
344
            try:
345
                self.stop_server()
346
                # close our file object
347
                #self.DEVNULL.close()
348
                print("Successfully shut down processors-server!")
349
            except Exception as e:
350
                self.logger.debug(e)
351
                print("Couldn't kill processors-server.  Was server started externally?")
352
353
354
class OdinAPI(object):
355
    """
356
    API for performing rule-based information extraction with Odin.
357
358
    Parameters
359
    ----------
360
    address : str
361
        The base address for the API (i.e., everything preceding `/api/..`)
362
363
    """
364
365
    validator = re.compile("^(https?|ftp):.+?\.?ya?ml$")
366
367
    def __init__(self, address):
368
        self._service = "{}/api/odin/extract".format(address)
369
370
    def _extract(self, json_data):
371
        mns_json = post_json(self._service, json_data)
372
        if "error" in mns_json:
373
            error_msg = mns_json["error"]
374
            original_msg = json.loads(json_data)
375
            rules = original_msg.get("rules", original_msg.get("url", None))
376
            oe = OdinError(rules=rules, message=error_msg)
377
            print(oe)
378
            return None
379
        else:
380
            return JSONSerializer.mentions_from_JSON(mns_json)
381
382
    @staticmethod
383
    def valid_rule_url(url):
384
        return True if OdinAPI.validator.match(url) else False
385
386
    def extract_from_text(self, text, rules):
387
        """
388
        Sends text to the server with rules for information extraction (IE).
389
390
        Parameters
391
        ----------
392
        text : str
393
            `rules` will be applied to this `text`.
394
        rules : str
395
            Either Odin rules provided as a `yaml` string, or a url pointing to a `yaml` file of rules.
396
397
        Returns
398
        -------
399
        [processors.odin.Mention] or None
400
            Rule matches produce a list of `processors.odin.Mention`.
401
        """
402
        if OdinAPI.valid_rule_url(rules):
403
            # this is actually a URL to a yaml file
404
            url = rules
405
            container = TextWithURL(text, url)
406
        else:
407
            container = TextWithRules(text, rules)
408
        return self._extract(container.to_JSON())
409
410
    def extract_from_document(self, doc, rules):
411
        """
412
        Sends a `processors.ds.Document` (`doc`) to the server with rules for information extraction (IE).
413
414
        Parameters
415
        ----------
416
        doc : processors.ds.Document
417
            `rules` will be applied to this `processors.ds.Document`.
418
        rules : str
419
            Either Odin rules provided as a `yaml` string, or a url pointing to a `yaml` file of rules.
420
421
        Returns
422
        -------
423
        [processors.odin.Mention] or None
424
            Rule matches produce a list of `processors.odin.Mention`.
425
426
        """
427
        if OdinAPI.valid_rule_url(rules):
428
            # this is actually a URL to a yaml file
429
            url = rules
430
            container = DocumentWithURL(doc, rules)
431
        else:
432
            container = DocumentWithRules(doc, rules)
433
        return self._extract(container.to_JSON())
434
435
#############################################
436
# Containers for Odin data
437
# transmitted to the server for processing
438
#############################################
439
440 View Code Duplication
class TextWithRules(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
441
442
    def __init__(self, text, rules):
443
        self.text = text
444
        self.rules = rules
445
446
    def to_JSON_dict(self):
447
        jdict = dict()
448
        jdict["text"] = self.text
449
        jdict["rules"] = self.rules
450
        return jdict
451
452
    def to_JSON(self):
453
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
454
455
class TextWithURL(object):
456 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
457
    def __init__(self, text, url):
458
        self.text = text
459
        # TODO: throw exception if url is invalid
460
        self.url = url
461
462
    def to_JSON_dict(self):
463
        jdict = dict()
464
        jdict["text"] = self.text
465
        jdict["url"] = self.url
466
        return jdict
467
468
    def to_JSON(self):
469
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
470
471
class DocumentWithRules(object):
472
473
    def __init__(self, document, rules):
474
        # TODO: throw exception if isinstance(document, Document) is False
475
        self.document = document
476
        self.rules = rules
477
478
    def to_JSON_dict(self):
479
        jdict = dict()
480
        jdict["document"] = self.document.to_JSON_dict()
481
        jdict["rules"] = self.rules
482
        return jdict
483
484
    def to_JSON(self):
485
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
486
487
class DocumentWithURL(object):
488
489
    def __init__(self, document, url):
490
        # TODO: throw exception if isinstance(document, Document) is False
491
        self.document = document
492
        # TODO: throw exception if url is invalid
493
        self.url = url
494
495
    def to_JSON_dict(self):
496
        jdict = dict()
497
        jdict["document"] = self.document.to_JSON_dict()
498
        jdict["url"] = self.url
499
        return jdict
500
501
    def to_JSON(self):
502
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
503