Passed
Pull Request — master (#1069)
by
unknown
02:52
created

ProcessingServer.start()   A

Complexity

Conditions 2

Size

Total Lines 25
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 17
dl 0
loc 25
rs 9.55
c 0
b 0
f 0
cc 2
nop 1
1
import json
2
import requests
3
import httpx
4
from typing import Dict, List
5
import uvicorn
6
7
from fastapi import FastAPI, status, Request, HTTPException
8
from fastapi.exceptions import RequestValidationError
9
from fastapi.responses import JSONResponse
10
11
from pika.exceptions import ChannelClosedByBroker
12
from ocrd_utils import getLogger
13
from .database import (
14
    initiate_database,
15
    db_get_processing_job,
16
    db_get_workspace,
17
    db_update_workspace,
18
)
19
from .deployer import Deployer
20
from .models import (
21
    DBProcessorJob,
22
    PYJobInput,
23
    PYJobOutput,
24
    PYResultMessage,
25
    StateEnum
26
)
27
from .rabbitmq_utils import (
28
    RMQPublisher,
29
    OcrdProcessingMessage
30
)
31
from .server_utils import (
32
    _get_processor_job,
33
    expand_page_ids,
34
    validate_and_return_mets_path,
35
    validate_job_input,
36
)
37
from .utils import (
38
    download_ocrd_all_tool_json,
39
    generate_created_time,
40
    generate_id
41
)
42
43
44
class ProcessingServer(FastAPI):
45
    """FastAPI app to make ocr-d processor calls
46
47
    The Processing-Server receives calls conforming to the ocr-d webapi regarding the processing
48
    part. It can run ocrd-processors and provides endpoints to discover processors and watch the job
49
    status.
50
    The Processing-Server does not execute the processors itself but starts up a queue and a
51
    database to delegate the calls to processing workers. They are started by the Processing-Server
52
    and the communication goes through the queue.
53
    """
54
55
    def __init__(self, config_path: str, host: str, port: int) -> None:
56
        super().__init__(on_startup=[self.on_startup], on_shutdown=[self.on_shutdown],
57
                         title='OCR-D Processing Server',
58
                         description='OCR-D processing and processors')
59
        self.log = getLogger(__name__)
60
        self.log.info(f"Downloading ocrd all tool json")
61
        self.ocrd_all_tool_json = download_ocrd_all_tool_json(
62
            ocrd_all_url="https://ocr-d.de/js/ocrd-all-tool.json"
63
        )
64
        self.hostname = host
65
        self.port = port
66
        # The deployer is used for:
67
        # - deploying agents when the Processing Server is started
68
        # - retrieving runtime data of agents
69
        self.deployer = Deployer(config_path)
70
        self.mongodb_url = None
71
        # TODO: Combine these under a single URL, rabbitmq_utils needs an update
72
        self.rmq_host = self.deployer.data_queue.address
73
        self.rmq_port = self.deployer.data_queue.port
74
        self.rmq_vhost = '/'
75
        self.rmq_username = self.deployer.data_queue.username
76
        self.rmq_password = self.deployer.data_queue.password
77
78
        # Gets assigned when `connect_publisher` is called on the working object
79
        self.rmq_publisher = None
80
81
        # Used for buffering/caching processing requests in the Processing Server
82
        # Key: `workspace_id` or `path_to_mets` depending on which is provided
83
        # Value: Queue that holds PYInputJob elements
84
        self.processing_requests_cache = {}
85
86
        # Used by processing workers and/or processor servers to report back the results
87
        if self.deployer.internal_callback_url:
88
            host = self.deployer.internal_callback_url
89
            self.internal_job_callback_url = f'{host.rstrip("/")}/result_callback'
90
        else:
91
            self.internal_job_callback_url = f'http://{host}:{port}/result_callback'
92
93
        # Create routes
94
        self.router.add_api_route(
95
            path='/stop',
96
            endpoint=self.stop_deployed_agents,
97
            methods=['POST'],
98
            tags=['tools'],
99
            summary='Stop database, queue and processing-workers',
100
        )
101
102
        self.router.add_api_route(
103
            path='/processor/{processor_name}',
104
            endpoint=self.push_processor_job,
105
            methods=['POST'],
106
            tags=['processing'],
107
            status_code=status.HTTP_200_OK,
108
            summary='Submit a job to this processor',
109
            response_model=PYJobOutput,
110
            response_model_exclude_unset=True,
111
            response_model_exclude_none=True
112
        )
113
114
        self.router.add_api_route(
115
            path='/processor/{processor_name}/{job_id}',
116
            endpoint=self.get_processor_job,
117
            methods=['GET'],
118
            tags=['processing'],
119
            status_code=status.HTTP_200_OK,
120
            summary='Get information about a job based on its ID',
121
            response_model=PYJobOutput,
122
            response_model_exclude_unset=True,
123
            response_model_exclude_none=True
124
        )
125
126
        self.router.add_api_route(
127
            path='/result_callback',
128
            endpoint=self.remove_from_request_cache,
129
            methods=['POST'],
130
            tags=['processing'],
131
            status_code=status.HTTP_200_OK,
132
            summary='Callback used by a worker or processor server for reporting result of a processing request',
133
        )
134
135
        self.router.add_api_route(
136
            path='/processor/{processor_name}',
137
            endpoint=self.get_processor_info,
138
            methods=['GET'],
139
            tags=['processing', 'discovery'],
140
            status_code=status.HTTP_200_OK,
141
            summary='Get information about this processor',
142
        )
143
144
        self.router.add_api_route(
145
            path='/processor',
146
            endpoint=self.list_processors,
147
            methods=['GET'],
148
            tags=['processing', 'discovery'],
149
            status_code=status.HTTP_200_OK,
150
            summary='Get a list of all available processors',
151
        )
152
153
        @self.exception_handler(RequestValidationError)
154
        async def validation_exception_handler(request: Request, exc: RequestValidationError):
155
            exc_str = f'{exc}'.replace('\n', ' ').replace('   ', ' ')
156
            self.log.error(f'{request}: {exc_str}')
157
            content = {'status_code': 10422, 'message': exc_str, 'data': None}
158
            return JSONResponse(content=content, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
159
160
    def start(self) -> None:
161
        """ deploy agents (db, queue, workers) and start the processing server with uvicorn
162
        """
163
        try:
164
            self.deployer.deploy_rabbitmq(image='rabbitmq:3-management', detach=True, remove=True)
165
            rabbitmq_url = self.deployer.data_queue.url
166
167
            self.deployer.deploy_mongodb(image='mongo', detach=True, remove=True)
168
            self.mongodb_url = self.deployer.data_mongo.url
169
170
            # The RMQPublisher is initialized and a connection to the RabbitMQ is performed
171
            self.connect_publisher()
172
            self.log.debug(f'Creating message queues on RabbitMQ instance url: {rabbitmq_url}')
173
            self.create_message_queues()
174
175
            self.deployer.deploy_hosts(
176
                mongodb_url=self.mongodb_url,
177
                rabbitmq_url=rabbitmq_url
178
            )
179
        except Exception:
180
            self.log.error('Error during startup of processing server. '
181
                           'Trying to kill parts of incompletely deployed service')
182
            self.deployer.kill_all()
183
            raise
184
        uvicorn.run(self, host=self.hostname, port=int(self.port))
185
186
    async def on_startup(self):
187
        await initiate_database(db_url=self.mongodb_url)
188
189
    async def on_shutdown(self) -> None:
190
        """
191
        - hosts and pids should be stored somewhere
192
        - ensure queue is empty or processor is not currently running
193
        - connect to hosts and kill pids
194
        """
195
        await self.stop_deployed_agents()
196
197
    async def stop_deployed_agents(self) -> None:
198
        self.deployer.kill_all()
199
200
    def connect_publisher(self, enable_acks: bool = True) -> None:
201
        self.log.info(f'Connecting RMQPublisher to RabbitMQ server: '
202
                      f'{self.rmq_host}:{self.rmq_port}{self.rmq_vhost}')
203
        self.rmq_publisher = RMQPublisher(
204
            host=self.rmq_host,
205
            port=self.rmq_port,
206
            vhost=self.rmq_vhost
207
        )
208
        self.log.debug(f'RMQPublisher authenticates with username: '
209
                       f'{self.rmq_username}, password: {self.rmq_password}')
210
        self.rmq_publisher.authenticate_and_connect(
211
            username=self.rmq_username,
212
            password=self.rmq_password
213
        )
214
        if enable_acks:
215
            self.rmq_publisher.enable_delivery_confirmations()
216
            self.log.info('Delivery confirmations are enabled')
217
        self.log.info('Successfully connected RMQPublisher.')
218
219
    def create_message_queues(self) -> None:
220
        """ Create the message queues based on the occurrence of
221
        `workers.name` in the config file.
222
        """
223
224
        # TODO: Remove
225
        """
226
        queue_names = set([])
227
        for data_host in self.deployer.data_hosts:
228
            for data_worker in data_host.data_workers:
229
                queue_names.add(data_worker.processor_name)
230
        """
231
232
        # The abstract version of the above lines
233
        queue_names = self.deployer.find_matching_processors(
234
            worker_only=True,
235
            str_names_only=True,
236
            unique_only=True
237
        )
238
239
        for queue_name in queue_names:
240
            # The existence/validity of the worker.name is not tested.
241
            # Even if an ocr-d processor does not exist, the queue is created
242
            self.log.info(f'Creating a message queue with id: {queue_name}')
243
            self.rmq_publisher.create_queue(queue_name=queue_name)
244
245
    @staticmethod
246
    def create_processing_message(job: DBProcessorJob) -> OcrdProcessingMessage:
247
        processing_message = OcrdProcessingMessage(
248
            job_id=job.job_id,
249
            processor_name=job.processor_name,
250
            created_time=generate_created_time(),
251
            path_to_mets=job.path_to_mets,
252
            workspace_id=job.workspace_id,
253
            input_file_grps=job.input_file_grps,
254
            output_file_grps=job.output_file_grps,
255
            page_id=job.page_id,
256
            parameters=job.parameters,
257
            result_queue_name=job.result_queue_name,
258
            callback_url=job.callback_url,
259
            internal_callback_url=job.internal_callback_url
260
        )
261
        return processing_message
262
263
    def check_if_queue_exists(self, processor_name):
264
        try:
265
            # Only checks if the process queue exists, if not raises ChannelClosedByBroker
266
            self.rmq_publisher.create_queue(processor_name, passive=True)
267
        except ChannelClosedByBroker as error:
268
            self.log.warning(f"Process queue with id '{processor_name}' not existing: {error}")
269
            # Reconnect publisher - not efficient, but works
270
            # TODO: Revisit when reconnection strategy is implemented
271
            self.connect_publisher(enable_acks=True)
272
            raise HTTPException(
273
                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
274
                detail=f"Process queue with id '{processor_name}' not existing"
275
            )
276
277
    def check_if_locked_pages_for_output_file_grps(
278
            self,
279
            locked_ws_pages: Dict,
280
            output_file_grps: List[str],
281
            page_ids: List[str]
282
    ) -> bool:
283
        for output_fileGrp in output_file_grps:
284
            self.log.debug(f"Checking output file group: {output_fileGrp}")
285
            if output_fileGrp in locked_ws_pages:
286
                self.log.debug(f"Locked workspace pages has entry for output file group: {output_fileGrp}")
287
                if "all_pages" in locked_ws_pages[output_fileGrp]:
288
                    self.log.debug(f"Caching the received request due to locked output file grp pages")
289
                    return True
290
                # If there are request page ids that are already locked
291
                if not set(locked_ws_pages[output_fileGrp]).isdisjoint(page_ids):
292
                    self.log.debug(f"Caching the received request due to locked output file grp pages")
293
                    return True
294
295
    def lock_pages(self, locked_ws_pages: Dict, output_file_grps: List[str], page_ids: List[str]):
296
        for output_fileGrp in output_file_grps:
297
            if output_fileGrp not in locked_ws_pages:
298
                self.log.debug(f"Creating an empty list for output file grp: {output_fileGrp}")
299
                locked_ws_pages[output_fileGrp] = []
300
            # The page id list is not empty - only some pages are in the request
301
            if page_ids:
302
                self.log.debug(f"Locking pages for `{output_fileGrp}`: {page_ids}")
303
                locked_ws_pages[output_fileGrp].extend(page_ids)
304
            else:
305
                # Lock all pages with a single value
306
                self.log.debug(f"Locking all pages for `{output_fileGrp}`")
307
                locked_ws_pages[output_fileGrp].append("all_pages")
308
309
    def unlock_pages(self, locked_ws_pages: Dict, output_file_grps: List[str], page_ids: List[str]):
310
        for output_fileGrp in output_file_grps:
311
            if output_fileGrp in locked_ws_pages:
312
                if page_ids:
313
                    # Unlock the previously locked pages
314
                    self.log.debug(f"Unlocking pages of `{output_fileGrp}`: {page_ids}")
315
                    locked_ws_pages[output_fileGrp] = [x for x in locked_ws_pages[output_fileGrp] if
316
                                                       x not in page_ids]
317
                    self.log.debug(f"Remaining locked pages of `{output_fileGrp}`: {locked_ws_pages[output_fileGrp]}")
318
                else:
319
                    # Remove the single variable used to indicate all pages are locked
320
                    self.log.debug(f"Unlocking all pages for: {output_fileGrp}")
321
                    locked_ws_pages[output_fileGrp].remove("all_pages")
322
323
    # Returns true if all dependent jobs' states are success, else false
324
    async def check_if_job_dependencies_met(self, dependencies: List[str]) -> bool:
325
        # Check the states of all dependent jobs
326
        for dependency_job_id in dependencies:
327
            self.log.debug(f"dependency_job_id: {dependency_job_id}")
328
            try:
329
                dependency_job_state = (await db_get_processing_job(dependency_job_id)).state
330
            except ValueError:
331
                # job_id not (yet) in db. Dependency not met
332
                return False
333
            self.log.debug(f"dependency_job_state: {dependency_job_state}")
334
            # Found a dependent job whose state is not success
335
            if dependency_job_state != StateEnum.success:
336
                return False
337
        return True
338
339
    async def find_next_requests_from_internal_queue(self, internal_queue: List[PYJobInput]) -> List[PYJobInput]:
340
        found_requests = []
341
        for i, current_element in enumerate(internal_queue):
342
            # Request has other job dependencies
343
            if current_element.depends_on:
344
                self.log.debug(f"current_element: {current_element}")
345
                self.log.debug(f"job dependencies: {current_element.depends_on}")
346
                satisfied_dependencies = await self.check_if_job_dependencies_met(current_element.depends_on)
347
                self.log.debug(f"satisfied dependencies: {satisfied_dependencies}")
348
                if not satisfied_dependencies:
349
                    continue
350
            # Consume the request from the internal queue
351
            found_request = internal_queue.pop(i)
352
            self.log.debug(f"found cached request to be processed: {found_request}")
353
            found_requests.append(found_request)
354
        return found_requests
355
356
    def query_ocrd_tool_json_from_server(self, processor_name):
357
        processor_server_url = self.deployer.resolve_processor_server_url(processor_name)
358
        if not processor_server_url:
359
            self.log.exception(f"Processor Server of '{processor_name}' is not available")
360
            raise HTTPException(
361
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
362
                detail=f"Processor Server of '{processor_name}' is not available"
363
            )
364
        # Request the tool json from the Processor Server
365
        response = requests.get(
366
            processor_server_url,
367
            headers={'Content-Type': 'application/json'}
368
        )
369
        if not response.status_code == 200:
370
            self.log.exception(f"Failed to retrieve '{processor_name}' from: {processor_server_url}")
371
            raise HTTPException(
372
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
373
                detail=f"Failed to retrieve '{processor_name}' from: {processor_server_url}"
374
            )
375
        ocrd_tool = response.json()
376
        return ocrd_tool, processor_server_url
377
378
    async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJobOutput:
379
        if data.job_id:
380
            raise HTTPException(
381
                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
382
                detail=f"Job id field is set but must not be: {data.job_id}"
383
            )
384
        # Generate processing job id
385
        data.job_id = generate_id()
386
387
        # Append the processor name to the request itself
388
        data.processor_name = processor_name
389
390
        if data.agent_type not in ['worker', 'server']:
391
            raise HTTPException(
392
                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
393
                detail=f"Unknown network agent with value: {data.agent_type}"
394
            )
395
        workspace_db = await db_get_workspace(
396
            workspace_id=data.workspace_id,
397
            workspace_mets_path=data.path_to_mets
398
        )
399
        if not workspace_db:
400
            raise HTTPException(
401
                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
402
                detail=f"Workspace with id: {data.workspace_id} or path: {data.path_to_mets} not found"
403
            )
404
405
        # Since the path is not resolved yet,
406
        # the return value is not important for the Processing Server
407
        await validate_and_return_mets_path(self.log, data)
408
409
        page_ids = expand_page_ids(data.page_id)
410
411
        # A flag whether the current request must be cached
412
        # This is set to true if for any output fileGrp there
413
        # is a page_id value that has been previously locked
414
        cache_current_request = False
415
416
        # Check if there are any dependencies of the current request
417
        if data.depends_on:
418
            if not await self.check_if_job_dependencies_met(data.depends_on):
419
                self.log.debug(f"Caching the received request due to job dependencies")
420
                cache_current_request = True
421
422
        locked_ws_pages = workspace_db.pages_locked
423
424
        # No need for further check of locked pages dependency
425
        # if the request should be already cached
426
        if not cache_current_request:
427
            # Check if there are any locked pages for the current request
428
            cache_current_request = self.check_if_locked_pages_for_output_file_grps(
429
                locked_ws_pages=locked_ws_pages,
430
                output_file_grps=data.output_file_grps,
431
                page_ids=page_ids
432
            )
433
434
        if cache_current_request:
435
            workspace_key = data.workspace_id if data.workspace_id else data.path_to_mets
436
            # If a record queue of this workspace_id does not exist in the requests cache
437
            if not self.processing_requests_cache.get(workspace_key, None):
438
                self.log.debug(f"Creating an internal queue for workspace_key: {workspace_key}")
439
                self.processing_requests_cache[workspace_key] = []
440
            self.log.debug(f"Caching the processing request: {data}")
441
            # Add the processing request to the end of the internal queue
442
            self.processing_requests_cache[workspace_key].append(data)
443
444
            return PYJobOutput(
445
                job_id=data.job_id,
446
                processor_name=processor_name,
447
                workspace_id=data.workspace_id,
448
                workspace_path=data.path_to_mets,
449
                state=StateEnum.cached
450
            )
451
        else:
452
            # Update locked pages by locking the pages in the request
453
            self.lock_pages(
454
                locked_ws_pages=locked_ws_pages,
455
                output_file_grps=data.output_file_grps,
456
                page_ids=page_ids
457
            )
458
459
            # Update the locked pages dictionary in the database
460
            await db_update_workspace(
461
                workspace_id=data.workspace_id,
462
                workspace_mets_path=data.path_to_mets,
463
                pages_locked=locked_ws_pages
464
            )
465
466
        # Create a DB entry
467
        job = DBProcessorJob(
468
            **data.dict(exclude_unset=True, exclude_none=True),
469
            internal_callback_url=self.internal_job_callback_url,
470
            state=StateEnum.queued
471
        )
472
        await job.insert()
473
474
        job_output = None
475
        if data.agent_type == 'worker':
476
            ocrd_tool = await self.get_processor_info(processor_name)
477
            validate_job_input(self.log, processor_name, ocrd_tool, data)
478
            processing_message = self.create_processing_message(job)
479
            await self.push_to_processing_queue(processor_name, processing_message)
480
            job_output = job.to_job_output()
481
        if data.agent_type == 'server':
482
            ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name)
483
            validate_job_input(self.log, processor_name, ocrd_tool, data)
484
            job_output = await self.push_to_processor_server(processor_name, processor_server_url, data)
485
        if not job_output:
486
            self.log.exception('Failed to create job output')
487
            raise HTTPException(
488
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
489
                detail='Failed to create job output'
490
            )
491
        return job_output
492
493
    # TODO: Revisit and remove duplications between push_to_* methods
494
    async def push_to_processing_queue(self, processor_name: str, processing_message: OcrdProcessingMessage):
495
        if not self.rmq_publisher:
496
            raise Exception('RMQPublisher is not connected')
497
        deployed_processors = self.deployer.find_matching_processors(
498
            worker_only=True,
499
            str_names_only=True,
500
            unique_only=True
501
        )
502
        if processor_name not in deployed_processors:
503
            self.check_if_queue_exists(processor_name)
504
505
        encoded_processing_message = OcrdProcessingMessage.encode_yml(processing_message)
506
        try:
507
            self.rmq_publisher.publish_to_queue(processor_name, encoded_processing_message)
508
        except Exception as error:
509
            self.log.exception(f'RMQPublisher has failed: {error}')
510
            raise HTTPException(
511
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
512
                detail=f'RMQPublisher has failed: {error}'
513
            )
514
515
    async def push_to_processor_server(
516
            self,
517
            processor_name: str,
518
            processor_server_url: str,
519
            job_input: PYJobInput
520
    ) -> PYJobOutput:
521
        try:
522
            json_data = json.dumps(job_input.dict(exclude_unset=True, exclude_none=True))
523
        except Exception as e:
524
            self.log.exception(f"Failed to json dump the PYJobInput, error: {e}")
525
            raise HTTPException(
526
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
527
                detail=f"Failed to json dump the PYJobInput, error: {e}"
528
            )
529
530
        # TODO: The amount of pages should come as a request input
531
        # TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161
532
        #  currently, use 200 as a default
533
        amount_of_pages = 200
534
        request_timeout = 20.0 * amount_of_pages  # 20 sec timeout per page
535
        # Post a processing job to the Processor Server asynchronously
536
        timeout = httpx.Timeout(timeout=request_timeout, connect=30.0)
537
        async with httpx.AsyncClient(timeout=timeout) as client:
538
            response = await client.post(
539
                processor_server_url,
540
                headers={'Content-Type': 'application/json'},
541
                json=json.loads(json_data)
542
            )
543
544
        if not response.status_code == 202:
545
            self.log.exception(f"Failed to post '{processor_name}' job to: {processor_server_url}")
546
            raise HTTPException(
547
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
548
                detail=f"Failed to post '{processor_name}' job to: {processor_server_url}"
549
            )
550
        job_output = response.json()
551
        return job_output
552
553
    async def get_processor_job(self, processor_name: str, job_id: str) -> PYJobOutput:
554
        return await _get_processor_job(self.log, processor_name, job_id)
555
556
    async def remove_from_request_cache(self, result_message: PYResultMessage):
557
        job_id = result_message.job_id
558
        state = result_message.state
559
        path_to_mets = result_message.path_to_mets
560
        workspace_id = result_message.workspace_id
561
562
        self.log.debug(f"Received result for job with id: {job_id} has state: {state}")
563
564
        if state == StateEnum.failed:
565
            # TODO: Call the callback to the Workflow server if the current processing step has failed
566
            pass
567
568
        if state != StateEnum.success:
569
            # TODO: Handle other potential error cases
570
            pass
571
572
        job_db = await db_get_processing_job(job_id)
573
        if not job_db:
574
            self.log.exception(f"Processing job with id: {job_id} not found in DB")
575
        job_output_file_grps = job_db.output_file_grps
576
        job_page_ids = expand_page_ids(job_db.page_id)
577
578
        # Read DB workspace entry
579
        workspace_db = await db_get_workspace(
580
            workspace_id=workspace_id,
581
            workspace_mets_path=path_to_mets
582
        )
583
        if not workspace_db:
584
            self.log.exception(f"Workspace with id: {workspace_id} or path: {path_to_mets} not found in DB")
585
586
        locked_ws_pages = workspace_db.pages_locked
587
        # Update locked pages by unlocking the pages in the request
588
        self.unlock_pages(
589
            locked_ws_pages=locked_ws_pages,
590
            output_file_grps=job_output_file_grps,
591
            page_ids=job_page_ids
592
        )
593
594
        # Update the locked pages dictionary in the database
595
        await db_update_workspace(
596
            workspace_id=workspace_id,
597
            workspace_mets_path=path_to_mets,
598
            pages_locked=locked_ws_pages
599
        )
600
601
        # Take the next request from the cache (if any available)
602
        workspace_key = workspace_id if workspace_id else path_to_mets
603
604
        if workspace_key not in self.processing_requests_cache:
605
            self.log.debug(f"No internal queue available for workspace with key: {workspace_key}")
606
            return
607
608
        if not len(self.processing_requests_cache[workspace_key]):
609
            # The queue is empty - delete it
610
            try:
611
                del self.processing_requests_cache[workspace_key]
612
            except KeyError:
613
                self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}")
