Passed
Pull Request — master (#1069)
by Konstantin
02:45
created

ocrd_network.processing_server   F

Complexity

Total Complexity 60

Size/Duplication

Total Lines 572
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 60
eloc 383
dl 0
loc 572
rs 3.6
c 0
b 0
f 0

17 Methods

Rating   Name   Duplication   Size   Complexity  
A ProcessingServer.stop_deployed_agents() 0 2 1
A ProcessingServer.query_ocrd_tool_json_from_server() 0 21 3
A ProcessingServer.start() 0 25 2
A ProcessingServer.create_processing_message() 0 16 1
A ProcessingServer.on_startup() 0 2 1
A ProcessingServer.check_if_queue_exists() 0 12 2
A ProcessingServer.on_shutdown() 0 7 1
A ProcessingServer.create_message_queues() 0 25 2
A ProcessingServer.connect_publisher() 0 18 2
B ProcessingServer.__init__() 0 97 1
A ProcessingServer.push_to_processing_queue() 0 19 4
A ProcessingServer.push_to_processor_server() 0 32 4
A ProcessingServer.get_processor_job() 0 2 1
F ProcessingServer.push_processor_job() 0 111 17
A ProcessingServer.list_processors() 0 11 1
A ProcessingServer.get_processor_info() 0 13 2
F ProcessingServer.remove_from_request_cache() 0 85 15

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.processing_server often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import json
2
import requests
3
import httpx
4
from typing import Dict, List, Optional
5
import uvicorn
6
from queue import Queue
7
8
from fastapi import FastAPI, status, Request, HTTPException
9
from fastapi.exceptions import RequestValidationError
10
from fastapi.responses import JSONResponse
11
12
from pika.exceptions import ChannelClosedByBroker
13
from ocrd_utils import getLogger
14
from .database import (
15
    initiate_database,
16
    db_get_processing_job,
17
    db_get_workspace,
18
    db_update_workspace,
19
)
20
from .deployer import Deployer
21
from .models import (
22
    DBProcessorJob,
23
    PYJobInput,
24
    PYJobOutput,
25
    StateEnum
26
)
27
from .rabbitmq_utils import (
28
    RMQPublisher,
29
    OcrdProcessingMessage
30
)
31
from .server_utils import (
32
    _get_processor_job,
33
    expand_page_ids,
34
    validate_and_return_mets_path,
35
    validate_job_input,
36
)
37
from .utils import (
38
    download_ocrd_all_tool_json,
39
    generate_created_time,
40
    generate_id
41
)
42
43
44
class ProcessingServer(FastAPI):
45
    """FastAPI app to make ocr-d processor calls
46
47
    The Processing-Server receives calls conforming to the ocr-d webapi regarding the processing
48
    part. It can run ocrd-processors and provides endpoints to discover processors and watch the job
49
    status.
50
    The Processing-Server does not execute the processors itself but starts up a queue and a
51
    database to delegate the calls to processing workers. They are started by the Processing-Server
52
    and the communication goes through the queue.
53
    """
54
55
    def __init__(self, config_path: str, host: str, port: int) -> None:
56
        super().__init__(on_startup=[self.on_startup], on_shutdown=[self.on_shutdown],
57
                         title='OCR-D Processing Server',
58
                         description='OCR-D processing and processors')
59
        self.log = getLogger(__name__)
60
        self.log.info(f"Downloading ocrd all tool json")
61
        self.ocrd_all_tool_json = download_ocrd_all_tool_json(
62
            ocrd_all_url="https://ocr-d.de/js/ocrd-all-tool.json"
63
        )
64
        self.hostname = host
65
        self.port = port
66
        # The deployer is used for:
67
        # - deploying agents when the Processing Server is started
68
        # - retrieving runtime data of agents
69
        self.deployer = Deployer(config_path)
70
        self.mongodb_url = None
71
        # TODO: Combine these under a single URL, rabbitmq_utils needs an update
72
        self.rmq_host = self.deployer.data_queue.address
73
        self.rmq_port = self.deployer.data_queue.port
74
        self.rmq_vhost = '/'
75
        self.rmq_username = self.deployer.data_queue.username
76
        self.rmq_password = self.deployer.data_queue.password
77
78
        # Gets assigned when `connect_publisher` is called on the working object
79
        self.rmq_publisher = None
80
81
        # Used for buffering/caching processing requests in the Processing Server
82
        # Key: `workspace_id` or `path_to_mets` depending on which is provided
83
        # Value: Queue that holds PYInputJob elements
84
        self.processing_requests_cache = {}
85
86
        # Create routes
87
        self.router.add_api_route(
88
            path='/stop',
89
            endpoint=self.stop_deployed_agents,
90
            methods=['POST'],
91
            tags=['tools'],
92
            summary='Stop database, queue and processing-workers',
93
        )
94
95
        self.router.add_api_route(
96
            path='/processor/{processor_name}',
97
            endpoint=self.push_processor_job,
98
            methods=['POST'],
99
            tags=['processing'],
100
            status_code=status.HTTP_200_OK,
101
            summary='Submit a job to this processor',
102
            response_model=PYJobOutput,
103
            response_model_exclude_unset=True,
104
            response_model_exclude_none=True
105
        )
106
107
        self.router.add_api_route(
108
            path='/processor/{processor_name}/{job_id}',
109
            endpoint=self.get_processor_job,
110
            methods=['GET'],
111
            tags=['processing'],
112
            status_code=status.HTTP_200_OK,
113
            summary='Get information about a job based on its ID',
114
            response_model=PYJobOutput,
115
            response_model_exclude_unset=True,
116
            response_model_exclude_none=True
117
        )
118
119
        self.router.add_api_route(
120
            path='/processor/result_callback',
121
            endpoint=self.remove_from_request_cache,
122
            methods=['POST'],
123
            tags=['processing'],
124
            status_code=status.HTTP_200_OK,
125
            summary='Callback used by a worker or processor server for reporting result of a processing request',
126
        )
127
128
        self.router.add_api_route(
129
            path='/processor/{processor_name}',
130
            endpoint=self.get_processor_info,
131
            methods=['GET'],
132
            tags=['processing', 'discovery'],
133
            status_code=status.HTTP_200_OK,
134
            summary='Get information about this processor',
135
        )
136
137
        self.router.add_api_route(
138
            path='/processor',
139
            endpoint=self.list_processors,
140
            methods=['GET'],
141
            tags=['processing', 'discovery'],
142
            status_code=status.HTTP_200_OK,
143
            summary='Get a list of all available processors',
144
        )
145
146
        @self.exception_handler(RequestValidationError)
147
        async def validation_exception_handler(request: Request, exc: RequestValidationError):
148
            exc_str = f'{exc}'.replace('\n', ' ').replace('   ', ' ')
149
            self.log.error(f'{request}: {exc_str}')
150
            content = {'status_code': 10422, 'message': exc_str, 'data': None}
151
            return JSONResponse(content=content, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
152
153
    def start(self) -> None:
154
        """ deploy agents (db, queue, workers) and start the processing server with uvicorn
155
        """
156
        try:
157
            self.deployer.deploy_rabbitmq(image='rabbitmq:3-management', detach=True, remove=True)
158
            rabbitmq_url = self.deployer.data_queue.url
159
160
            self.deployer.deploy_mongodb(image='mongo', detach=True, remove=True)
161
            self.mongodb_url = self.deployer.data_mongo.url
162
163
            # The RMQPublisher is initialized and a connection to the RabbitMQ is performed
164
            self.connect_publisher()
165
            self.log.debug(f'Creating message queues on RabbitMQ instance url: {rabbitmq_url}')
166
            self.create_message_queues()
167
168
            self.deployer.deploy_hosts(
169
                mongodb_url=self.mongodb_url,
170
                rabbitmq_url=rabbitmq_url
171
            )
172
        except Exception:
173
            self.log.error('Error during startup of processing server. '
174
                           'Trying to kill parts of incompletely deployed service')
175
            self.deployer.kill_all()
176
            raise
177
        uvicorn.run(self, host=self.hostname, port=int(self.port))
178
179
    async def on_startup(self):
180
        await initiate_database(db_url=self.mongodb_url)
181
182
    async def on_shutdown(self) -> None:
183
        """
184
        - hosts and pids should be stored somewhere
185
        - ensure queue is empty or processor is not currently running
186
        - connect to hosts and kill pids
187
        """
188
        await self.stop_deployed_agents()
189
190
    async def stop_deployed_agents(self) -> None:
191
        self.deployer.kill_all()
192
193
    def connect_publisher(self, enable_acks: bool = True) -> None:
194
        self.log.info(f'Connecting RMQPublisher to RabbitMQ server: '
195
                      f'{self.rmq_host}:{self.rmq_port}{self.rmq_vhost}')
196
        self.rmq_publisher = RMQPublisher(
197
            host=self.rmq_host,
198
            port=self.rmq_port,
199
            vhost=self.rmq_vhost
200
        )
201
        self.log.debug(f'RMQPublisher authenticates with username: '
202
                       f'{self.rmq_username}, password: {self.rmq_password}')
203
        self.rmq_publisher.authenticate_and_connect(
204
            username=self.rmq_username,
205
            password=self.rmq_password
206
        )
207
        if enable_acks:
208
            self.rmq_publisher.enable_delivery_confirmations()
209
            self.log.info('Delivery confirmations are enabled')
210
        self.log.info('Successfully connected RMQPublisher.')
211
212
    def create_message_queues(self) -> None:
213
        """ Create the message queues based on the occurrence of
214
        `workers.name` in the config file.
215
        """
216
217
        # TODO: Remove
218
        """
219
        queue_names = set([])
220
        for data_host in self.deployer.data_hosts:
221
            for data_worker in data_host.data_workers:
222
                queue_names.add(data_worker.processor_name)
223
        """
224
225
        # The abstract version of the above lines
226
        queue_names = self.deployer.find_matching_processors(
227
            worker_only=True,
228
            str_names_only=True,
229
            unique_only=True
230
        )
231
232
        for queue_name in queue_names:
233
            # The existence/validity of the worker.name is not tested.
234
            # Even if an ocr-d processor does not exist, the queue is created
235
            self.log.info(f'Creating a message queue with id: {queue_name}')
236
            self.rmq_publisher.create_queue(queue_name=queue_name)
237
238
    @staticmethod
239
    def create_processing_message(job: DBProcessorJob) -> OcrdProcessingMessage:
240
        processing_message = OcrdProcessingMessage(
241
            job_id=job.job_id,
242
            processor_name=job.processor_name,
243
            created_time=generate_created_time(),
244
            path_to_mets=job.path_to_mets,
245
            workspace_id=job.workspace_id,
246
            input_file_grps=job.input_file_grps,
247
            output_file_grps=job.output_file_grps,
248
            page_id=job.page_id,
249
            parameters=job.parameters,
250
            result_queue_name=job.result_queue_name,
251
            callback_url=job.callback_url,
252
        )
253
        return processing_message
254
255
    def check_if_queue_exists(self, processor_name):
256
        try:
257
            # Only checks if the process queue exists, if not raises ChannelClosedByBroker
258
            self.rmq_publisher.create_queue(processor_name, passive=True)
259
        except ChannelClosedByBroker as error:
260
            self.log.warning(f"Process queue with id '{processor_name}' not existing: {error}")
261
            # Reconnect publisher - not efficient, but works
262
            # TODO: Revisit when reconnection strategy is implemented
263
            self.connect_publisher(enable_acks=True)
264
            raise HTTPException(
265
                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
266
                detail=f"Process queue with id '{processor_name}' not existing"
267
            )
268
269
    def query_ocrd_tool_json_from_server(self, processor_name):
270
        processor_server_url = self.deployer.resolve_processor_server_url(processor_name)
271
        if not processor_server_url:
272
            self.log.exception(f"Processor Server of '{processor_name}' is not available")
273
            raise HTTPException(
274
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
275
                detail=f"Processor Server of '{processor_name}' is not available"
276
            )
277
        # Request the tool json from the Processor Server
278
        response = requests.get(
279
            processor_server_url,
280
            headers={'Content-Type': 'application/json'}
281
        )
282
        if not response.status_code == 200:
283
            self.log.exception(f"Failed to retrieve '{processor_name}' from: {processor_server_url}")
284
            raise HTTPException(
285
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
286
                detail=f"Failed to retrieve '{processor_name}' from: {processor_server_url}"
287
            )
288
        ocrd_tool = response.json()
289
        return ocrd_tool, processor_server_url
290
291
    async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJobOutput:
292
        if data.job_id:
293
            raise HTTPException(
294
                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
295
                detail=f"Job id field is set but must not be: {data.job_id}"
296
            )
297
        # Generate processing job id
298
        data.job_id = generate_id()
299
300
        if data.agent_type not in ['worker', 'server']:
301
            raise HTTPException(
302
                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
303
                detail=f"Unknown network agent with value: {data.agent_type}"
304
            )
305
        workspace_db = await db_get_workspace(
306
            workspace_id=data.workspace_id,
307
            workspace_mets_path=data.path_to_mets
308
        )
309
        if not workspace_db:
310
            raise HTTPException(
311
                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
312
                detail=f"Workspace with id: {data.workspace_id} or path: {data.path_to_mets} not found"
313
            )
314
315
        # Since the path is not resolved yet,
316
        # the return value is not important for the Processing Server
317
        await validate_and_return_mets_path(self.log, data)
318
319
        page_ids = expand_page_ids(data.page_id)
320
321
        # A flag whether the current request must be cached
322
        # This is set to true if for any output fileGrp there
323
        # is a page_id value that has been previously locked
324
        cache_current_request = False
325
326
        # Check if there are any locked pages for the current request
327
        locked_ws_pages = workspace_db.pages_locked
328
        for output_fileGrp in data.output_file_grps:
329
            if output_fileGrp in locked_ws_pages:
330
                if "all_pages" in locked_ws_pages[output_fileGrp]:
331
                    cache_current_request = True
332
                    break
333
                # If there are request page ids that are already locked
334
                if not locked_ws_pages[output_fileGrp].isdisjoint(page_ids):
335
                    cache_current_request = True
336
                    break
337
338
        if cache_current_request:
339
            # Append the processor name to the request itself
340
            data.processor_name = processor_name
341
342
            workspace_key = data.workspace_id if data.workspace_id else data.path_to_mets
343
            # If a record queue of this workspace_id does not exist in the requests cache
344
            if not self.processing_requests_cache.get(workspace_key, None):
345
                self.processing_requests_cache[workspace_key] = Queue()
346
            # Add the processing request to the internal queue
347
            self.processing_requests_cache[workspace_key].put(data)
348
349
            return PYJobOutput(
350
                job_id=data.job_id,
351
                processor_name=processor_name,
352
                workspace_id=data.workspace_id,
353
                workspace_path=data.path_to_mets,
354
                state=StateEnum.cached
355
            )
356
        else:
357
            # Update locked pages by locking the pages in the request
358
            for output_fileGrp in data.output_file_grps:
359
                if output_fileGrp not in locked_ws_pages:
360
                    locked_ws_pages[output_fileGrp] = set()
361
                # The page id list is not empty - only some pages are in the request
362
                if page_ids:
363
                    locked_ws_pages[output_fileGrp].update(page_ids)
364
                else:
365
                    # Lock all pages with a single value
366
                    locked_ws_pages[output_fileGrp].add("all_pages")
367
368
            # Update the locked pages dictionary in the database
369
            await db_update_workspace(
370
                workspace_id=data.workspace_id,
371
                workspace_mets_path=data.path_to_mets,
372
                pages_locked=locked_ws_pages
373
            )
374
375
        # Create a DB entry
376
        job = DBProcessorJob(
377
            **data.dict(exclude_unset=True, exclude_none=True),
378
            processor_name=processor_name,
379
            internal_callback_url=f"/processor/result_callback",
380
            state=StateEnum.queued
381
        )
382
        await job.insert()
383
384
        job_output = None
385
        if data.agent_type == 'worker':
386
            ocrd_tool = await self.get_processor_info(processor_name)
387
            validate_job_input(self.log, processor_name, ocrd_tool, data)
388
            processing_message = self.create_processing_message(job)
389
            await self.push_to_processing_queue(processor_name, processing_message)
390
            job_output = job.to_job_output()
391
        if data.agent_type == 'server':
392
            ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name)
393
            validate_job_input(self.log, processor_name, ocrd_tool, data)
394
            job_output = await self.push_to_processor_server(processor_name, processor_server_url, data)
395
        if not job_output:
396
            self.log.exception('Failed to create job output')
397
            raise HTTPException(
398
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
399
                detail='Failed to create job output'
400
            )
401
        return job_output
402
403
    # TODO: Revisit and remove duplications between push_to_* methods
404
    async def push_to_processing_queue(self, processor_name: str, processing_message: OcrdProcessingMessage):
405
        if not self.rmq_publisher:
406
            raise Exception('RMQPublisher is not connected')
407
        deployed_processors = self.deployer.find_matching_processors(
408
            worker_only=True,
409
            str_names_only=True,
410
            unique_only=True
411
        )
412
        if processor_name not in deployed_processors:
413
            self.check_if_queue_exists(processor_name)
414
415
        encoded_processing_message = OcrdProcessingMessage.encode_yml(processing_message)
416
        try:
417
            self.rmq_publisher.publish_to_queue(processor_name, encoded_processing_message)
418
        except Exception as error:
419
            self.log.exception(f'RMQPublisher has failed: {error}')
420
            raise HTTPException(
421
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
422
                detail=f'RMQPublisher has failed: {error}'
423
            )
424
425
    async def push_to_processor_server(self, processor_name: str, processor_server_url: str, job_input: PYJobInput) -> PYJobOutput:
426
        try:
427
            json_data = json.dumps(job_input.dict(exclude_unset=True, exclude_none=True))
428
        except Exception as e:
429
            self.log.exception(f"Failed to json dump the PYJobInput, error: {e}")
430
            raise HTTPException(
431
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
432
                detail=f"Failed to json dump the PYJobInput, error: {e}"
433
            )
434
        
435
        # TODO: The amount of pages should come as a request input
436
        # TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161
437
        #  currently, use 200 as a default
438
        amount_of_pages = 200
439
        request_timeout = 20.0 * amount_of_pages  # 20 sec timeout per page
440
        # Post a processing job to the Processor Server asynchronously
441
        timeout = httpx.Timeout(timeout=request_timeout, connect=30.0)
442
        async with httpx.AsyncClient(timeout=timeout) as client:
443
            response = await client.post(
444
                processor_server_url,
445
                headers={'Content-Type': 'application/json'},
446
                json=json.loads(json_data)
447
            )
448
449
        if not response.status_code == 202:
450
            self.log.exception(f"Failed to post '{processor_name}' job to: {processor_server_url}")
451
            raise HTTPException(
452
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
453
                detail=f"Failed to post '{processor_name}' job to: {processor_server_url}"
454
            )
455
        job_output = response.json()
456
        return job_output
457
458
    async def get_processor_job(self, processor_name: str, job_id: str) -> PYJobOutput:
459
        return await _get_processor_job(self.log, processor_name, job_id)
460
461
    async def remove_from_request_cache(self, job_id: str, state: StateEnum,
462
                                        workspace_id: Optional[str], path_to_mets: Optional[str]):
463
        if state == StateEnum.failed:
464
            # TODO: Call the callback to the Workflow server if the current processing step has failed
465
            pass
466
467
        if state != StateEnum.success:
468
            # TODO: Handle other potential error cases
469
            pass
470
471
        job_db = await db_get_processing_job(job_id)
472
        if not job_db:
473
            self.log.exception(f"Processing job with id: {job_id} not found in DB")
474
        job_output_file_grps = job_db.output_file_grps
475
        job_page_ids = expand_page_ids(job_db.page_id)
476
477
        # Read DB workspace entry
478
        workspace_db = await db_get_workspace(
479
            workspace_id=workspace_id,
480
            workspace_mets_path=path_to_mets
481
        )
482
        if not workspace_db:
483
            self.log.exception(f"Workspace with id: {workspace_id} or path: {path_to_mets} not found in DB")
484
485
        # Update locked pages by unlocking the pages in the request
486
        locked_ws_pages = workspace_db.pages_locked
487
        for output_fileGrp in job_output_file_grps:
488
            if output_fileGrp in locked_ws_pages:
489
                if job_page_ids:
490
                    # Unlock the previously locked pages
491
                    locked_ws_pages[output_fileGrp].difference_update(set(job_page_ids))
492
                else:
493
                    # Remove the single variable used to indicate all pages are locked
494
                    locked_ws_pages[output_fileGrp].remove("all_pages")
495
496
        # Update the locked pages dictionary in the database
497
        await db_update_workspace(
498
            workspace_id=workspace_id,
499
            workspace_mets_path=path_to_mets,
500
            pages_locked=locked_ws_pages
501
        )
502
503
        # Take the next request from the cache (if any available)
504
        workspace_key = workspace_id if workspace_id else path_to_mets
505
506
        if workspace_key not in self.processing_requests_cache:
507
            # No internal queue available for that workspace
508
            return
509
510
        if self.processing_requests_cache[workspace_key].empty():
511
            # The queue is empty - delete it
512
            try:
513
                del self.processing_requests_cache[workspace_key]
514
            except KeyError as ex:
515
                self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}")
