Completed
Push — master ( 41d21e...c78683 )
by Gus
54s queued 18s
created

ProcessorsBaseAPI   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 123
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 123
rs 10
c 1
b 0
f 0
wmc 11

7 Methods

Rating   Name   Duplication   Size   Complexity  
A make_address() 0 7 1
A _prepare_log_file() 0 21 2
A annotate_from_sentences() 0 5 1
A __init__() 0 18 1
A is_running() 0 2 2
A _check_server_version() 0 15 3
A annotate() 0 5 1
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
from __future__ import unicode_literals
4
#from pkg_resources import resource_filename
5
from six.moves.urllib.request import urlretrieve
6
from .utils import *
7
from .annotators import *
8
from .sentiment import SentimentAnalysisAPI
9
from .serialization import JSONSerializer
10
import os
11
import shlex
12
import subprocess as sp
13
import requests
14
import re
15
import time
16
import sys
17
import logging
18
import warnings
19
20
21
class ProcessorsBaseAPI(object):
22
    """
23
    Manages a connection with processors-server and provides an interface to the API.
24
25
    Parameters
26
    ----------
27
    port : int
28
        The port the server is running on or should be started on.  Default is 8886.
29
    hostname : str
30
        The host name to use for the server.  Default is "localhost".
31
    log_file: str
32
        The path for the log file.  Default is py-processors.log in the user's home directory.
33
34
    Methods
35
    -------
36
    annotate(text)
37
        Produces a Document from the provided `text` using the default processor.
38
    clu.annotate(text)
39
        Produces a Document from the provided `text` using CluProcessor.
40
    fastnlp.annotate(text)
41
        Produces a Document from the provided `text` using FastNLPProcessor.
42
    bionlp.annotate(text)
43
        Produces a Document from the provided `text` using BioNLPProcessor.
44
    annotate_from_sentences(sentences)
45
        Produces a Document from `sentences` (a list of text split into sentences). Uses the default processor.
46
    fastnlp.annotate_from_sentences(sentences)
47
        Produces a Document from `sentences` (a list of text split into sentences). Uses FastNLPProcessor.
48
    bionlp.annotate_from_sentences(sentences)
49
        Produces a Document from `sentences` (a list of text split into sentences). Uses BioNLPProcessor.
50
    corenlp.sentiment.score_sentence(sentence)
51
        Produces a sentiment score for the provided `sentence` (an instance of Sentence).
52
    corenlp.sentiment.score_document(doc)
53
        Produces sentiment scores for the provided `doc` (an instance of Document).  One score is produced for each sentence.
54
    corenlp.sentiment.score_segmented_text(sentences)
55
        Produces sentiment scores for the provided `sentences` (a list of text segmented into sentences).  One score is produced for item in `sentences`.
56
    odin.extract_from_text(text, rules)
57
        Produces a list of Mentions for matches of the provided `rules` on the `text`.  `rules` can be a string of Odin rules, or a url ending in `.yml` or `.yaml`.
58
    odin.extract_from_document(doc, rules)
59
        Produces a list of Mentions for matches of the provided `rules` on the `doc` (an instance of Document).  `rules` can be a string of Odin rules, or a url ending in .yml or yaml.
60
    """
61
    PORT = 8888
62
    HOST = "localhost"
63
    LOG = full_path(os.path.join(os.path.expanduser("~"), "py-processors.log"))
64
65
    def __init__(self, **kwargs):
66
67
        self.hostname = kwargs.get("hostname", ProcessorsBaseAPI.HOST)
68
        self.port = kwargs.get("port", ProcessorsBaseAPI.PORT)
69
        self.make_address(self.hostname, self.port)
70
        # processors
71
        self.default = Processor(self.address)
72
        self.clu = CluProcessor(self.address)
73
        self.fastnlp = FastNLPProcessor(self.address)
74
        self.bionlp = BioNLPProcessor(self.address)
75
        # sentiment
76
        self.sentiment = SentimentAnalysisAPI(self.address)
