Completed
Push — master ( 4c1319...a3584a )
by Gus
34s queued 23s
created

OpenIEAPI._extract()   A

Complexity

Conditions 2

Size

Total Lines 12

Duplication

Lines 12
Ratio 100 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 12
loc 12
rs 9.4285
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
from __future__ import unicode_literals
4
#from pkg_resources import resource_filename
5
from .utils import *
6
from .annotators import *
7
from .sentiment import SentimentAnalysisAPI
8
from .serialization import JSONSerializer
9
import os
10
import shlex
11
import subprocess as sp
12
import requests
13
import re
14
import time
15
import sys
16
import logging
17
import warnings
18
19
20
class ProcessorsBaseAPI(object):
21
    """
22
    Manages a connection with processors-server and provides an interface to the API.
23
24
    Parameters
25
    ----------
26
    port : int
27
        The port the server is running on or should be started on.  Default is 8886.
28
    hostname : str
29
        The host name to use for the server.  Default is "localhost".
30
    log_file: str
31
        The path for the log file.  Default is py-processors.log in the user's home directory.
32
33
    Methods
34
    -------
35
    annotate(text)
36
        Produces a Document from the provided `text` using the default processor.
37
    clu.annotate(text)
38
        Produces a Document from the provided `text` using CluProcessor.
39
    fastnlp.annotate(text)
40
        Produces a Document from the provided `text` using FastNLPProcessor.
41
    bionlp.annotate(text)
42
        Produces a Document from the provided `text` using BioNLPProcessor.
43
    annotate_from_sentences(sentences)
44
        Produces a Document from `sentences` (a list of text split into sentences). Uses the default processor.
45
    fastnlp.annotate_from_sentences(sentences)
46
        Produces a Document from `sentences` (a list of text split into sentences). Uses FastNLPProcessor.
47
    bionlp.annotate_from_sentences(sentences)
48
        Produces a Document from `sentences` (a list of text split into sentences). Uses BioNLPProcessor.
49
    corenlp.sentiment.score_sentence(sentence)
50
        Produces a sentiment score for the provided `sentence` (an instance of Sentence).
51
    corenlp.sentiment.score_document(doc)
52
        Produces sentiment scores for the provided `doc` (an instance of Document).  One score is produced for each sentence.
53
    corenlp.sentiment.score_segmented_text(sentences)
54
        Produces sentiment scores for the provided `sentences` (a list of text segmented into sentences).  One score is produced for item in `sentences`.
55
    odin.extract_from_text(text, rules)
56
        Produces a list of Mentions for matches of the provided `rules` on the `text`.  `rules` can be a string of Odin rules, or a url ending in `.yml` or `.yaml`.
57
    odin.extract_from_document(doc, rules)
58
        Produces a list of Mentions for matches of the provided `rules` on the `doc` (an instance of Document).  `rules` can be a string of Odin rules, or a url ending in .yml or yaml.
59
    """
60
    PORT = 8888
61
    HOST = "localhost"
62
    LOG = full_path(os.path.join(os.path.expanduser("~"), "py-processors.log"))
63
64
    def __init__(self, **kwargs):
65
66
        self.hostname = kwargs.get("hostname", ProcessorsBaseAPI.HOST)
67
        self.port = kwargs.get("port", ProcessorsBaseAPI.PORT)
68
        self.make_address(self.hostname, self.port)
69
        # processors
70
        self.default = Processor(self.address)
71
        self.clu = CluProcessor(self.address)
72
        self.fastnlp = FastNLPProcessor(self.address)
73
        self.bionlp = BioNLPProcessor(self.address)
74
        # sentiment
75
        self.sentiment = SentimentAnalysisAPI(self.address)
76
        # odin
77
        self.odin = OdinAPI(self.address)
78
        #openie
79
        self.openie = OpenIEAPI(self.address)
80
        # use the os module's devnull for compatibility with python 2.7
81
        #self.DEVNULL = open(os.devnull, 'wb')
82
        self.logger = logging.getLogger(__name__)
83
        self.log_file = self._prepare_log_file(kwargs.get("log_file", ProcessorsAPI.LOG))
84
85
    def make_address(self, hostname, port):
86
        # update hostname
87
        self.hostname = hostname
88
        # update port
89
        self.port = port
90
        # update address