516
            return
517
518
        # Process the next request in the internal queue
519
        # TODO: Refactor and optimize the duplications here
520
        #  and last lines in `push_processor_job` method
521
        data = self.processing_requests_cache[workspace_key].get()
522
        processor_name = data.processor_name
523
524
        # Create a DB entry
525
        job = DBProcessorJob(
526
            **data.dict(exclude_unset=True, exclude_none=True),
527
            processor_name=processor_name,
528
            internal_callback_url=f"/processor/result_callback",
529
            state=StateEnum.queued
530
        )
531
        await job.insert()
532
533
        job_output = None
534
        if data.agent_type == 'worker':
535
            ocrd_tool = await self.get_processor_info(processor_name)
536
            validate_job_input(self.log, processor_name, ocrd_tool, data)
537
            processing_message = self.create_processing_message(job)
538
            await self.push_to_processing_queue(processor_name, processing_message)
539
            job_output = job.to_job_output()
540
        if data.agent_type == 'server':
541
            ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name)
542
            validate_job_input(self.log, processor_name, ocrd_tool, data)
543
            job_output = await self.push_to_processor_server(processor_name, processor_server_url, data)
544
        if not job_output:
545
            self.log.exception(f'Failed to create job output for job input data: {data}')
546
547
    async def get_processor_info(self, processor_name) -> Dict:
548
        """ Return a processor's ocrd-tool.json
549
        """
550
        ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None)
551
        if not ocrd_tool:
552
            raise HTTPException(
553
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
554
                detail=f"Ocrd tool JSON of '{processor_name}' not available!"
555
            )
556
557
        # TODO: Returns the ocrd tool json even of processors
558
        #  that are not deployed. This may or may not be desired.
559
        return ocrd_tool
560
561
    async def list_processors(self) -> List[str]:
562
        # There is no caching on the Processing Server side
563
        processor_names_list = self.deployer.find_matching_processors(
564
            docker_only=False,
565
            native_only=False,
566
            worker_only=False,
567
            server_only=False,
568
            str_names_only=True,
569
            unique_only=True
570
        )
571
        return processor_names_list
572