| Total Complexity | 6 |
| Total Lines | 50 |
| Duplicated Lines | 0 % |
| Changes | 0 | ||
| 1 | """Parallel processing functionality for Annif""" |
||
| 2 | |||
| 3 | |||
| 4 | import multiprocessing |
||
| 5 | import multiprocessing.dummy |
||
| 6 | |||
| 7 | |||
| 8 | class ProjectSuggestMap: |
||
| 9 | """A utility class that can be used to wrap one or more projects and |
||
| 10 | provide a mapping method that converts Document objects to suggestions. |
||
| 11 | Intended to be used with the multiprocessing module.""" |
||
| 12 | |||
| 13 | def __init__( |
||
| 14 | self, |
||
| 15 | registry, |
||
| 16 | project_ids, |
||
| 17 | backend_params, |
||
| 18 | limit, |
||
| 19 | threshold): |
||
| 20 | self.registry = registry |
||
| 21 | self.project_ids = project_ids |
||
| 22 | self.backend_params = backend_params |
||
| 23 | self.limit = limit |
||
| 24 | self.threshold = threshold |
||
| 25 | |||
| 26 | def suggest(self, doc): |
||
| 27 | filtered_hits = {} |
||
| 28 | for project_id in self.project_ids: |
||
| 29 | project = self.registry.get_project(project_id) |
||
| 30 | hits = project.suggest(doc.text, self.backend_params) |
||
| 31 | filtered_hits[project_id] = hits.filter( |
||
| 32 | project.subjects, self.limit, self.threshold) |
||
| 33 | return (filtered_hits, doc.uris, doc.labels) |
||
| 34 | |||
| 35 | |||
| 36 | def get_pool(n_jobs): |
||
| 37 | """return a suitable multiprocessing pool class, and the correct jobs |
||
| 38 | argument for its constructor, for the given amount of parallel jobs""" |
||
| 39 | |||
| 40 | if n_jobs < 1: |
||
| 41 | n_jobs = None |
||
| 42 | pool_class = multiprocessing.Pool |
||
| 43 | elif n_jobs == 1: |
||
| 44 | # use the dummy wrapper around threading to avoid subprocess overhead |
||
| 45 | pool_class = multiprocessing.dummy.Pool |
||
| 46 | else: |
||
| 47 | pool_class = multiprocessing.Pool |
||
| 48 | |||
| 49 | return n_jobs, pool_class |
||
| 50 |