91
        self.address = "http://{}:{}".format(self.hostname, self.port)
92
93
    def _prepare_log_file(self, lf):
94
        """
95
        Configure logger and return file path for logging
96
        """
97
        # log_file
98
        log_file = ProcessorsAPI.LOG if not lf else os.path.expanduser(lf)
99
        # configure logger
100
        self.logger.setLevel(logging.DEBUG)
101
        # create console handler and set level to info
102
        handler = logging.StreamHandler()
103
        handler.setLevel(logging.INFO)
104
        formatter = logging.Formatter("%(levelname)s - %(message)s")
105
        handler.setFormatter(formatter)
106
        self.logger.addHandler(handler)
107
        # create debug file handler and set level to debug
108
        handler = logging.FileHandler(log_file, "w")
109
        handler.setLevel(logging.DEBUG)
110
        formatter = logging.Formatter("%(levelname)s - %(message)s")
111
        handler.setFormatter(formatter)
112
        self.logger.addHandler(handler)
113
        return log_file
114
115
    def annotate(self, text):
116
        """
117
        Uses default processor (CoreNLP) to annotate text.  Included for backwards compatibility.
118
        """
119
        return self.default.annotate(text)
120
121
    def annotate_from_sentences(self, sentences):
122
        """
123
        Uses default processor (CoreNLP) to annotate a list of segmented sentences.
124
        """
125
        return self.default.annotate_from_sentences(sentences)
126
127
    def is_running(self):
128
        return True if self.annotate("Blah") else False
129
130
    def _check_server_version(self):
131
        """
132
        Checks server version to see if it meets the recommendations
133
        """
134
        # avoid circular imports by delaying this import
135
        from .__init__ import __ps_rec__
136
        try:
137
            service_address = "{}/version".format(self.address)
138
            server_version = post_json(service_address, None)["version"]
139
            if str(__ps_rec__) != str(server_version):
140
                warnings.warn("Recommended server version is {}, but server version is {}".format(__ps_rec__, server_version))
141
            else:
142
                self.logger.info("Server version meets recommendations (v{})".format(__ps_rec__))
143
        except Exception as e:
144
            warnings.warn("Unable to determine server version.  Recommended version is {}".format(__ps_rec__))
145
146
147
class ProcessorsAPI(ProcessorsBaseAPI):
148
149
    """
150
    Manages a connection with the processors-server jar and provides an interface to the API.
151
152
    Parameters
153
    ----------
154
    timeout : int
155
        The number of seconds to wait for the server to initialize.  Default is 120.
156
    jvm_mem : str
157
        The maximum amount of memory to allocate to the JVM for the server.  Default is "-Xmx3G".
158
    jar_path : str
159
        The path to the processors-server jar.  Default is the jar installed with the package.
160
    kee_alive : bool
161
        Whether or not to keep the server running when ProcessorsAPI instance goes out of scope.  Default is false (server is shut down).
162
    log_file: str
163
        The path for the log file.  Default is py-processors.log in the user's home directory.
164
165
    Methods
166
    -------
167
    start_server(jar_path, **kwargs)
168
        Starts the server using the provided `jar_path`.  Optionally takes hostname, port, jvm_mem, and timeout.
169
    stop_server()
170
        Attempts to stop the server running at self.address.
171
    """
172
173
    PROC_VAR = 'PROCESSORS_SERVER'
174
    TIMEOUT = 120
175
    # save to lib loc
176
    DEFAULT_JAR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "processors-server.jar")
177
    JVM_MEM = "-Xmx3G"
178
    #print(resource_filename(__name__, "processors-server.jar"))
179
180
    def __init__(self, **kwargs):
181
        super(ProcessorsAPI, self).__init__(**kwargs)
182
        self.timeout = kwargs.get("timeout", ProcessorsAPI.TIMEOUT)
183
        self.jvm_mem = kwargs.get("jvm_mem", ProcessorsAPI.JVM_MEM)
184
        self._start_command = "java {mem} -cp {jp} NLPServer --port {port} --host {host}" # mem, jar path, port, host
185
        # whether or not to stop the server when the object is destroyed
186
        self.keep_alive = kwargs.get("keep_alive", False)
187
        # how long to wait between requests
188
        self.wait_time = 2
189
        # set self.jar_path
