processors.api   F
last analyzed

Complexity

Total Complexity 70

Size/Duplication

Total Lines 541
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 70
eloc 274
dl 0
loc 541
rs 2.8
c 0
b 0
f 0

38 Methods

Rating   Name   Duplication   Size   Complexity  
A ProcessorsAPI.establish_connection() 0 16 3
A ProcessorsBaseAPI.is_running() 0 2 2
A OdinAPI.valid_rule_url() 0 3 2
A ProcessorsAPI.__init__() 0 14 1
A TextWithURL.to_JSON() 0 2 1
A ProcessorsAPI.start_server() 0 14 2
A OpenIEAPI.extract_base_entities() 0 5 1
A ProcessorsBaseAPI.annotate_from_sentences() 0 5 1
A DocumentWithRules.__init__() 0 4 1
A DocumentWithRules.to_JSON_dict() 0 5 1
A TextWithRules.to_JSON() 0 2 1
A OdinAPI.extract_from_document() 0 24 2
A DocumentWithURL.to_JSON() 0 2 1
A TextWithRules.__init__() 0 3 1
A OdinAPI.__init__() 0 2 1
A TextWithURL.__init__() 0 4 1
B ProcessorsAPI._resolve_jar_path() 0 30 8
A OpenIEAPI.__init__() 0 2 1
A OdinAPI._extract() 0 11 2
A OpenIEAPI.extract_entities() 0 5 1
A DocumentWithURL.__init__() 0 5 1
A ProcessorsAPI.__del__() 0 13 3
A DocumentWithURL.to_JSON_dict() 0 5 1
A ProcessorsBaseAPI.__init__() 0 20 1
A ProcessorsBaseAPI._check_server_version() 0 15 3
A ProcessorsAPI.stop_server() 0 17 3
A DocumentWithRules.to_JSON() 0 2 1
B ProcessorsAPI._download_jar() 0 23 5
A OpenIEAPI._extract() 0 12 2
A TextWithURL.to_JSON_dict() 0 5 1
A ProcessorsBaseAPI.make_address() 0 7 1
A OdinAPI.extract_from_text() 0 23 2
A ProcessorsBaseAPI.annotate() 0 5 1
A ProcessorsAPI._ensure_jar_path_exists() 0 4 2
A TextWithRules.to_JSON_dict() 0 5 1
A OpenIEAPI.extract_and_filter_entities() 0 5 1
A ProcessorsBaseAPI._prepare_log_file() 0 21 2
B ProcessorsAPI._start_server() 0 35 5

How to fix   Complexity   

Complexity

Complex classes like processors.api often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
from __future__ import unicode_literals
3
#from pkg_resources import resource_filename
4
from .utils import *
5
from .annotators import *
6
from .sentiment import SentimentAnalysisAPI
7
from .serialization import JSONSerializer
8
import os
9
import shlex
10
import subprocess as sp
11
import requests
12
import re
13
import time
14
import sys
15
import logging
16
import warnings
17
18
19
class ProcessorsBaseAPI(object):
20
    """
21
    Manages a connection with processors-server and provides an interface to the API.
22
23
    Parameters
24
    ----------
25
    port : int
26
        The port the server is running on or should be started on.  Default is 8886.
27
    hostname : str
28
        The host name to use for the server.  Default is "localhost".
29
    log_file: str
30
        The path for the log file.  Default is py-processors.log in the user's home directory.
31
32
    Methods
33
    -------
34
    annotate(text)
35
        Produces a Document from the provided `text` using the default processor.
36
    clu.annotate(text)
37
        Produces a Document from the provided `text` using CluProcessor.
38
    fastnlp.annotate(text)
39
        Produces a Document from the provided `text` using FastNLPProcessor.
40
    bionlp.annotate(text)
41
        Produces a Document from the provided `text` using BioNLPProcessor.
42
    annotate_from_sentences(sentences)
43
        Produces a Document from `sentences` (a list of text split into sentences). Uses the default processor.
44
    fastnlp.annotate_from_sentences(sentences)
45
        Produces a Document from `sentences` (a list of text split into sentences). Uses FastNLPProcessor.
46
    bionlp.annotate_from_sentences(sentences)
47
        Produces a Document from `sentences` (a list of text split into sentences). Uses BioNLPProcessor.
48
    corenlp.sentiment.score_sentence(sentence)
49
        Produces a sentiment score for the provided `sentence` (an instance of Sentence).
50
    corenlp.sentiment.score_document(doc)
51
        Produces sentiment scores for the provided `doc` (an instance of Document).  One score is produced for each sentence.
52
    corenlp.sentiment.score_segmented_text(sentences)
53
        Produces sentiment scores for the provided `sentences` (a list of text segmented into sentences).  One score is produced for item in `sentences`.
54
    odin.extract_from_text(text, rules)
55
        Produces a list of Mentions for matches of the provided `rules` on the `text`.  `rules` can be a string of Odin rules, or a url ending in `.yml` or `.yaml`.
56
    odin.extract_from_document(doc, rules)
57
        Produces a list of Mentions for matches of the provided `rules` on the `doc` (an instance of Document).  `rules` can be a string of Odin rules, or a url ending in .yml or yaml.
58
    """
