ProcessingServer.on_startup()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
from datetime import datetime
2
from os import getpid
3
from pathlib import Path
4
from typing import Dict, List, Optional, Union
5
from uvicorn import run as uvicorn_run
6
7
from fastapi import APIRouter, FastAPI, File, HTTPException, Request, status, UploadFile
8
from fastapi.exceptions import RequestValidationError
9
from fastapi.responses import FileResponse, JSONResponse, PlainTextResponse
10
11
from ocrd.task_sequence import ProcessorTask
12
from ocrd_utils import initLogging, getLogger
13
from .constants import JobState, ServerApiTags
14
from .database import (
15
    initiate_database,
16
    db_get_processing_job,
17
    db_get_processing_jobs,
18
    db_update_processing_job,
19
    db_update_workspace,
20
    db_get_workflow_script,
21
    db_find_first_workflow_script_by_content
22
)
23
from .runtime_data import Deployer
24
from .logging_utils import configure_file_handler_with_formatter, get_processing_server_logging_file_path
25
from .models import (
26
    DBProcessorJob,
27
    DBWorkflowJob,
28
    DBWorkflowScript,
29
    PYJobInput,
30
    PYJobOutput,
31
    PYResultMessage,
32
    PYWorkflowJobOutput
33
)
34
from .rabbitmq_utils import (
35
    check_if_queue_exists,
36
    connect_rabbitmq_publisher,
37
    get_message_queues,
38
    OcrdProcessingMessage
39
)
40
from .server_cache import CacheLockedPages, CacheProcessingRequests
41
from .server_utils import (
42
    create_processing_message,
43
    create_workspace_if_not_exists,
44
    _get_processor_job,
45
    _get_processor_job_log,
46
    get_page_ids_list,
47
    get_workflow_content,
48
    get_from_database_workspace,
49
    get_from_database_workflow_job,
50
    kill_mets_server_zombies,
51
    parse_workflow_tasks,
52
    raise_http_exception,
53
    validate_and_return_mets_path,
54
    validate_first_task_input_file_groups_existence,
55
    validate_job_input,
56
    validate_workflow
57
)
58
from .tcp_to_uds_mets_proxy import MetsServerProxy
59
from .utils import (
60
    load_ocrd_all_tool_json,
61
    expand_page_ids,
62
    generate_id,
63
    generate_workflow_content,
64
    generate_workflow_content_hash
65
)
66
67
68
class ProcessingServer(FastAPI):
69
    """FastAPI app to make ocr-d processor calls
70
71
    The Processing-Server receives calls conforming to the ocr-d webapi regarding the processing
72
    part. It can run ocrd-processors and provides endpoints to discover processors and watch the job
73
    status.
74
    The Processing-Server does not execute the processors itself but starts up a queue and a
75
    database to delegate the calls to processing workers. They are started by the Processing-Server
76
    and the communication goes through the queue.
77
    """
78
79
    def __init__(self, config_path: str, host: str, port: int) -> None:
80
        self.title = "OCR-D Processing Server"
81
        super().__init__(
82
            title=self.title,
83
            on_startup=[self.on_startup],
84
            on_shutdown=[self.on_shutdown],
85
            description="OCR-D Processing Server"
86
        )
87
        initLogging()
88
        self.log = getLogger("ocrd_network.processing_server")
89
        log_file = get_processing_server_logging_file_path(pid=getpid())
90
        configure_file_handler_with_formatter(self.log, log_file=log_file, mode="a")
91
92
        self.log.info("Loading ocrd-all-tool.json")
93
        self.ocrd_all_tool_json = load_ocrd_all_tool_json()
94
        self.hostname = host
95
        self.port = port
96
97
        # The deployer is used for:
98
        # - deploying agents when the Processing Server is started
99
        # - retrieving runtime data of agents
100
        self.deployer = Deployer(config_path)
101
        # Used for forwarding Mets Server TCP requests to UDS requests
102
        self.mets_server_proxy = MetsServerProxy()
103
        self.use_tcp_mets = self.deployer.use_tcp_mets
104
        # If set, all Mets Server UDS requests are multiplexed over TCP
105
        # Used by processing workers to report back the results
106
        if self.deployer.internal_callback_url:
107
            host = self.deployer.internal_callback_url
108
            self.internal_job_callback_url = f"{host.rstrip('/')}/result_callback"
109
            self.multiplexing_endpoint = f"{host.rstrip('/')}/tcp_mets"
110
        else:
111
            self.internal_job_callback_url = f"http://{host}:{port}/result_callback"