77
        # odin
78
        self.odin = OdinAPI(self.address)
79
        # use the os module's devnull for compatibility with python 2.7
80
        #self.DEVNULL = open(os.devnull, 'wb')
81
        self.logger = logging.getLogger(__name__)
82
        self.log_file = self._prepare_log_file(kwargs.get("log_file", ProcessorsAPI.LOG))
83
84
    def make_address(self, hostname, port):
85
        # update hostname
86
        self.hostname = hostname
87
        # update port
88
        self.port = port
89
        # update address
90
        self.address = "http://{}:{}".format(self.hostname, self.port)
91
92
    def _prepare_log_file(self, lf):
93
        """
94
        Configure logger and return file path for logging
95
        """
96
        # log_file
97
        log_file = ProcessorsAPI.LOG if not lf else os.path.expanduser(lf)
98
        # configure logger
99
        self.logger.setLevel(logging.DEBUG)
100
        # create console handler and set level to info
101
        handler = logging.StreamHandler()
102
        handler.setLevel(logging.INFO)
103
        formatter = logging.Formatter("%(levelname)s - %(message)s")
104
        handler.setFormatter(formatter)
105
        self.logger.addHandler(handler)
106
        # create debug file handler and set level to debug
107
        handler = logging.FileHandler(log_file, "w")
108
        handler.setLevel(logging.DEBUG)
109
        formatter = logging.Formatter("%(levelname)s - %(message)s")
110
        handler.setFormatter(formatter)
111
        self.logger.addHandler(handler)
112
        return log_file
113
114
    def annotate(self, text):
115
        """
116
        Uses default processor (CoreNLP) to annotate text.  Included for backwards compatibility.
117
        """
118
        return self.default.annotate(text)
119
120
    def annotate_from_sentences(self, sentences):
121
        """
122
        Uses default processor (CoreNLP) to annotate a list of segmented sentences.
123
        """
124
        return self.default.annotate_from_sentences(sentences)
125
126
    def is_running(self):
127
        return True if self.annotate("Blah") else False
128
129
    def _check_server_version(self):
130
        """
131
        Checks server version to see if it meets the recommendations
132
        """
133
        # avoid circular imports by delaying this import
134
        from .__init__ import __ps_rec__
135
        try:
136
            service_address = "{}/version".format(self.address)
137
            server_version = post_json(service_address, None)["version"]
138
            if str(__ps_rec__) != str(server_version):
139
                warnings.warn("Recommended server version is {}, but server version is {}".format(__ps_rec__, server_version))
140
            else:
141
                self.logger.info("Server version meets recommendations (v{})".format(__ps_rec__))
142
        except Exception as e:
143
            warnings.warn("Unable to determine server version.  Recommended version is {}".format(__ps_rec__))
144
145
146
class ProcessorsAPI(ProcessorsBaseAPI):
147
148
    """
149
    Manages a connection with the processors-server jar and provides an interface to the API.
150
151
    Parameters
152
    ----------
153
    timeout : int
154
        The number of seconds to wait for the server to initialize.  Default is 120.
155
    jvm_mem : str
156
        The maximum amount of memory to allocate to the JVM for the server.  Default is "-Xmx3G".
157
    jar_path : str
158
        The path to the processors-server jar.  Default is the jar installed with the package.
159
    kee_alive : bool
160
        Whether or not to keep the server running when ProcessorsAPI instance goes out of scope.  Default is false (server is shut down).
161
    log_file: str
162
        The path for the log file.  Default is py-processors.log in the user's home directory.
163
164
    Methods
165
    -------
166
    start_server(jar_path, **kwargs)
167
        Starts the server using the provided `jar_path`.  Optionally takes hostname, port, jvm_mem, and timeout.
168
    stop_server()
169
        Attempts to stop the server running at self.address.
170
    """
171
172
    PROC_VAR = 'PROCESSORS_SERVER'
173
    TIMEOUT = 120
174
    # save to lib loc