59
    PORT = 8888
60
    HOST = "localhost"
61
    LOG = full_path(os.path.join(os.path.expanduser("~"), "py-processors.log"))
62
63
    def __init__(self, **kwargs):
64
65
        self.hostname = kwargs.get("hostname", ProcessorsBaseAPI.HOST)
66
        self.port = kwargs.get("port", ProcessorsBaseAPI.PORT)
67
        self.make_address(self.hostname, self.port)
68
        # processors
69
        self.default = Processor(self.address)
70
        self.clu = CluProcessor(self.address)
71
        self.fastnlp = FastNLPProcessor(self.address)
72
        self.bionlp = BioNLPProcessor(self.address)
73
        # sentiment
74
        self.sentiment = SentimentAnalysisAPI(self.address)
75
        # odin
76
        self.odin = OdinAPI(self.address)
77
        #openie
78
        self.openie = OpenIEAPI(self.address)
79
        # use the os module's devnull for compatibility with python 2.7
80
        #self.DEVNULL = open(os.devnull, 'wb')
81
        self.logger = logging.getLogger(__name__)
82
        self.log_file = self._prepare_log_file(kwargs.get("log_file", ProcessorsAPI.LOG))
83
84
    def make_address(self, hostname, port):
85
        # update hostname
86
        self.hostname = hostname
87
        # update port
88
        self.port = port
89
        # update address
90
        self.address = "http://{}:{}".format(self.hostname, self.port)
91
92
    def _prepare_log_file(self, lf):
93
        """
94
        Configure logger and return file path for logging
95
        """
96
        # log_file
97
        log_file = ProcessorsAPI.LOG if not lf else os.path.expanduser(lf)
98
        # configure logger
99
        self.logger.setLevel(logging.DEBUG)
100
        # create console handler and set level to info
101
        handler = logging.StreamHandler()
102
        handler.setLevel(logging.INFO)
103
        formatter = logging.Formatter("%(levelname)s - %(message)s")
104
        handler.setFormatter(formatter)
105
        self.logger.addHandler(handler)
106
        # create debug file handler and set level to debug
107
        handler = logging.FileHandler(log_file, "w")
108
        handler.setLevel(logging.DEBUG)
109
        formatter = logging.Formatter("%(levelname)s - %(message)s")
110
        handler.setFormatter(formatter)
111
        self.logger.addHandler(handler)
112
        return log_file
113
114
    def annotate(self, text):
115
        """
116
        Uses default processor (CoreNLP) to annotate text.  Included for backwards compatibility.
117
        """
118
        return self.default.annotate(text)
119
120
    def annotate_from_sentences(self, sentences):
