Total Complexity | 6 |
Total Lines | 50 |
Duplicated Lines | 0 % |
Changes | 0 |
1 | """Parallel processing functionality for Annif""" |
||
2 | |||
3 | |||
4 | import multiprocessing |
||
5 | import multiprocessing.dummy |
||
6 | |||
7 | |||
8 | class ProjectSuggestMap: |
||
9 | """A utility class that can be used to wrap one or more projects and |
||
10 | provide a mapping method that converts Document objects to suggestions. |
||
11 | Intended to be used with the multiprocessing module.""" |
||
12 | |||
13 | def __init__( |
||
14 | self, |
||
15 | registry, |
||
16 | project_ids, |
||
17 | backend_params, |
||
18 | limit, |
||
19 | threshold): |
||
20 | self.registry = registry |
||
21 | self.project_ids = project_ids |
||
22 | self.backend_params = backend_params |
||
23 | self.limit = limit |
||
24 | self.threshold = threshold |
||
25 | |||
26 | def suggest(self, doc): |
||
27 | filtered_hits = {} |
||
28 | for project_id in self.project_ids: |
||
29 | project = self.registry.get_project(project_id) |
||
30 | hits = project.suggest(doc.text, self.backend_params) |
||
31 | filtered_hits[project_id] = hits.filter( |
||
32 | project.subjects, self.limit, self.threshold) |
||
33 | return (filtered_hits, doc.uris, doc.labels) |
||
34 | |||
35 | |||
36 | def get_pool(n_jobs): |
||
37 | """return a suitable multiprocessing pool class, and the correct jobs |
||
38 | argument for its constructor, for the given amount of parallel jobs""" |
||
39 | |||
40 | if n_jobs < 1: |
||
41 | n_jobs = None |
||
42 | pool_class = multiprocessing.Pool |
||
43 | elif n_jobs == 1: |
||
44 | # use the dummy wrapper around threading to avoid subprocess overhead |
||
45 | pool_class = multiprocessing.dummy.Pool |
||
46 | else: |
||
47 | pool_class = multiprocessing.Pool |
||
48 | |||
49 | return n_jobs, pool_class |
||
50 |