175
    DEFAULT_JAR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "processors-server.jar")
176
    JVM_MEM = "-Xmx3G"
177
    #print(resource_filename(__name__, "processors-server.jar"))
178
179
    def __init__(self, **kwargs):
180
        super(ProcessorsAPI, self).__init__(**kwargs)
181
        self.timeout = kwargs.get("timeout", ProcessorsAPI.TIMEOUT)
182
        self.jvm_mem = kwargs.get("jvm_mem", ProcessorsAPI.JVM_MEM)
183
        self._start_command = "java {mem} -cp {jp} NLPServer --port {port} --host {host}" # mem, jar path, port, host
184
        # whether or not to stop the server when the object is destroyed
185
        self.keep_alive = kwargs.get("keep_alive", False)
186
        # how long to wait between requests
187
        self.wait_time = 2
188
        # set self.jar_path
189
        self.jar_path = ProcessorsAPI.DEFAULT_JAR
190
        self._resolve_jar_path(kwargs.get("jar_path", self.jar_path))
191
        # attempt to establish connection with server
192
        self.establish_connection()
193
194
    def establish_connection(self):
195
        """
196
        Attempt to connect to a server (assumes server is running)
197
        """
198
        if self.is_running():
199
            self.logger.info("Connection with server established!")
200
            self._check_server_version()
201
        else:
202
            try:
203
                # resolve jar path if server is not already running
204
                self._resolve_jar_path(self.jar_path)
205
                # Attempt to start the server
206
                self._start_server()
207
            except Exception as e:
208
                self.logger.warn("Unable to start server. Please start the server manually with .start_server(jar_path=\"path/to/processors-server.jar\")")
209
                self.logger.warn("\n{}".format(e))
210
211
    def _resolve_jar_path(self, jar_path=None):
212
        """
213
        Attempts to preferentially set value of self.jar_path
214
        """
215
        jar_path = jar_path or ProcessorsAPI.DEFAULT_JAR
216
217
        # Preference 1: if a .jar is given, check to see if the path is valid
218
        if jar_path:
219
            jp = full_path(jar_path)
220
            # check if path is valid
221
            if os.path.exists(jp):
222
                self.jar_path = jp
223
224
        # Preference 2: if a PROCESSORS_SERVER environment variable is defined, check its validity
225
        if not os.path.exists(self.jar_path) and ProcessorsAPI.PROC_VAR in os.environ:
226
            self.logger.info("Using path given via ${}".format(ProcessorsAPI.PROC_VAR))
227
            jp = full_path(os.environ[ProcessorsAPI.PROC_VAR])
228
            # check if path is valid
229
            if os.path.exists(jp):
230
                self.jar_path = jp
231
            else:
232
                self.jar_path = None
233
                self.logger.warn("WARNING: {0} path is invalid.  \nPlease verify this entry in your environment:\n\texport {0}=/path/to/processors-server.jar".format(ProcessorsAPI.PROC_VAR))
234
235
        # Preference 3: attempt to use the processors-sever.jar (download if not found)
236
        # check if jar exists
237
        if not self.jar_path or not os.path.exists(self.jar_path):
238
            self.logger.info("No jar found.  Downloading to {} ...".format(ProcessorsAPI.DEFAULT_JAR))
239
            ProcessorsAPI._download_jar()
240
            self.jar_path = ProcessorsAPI.DEFAULT_JAR
241
242
    def start_server(self, jar_path, **kwargs):
243
        """
244
        Starts processors-sever.jar
245
        """
246
        self.port = kwargs.get("port", self.port)
247
        self.hostname = kwargs.get("hostname", self.hostname)
248
        self.jvm_mem = kwargs.get("jvm_mem", self.jvm_mem)
249
        self.timeout = int(float(kwargs.get("timeout", self.jvm_mem))/2)
250
        jp = full_path(jar_path)
251
        if jp:
252
            self.jar_path = jp
253
            self._start_server()
254
        else:
