| Total Complexity | 6 | 
| Total Lines | 50 | 
| Duplicated Lines | 0 % | 
| Changes | 0 | ||
| 1 | """Parallel processing functionality for Annif"""  | 
            ||
| 2 | |||
| 3 | |||
| 4 | import multiprocessing  | 
            ||
| 5 | import multiprocessing.dummy  | 
            ||
| 6 | |||
| 7 | |||
| 8 | class ProjectSuggestMap:  | 
            ||
| 9 | """A utility class that can be used to wrap one or more projects and  | 
            ||
| 10 | provide a mapping method that converts Document objects to suggestions.  | 
            ||
| 11 | Intended to be used with the multiprocessing module."""  | 
            ||
| 12 | |||
| 13 | def __init__(  | 
            ||
| 14 | self,  | 
            ||
| 15 | registry,  | 
            ||
| 16 | project_ids,  | 
            ||
| 17 | backend_params,  | 
            ||
| 18 | limit,  | 
            ||
| 19 | threshold):  | 
            ||
| 20 | self.registry = registry  | 
            ||
| 21 | self.project_ids = project_ids  | 
            ||
| 22 | self.backend_params = backend_params  | 
            ||
| 23 | self.limit = limit  | 
            ||
| 24 | self.threshold = threshold  | 
            ||
| 25 | |||
| 26 | def suggest(self, doc):  | 
            ||
| 27 |         filtered_hits = {} | 
            ||
| 28 | for project_id in self.project_ids:  | 
            ||
| 29 | project = self.registry.get_project(project_id)  | 
            ||
| 30 | hits = project.suggest(doc.text, self.backend_params)  | 
            ||
| 31 | filtered_hits[project_id] = hits.filter(  | 
            ||
| 32 | project.subjects, self.limit, self.threshold)  | 
            ||
| 33 | return (filtered_hits, doc.uris, doc.labels)  | 
            ||
| 34 | |||
| 35 | |||
| 36 | def get_pool(n_jobs):  | 
            ||
| 37 | """return a suitable multiprocessing pool class, and the correct jobs  | 
            ||
| 38 | argument for its constructor, for the given amount of parallel jobs"""  | 
            ||
| 39 | |||
| 40 | if n_jobs < 1:  | 
            ||
| 41 | n_jobs = None  | 
            ||
| 42 | pool_class = multiprocessing.Pool  | 
            ||
| 43 | elif n_jobs == 1:  | 
            ||
| 44 | # use the dummy wrapper around threading to avoid subprocess overhead  | 
            ||
| 45 | pool_class = multiprocessing.dummy.Pool  | 
            ||
| 46 | else:  | 
            ||
| 47 | pool_class = multiprocessing.Pool  | 
            ||
| 48 | |||
| 49 | return n_jobs, pool_class  | 
            ||
| 50 |