112
            self.multiplexing_endpoint = f"http://{host}:{port}/tcp_mets"
113
114
        self.mongodb_url = None
115
        self.rabbitmq_url = None
116
        self.rmq_data = {
117
            "host": self.deployer.data_queue.host,
118
            "port": self.deployer.data_queue.port,
119
            "vhost": "/",
120
            "username": self.deployer.data_queue.cred_username,
121
            "password": self.deployer.data_queue.cred_password
122
        }
123
124
        # Gets assigned when `connect_rabbitmq_publisher()` is called on the working object
125
        self.rmq_publisher = None
126
127
        # Used for keeping track of cached processing requests
128
        self.cache_processing_requests = CacheProcessingRequests()
129
130
        # Used for keeping track of locked/unlocked pages of a workspace
131
        self.cache_locked_pages = CacheLockedPages()
132
133
        self.add_api_routes_others()
134
        self.add_api_routes_processing()
135
        self.add_api_routes_workflow()
136
137
        @self.exception_handler(RequestValidationError)
138
        async def validation_exception_handler(request: Request, exc: RequestValidationError):
139
            exc_str = f'{exc}'.replace('\n', ' ').replace('   ', ' ')
140
            self.log.error(f'{request}: {exc_str}')
141
            content = {'status_code': 10422, 'message': exc_str, 'data': None}
142
            return JSONResponse(content=content, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
143
144
    def start(self) -> None:
145
        """ deploy agents (db, queue, workers) and start the processing server with uvicorn
146
        """
147
        try:
148
            self.rabbitmq_url = self.deployer.deploy_rabbitmq()
149
            self.mongodb_url = self.deployer.deploy_mongodb()
150
151
            # The RMQPublisher is initialized and a connection to the RabbitMQ is performed
152
            self.rmq_publisher = connect_rabbitmq_publisher(self.log, self.rmq_data, enable_acks=True)
153
154
            self.deployer.deploy_workers(mongodb_url=self.mongodb_url, rabbitmq_url=self.rabbitmq_url)
155
        except Exception as error:
156
            self.log.exception(f"Failed to start the Processing Server, error: {error}")
157
            self.log.warning("Trying to stop previously deployed services and workers.")
158
            self.deployer.stop_all()
159
            raise
160
        uvicorn_run(self, host=self.hostname, port=int(self.port))
161
162
    async def on_startup(self):
163
        self.log.info(f"Initializing the Database on: {self.mongodb_url}")
164
        await initiate_database(db_url=self.mongodb_url)
165
166
    async def on_shutdown(self) -> None:
167
        """
168
        - hosts and pids should be stored somewhere
169
        - ensure queue is empty or processor is not currently running
170
        - connect to hosts and kill pids
171
        """
172
        await self.stop_deployed_agents()
173
174
    def add_api_routes_others(self):
175
        others_router = APIRouter()
176
        others_router.add_api_route(
177
            path="/",
178
            endpoint=self.home_page,
179
            methods=["GET"],
180
            status_code=status.HTTP_200_OK,
181
            summary="Get information about the processing server"
182
        )
183
        others_router.add_api_route(
184
            path="/stop",
185
            endpoint=self.stop_deployed_agents,
186
            methods=["POST"],
187
            tags=[ServerApiTags.TOOLS],
188
            summary="Stop database, queue and processing-workers"
189
        )
190
        others_router.add_api_route(
191
            path="/tcp_mets",
192
            methods=["POST"],
193
            endpoint=self.forward_tcp_request_to_uds_mets_server,
194
            tags=[ServerApiTags.WORKSPACE],
195
            summary="Forward a TCP request to UDS mets server"
196
        )
197
        others_router.add_api_route(
198
            path="/kill_mets_server_zombies",
199
            endpoint=self.kill_mets_server_zombies,
200
            methods=["DELETE"],
201
            tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING],
202
            status_code=status.HTTP_200_OK,
203
            summary="!! Workaround Do Not Use Unless You Have A Reason "
204
            "!! Kill all METS servers on this machine that have been created more than 60 minutes ago."
205
        )
206
        self.include_router(others_router)
207
208
    def add_api_routes_processing(self):
209
        processing_router = APIRouter()
210
        processing_router.add_api_route(
211
            path="/processor",
212
            endpoint=self.list_processors,
213
            methods=["GET"],
214
            tags=[ServerApiTags.PROCESSING, ServerApiTags.DISCOVERY],
215
            status_code=status.HTTP_200_OK,
216
            summary="Get a list of all available processors"
217
        )