255
            raise Exception("Please provide jar_path=\"path/to/processors-server.jar\"")
256
257
    def stop_server(self, port=None):
258
        """
259
        Sends a poison pill to the server and waits for shutdown response
260
        """
261
        port = port or self.port
262
        address = "http://{}:{}".format(self.hostname, port)
263
        shutdown_address = "{}/shutdown".format(address)
264
        # attempt shutdown
265
        try:
266
            response = requests.post(shutdown_address)
267
            if response:
268
                print(response.content.decode("utf-8"))
269
            return True
270
        # will fail if the server is already down
271
        except Exception as e:
272
            pass
273
        return False
274
275
    def _ensure_jar_path_exists(self):
276
        # check if jar exists
277
        if not os.path.exists(self.jar_path):
278
            raise Exception("jar not found at {}".format(self.jar_path))
279
280
    def _start_server(self, port=None):
281
        """
282
        "Private" method called by start_server()
283
        """
284
285
        # does the jar exist?
286
        self._ensure_jar_path_exists()
287
288
        if port:
289
            self.port = port
290
        # build the command
291
        cmd = self._start_command.format(mem=self.jvm_mem, jp=self.jar_path, port=self.port, host=self.hostname)
292
        self._process = sp.Popen(shlex.split(cmd),
293
                                 shell=False,
294
                                 stderr=open(self.log_file, 'wb'),
295
                                 stdout=open(self.log_file, 'wb'),
296
                                 universal_newlines=True)
297
298
        self.logger.info("Starting processors-server ({}) ...".format(cmd))
299
        print("\nWaiting for server...")
300
301
        progressbar_length = int(self.timeout/self.wait_time)
302
        for i in range(progressbar_length):
303
            try:
304
                success = self.annotate("blah")
305
                if success:
306
                    print("\n\nConnection with processors-server established ({})".format(self.address))
307
                    return True
308
                sys.stdout.write("\r[{:{}}]".format('='*i, progressbar_length))
309
                time.sleep(self.wait_time)
310
            except Exception as e:
311
                raise(e)
312
313
        # if the server still hasn't started, raise an Exception
314
        raise Exception("Couldn't connect to processors-server. Is the port in use?")
315
316
    @staticmethod
317
    def _download_jar(jar_url=None):
318
        from .__init__ import SERVER_JAR_URL
319
        jar_url = jar_url or SERVER_JAR_URL
320
        # download processors-server.jar
321
        ppjar = ProcessorsAPI.DEFAULT_JAR
322
        percent = 0
323
        def dlProgress(count, blockSize, totalSize):
324
            percent = int(count*blockSize*100/totalSize)
325
            sys.stdout.write("\r{}% complete".format(percent))
326
            sys.stdout.flush()
327
328
        print("Downloading {} from {} ...".format(ppjar, jar_url))
329
        urlretrieve(jar_url, ppjar, reporthook=dlProgress)
330
        print("\nDownload Complete! {}".format(ppjar))
331
332
    def __del__(self):
333
        """
334
        Stop server unless otherwise specified
335
        """
336
        if not self.keep_alive:
337
            try:
338
                self.stop_server()
339
                # close our file object
340
                #self.DEVNULL.close()
341
                print("Successfully shut down processors-server!")
342
            except Exception as e:
343
                self.logger.debug(e)
344
                print("Couldn't kill processors-server.  Was server started externally?")
345
346
347
class OdinAPI(object):
348
    """
349
    API for performing rule-based information extraction with Odin.
350
351
    Parameters
352
    ----------
353
    address : str
354
        The base address for the API (i.e., everything preceding `/api/..`)
355
356
    """
357
358
    validator = re.compile("^(https?|ftp):.+?\.?ya?ml$")
359
360
    def __init__(self, address):
361
        self._service = "{}/api/odin/extract".format(address)
362
363
    def _extract(self, json_data):
364
        mns_json = post_json(self._service, json_data)
365
        if "error" in mns_json:
366
            error_msg = mns_json["error"]