121
        """
122
        Uses default processor (CoreNLP) to annotate a list of segmented sentences.
123
        """
124
        return self.default.annotate_from_sentences(sentences)
125
126
    def is_running(self):
127
        return True if self.annotate("Blah") else False
128
129
    def _check_server_version(self):
130
        """
131
        Checks server version to see if it meets the recommendations
132
        """
133
        # avoid circular imports by delaying this import
134
        from .__init__ import __ps_rec__
135
        try:
136
            service_address = "{}/version".format(self.address)
137
            server_version = post_json(service_address, None)["version"]
138
            if str(__ps_rec__) != str(server_version):
139
                warnings.warn("Recommended server version is {}, but server version is {}".format(__ps_rec__, server_version))
140
            else:
141
                self.logger.info("Server version meets recommendations (v{})".format(__ps_rec__))
142
        except Exception as e:
143
            warnings.warn("Unable to determine server version.  Recommended version is {}".format(__ps_rec__))
144
145
146
class ProcessorsAPI(ProcessorsBaseAPI):
147
148
    """
149
    Manages a connection with the processors-server jar and provides an interface to the API.
150
151
    Parameters
152
    ----------
153
    timeout : int
154
        The number of seconds to wait for the server to initialize.  Default is 120.
155
    jvm_mem : str
156
        The maximum amount of memory to allocate to the JVM for the server.  Default is "-Xmx3G".
157
    jar_path : str
158
        The path to the processors-server jar.  Default is the jar installed with the package.
159
    kee_alive : bool
160
        Whether or not to keep the server running when ProcessorsAPI instance goes out of scope.  Default is false (server is shut down).
161
    log_file: str
162
        The path for the log file.  Default is py-processors.log in the user's home directory.
163
164
    Methods
165
    -------
166
    start_server(jar_path, **kwargs)
167
        Starts the server using the provided `jar_path`.  Optionally takes hostname, port, jvm_mem, and timeout.
168
    stop_server()
169
        Attempts to stop the server running at self.address.
170
    """
171
172
    PROC_VAR = 'PROCESSORS_SERVER'
173
    TIMEOUT = 120
174
    # save to lib loc
175
    DEFAULT_JAR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "processors-server.jar")
176
    JVM_MEM = "-Xmx3G"
177
    #print(resource_filename(__name__, "processors-server.jar"))
178
179
    def __init__(self, **kwargs):
180
        super(ProcessorsAPI, self).__init__(**kwargs)
181
        self.timeout = kwargs.get("timeout", ProcessorsAPI.TIMEOUT)
182
        self.jvm_mem = kwargs.get("jvm_mem", ProcessorsAPI.JVM_MEM)
183
        self._start_command = "java {mem} -cp {jp} NLPServer --port {port} --host {host}" # mem, jar path, port, host
184
        # whether or not to stop the server when the object is destroyed
185
        self.keep_alive = kwargs.get("keep_alive", False)
186
        # how long to wait between requests
187
        self.wait_time = 2
188
        # set self.jar_path
189
        self.jar_path = ProcessorsAPI.DEFAULT_JAR
190
        self._resolve_jar_path(kwargs.get("jar_path", self.jar_path))
191
        # attempt to establish connection with server
192
        self.establish_connection()
193
194
    def establish_connection(self):
195
        """
196
        Attempt to connect to a server (assumes server is running)
197
        """
198
        if self.is_running():
199
            self.logger.info("Connection with server established!")
200
            self._check_server_version()
201
        else:
202
            try:
203
                # resolve jar path if server is not already running
204
                self._resolve_jar_path(self.jar_path)
205
                # Attempt to start the server
206
                self._start_server()
207
            except Exception as e:
208
                self.logger.warn("Unable to start server. Please start the server manually with .start_server(jar_path=\"path/to/processors-server.jar\")")
209
                self.logger.warn("\n{}".format(e))
210
211
    def _resolve_jar_path(self, jar_path=None):