218
        processing_router.add_api_route(
219
            path="/processor/info/{processor_name}",
220
            endpoint=self.get_worker_ocrd_tool,
221
            methods=["GET"],
222
            tags=[ServerApiTags.PROCESSING, ServerApiTags.DISCOVERY],
223
            status_code=status.HTTP_200_OK,
224
            summary="Get information about this processor"
225
        )
226
        processing_router.add_api_route(
227
            path="/processor/run/{processor_name}",
228
            endpoint=self.validate_and_forward_job_to_worker,
229
            methods=["POST"],
230
            tags=[ServerApiTags.PROCESSING],
231
            status_code=status.HTTP_200_OK,
232
            summary="Submit a job to this processor",
233
            response_model=PYJobOutput,
234
            response_model_exclude_unset=True,
235
            response_model_exclude_none=True
236
        )
237
        processing_router.add_api_route(
238
            path="/processor/job/{job_id}",
239
            endpoint=self.get_processor_job,
240
            methods=["GET"],
241
            tags=[ServerApiTags.PROCESSING],
242
            status_code=status.HTTP_200_OK,
243
            summary="Get information about a job based on its ID",
244
            response_model=PYJobOutput,
245
            response_model_exclude_unset=True,
246
            response_model_exclude_none=True
247
        )
248
        processing_router.add_api_route(
249
            path="/processor/log/{job_id}",
250
            endpoint=self.get_processor_job_log,
251
            methods=["GET"],
252
            tags=[ServerApiTags.PROCESSING],
253
            status_code=status.HTTP_200_OK,
254
            summary="Get the log file of a job id"
255
        )
256
        processing_router.add_api_route(
257
            path="/result_callback",
258
            endpoint=self.remove_job_from_request_cache,
259
            methods=["POST"],
260
            tags=[ServerApiTags.PROCESSING],
261
            status_code=status.HTTP_200_OK,
262
            summary="Callback used by a worker for reporting result of a processing request"
263
        )
264
        self.include_router(processing_router)
265
266
    def add_api_routes_workflow(self):
267
        workflow_router = APIRouter()
268
        workflow_router.add_api_route(
269
            path="/workflow",
270
            endpoint=self.upload_workflow,
271
            methods=["POST"],
272
            tags=[ServerApiTags.WORKFLOW],
273
            status_code=status.HTTP_201_CREATED,
274
            summary="Upload/Register a new workflow script"
275
        )
276
        workflow_router.add_api_route(
277
            path="/workflow/{workflow_id}",
278
            endpoint=self.download_workflow,
279
            methods=["GET"],
280
            tags=[ServerApiTags.WORKFLOW],
281
            status_code=status.HTTP_200_OK,
282
            summary="Download a workflow script"
283
        )
284
        workflow_router.add_api_route(
285
            path="/workflow/{workflow_id}",
286
            endpoint=self.replace_workflow,
287
            methods=["PUT"],
288
            tags=[ServerApiTags.WORKFLOW],
289
            status_code=status.HTTP_200_OK,
290
            summary="Update/Replace a workflow script"
291
        )
292
        workflow_router.add_api_route(
293
            path="/workflow/run",
294
            endpoint=self.run_workflow,
295
            methods=["POST"],
296
            tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING],
297
            status_code=status.HTTP_200_OK,
298
            summary="Run a workflow",
299
            response_model=PYWorkflowJobOutput,
300
            response_model_exclude_defaults=True,
301
            response_model_exclude_unset=True,
302
            response_model_exclude_none=True
303
        )
304
        workflow_router.add_api_route(
305
            path="/workflow/job-simple/{workflow_job_id}",
306
            endpoint=self.get_workflow_info_simple,
307
            methods=["GET"],
308
            tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING],
309
            status_code=status.HTTP_200_OK,
310
            summary="Get simplified overall job status"
311
        )
312
        workflow_router.add_api_route(
313
            path="/workflow/job/{workflow_job_id}",
314
            endpoint=self.get_workflow_info,
315
            methods=["GET"],
316
            tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING],
317
            status_code=status.HTTP_200_OK,
318
            summary="Get information about a workflow run"
319
        )
320
        self.include_router(workflow_router)
321
322
    async def forward_tcp_request_to_uds_mets_server(self, request: Request) -> Dict:
323
        """Forward mets-server-request
324
325
        A processor calls a mets related method like add_file with ClientSideOcrdMets. This sends
326
        a request to this endpoint. This request contains all information necessary to make a call
327
        to the uds-mets-server. This information is used by `MetsServerProxy` to make a the call
328
        to the local (local for the processing-server) reachable the uds-mets-server.
329
        """