614
            return
615
616
        consumed_requests = await self.find_next_requests_from_internal_queue(
617
            internal_queue=self.processing_requests_cache[workspace_key]
618
        )
619
620
        if not len(consumed_requests):
621
            self.log.debug("No data was consumed from the internal queue")
622
            return
623
624
        for data in consumed_requests:
625
            processor_name = data.processor_name
626
            # Create a DB entry
627
            job = DBProcessorJob(
628
                **data.dict(exclude_unset=True, exclude_none=True),
629
                internal_callback_url=self.internal_job_callback_url,
630
                state=StateEnum.queued
631
            )
632
            await job.insert()
633
634
            job_output = None
635
            if data.agent_type == 'worker':
636
                ocrd_tool = await self.get_processor_info(processor_name)
637
                validate_job_input(self.log, processor_name, ocrd_tool, data)
638
                processing_message = self.create_processing_message(job)
639
                await self.push_to_processing_queue(processor_name, processing_message)
640
                job_output = job.to_job_output()
641
            if data.agent_type == 'server':
642
                ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name)
643
                validate_job_input(self.log, processor_name, ocrd_tool, data)
644
                job_output = await self.push_to_processor_server(processor_name, processor_server_url, data)
645
            if not job_output:
646
                self.log.exception(f'Failed to create job output for job input data: {data}')
647
648
    async def get_processor_info(self, processor_name) -> Dict:
649
        """ Return a processor's ocrd-tool.json
650
        """
651
        ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None)
652
        if not ocrd_tool:
653
            raise HTTPException(
654
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
655
                detail=f"Ocrd tool JSON of '{processor_name}' not available!"
656
            )
657
658
        # TODO: Returns the ocrd tool json even of processors
659
        #  that are not deployed. This may or may not be desired.
660
        return ocrd_tool
661
662
    async def list_processors(self) -> List[str]:
663
        # There is no caching on the Processing Server side
664
        processor_names_list = self.deployer.find_matching_processors(
665
            docker_only=False,
666
            native_only=False,
667
            worker_only=False,
668
            server_only=False,
669
            str_names_only=True,
670
            unique_only=True
671
        )
672
        return processor_names_list
673