190
        self.jar_path = ProcessorsAPI.DEFAULT_JAR
191
        self._resolve_jar_path(kwargs.get("jar_path", self.jar_path))
192
        # attempt to establish connection with server
193
        self.establish_connection()
194
195
    def establish_connection(self):
196
        """
197
        Attempt to connect to a server (assumes server is running)
198
        """
199
        if self.is_running():
200
            self.logger.info("Connection with server established!")
201
            self._check_server_version()
202
        else:
203
            try:
204
                # resolve jar path if server is not already running
205
                self._resolve_jar_path(self.jar_path)
206
                # Attempt to start the server
207
                self._start_server()
208
            except Exception as e:
209
                self.logger.warn("Unable to start server. Please start the server manually with .start_server(jar_path=\"path/to/processors-server.jar\")")
210
                self.logger.warn("\n{}".format(e))
211
212
    def _resolve_jar_path(self, jar_path=None):
213
        """
214
        Attempts to preferentially set value of self.jar_path
215
        """
216
        jar_path = jar_path or ProcessorsAPI.DEFAULT_JAR
217
218
        # Preference 1: if a .jar is given, check to see if the path is valid
219
        if jar_path:
220
            jp = full_path(jar_path)
221
            # check if path is valid
222
            if os.path.exists(jp):
223
                self.jar_path = jp
224
225
        # Preference 2: if a PROCESSORS_SERVER environment variable is defined, check its validity
226
        if not os.path.exists(self.jar_path) and ProcessorsAPI.PROC_VAR in os.environ:
227
            self.logger.info("Using path given via ${}".format(ProcessorsAPI.PROC_VAR))
228
            jp = full_path(os.environ[ProcessorsAPI.PROC_VAR])
229
            # check if path is valid
230
            if os.path.exists(jp):
231
                self.jar_path = jp
232
            else:
233
                self.jar_path = None
234
                self.logger.warn("WARNING: {0} path is invalid.  \nPlease verify this entry in your environment:\n\texport {0}=/path/to/processors-server.jar".format(ProcessorsAPI.PROC_VAR))
235
236
        # Preference 3: attempt to use the processors-sever.jar (download if not found)
237
        # check if jar exists
238
        if not self.jar_path or not os.path.exists(self.jar_path):
239
            self.logger.info("No jar found.  Downloading to {} ...".format(ProcessorsAPI.DEFAULT_JAR))
240
            ProcessorsAPI._download_jar()
241
            self.jar_path = ProcessorsAPI.DEFAULT_JAR
242
243
    def start_server(self, jar_path, **kwargs):
244
        """
245
        Starts processors-sever.jar
246
        """
247
        self.port = kwargs.get("port", self.port)
248
        self.hostname = kwargs.get("hostname", self.hostname)
249
        self.jvm_mem = kwargs.get("jvm_mem", self.jvm_mem)
250
        self.timeout = int(float(kwargs.get("timeout", self.jvm_mem))/2)
251
        jp = full_path(jar_path)
252
        if jp:
253
            self.jar_path = jp
254
            self._start_server()
255
        else:
256
            raise Exception("Please provide jar_path=\"path/to/processors-server.jar\"")
257
258
    def stop_server(self, port=None):
259
        """
260
        Sends a poison pill to the server and waits for shutdown response
261
        """
262
        port = port or self.port
263
        address = "http://{}:{}".format(self.hostname, port)
264
        shutdown_address = "{}/shutdown".format(address)
265
        # attempt shutdown
266
        try:
267
            response = requests.post(shutdown_address)
268
            if response:
269
                print(response.content.decode("utf-8"))
270
            return True
271
        # will fail if the server is already down
272
        except Exception as e:
273
            pass
274
        return False
275
276
    def _ensure_jar_path_exists(self):
277
        # check if jar exists
278
        if not os.path.exists(self.jar_path):
279
            raise Exception("jar not found at {}".format(self.jar_path))
280
281
    def _start_server(self, port=None):
282
        """
283
        "Private" method called by start_server()
284
        """
285
286
        # does the jar exist?
287
        self._ensure_jar_path_exists()
288
289
        if port:
290
            self.port = port
291
        # build the command
292
        cmd = self._start_command.format(mem=self.jvm_mem, jp=self.jar_path, port=self.port, host=self.hostname)