330
        request_body = await request.json()
331
        ws_dir_path = request_body["workspace_path"]
332
        self.deployer.start_uds_mets_server(ws_dir_path=ws_dir_path)
333
        return self.mets_server_proxy.forward_tcp_request(request_body=request_body)
334
335
    async def home_page(self):
336
        message = f"The home page of the {self.title}"
337
        json_message = {
338
            "message": message,
339
            "time": datetime.now().strftime("%Y-%m-%d %H:%M")
340
        }
341
        return json_message
342
343
    async def stop_deployed_agents(self) -> None:
344
        self.deployer.stop_all()
345
346
    async def get_worker_ocrd_tool(self, processor_name: str) -> Dict:
347
        ocrd_tool = {}
348
        ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None)
349
        if not ocrd_tool:
350
            raise_http_exception(self.log, status.HTTP_404_NOT_FOUND,
351
                                 f"Processing Worker '{processor_name}' not found.")
352
        return ocrd_tool
353
354
    def exists_worker(self, processor_name: str) -> bool:
355
        # TODO: Reconsider and refactor this.
356
        #  Added ocrd-dummy by default if not available for the integration tests.
357
        #  A proper Processing Worker registration endpoint is needed on the Processing Server side
358
        if processor_name == 'ocrd-dummy':
359
            return True
360
        return bool(check_if_queue_exists(self.log, self.rmq_data, processor_name=processor_name))
361
362
    def validate_worker_existence(self, processor_name: str) -> None:
363
        worker_exists = self.exists_worker(processor_name=processor_name)
364
        if not worker_exists:
365
            message = f"Processing Worker '{processor_name}' not found."
366
            raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
367
368
    async def validate_and_forward_job_to_worker(self, processor_name: str, data: PYJobInput) -> PYJobOutput:
369
        # Append the processor name to the request itself
370
        data.processor_name = processor_name
371
        self.validate_worker_existence(processor_name=data.processor_name)
372
        if data.job_id:
373
            message = f"Processing request job id field is set but must not be: {data.job_id}"
374
            raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
375
        # Generate processing job id
376
        data.job_id = generate_id()
377
        ocrd_tool = await self.get_worker_ocrd_tool(processor_name=data.processor_name)
378
        validate_job_input(self.log, data.processor_name, ocrd_tool, data)
379
380
        if data.workspace_id:
381
            # just a check whether the workspace exists in the database or not
382
            await get_from_database_workspace(self.log, data.workspace_id)
383
        else:  # data.path_to_mets provided instead
384
            await create_workspace_if_not_exists(self.log, mets_path=data.path_to_mets)
385
386
        workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id
387
        # initialize the request counter for the workspace_key
388
        self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=0)
389
390
        # This check is done to return early in case a workspace_id is provided
391
        # but the abs mets path cannot be queried from the DB
392
        request_mets_path = await validate_and_return_mets_path(self.log, data)
393
394
        page_ids = expand_page_ids(data.page_id)
395
396
        # A flag whether the current request must be cached
397
        # This is set to true if for any output file group there
398
        # is a page_id value that has been previously locked
399
        cache_current_request = False
400
401
        # Check if there are any dependencies of the current request
402
        if data.depends_on:
403
            cache_current_request = await self.cache_processing_requests.is_caching_required(data.depends_on)
404
405
        # No need for further check of locked pages dependency
406
        # if the request should be already cached
407
        if not cache_current_request:
408
            # Check if there are any locked pages for the current request
409
            cache_current_request = self.cache_locked_pages.check_if_locked_pages_for_output_file_grps(
410
                workspace_key=workspace_key,
411
                output_file_grps=data.output_file_grps,
412
                page_ids=page_ids
413
            )
414
415
        if cache_current_request:
416
            # Cache the received request
417
            self.cache_processing_requests.cache_request(workspace_key, data)
418
419
            # Create a cached job DB entry
420
            db_cached_job = DBProcessorJob(
421
                **data.dict(exclude_unset=True, exclude_none=True),
422
                internal_callback_url=self.internal_job_callback_url,
423
                state=JobState.cached
424
            )
425
            await db_cached_job.insert()
426
            return db_cached_job.to_job_output()
427
428
        # Lock the pages in the request
429
        self.cache_locked_pages.lock_pages(
430
            workspace_key=workspace_key,
431
            output_file_grps=data.output_file_grps,
432
            page_ids=page_ids
433
        )
434
435
        # Start a UDS Mets Server with the current workspace
436
        ws_dir_path = str(Path(request_mets_path).parent)