367
            original_msg = json.loads(json_data)
368
            rules = original_msg.get("rules", original_msg.get("url", None))
369
            oe = OdinError(rules=rules, message=error_msg)
370
            print(oe)
371
            return None
372
        else:
373
            return JSONSerializer.mentions_from_JSON(mns_json)
374
375
    @staticmethod
376
    def valid_rule_url(url):
377
        return True if OdinAPI.validator.match(url) else False
378
379
    def extract_from_text(self, text, rules):
380
        """
381
        Sends text to the server with rules for information extraction (IE).
382
383
        Parameters
384
        ----------
385
        text : str
386
            `rules` will be applied to this `text`.
387
        rules : str
388
            Either Odin rules provided as a `yaml` string, or a url pointing to a `yaml` file of rules.
389
390
        Returns
391
        -------
392
        [processors.odin.Mention] or None
393
            Rule matches produce a list of `processors.odin.Mention`.
394
        """
395
        if OdinAPI.valid_rule_url(rules):
396
            # this is actually a URL to a yaml file
397
            url = rules
398
            container = TextWithURL(text, url)
399
        else:
400
            container = TextWithRules(text, rules)
401
        return self._extract(container.to_JSON())
402
403
    def extract_from_document(self, doc, rules):
404
        """
405
        Sends a `processors.ds.Document` (`doc`) to the server with rules for information extraction (IE).
406
407
        Parameters
408
        ----------
409
        doc : processors.ds.Document
410
            `rules` will be applied to this `processors.ds.Document`.
411
        rules : str
412
            Either Odin rules provided as a `yaml` string, or a url pointing to a `yaml` file of rules.
413
414
        Returns
415
        -------
416
        [processors.odin.Mention] or None
417
            Rule matches produce a list of `processors.odin.Mention`.
418
419
        """
420
        if OdinAPI.valid_rule_url(rules):
421
            # this is actually a URL to a yaml file
422
            url = rules
423
            container = DocumentWithURL(doc, rules)
424
        else:
425
            container = DocumentWithRules(doc, rules)
426
        return self._extract(container.to_JSON())
427
428
#############################################
429
# Containers for Odin data
430
# transmitted to the server for processing
431
#############################################
432
433
class TextWithRules(object):
434
435
    def __init__(self, text, rules):
436
        self.text = text
437
        self.rules = rules
438
439
    def to_JSON_dict(self):
440 View Code Duplication
        jdict = dict()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
441
        jdict["text"] = self.text
442
        jdict["rules"] = self.rules
443
        return jdict
444
445
    def to_JSON(self):
446
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
447
448
class TextWithURL(object):
449
450
    def __init__(self, text, url):
451
        self.text = text
452
        # TODO: throw exception if url is invalid
453
        self.url = url
454
455
    def to_JSON_dict(self):
456 View Code Duplication
        jdict = dict()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
457
        jdict["text"] = self.text
458
        jdict["url"] = self.url
459
        return jdict
460
461
    def to_JSON(self):
462
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
463
464
class DocumentWithRules(object):
465
466
    def __init__(self, document, rules):
467
        # TODO: throw exception if isinstance(document, Document) is False
468
        self.document = document
469
        self.rules = rules
470
471
    def to_JSON_dict(self):
472
        jdict = dict()
473
        jdict["document"] = self.document.to_JSON_dict()
474
        jdict["rules"] = self.rules
475
        return jdict
476
477
    def to_JSON(self):
478
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
479
480
class DocumentWithURL(object):
481
482
    def __init__(self, document, url):
483
        # TODO: throw exception if isinstance(document, Document) is False
484
        self.document = document
485
        # TODO: throw exception if url is invalid
486
        self.url = url
487
488
    def to_JSON_dict(self):
489
        jdict = dict()
490
        jdict["document"] = self.document.to_JSON_dict()
491
        jdict["url"] = self.url
492
        return jdict
493
494
    def to_JSON(self):
495
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=4)
496