212
        """
213
        Attempts to preferentially set value of self.jar_path
214
        """
215
        jar_path = jar_path or ProcessorsAPI.DEFAULT_JAR
216
217
        # Preference 1: if a .jar is given, check to see if the path is valid
218
        if jar_path:
219
            jp = full_path(jar_path)
220
            # check if path is valid
221
            if os.path.exists(jp):
222
                self.jar_path = jp
223
224
        # Preference 2: if a PROCESSORS_SERVER environment variable is defined, check its validity
225
        if not os.path.exists(self.jar_path) and ProcessorsAPI.PROC_VAR in os.environ:
226
            self.logger.info("Using path given via ${}".format(ProcessorsAPI.PROC_VAR))
227
            jp = full_path(os.environ[ProcessorsAPI.PROC_VAR])
228
            # check if path is valid
229
            if os.path.exists(jp):
230
                self.jar_path = jp
231
            else:
232
                self.jar_path = None
233
                self.logger.warn("WARNING: {0} path is invalid.  \nPlease verify this entry in your environment:\n\texport {0}=/path/to/processors-server.jar".format(ProcessorsAPI.PROC_VAR))
234
235
        # Preference 3: attempt to use the processors-sever.jar (download if not found)
236
        # check if jar exists
237
        if not self.jar_path or not os.path.exists(self.jar_path):
238
            self.logger.info("No jar found.  Downloading to {} ...".format(ProcessorsAPI.DEFAULT_JAR))
239
            ProcessorsAPI._download_jar()
240
            self.jar_path = ProcessorsAPI.DEFAULT_JAR
241
242
    def start_server(self, jar_path, **kwargs):
243
        """
244
        Starts processors-sever.jar
245
        """
246
        self.port = kwargs.get("port", self.port)
247
        self.hostname = kwargs.get("hostname", self.hostname)
248
        self.jvm_mem = kwargs.get("jvm_mem", self.jvm_mem)
249
        self.timeout = int(float(kwargs.get("timeout", self.jvm_mem))/2)
250
        jp = full_path(jar_path)
251
        if jp:
252
            self.jar_path = jp
253
            self._start_server()
254
        else:
255
            raise Exception("Please provide jar_path=\"path/to/processors-server.jar\"")
256
257
    def stop_server(self, port=None):
258
        """
259
        Sends a poison pill to the server and waits for shutdown response
260
        """
261
        port = port or self.port
262
        address = "http://{}:{}".format(self.hostname, port)
263
        shutdown_address = "{}/shutdown".format(address)
264
        # attempt shutdown
265
        try:
266
            response = requests.post(shutdown_address)
267
            if response:
268
                print(response.content.decode("utf-8"))
269
            return True
270
        # will fail if the server is already down
271
        except Exception as e:
272
            pass
273
        return False
274
275
    def _ensure_jar_path_exists(self):
276
        # check if jar exists
277
        if not os.path.exists(self.jar_path):
278
            raise Exception("jar not found at {}".format(self.jar_path))
279
280
    def _start_server(self, port=None):
281
        """
282
        "Private" method called by start_server()
283
        """
284
285
        # does the jar exist?
286
        self._ensure_jar_path_exists()
287
288
        if port:
289
            self.port = port
290
        # build the command
291
        cmd = self._start_command.format(mem=self.jvm_mem, jp=self.jar_path, port=self.port, host=self.hostname)
292
        self._process = sp.Popen(shlex.split(cmd),
293
                                 shell=False,
294
                                 stderr=open(self.log_file, 'wb'),
295
                                 stdout=open(self.log_file, 'wb'),
296
                                 universal_newlines=True)
297
298
        self.logger.info("Starting processors-server ({}) ...".format(cmd))
299
        print("\nWaiting for server...")
300
301
        progressbar_length = int(self.timeout/self.wait_time)
302
        for i in range(progressbar_length):
303
            try:
304
                success = self.annotate("blah")
305
                if success:
306
                    print("\n\nConnection with processors-server established ({})".format(self.address))
307
                    return True
308
                sys.stdout.write("\r[{:{}}]".format('='*i, progressbar_length))
309
                time.sleep(self.wait_time)
310
            except Exception as e:
311
                raise(e)
312
313
        # if the server still hasn't started, raise an Exception
314
        raise Exception("Couldn't connect to processors-server. Is the port in use?")
315
316
    @staticmethod
317
    def _download_jar(jar_url=None):
318
        from .__init__ import SERVER_JAR_URL
319
        jar_url = jar_url or SERVER_JAR_URL
320
        # download processors-server.jar
321
        ppjar = ProcessorsAPI.DEFAULT_JAR
322
        dl = 0
323
        print("Downloading {} from {} ...".format(ppjar, jar_url))
324
        response = requests.get(jar_url, stream=True, headers={'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'})
325
        total_length = int(response.headers.get('content-length'))
326
        with open(ppjar, "wb") as handle:
327
            for data in response.iter_content(chunk_size=2048):
328
                # do we know the total file size?
329
                if total_length:
330
                    percent_complete = int(100 * float(dl) / float(total_length))
331
                    if percent_complete % 5 == 0:
332
                        sys.stdout.write("\r{}% complete".format(percent_complete))
333
                        sys.stdout.flush()
334
                    dl += len(data)
335
                # write data to disk
336
                handle.write(data)
337
338
        print("\nDownload Complete! {}".format(ppjar))
339
340
    def __del__(self):
341
        """
342
        Stop server unless otherwise specified
343
        """
344
        if not self.keep_alive:
345
            try:
346
                self.stop_server()
347
                # close our file object
348
                #self.DEVNULL.close()
349
                print("Successfully shut down processors-server!")
350
            except Exception as e:
351
                self.logger.debug(e)
352
                print("Couldn't kill processors-server.  Was server started externally?")
353
354
355
class OdinAPI(object):
356
    """
357
    API for performing rule-based information extraction with Odin.
358
359
    Parameters
360
    ----------
361
    address : str
362
        The base address for the API (i.e., everything preceding `/api/..`)
363
364
    """
365
366
    validator = re.compile("^(https?|ftp):.+?\.?ya?ml$")
367
368
    def __init__(self, address):
369
        self._service = "{}/api/odin/extract".format(address)
370
371
    def _extract(self, json_data):
372
        mns_json = post_json(self._service, json_data)
373
        if "error" in mns_json:
374
            error_msg = mns_json["error"]
375
            original_msg = json.loads(json_data)
376
            rules = original_msg.get("rules", original_msg.get("url", None))
377
            oe = OdinError(rules=rules, message=error_msg)
378
            print(oe)
379
            return None
380
        else:
381
            return JSONSerializer.mentions_from_JSON(mns_json)
382
383
    @staticmethod
384
    def valid_rule_url(url):
385
        return True if OdinAPI.validator.match(url) else False
386
387
    def extract_from_text(self, text, rules):
388
        """
389
        Sends text to the server with rules for information extraction (IE).
390
391
        Parameters
392
        ----------
393
        text : str
394
            `rules` will be applied to this `text`.
395
        rules : str
396
            Either Odin rules provided as a `yaml` string, or a url pointing to a `yaml` file of rules.
397
398
        Returns
399
        -------
400
        [processors.odin.Mention] or None
401
            Rule matches produce a list of `processors.odin.Mention`.
402
        """
403
        if OdinAPI.valid_rule_url(rules):
404
            # this is actually a URL to a yaml file
405
            url = rules
406
            container = TextWithURL(text, url)
407
        else:
408
            container = TextWithRules(text, rules)
409
        return self._extract(container.to_JSON())
410
411
    def extract_from_document(self, doc, rules):