437
        mets_server_url = self.deployer.start_uds_mets_server(ws_dir_path=ws_dir_path)
438
        if self.use_tcp_mets:
439
            # let workers talk to mets server via tcp instead of using unix-socket
440
            mets_server_url = self.multiplexing_endpoint
441
442
        # Assign the mets server url in the database (workers read mets_server_url from db)
443
        await db_update_workspace(
444
            workspace_id=data.workspace_id,
445
            workspace_mets_path=data.path_to_mets,
446
            mets_server_url=mets_server_url
447
        )
448
449
        # Create a queued job DB entry
450
        db_queued_job = DBProcessorJob(
451
            **data.dict(exclude_unset=True, exclude_none=True),
452
            internal_callback_url=self.internal_job_callback_url,
453
            state=JobState.queued
454
        )
455
        await db_queued_job.insert()
456
        self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1)
457
        job_output = await self.push_job_to_worker(data=data, db_job=db_queued_job)
458
        return job_output
459
460
    async def push_job_to_worker(self, data: PYJobInput, db_job: DBProcessorJob) -> PYJobOutput:
461
        job_output = None
462
        self.log.debug(f"Pushing to Processing Worker: {data.processor_name}, {data.page_id}, {data.job_id}")
463
        job_output = await self.push_job_to_processing_queue(db_job=db_job)
464
        if not job_output:
465
            message = f"Failed to create job output for job input: {data}"
466
            raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message)
467
        return job_output
468
469
    async def push_job_to_processing_queue(self, db_job: DBProcessorJob) -> PYJobOutput:
470
        if not self.rmq_publisher:
471
            message = "The Processing Server has no connection to RabbitMQ Server. RMQPublisher is not connected."
472
            raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message)
473
        processing_message = create_processing_message(self.log, db_job)
474
        try:
475
            encoded_message = OcrdProcessingMessage.encode_yml(processing_message)
476
            self.rmq_publisher.publish_to_queue(queue_name=db_job.processor_name, message=encoded_message)
477
        except Exception as error:
478
            message = (
479
                f"Processing server has failed to push processing message to queue: {db_job.processor_name}, "
480
                f"Processing message: {processing_message.__dict__}"
481
            )
482
            raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error)
483
        return db_job.to_job_output()
484
485
    async def get_processor_job(self, job_id: str) -> PYJobOutput:
486
        return await _get_processor_job(self.log, job_id)
487
488
    async def get_processor_job_log(self, job_id: str) -> FileResponse:
489
        return await _get_processor_job_log(self.log, job_id)
490
491
    async def _lock_pages_of_workspace(
492
        self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]
493
    ) -> None:
494
        # Lock the output file group pages for the current request
495
        self.cache_locked_pages.lock_pages(
496
            workspace_key=workspace_key,
497
            output_file_grps=output_file_grps,
498
            page_ids=page_ids
499
        )
500
501
    async def _unlock_pages_of_workspace(
502
        self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]
503
    ) -> None:
504
        self.cache_locked_pages.unlock_pages(
505
            workspace_key=workspace_key,
506
            output_file_grps=output_file_grps,
507
            page_ids=page_ids
508
        )
509
510
    async def push_cached_jobs_to_workers(self, processing_jobs: List[PYJobInput]) -> None:
511
        if not len(processing_jobs):
512
            self.log.debug("No processing jobs were consumed from the requests cache")
513
            return
514
        for data in processing_jobs:
515
            self.log.info(f"Changing the job status of: {data.job_id} from {JobState.cached} to {JobState.queued}")
516
            db_consumed_job = await db_update_processing_job(job_id=data.job_id, state=JobState.queued)
517
            workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id
518
519
            # Lock the output file group pages for the current request
520
            await self._lock_pages_of_workspace(
521
                workspace_key=workspace_key,
522
                output_file_grps=data.output_file_grps,
523
                page_ids=expand_page_ids(data.page_id)
524
            )
525
526
            self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1)
527
            job_output = await self.push_job_to_worker(data=data, db_job=db_consumed_job)
528
            if not job_output:
529
                self.log.exception(f"Failed to create job output for job input data: {data}")
530
531
    async def _cancel_cached_dependent_jobs(self, workspace_key: str, job_id: str) -> None:
532
        await self.cache_processing_requests.cancel_dependent_jobs(
533
            workspace_key=workspace_key,
534
            processing_job_id=job_id
535
        )
536
537
    async def _consume_cached_jobs_of_workspace(
538
        self, workspace_key: str, mets_server_url: str, path_to_mets: str
539
    ) -> List[PYJobInput]:
540
        # decrease the internal cache counter by 1