293
        self._process = sp.Popen(shlex.split(cmd),
294
                                 shell=False,
295
                                 stderr=open(self.log_file, 'wb'),
296
                                 stdout=open(self.log_file, 'wb'),
297
                                 universal_newlines=True)
298
299
        self.logger.info("Starting processors-server ({}) ...".format(cmd))
300
        print("\nWaiting for server...")
301
302
        progressbar_length = int(self.timeout/self.wait_time)
303
        for i in range(progressbar_length):
304
            try:
305
                success = self.annotate("blah")
306
                if success:
307
                    print("\n\nConnection with processors-server established ({})".format(self.address))
308
                    return True
309
                sys.stdout.write("\r[{:{}}]".format('='*i, progressbar_length))
310
                time.sleep(self.wait_time)
311
            except Exception as e:
312
                raise(e)
313
314
        # if the server still hasn't started, raise an Exception
315
        raise Exception("Couldn't connect to processors-server. Is the port in use?")
316
317
    @staticmethod
318
    def _download_jar(jar_url=None):
319
        from .__init__ import SERVER_JAR_URL
320
        jar_url = jar_url or SERVER_JAR_URL
321
        # download processors-server.jar
322
        ppjar = ProcessorsAPI.DEFAULT_JAR
323
        dl = 0
324
        print("Downloading {} from {} ...".format(ppjar, jar_url))
325
        response = requests.get(jar_url, stream=True, headers={'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'})
326
        total_length = int(response.headers.get('content-length'))
327
        with open(ppjar, "wb") as handle:
328
            for data in response.iter_content(chunk_size=2048):
329
                # do we know the total file size?
330
                if total_length:
331
                    percent_complete = int(100 * float(dl) / float(total_length))
332
                    if percent_complete % 5 == 0:
333
                        sys.stdout.write("\r{}% complete".format(percent_complete))
334
                        sys.stdout.flush()
335
                    dl += len(data)
336
                # write data to disk
337
                handle.write(data)
338
339
        print("\nDownload Complete! {}".format(ppjar))
340
341
    def __del__(self):
342
        """
343
        Stop server unless otherwise specified
344
        """
345
        if not self.keep_alive:
346
            try:
347
                self.stop_server()
348
                # close our file object
349
                #self.DEVNULL.close()
350
                print("Successfully shut down processors-server!")
351
            except Exception as e:
352
                self.logger.debug(e)
353
                print("Couldn't kill processors-server.  Was server started externally?")
354
355
356
class OdinAPI(object):
357
    """
358
    API for performing rule-based information extraction with Odin.
359
360
    Parameters
361
    ----------
362
    address : str
363
        The base address for the API (i.e., everything preceding `/api/..`)
364
365
    """
366
367
    validator = re.compile("^(https?|ftp):.+?\.?ya?ml$")
368
369
    def __init__(self, address):
370
        self._service = "{}/api/odin/extract".format(address)
371
372
    def _extract(self, json_data):
373
        mns_json = post_json(self._service, json_data)
374
        if "error" in mns_json:
375
            error_msg = mns_json["error"]
376
            original_msg = json.loads(json_data)
377
            rules = original_msg.get("rules", original_msg.get("url", None))
378
            oe = OdinError(rules=rules, message=error_msg)
379
            print(oe)
380
            return None
381
        else:
382
            return JSONSerializer.mentions_from_JSON(mns_json)
383
384
    @staticmethod
385
    def valid_rule_url(url):
386
        return True if OdinAPI.validator.match(url) else False
387
388
    def extract_from_text(self, text, rules):
389
        """
390
        Sends text to the server with rules for information extraction (IE).
391
392
        Parameters
393
        ----------
394
        text : str
395
            `rules` will be applied to this `text`.
396
        rules : str
397
            Either Odin rules provided as a `yaml` string, or a url pointing to a `yaml` file of rules.
398
399
        Returns
400
        -------
401
        [processors.odin.Mention] or None
402
            Rule matches produce a list of `processors.odin.Mention`.
403
        """
404
        if OdinAPI.valid_rule_url(rules):
405
            # this is actually a URL to a yaml file
406
            url = rules
407
            container = TextWithURL(text, url)
408
        else:
409
            container = TextWithRules(text, rules)