412
        """
413
        Sends a `processors.ds.Document` (`doc`) to the server with rules for information extraction (IE).
414
415
        Parameters
416
        ----------
417
        doc : processors.ds.Document
418
            `rules` will be applied to this `processors.ds.Document`.
419
        rules : str
420
            Either Odin rules provided as a `yaml` string, or a url pointing to a `yaml` file of rules.
421
422
        Returns
423
        -------
424
        [processors.odin.Mention] or None
425
            Rule matches produce a list of `processors.odin.Mention`.
426
427
        """
428
        if OdinAPI.valid_rule_url(rules):
429
            # this is actually a URL to a yaml file
430
            url = rules
431
            container = DocumentWithURL(doc, rules)
432
        else:
433
            container = DocumentWithRules(doc, rules)
434
        return self._extract(container.to_JSON())
435
436
437
class OpenIEAPI(object):
438
439
    def __init__(self, address):
440
        self._service = "{}/api/openie/entities/".format(address)
441
442
    def _extract(self, endpoint, json_data):
443
        """
444
        """
445
        # /api/openie/entities/???
446
        api_endpoint = self._service + endpoint
447
        mns_json = post_json(api_endpoint, json_data)
448
        if "error" in mns_json:
449
            error_msg = mns_json["error"]
450
            print(error_msg)
451
            return None
452
        else:
453
            return JSONSerializer.mentions_from_JSON(mns_json)
454
455
    def extract_entities(self, ds):
456
        """
457
        Extracts and expands Entities from a Sentence or Document
458
        """
459
        return self._extract(endpoint="extract", json_data=json.dumps(ds.to_JSON_dict(), sort_keys=True, indent=None))
460
461
    def extract_and_filter_entities(self, ds):
462
        """
463
        Extracts, expands, and filters Entities from a Sentence or Document
464
        """
465
        return self._extract(endpoint="extract-filter", json_data=json.dumps(ds.to_JSON_dict(), sort_keys=True, indent=None))
466
467
    def extract_base_entities(self, ds):
468
        """
469
        Extracts non-expanded Entities from a Sentence or Document
470
        """
471
        return self._extract(endpoint="base-extract", json_data=json.dumps(ds.to_JSON_dict(), sort_keys=True, indent=None))
472
473
#############################################
474
# Containers for Odin data
475
# transmitted to the server for processing
476
#############################################
477
478
class TextWithRules(object):
479
480
    def __init__(self, text, rules):
481
        self.text = text
482
        self.rules = rules
483
484
    def to_JSON_dict(self):
485
        jdict = dict()
486
        jdict["text"] = self.text
487
        jdict["rules"] = self.rules
488
        return jdict
489
490
    def to_JSON(self):
491
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=None)
492
493
class TextWithURL(object):
494
495
    def __init__(self, text, url):
496
        self.text = text
497
        # TODO: throw exception if url is invalid
498
        self.url = url
499
500
    def to_JSON_dict(self):
501
        jdict = dict()
502
        jdict["text"] = self.text
503
        jdict["url"] = self.url
504
        return jdict
505
506
    def to_JSON(self):
507
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=None)
508
509
class DocumentWithRules(object):
510
511
    def __init__(self, document, rules):
512
        # TODO: throw exception if isinstance(document, Document) is False
513
        self.document = document
514
        self.rules = rules
515
516
    def to_JSON_dict(self):
517
        jdict = dict()
518
        jdict["document"] = self.document.to_JSON_dict()
519
        jdict["rules"] = self.rules
520
        return jdict
521
522
    def to_JSON(self):
523
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=None)
524
525
class DocumentWithURL(object):
526
527
    def __init__(self, document, url):
528
        # TODO: throw exception if isinstance(document, Document) is False
529
        self.document = document
530
        # TODO: throw exception if url is invalid
531
        self.url = url
532
533
    def to_JSON_dict(self):
534
        jdict = dict()
535
        jdict["document"] = self.document.to_JSON_dict()
536
        jdict["url"] = self.url
537
        return jdict
538
539
    def to_JSON(self):
540
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=None)
541