541
        request_counter = self.cache_processing_requests.update_request_counter(
542
            workspace_key=workspace_key, by_value=-1
543
        )
544
        self.log.debug(f"Internal processing job cache counter value: {request_counter}")
545
        if (workspace_key not in self.cache_processing_requests.processing_requests or
546
            not len(self.cache_processing_requests.processing_requests[workspace_key])):
547
            if request_counter <= 0:
548
                # Shut down the Mets Server for the workspace_key since no
549
                # more internal callbacks are expected for that workspace
550
                self.log.debug(f"Stopping the mets server: {mets_server_url}")
551
                self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url, path_to_mets=path_to_mets)
552
553
                try:
554
                    # The queue is empty - delete it
555
                    del self.cache_processing_requests.processing_requests[workspace_key]
556
                except KeyError:
557
                    self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}")
558
559
                # For debugging purposes it is good to see if any locked pages are left
560
                self.log.debug(f"Contents of the locked pages cache for: {workspace_key}")
561
                locked_pages = self.cache_locked_pages.get_locked_pages(workspace_key=workspace_key)
562
                for output_file_grp in locked_pages:
563
                    self.log.debug(f"{output_file_grp}: {locked_pages[output_file_grp]}")
564
            else:
565
                self.log.debug(f"Internal request cache is empty but waiting for {request_counter} result callbacks.")
566
            return []
567
        # Check whether the internal queue for the workspace key still exists
568
        if workspace_key not in self.cache_processing_requests.processing_requests:
569
            self.log.debug(f"No internal queue available for workspace with key: {workspace_key}")
570
            return []
571
        consumed_requests = await self.cache_processing_requests.consume_cached_requests(workspace_key=workspace_key)
572
        return consumed_requests
573
574
    async def remove_job_from_request_cache(self, result_message: PYResultMessage):
575
        result_job_id = result_message.job_id
576
        result_job_state = result_message.state
577
        path_to_mets = result_message.path_to_mets
578
        workspace_id = result_message.workspace_id
579
        self.log.info(f"Result job_id: {result_job_id}, state: {result_job_state}")
580
581
        db_workspace = await get_from_database_workspace(self.log, workspace_id, path_to_mets)
582
        mets_server_url = db_workspace.mets_server_url
583
        workspace_key = path_to_mets if path_to_mets else workspace_id
584
585
        if result_job_state == JobState.failed:
586
            await self._cancel_cached_dependent_jobs(workspace_key, result_job_id)
587
588
        if result_job_state != JobState.success:
589
            # TODO: Handle other potential error cases
590
            pass
591
592
        try:
593
            db_result_job = await db_get_processing_job(result_job_id)
594
            # Unlock the output file group pages for the result processing request
595
            await self._unlock_pages_of_workspace(
596
                workspace_key=workspace_key,
597
                output_file_grps=db_result_job.output_file_grps,
598
                page_ids=expand_page_ids(db_result_job.page_id)
599
            )
600
        except ValueError as error:
601
            message = f"Processing result job with id '{result_job_id}' not found in the DB."
602
            raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error)
603
604
        consumed_cached_jobs = await self._consume_cached_jobs_of_workspace(
605
            workspace_key=workspace_key, mets_server_url=mets_server_url, path_to_mets=path_to_mets
606
        )
607
        await self.push_cached_jobs_to_workers(processing_jobs=consumed_cached_jobs)
608
609
    async def list_processors(self) -> List[str]:
610
        return get_message_queues(self.log, self.rmq_data)
611
612
    async def task_sequence_to_processing_jobs(
613
        self,
614
        tasks: List[ProcessorTask],
615
        mets_path: str,
616
        page_id: str,
617
    ) -> List[PYJobOutput]:
618
        temp_file_group_cache = {}
619
        responses = []
620
        for task in tasks:
621
            # Find dependent jobs of the current task
622
            dependent_jobs = []
623
            for input_file_grp in task.input_file_grps:
624
                if input_file_grp in temp_file_group_cache:
625
                    dependent_jobs.append(temp_file_group_cache[input_file_grp])
626
            # NOTE: The `task.mets_path` and `task.page_id` is not utilized in low level
627
            # Thus, setting these two flags in the ocrd process workflow file has no effect
628
            job_input_data = PYJobInput(
629
                processor_name=task.executable,
630
                path_to_mets=mets_path,
631
                input_file_grps=task.input_file_grps,
632
                output_file_grps=task.output_file_grps,
633
                page_id=page_id,
634
                parameters=task.parameters,
635
                depends_on=dependent_jobs,
636
            )