410
        return self._extract(container.to_JSON())
411
412
    def extract_from_document(self, doc, rules):
413
        """
414
        Sends a `processors.ds.Document` (`doc`) to the server with rules for information extraction (IE).
415
416
        Parameters
417
        ----------
418
        doc : processors.ds.Document
419
            `rules` will be applied to this `processors.ds.Document`.
420
        rules : str
421
            Either Odin rules provided as a `yaml` string, or a url pointing to a `yaml` file of rules.
422
423
        Returns
424
        -------
425
        [processors.odin.Mention] or None
426
            Rule matches produce a list of `processors.odin.Mention`.
427
428
        """
429
        if OdinAPI.valid_rule_url(rules):
430
            # this is actually a URL to a yaml file
431
            url = rules
432
            container = DocumentWithURL(doc, rules)
433
        else:
434
            container = DocumentWithRules(doc, rules)
435
        return self._extract(container.to_JSON())
436
437
438
class OpenIEAPI(object):
439
440 View Code Duplication
    def __init__(self, address):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
441
        self._service = "{}/api/openie/entities/".format(address)
442
443
    def _extract(self, endpoint, json_data):
444
        """
445
        """
446
        # /api/openie/entities/???
447
        api_endpoint = self._service + endpoint
448
        mns_json = post_json(api_endpoint, json_data)
449
        if "error" in mns_json:
450
            error_msg = mns_json["error"]
451
            print(error_msg)
452
            return None
453
        else:
454
            return JSONSerializer.mentions_from_JSON(mns_json)
455
456 View Code Duplication
    def extract_entities(self, ds):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
457
        """
458
        Extracts and expands Entities from a Sentence or Document
459
        """
460
        return self._extract(endpoint="extract", json_data=json.dumps(ds.to_JSON_dict(), sort_keys=True, indent=None))
461
462
    def extract_and_filter_entities(self, ds):
463
        """
464
        Extracts, expands, and filters Entities from a Sentence or Document
465
        """
466
        return self._extract(endpoint="extract-filter", json_data=json.dumps(ds.to_JSON_dict(), sort_keys=True, indent=None))
467
468
    def extract_base_entities(self, ds):
469
        """
470
        Extracts non-expanded Entities from a Sentence or Document
471
        """
472
        return self._extract(endpoint="base-extract", json_data=json.dumps(ds.to_JSON_dict(), sort_keys=True, indent=None))
473
474
#############################################
475
# Containers for Odin data
476
# transmitted to the server for processing
477
#############################################
478
479
class TextWithRules(object):
480
481
    def __init__(self, text, rules):
482
        self.text = text
483
        self.rules = rules
484
485
    def to_JSON_dict(self):
486
        jdict = dict()
487
        jdict["text"] = self.text
488
        jdict["rules"] = self.rules
489
        return jdict
490
491
    def to_JSON(self):
492
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=None)
493
494
class TextWithURL(object):
495
496
    def __init__(self, text, url):
497
        self.text = text
498
        # TODO: throw exception if url is invalid
499
        self.url = url
500
501
    def to_JSON_dict(self):
502
        jdict = dict()
503
        jdict["text"] = self.text
504
        jdict["url"] = self.url
505
        return jdict
506
507
    def to_JSON(self):
508
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=None)
509
510
class DocumentWithRules(object):
511
512
    def __init__(self, document, rules):
513
        # TODO: throw exception if isinstance(document, Document) is False
514
        self.document = document
515
        self.rules = rules
516
517
    def to_JSON_dict(self):
518
        jdict = dict()
519
        jdict["document"] = self.document.to_JSON_dict()
520
        jdict["rules"] = self.rules
521
        return jdict
522
523
    def to_JSON(self):
524
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=None)
525
526
class DocumentWithURL(object):
527
528
    def __init__(self, document, url):
529
        # TODO: throw exception if isinstance(document, Document) is False
530
        self.document = document
531
        # TODO: throw exception if url is invalid
532
        self.url = url
533
534
    def to_JSON_dict(self):
535
        jdict = dict()
536
        jdict["document"] = self.document.to_JSON_dict()
537
        jdict["url"] = self.url
538
        return jdict
539
540
    def to_JSON(self):
541
        return json.dumps(self.to_JSON_dict(), sort_keys=True, indent=None)
542