637
            response = await self.validate_and_forward_job_to_worker(
638
                processor_name=job_input_data.processor_name,
639
                data=job_input_data
640
            )
641
            for file_group in task.output_file_grps:
642
                temp_file_group_cache[file_group] = response.job_id
643
            responses.append(response)
644
        return responses
645
646
    def validate_tasks_worker_existence(self, tasks: List[ProcessorTask]) -> None:
647
        missing_workers = []
648
        for task in tasks:
649
            try:
650
                self.validate_worker_existence(processor_name=task.executable)
651
            except HTTPException:
652
                # catching the error is not relevant here
653
                missing_workers.append({task.executable})
654
        if missing_workers:
655
            message = (
656
                "Workflow validation has failed. The desired Processing Worker was not found. "
657
                f"Missing Processing Workers: {missing_workers}"
658
            )
659
            raise_http_exception(self.log, status.HTTP_406_NOT_ACCEPTABLE, message)
660
661
    async def run_workflow(
662
        self,
663
        mets_path: str,
664
        workflow: Union[UploadFile, str, None] = File(None),
665
        workflow_id: str = None,
666
        page_id: str = None,
667
        page_wise: bool = False,
668
        workflow_callback_url: str = None
669
    ) -> PYWorkflowJobOutput:
670
        await create_workspace_if_not_exists(self.log, mets_path=mets_path)
671
        workflow_content = await get_workflow_content(self.log, workflow_id, workflow)
672
        processing_tasks = parse_workflow_tasks(self.log, workflow_content)
673
674
        # Validate the input file groups of the first task in the workflow
675
        validate_first_task_input_file_groups_existence(self.log, mets_path, processing_tasks[0].input_file_grps)
676
677
        # Validate existence of Processing Workers
678
        # for the ocr-d processors referenced inside tasks
679
        self.validate_tasks_worker_existence(processing_tasks)
680
681
        # for page_wise mode, we need to expand the list of pages
682
        # for the database, it's better to keep a short string
683
        page_id = page_id or ''
684
        page_ids = get_page_ids_list(self.log, mets_path, page_id)
685
686
        if not page_wise:
687
            responses = await self.task_sequence_to_processing_jobs(
688
                tasks=processing_tasks,
689
                mets_path=mets_path,
690
                page_id=page_id,
691
            )
692
            processing_job_ids = [response.job_id for response in responses]
693
            db_workflow_job = DBWorkflowJob(
694
                job_id=generate_id(),
695
                page_id=page_id,
696
                page_wise=page_wise,
697
                processing_job_ids={page_id: processing_job_ids},
698
                path_to_mets=mets_path,
699
                workflow_callback_url=workflow_callback_url
700
            )
701
            await db_workflow_job.insert()
702
            return db_workflow_job.to_job_output()
703
704
        all_pages_job_ids = {}
705
        for current_page in page_ids:
706
            responses = await self.task_sequence_to_processing_jobs(
707
                tasks=processing_tasks,
708
                mets_path=mets_path,
709
                page_id=current_page,
710
            )
711
            processing_job_ids = [response.job_id for response in responses]
712
            all_pages_job_ids[current_page] = processing_job_ids
713
        db_workflow_job = DBWorkflowJob(
714
            job_id=generate_id(),
715
            page_id=page_id,
716
            page_wise=page_wise,
717
            processing_job_ids=all_pages_job_ids,
718
            path_to_mets=mets_path,
719
            workflow_callback_url=workflow_callback_url
720
        )
721
        await db_workflow_job.insert()
722
        return db_workflow_job.to_job_output()
723
724
    @staticmethod
725
    def _produce_workflow_status_response(processing_jobs: List[DBProcessorJob]) -> Dict:
726
        response = {}
727
        failed_tasks = {}
728
        failed_tasks_key = "failed-processor-tasks"
729
        for p_job in processing_jobs:
730
            response.setdefault(p_job.processor_name, {})
731
            response[p_job.processor_name].setdefault(p_job.state.value, 0)
732
            response[p_job.processor_name][p_job.state.value] += 1
733
            if p_job.state == JobState.failed:
734
                if failed_tasks_key not in response:
735
                    response[failed_tasks_key] = failed_tasks
736
                failed_tasks.setdefault(p_job.processor_name, [])
737
                failed_tasks[p_job.processor_name].append(
738
                    {"job_id": p_job.job_id, "page_id": p_job.page_id}
739
                )
740
        return response
741
742
    @staticmethod
743
    def _produce_workflow_status_simple_response(processing_jobs: List[DBProcessorJob]) -> JobState:
744
        workflow_job_state = JobState.unset
745
        success_jobs = 0
746
        for p_job in processing_jobs:
747
            if p_job.state == JobState.cached or p_job.state == JobState.queued:
748
                continue
749
            if p_job.state == JobState.failed or p_job.state == JobState.cancelled:
750
                workflow_job_state = JobState.failed
751
                break
752
            if p_job.state == JobState.running:
753
                workflow_job_state = JobState.running
754
            if p_job.state == JobState.success:
755
                success_jobs += 1
756
        if len(processing_jobs) == success_jobs:
757
            workflow_job_state = JobState.success
758
        return workflow_job_state
759
760
    async def get_workflow_info(self, workflow_job_id) -> Dict:
761
        """ Return list of a workflow's processor jobs
762
        """
763
        workflow_job = await get_from_database_workflow_job(self.log, workflow_job_id)
764
        job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst]
765
        jobs = await db_get_processing_jobs(job_ids)
766
        response = self._produce_workflow_status_response(processing_jobs=jobs)
767
        return response
768
769
    async def kill_mets_server_zombies(self, minutes_ago: Optional[int] = None, dry_run: Optional[bool] = None) -> List[int]:
770
        pids_killed = kill_mets_server_zombies(minutes_ago=minutes_ago, dry_run=dry_run)
771
        return pids_killed
772
773
    async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, JobState]:
774
        """
775
        Simplified version of the `get_workflow_info` that returns a single state for the entire workflow.
776
        - If a single processing job fails, the entire workflow job status is set to FAILED.
777
        - If there are any processing jobs running, regardless of other states, such as QUEUED and CACHED,
778
        the entire workflow job status is set to RUNNING.
779
        - If all processing jobs has finished successfully, only then the workflow job status is set to SUCCESS
780
        """
781
        workflow_job = await get_from_database_workflow_job(self.log, workflow_job_id)
782
        job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst]
783
        jobs = await db_get_processing_jobs(job_ids)
784
        workflow_job_state = self._produce_workflow_status_simple_response(processing_jobs=jobs)
785
        return {"state": workflow_job_state}
786
787
    async def upload_workflow(self, workflow: Union[UploadFile, str]) -> Dict[str, str]:
788
        """ Store a script for a workflow in the database
789
        """
790
        if isinstance(workflow, str):
791
            with open(workflow) as wf_file:
792
                workflow_content = wf_file.read()
793
        else:
794
            workflow_content = await generate_workflow_content(workflow)
795
        validate_workflow(self.log, workflow_content)
796
        content_hash = generate_workflow_content_hash(workflow_content)
797
        try:
798
            db_workflow_script = await db_find_first_workflow_script_by_content(content_hash)
799
            if db_workflow_script:
800
                message = f"The same workflow script already exists, workflow id: {db_workflow_script.workflow_id}"
801
                raise_http_exception(self.log, status.HTTP_409_CONFLICT, message)
802
        except ValueError:
803
            pass
804
        workflow_id = generate_id()
805
        db_workflow_script = DBWorkflowScript(
806
            workflow_id=workflow_id,
807
            content=workflow_content,
808
            content_hash=content_hash
809
        )
810
        await db_workflow_script.insert()
811
        return {"workflow_id": workflow_id}
812
813
    async def replace_workflow(self, workflow_id, workflow: Union[UploadFile, str]) -> Dict[str, str]:
814
        """ Update a workflow script file in the database
815
        """
816
        try:
817
            db_workflow_script = await db_get_workflow_script(workflow_id)
818
            if isinstance(workflow, str):
819
                with open(workflow) as wf_file:
820
                    workflow_content = wf_file.read()
821
            else:
822
                workflow_content = await generate_workflow_content(workflow)
823
            validate_workflow(self.log, workflow_content)
824
            db_workflow_script.content = workflow_content
825
            content_hash = generate_workflow_content_hash(workflow_content)
826
            db_workflow_script.content_hash = content_hash
827
            await db_workflow_script.save()
828
            return {"workflow_id": db_workflow_script.workflow_id}
829
        except ValueError as error:
830
            message = f"Workflow script not existing for id '{workflow_id}'."
831
            raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error)
832
833
    async def download_workflow(self, workflow_id) -> PlainTextResponse:
834
        """ Load workflow-script from the database
835
        """
836
        try:
837
            workflow = await db_get_workflow_script(workflow_id)
838
            return PlainTextResponse(workflow.content)
839
        except ValueError as error:
840
            message = f"Workflow script not existing for id '{workflow_id}'."
841
            raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error)
842