ProcessingServer.home_page()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
from datetime import datetime
2
from os import getpid
3
from pathlib import Path
4
from typing import Dict, List, Optional, Union
5
from uvicorn import run as uvicorn_run
6
7
from fastapi import APIRouter, FastAPI, File, HTTPException, Request, status, UploadFile
8
from fastapi.exceptions import RequestValidationError
9
from fastapi.responses import FileResponse, JSONResponse, PlainTextResponse
10
11
from ocrd.task_sequence import ProcessorTask
12
from ocrd_utils import initLogging, getLogger
13
from .constants import AgentType, JobState, ServerApiTags
14
from .database import (
15
    initiate_database,
16
    db_get_processing_job,
17
    db_get_processing_jobs,
18
    db_update_processing_job,
19
    db_update_workspace,
20
    db_get_workflow_script,
21
    db_find_first_workflow_script_by_content
22
)
23
from .runtime_data import Deployer
24
from .logging_utils import configure_file_handler_with_formatter, get_processing_server_logging_file_path
25
from .models import (
26
    DBProcessorJob,
27
    DBWorkflowJob,
28
    DBWorkflowScript,
29
    PYJobInput,
30
    PYJobOutput,
31
    PYResultMessage,
32
    PYWorkflowJobOutput
33
)
34
from .rabbitmq_utils import (
35
    check_if_queue_exists,
36
    connect_rabbitmq_publisher,
37
    create_message_queues,
38
    OcrdProcessingMessage
39
)
40
from .server_cache import CacheLockedPages, CacheProcessingRequests
41
from .server_utils import (
42
    create_processing_message,
43
    create_workspace_if_not_exists,
44
    forward_job_to_processor_server,
45
    _get_processor_job,
46
    _get_processor_job_log,
47
    get_page_ids_list,
48
    get_workflow_content,
49
    get_from_database_workspace,
50
    get_from_database_workflow_job,
51
    kill_mets_server_zombies,
52
    parse_workflow_tasks,
53
    raise_http_exception,
54
    request_processor_server_tool_json,
55
    validate_and_return_mets_path,
56
    validate_first_task_input_file_groups_existence,
57
    validate_job_input,
58
    validate_workflow
59
)
60
from .tcp_to_uds_mets_proxy import MetsServerProxy
61
from .utils import (
62
    load_ocrd_all_tool_json,
63
    expand_page_ids,
64
    generate_id,
65
    generate_workflow_content,
66
    generate_workflow_content_hash
67
)
68
69
70
class ProcessingServer(FastAPI):
71
    """FastAPI app to make ocr-d processor calls
72
73
    The Processing-Server receives calls conforming to the ocr-d webapi regarding the processing
74
    part. It can run ocrd-processors and provides endpoints to discover processors and watch the job
75
    status.
76
    The Processing-Server does not execute the processors itself but starts up a queue and a
77
    database to delegate the calls to processing workers. They are started by the Processing-Server
78
    and the communication goes through the queue.
79
    """
80
81
    def __init__(self, config_path: str, host: str, port: int) -> None:
82
        self.title = "OCR-D Processing Server"
83
        super().__init__(
84
            title=self.title,
85
            on_startup=[self.on_startup],
86
            on_shutdown=[self.on_shutdown],
87
            description="OCR-D Processing Server"
88
        )
89
        initLogging()
90
        self.log = getLogger("ocrd_network.processing_server")
91
        log_file = get_processing_server_logging_file_path(pid=getpid())
92
        configure_file_handler_with_formatter(self.log, log_file=log_file, mode="a")
93
94
        self.log.info("Loading ocrd-all-tool.json")
95
        self.ocrd_all_tool_json = load_ocrd_all_tool_json()
96
        self.hostname = host
97
        self.port = port
98
99
        # The deployer is used for:
100
        # - deploying agents when the Processing Server is started
101
        # - retrieving runtime data of agents
102
        self.deployer = Deployer(config_path)
103
        # Used for forwarding Mets Server TCP requests to UDS requests
104
        self.mets_server_proxy = MetsServerProxy()
105
        self.use_tcp_mets = self.deployer.use_tcp_mets
106
        # If set, all Mets Server UDS requests are multiplexed over TCP
107
        # Used by processing workers and/or processor servers to report back the results
108
        if self.deployer.internal_callback_url:
109
            host = self.deployer.internal_callback_url
110
            self.internal_job_callback_url = f"{host.rstrip('/')}/result_callback"
111
            self.multiplexing_endpoint = f"{host.rstrip('/')}/tcp_mets"
112
        else:
113
            self.internal_job_callback_url = f"http://{host}:{port}/result_callback"
114
            self.multiplexing_endpoint = f"http://{host}:{port}/tcp_mets"
115
116
        self.mongodb_url = None
117
        self.rabbitmq_url = None
118
        self.rmq_data = {
119
            "host": self.deployer.data_queue.host,
120
            "port": self.deployer.data_queue.port,
121
            "vhost": "/",
122
            "username": self.deployer.data_queue.cred_username,
123
            "password": self.deployer.data_queue.cred_password
124
        }
125
126
        # Gets assigned when `connect_rabbitmq_publisher()` is called on the working object
127
        self.rmq_publisher = None
128
129
        # Used for keeping track of cached processing requests
130
        self.cache_processing_requests = CacheProcessingRequests()
131
132
        # Used for keeping track of locked/unlocked pages of a workspace
133
        self.cache_locked_pages = CacheLockedPages()
134
135
        self.add_api_routes_others()
136
        self.add_api_routes_processing()
137
        self.add_api_routes_workflow()
138
139
        @self.exception_handler(RequestValidationError)
140
        async def validation_exception_handler(request: Request, exc: RequestValidationError):
141
            exc_str = f'{exc}'.replace('\n', ' ').replace('   ', ' ')
142
            self.log.error(f'{request}: {exc_str}')
143
            content = {'status_code': 10422, 'message': exc_str, 'data': None}
144
            return JSONResponse(content=content, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
145
146
    def start(self) -> None:
147
        """ deploy agents (db, queue, workers) and start the processing server with uvicorn
148
        """
149
        try:
150
            self.rabbitmq_url = self.deployer.deploy_rabbitmq()
151
            self.mongodb_url = self.deployer.deploy_mongodb()
152
153
            # The RMQPublisher is initialized and a connection to the RabbitMQ is performed
154
            self.rmq_publisher = connect_rabbitmq_publisher(self.log, self.rmq_data, enable_acks=True)
155
156
            queue_names = self.deployer.find_matching_network_agents(
157
                worker_only=True, str_names_only=True, unique_only=True
158
            )
159
            self.log.info(f"Creating message queues on RabbitMQ instance url: {self.rabbitmq_url}")
160
            create_message_queues(logger=self.log, rmq_publisher=self.rmq_publisher, queue_names=queue_names)
161
162
            self.deployer.deploy_network_agents(mongodb_url=self.mongodb_url, rabbitmq_url=self.rabbitmq_url)
163
        except Exception as error:
164
            self.log.exception(f"Failed to start the Processing Server, error: {error}")
165
            self.log.warning("Trying to stop previously deployed services and network agents.")
166
            self.deployer.stop_all()
167
            raise
168
        uvicorn_run(self, host=self.hostname, port=int(self.port))
169
170
    async def on_startup(self):
171
        self.log.info(f"Initializing the Database on: {self.mongodb_url}")
172
        await initiate_database(db_url=self.mongodb_url)
173
174
    async def on_shutdown(self) -> None:
175
        """
176
        - hosts and pids should be stored somewhere
177
        - ensure queue is empty or processor is not currently running
178
        - connect to hosts and kill pids
179
        """
180
        await self.stop_deployed_agents()
181
182
    def add_api_routes_others(self):
183
        others_router = APIRouter()
184
        others_router.add_api_route(
185
            path="/",
186
            endpoint=self.home_page,
187
            methods=["GET"],
188
            status_code=status.HTTP_200_OK,
189
            summary="Get information about the processing server"
190
        )
191
        others_router.add_api_route(
192
            path="/stop",
193
            endpoint=self.stop_deployed_agents,
194
            methods=["POST"],
195
            tags=[ServerApiTags.TOOLS],
196
            summary="Stop database, queue and processing-workers"
197
        )
198
        others_router.add_api_route(
199
            path="/tcp_mets",
200
            methods=["POST"],
201
            endpoint=self.forward_tcp_request_to_uds_mets_server,
202
            tags=[ServerApiTags.WORKSPACE],
203
            summary="Forward a TCP request to UDS mets server"
204
        )
205
        others_router.add_api_route(
206
            path="/kill_mets_server_zombies",
207
            endpoint=self.kill_mets_server_zombies,
208
            methods=["DELETE"],
209
            tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING],
210
            status_code=status.HTTP_200_OK,
211
            summary="!! Workaround Do Not Use Unless You Have A Reason "
212
            "!! Kill all METS servers on this machine that have been created more than 60 minutes ago."
213
        )
214
        self.include_router(others_router)
215
216
    def add_api_routes_processing(self):
217
        processing_router = APIRouter()
218
        processing_router.add_api_route(
219
            path="/processor",
220
            endpoint=self.list_processors,
221
            methods=["GET"],
222
            tags=[ServerApiTags.PROCESSING, ServerApiTags.DISCOVERY],
223
            status_code=status.HTTP_200_OK,
224
            summary="Get a list of all available processors"
225
        )
226
        processing_router.add_api_route(
227
            path="/processor/info/{processor_name}",
228
            endpoint=self.get_network_agent_ocrd_tool,
229
            methods=["GET"],
230
            tags=[ServerApiTags.PROCESSING, ServerApiTags.DISCOVERY],
231
            status_code=status.HTTP_200_OK,
232
            summary="Get information about this processor"
233
        )
234
        processing_router.add_api_route(
235
            path="/processor/run/{processor_name}",
236
            endpoint=self.validate_and_forward_job_to_network_agent,
237
            methods=["POST"],
238
            tags=[ServerApiTags.PROCESSING],
239
            status_code=status.HTTP_200_OK,
240
            summary="Submit a job to this processor",
241
            response_model=PYJobOutput,
242
            response_model_exclude_unset=True,
243
            response_model_exclude_none=True
244
        )
245
        processing_router.add_api_route(
246
            path="/processor/job/{job_id}",
247
            endpoint=self.get_processor_job,
248
            methods=["GET"],
249
            tags=[ServerApiTags.PROCESSING],
250
            status_code=status.HTTP_200_OK,
251
            summary="Get information about a job based on its ID",
252
            response_model=PYJobOutput,
253
            response_model_exclude_unset=True,
254
            response_model_exclude_none=True
255
        )
256
        processing_router.add_api_route(
257
            path="/processor/log/{job_id}",
258
            endpoint=self.get_processor_job_log,
259
            methods=["GET"],
260
            tags=[ServerApiTags.PROCESSING],
261
            status_code=status.HTTP_200_OK,
262
            summary="Get the log file of a job id"
263
        )
264
        processing_router.add_api_route(
265
            path="/result_callback",
266
            endpoint=self.remove_job_from_request_cache,
267
            methods=["POST"],
268
            tags=[ServerApiTags.PROCESSING],
269
            status_code=status.HTTP_200_OK,
270
            summary="Callback used by a worker or processor server for reporting result of a processing request"
271
        )
272
        self.include_router(processing_router)
273
274
    def add_api_routes_workflow(self):
275
        workflow_router = APIRouter()
276
        workflow_router.add_api_route(
277
            path="/workflow",
278
            endpoint=self.upload_workflow,
279
            methods=["POST"],
280
            tags=[ServerApiTags.WORKFLOW],
281
            status_code=status.HTTP_201_CREATED,
282
            summary="Upload/Register a new workflow script"
283
        )
284
        workflow_router.add_api_route(
285
            path="/workflow/{workflow_id}",
286
            endpoint=self.download_workflow,
287
            methods=["GET"],
288
            tags=[ServerApiTags.WORKFLOW],
289
            status_code=status.HTTP_200_OK,
290
            summary="Download a workflow script"
291
        )
292
        workflow_router.add_api_route(
293
            path="/workflow/{workflow_id}",
294
            endpoint=self.replace_workflow,
295
            methods=["PUT"],
296
            tags=[ServerApiTags.WORKFLOW],
297
            status_code=status.HTTP_200_OK,
298
            summary="Update/Replace a workflow script"
299
        )
300
        workflow_router.add_api_route(
301
            path="/workflow/run",
302
            endpoint=self.run_workflow,
303
            methods=["POST"],
304
            tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING],
305
            status_code=status.HTTP_200_OK,
306
            summary="Run a workflow",
307
            response_model=PYWorkflowJobOutput,
308
            response_model_exclude_defaults=True,
309
            response_model_exclude_unset=True,
310
            response_model_exclude_none=True
311
        )
312
        workflow_router.add_api_route(
313
            path="/workflow/job-simple/{workflow_job_id}",
314
            endpoint=self.get_workflow_info_simple,
315
            methods=["GET"],
316
            tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING],
317
            status_code=status.HTTP_200_OK,
318
            summary="Get simplified overall job status"
319
        )
320
        workflow_router.add_api_route(
321
            path="/workflow/job/{workflow_job_id}",
322
            endpoint=self.get_workflow_info,
323
            methods=["GET"],
324
            tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING],
325
            status_code=status.HTTP_200_OK,
326
            summary="Get information about a workflow run"
327
        )
328
        self.include_router(workflow_router)
329
330
    async def forward_tcp_request_to_uds_mets_server(self, request: Request) -> Dict:
331
        """Forward mets-server-request
332
333
        A processor calls a mets related method like add_file with ClientSideOcrdMets. This sends
334
        a request to this endpoint. This request contains all information necessary to make a call
335
        to the uds-mets-server. This information is used by `MetsServerProxy` to make a the call
336
        to the local (local for the processing-server) reachable the uds-mets-server.
337
        """
338
        request_body = await request.json()
339
        ws_dir_path = request_body["workspace_path"]
340
        self.deployer.start_uds_mets_server(ws_dir_path=ws_dir_path)
341
        return self.mets_server_proxy.forward_tcp_request(request_body=request_body)
342
343
    async def home_page(self):
344
        message = f"The home page of the {self.title}"
345
        json_message = {
346
            "message": message,
347
            "time": datetime.now().strftime("%Y-%m-%d %H:%M")
348
        }
349
        return json_message
350
351
    async def stop_deployed_agents(self) -> None:
352
        self.deployer.stop_all()
353
354
    def query_ocrd_tool_json_from_server(self, processor_name: str) -> Dict:
355
        processor_server_base_url = self.deployer.resolve_processor_server_url(processor_name)
356
        if processor_server_base_url == '':
357
            message = f"Processor Server URL of '{processor_name}' not found"
358
            raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message=message)
359
        return request_processor_server_tool_json(self.log, processor_server_base_url=processor_server_base_url)
360
361
    async def get_network_agent_ocrd_tool(
362
        self, processor_name: str, agent_type: AgentType = AgentType.PROCESSING_WORKER
363
    ) -> Dict:
364
        ocrd_tool = {}
365
        error_message = f"Network agent of type '{agent_type}' for processor '{processor_name}' not found."
366
        if agent_type != AgentType.PROCESSING_WORKER and agent_type != AgentType.PROCESSOR_SERVER:
367
            message = f"Unknown agent type: {agent_type}, {type(agent_type)}"
368
            raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message)
369
        if agent_type == AgentType.PROCESSING_WORKER:
370
            ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None)
371
        if agent_type == AgentType.PROCESSOR_SERVER:
372
            ocrd_tool = self.query_ocrd_tool_json_from_server(processor_name)
373
        if not ocrd_tool:
374
            raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, error_message)
375
        return ocrd_tool
376
377
    def network_agent_exists_server(self, processor_name: str) -> bool:
378
        processor_server_url = self.deployer.resolve_processor_server_url(processor_name)
379
        return bool(processor_server_url)
380
381
    def network_agent_exists_worker(self, processor_name: str) -> bool:
382
        # TODO: Reconsider and refactor this.
383
        #  Added ocrd-dummy by default if not available for the integration tests.
384
        #  A proper Processing Worker / Processor Server registration endpoint
385
        #  is needed on the Processing Server side
386
        if processor_name == 'ocrd-dummy':
387
            return True
388
        return bool(check_if_queue_exists(self.log, self.rmq_data, processor_name=processor_name))
389
390
    def validate_agent_type_and_existence(self, processor_name: str, agent_type: AgentType) -> None:
391
        agent_exists = False
392
        if agent_type == AgentType.PROCESSOR_SERVER:
393
            agent_exists = self.network_agent_exists_server(processor_name=processor_name)
394
        elif agent_type == AgentType.PROCESSING_WORKER:
395
            agent_exists = self.network_agent_exists_worker(processor_name=processor_name)
396
        else:
397
            message = f"Unknown agent type: {agent_type}, {type(agent_type)}"
398
            raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message)
399
        if not agent_exists:
400
            message = f"Network agent of type '{agent_type}' for processor '{processor_name}' not found."
401
            raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
402
403
    async def validate_and_forward_job_to_network_agent(self, processor_name: str, data: PYJobInput) -> PYJobOutput:
404
        # Append the processor name to the request itself
405
        data.processor_name = processor_name
406
        self.validate_agent_type_and_existence(processor_name=data.processor_name, agent_type=data.agent_type)
407
        if data.job_id:
408
            message = f"Processing request job id field is set but must not be: {data.job_id}"
409
            raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
410
        # Generate processing job id
411
        data.job_id = generate_id()
412
        ocrd_tool = await self.get_network_agent_ocrd_tool(
413
            processor_name=data.processor_name,
414
            agent_type=data.agent_type
415
        )
416
        validate_job_input(self.log, data.processor_name, ocrd_tool, data)
417
418
        if data.workspace_id:
419
            # just a check whether the workspace exists in the database or not
420
            await get_from_database_workspace(self.log, data.workspace_id)
421
        else:  # data.path_to_mets provided instead
422
            await create_workspace_if_not_exists(self.log, mets_path=data.path_to_mets)
423
424
        workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id
425
        # initialize the request counter for the workspace_key
426
        self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=0)
427
428
        # This check is done to return early in case a workspace_id is provided
429
        # but the abs mets path cannot be queried from the DB
430
        request_mets_path = await validate_and_return_mets_path(self.log, data)
431
432
        page_ids = expand_page_ids(data.page_id)
433
434
        # A flag whether the current request must be cached
435
        # This is set to true if for any output file group there
436
        # is a page_id value that has been previously locked
437
        cache_current_request = False
438
439
        # Check if there are any dependencies of the current request
440
        if data.depends_on:
441
            cache_current_request = await self.cache_processing_requests.is_caching_required(data.depends_on)
442
443
        # No need for further check of locked pages dependency
444
        # if the request should be already cached
445
        if not cache_current_request:
446
            # Check if there are any locked pages for the current request
447
            cache_current_request = self.cache_locked_pages.check_if_locked_pages_for_output_file_grps(
448
                workspace_key=workspace_key,
449
                output_file_grps=data.output_file_grps,
450
                page_ids=page_ids
451
            )
452
453
        if cache_current_request:
454
            # Cache the received request
455
            self.cache_processing_requests.cache_request(workspace_key, data)
456
457
            # Create a cached job DB entry
458
            db_cached_job = DBProcessorJob(
459
                **data.dict(exclude_unset=True, exclude_none=True),
460
                internal_callback_url=self.internal_job_callback_url,
461
                state=JobState.cached
462
            )
463
            await db_cached_job.insert()
464
            return db_cached_job.to_job_output()
465
466
        # Lock the pages in the request
467
        self.cache_locked_pages.lock_pages(
468
            workspace_key=workspace_key,
469
            output_file_grps=data.output_file_grps,
470
            page_ids=page_ids
471
        )
472
473
        # Start a UDS Mets Server with the current workspace
474
        ws_dir_path = str(Path(request_mets_path).parent)
475
        mets_server_url = self.deployer.start_uds_mets_server(ws_dir_path=ws_dir_path)
476
        if self.use_tcp_mets:
477
            # let workers talk to mets server via tcp instead of using unix-socket
478
            mets_server_url = self.multiplexing_endpoint
479
480
        # Assign the mets server url in the database (workers read mets_server_url from db)
481
        await db_update_workspace(
482
            workspace_id=data.workspace_id,
483
            workspace_mets_path=data.path_to_mets,
484
            mets_server_url=mets_server_url
485
        )
486
487
        # Create a queued job DB entry
488
        db_queued_job = DBProcessorJob(
489
            **data.dict(exclude_unset=True, exclude_none=True),
490
            internal_callback_url=self.internal_job_callback_url,
491
            state=JobState.queued
492
        )
493
        await db_queued_job.insert()
494
        self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1)
495
        job_output = await self.push_job_to_network_agent(data=data, db_job=db_queued_job)
496
        return job_output
497
498
    async def push_job_to_network_agent(self, data: PYJobInput, db_job: DBProcessorJob) -> PYJobOutput:
499
        if data.agent_type != AgentType.PROCESSING_WORKER and data.agent_type != AgentType.PROCESSOR_SERVER:
500
            message = f"Unknown agent type: {data.agent_type}, {type(data.agent_type)}"
501
            raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message)
502
        job_output = None
503
        self.log.debug(f"Pushing to {data.agent_type}: {data.processor_name}, {data.page_id}, {data.job_id}")
504
        if data.agent_type == AgentType.PROCESSING_WORKER:
505
            job_output = await self.push_job_to_processing_queue(db_job=db_job)
506
        if data.agent_type == AgentType.PROCESSOR_SERVER:
507
            job_output = await self.push_job_to_processor_server(job_input=data)
508
        if not job_output:
509
            message = f"Failed to create job output for job input: {data}"
510
            raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message)
511
        return job_output
512
513
    async def push_job_to_processing_queue(self, db_job: DBProcessorJob) -> PYJobOutput:
514
        if not self.rmq_publisher:
515
            message = "The Processing Server has no connection to RabbitMQ Server. RMQPublisher is not connected."
516
            raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message)
517
        processing_message = create_processing_message(self.log, db_job)
518
        try:
519
            encoded_message = OcrdProcessingMessage.encode_yml(processing_message)
520
            self.rmq_publisher.publish_to_queue(queue_name=db_job.processor_name, message=encoded_message)
521
        except Exception as error:
522
            message = (
523
                f"Processing server has failed to push processing message to queue: {db_job.processor_name}, "
524
                f"Processing message: {processing_message.__dict__}"
525
            )
526
            raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error)
527
        return db_job.to_job_output()
528
529
    async def push_job_to_processor_server(self, job_input: PYJobInput) -> PYJobOutput:
530
        processor_server_base_url = self.deployer.resolve_processor_server_url(job_input.processor_name)
531
        return await forward_job_to_processor_server(
532
            self.log, job_input=job_input, processor_server_base_url=processor_server_base_url
533
        )
534
535
    async def get_processor_job(self, job_id: str) -> PYJobOutput:
536
        return await _get_processor_job(self.log, job_id)
537
538
    async def get_processor_job_log(self, job_id: str) -> FileResponse:
539
        return await _get_processor_job_log(self.log, job_id)
540
541
    async def _lock_pages_of_workspace(
542
        self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]
543
    ) -> None:
544
        # Lock the output file group pages for the current request
545
        self.cache_locked_pages.lock_pages(
546
            workspace_key=workspace_key,
547
            output_file_grps=output_file_grps,
548
            page_ids=page_ids
549
        )
550
551
    async def _unlock_pages_of_workspace(
552
        self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]
553
    ) -> None:
554
        self.cache_locked_pages.unlock_pages(
555
            workspace_key=workspace_key,
556
            output_file_grps=output_file_grps,
557
            page_ids=page_ids
558
        )
559
560
    async def push_cached_jobs_to_agents(self, processing_jobs: List[PYJobInput]) -> None:
561
        if not len(processing_jobs):
562
            self.log.debug("No processing jobs were consumed from the requests cache")
563
            return
564
        for data in processing_jobs:
565
            self.log.info(f"Changing the job status of: {data.job_id} from {JobState.cached} to {JobState.queued}")
566
            db_consumed_job = await db_update_processing_job(job_id=data.job_id, state=JobState.queued)
567
            workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id
568
569
            # Lock the output file group pages for the current request
570
            await self._lock_pages_of_workspace(
571
                workspace_key=workspace_key,
572
                output_file_grps=data.output_file_grps,
573
                page_ids=expand_page_ids(data.page_id)
574
            )
575
576
            self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1)
577
            job_output = await self.push_job_to_network_agent(data=data, db_job=db_consumed_job)
578
            if not job_output:
579
                self.log.exception(f"Failed to create job output for job input data: {data}")
580
581
    async def _cancel_cached_dependent_jobs(self, workspace_key: str, job_id: str) -> None:
582
        await self.cache_processing_requests.cancel_dependent_jobs(
583
            workspace_key=workspace_key,
584
            processing_job_id=job_id
585
        )
586
587
    async def _consume_cached_jobs_of_workspace(
588
        self, workspace_key: str, mets_server_url: str, path_to_mets: str
589
    ) -> List[PYJobInput]:
590
        # decrease the internal cache counter by 1
591
        request_counter = self.cache_processing_requests.update_request_counter(
592
            workspace_key=workspace_key, by_value=-1
593
        )
594
        self.log.debug(f"Internal processing job cache counter value: {request_counter}")
595
        if (workspace_key not in self.cache_processing_requests.processing_requests or
596
            not len(self.cache_processing_requests.processing_requests[workspace_key])):
597
            if request_counter <= 0:
598
                # Shut down the Mets Server for the workspace_key since no
599
                # more internal callbacks are expected for that workspace
600
                self.log.debug(f"Stopping the mets server: {mets_server_url}")
601
                self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url, path_to_mets=path_to_mets)
602
603
                try:
604
                    # The queue is empty - delete it
605
                    del self.cache_processing_requests.processing_requests[workspace_key]
606
                except KeyError:
607
                    self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}")
608
609
                # For debugging purposes it is good to see if any locked pages are left
610
                self.log.debug(f"Contents of the locked pages cache for: {workspace_key}")
611
                locked_pages = self.cache_locked_pages.get_locked_pages(workspace_key=workspace_key)
612
                for output_file_grp in locked_pages:
613
                    self.log.debug(f"{output_file_grp}: {locked_pages[output_file_grp]}")
614
            else:
615
                self.log.debug(f"Internal request cache is empty but waiting for {request_counter} result callbacks.")
616
            return []
617
        # Check whether the internal queue for the workspace key still exists
618
        if workspace_key not in self.cache_processing_requests.processing_requests:
619
            self.log.debug(f"No internal queue available for workspace with key: {workspace_key}")
620
            return []
621
        consumed_requests = await self.cache_processing_requests.consume_cached_requests(workspace_key=workspace_key)
622
        return consumed_requests
623
624
    async def remove_job_from_request_cache(self, result_message: PYResultMessage):
625
        result_job_id = result_message.job_id
626
        result_job_state = result_message.state
627
        path_to_mets = result_message.path_to_mets
628
        workspace_id = result_message.workspace_id
629
        self.log.info(f"Result job_id: {result_job_id}, state: {result_job_state}")
630
631
        db_workspace = await get_from_database_workspace(self.log, workspace_id, path_to_mets)
632
        mets_server_url = db_workspace.mets_server_url
633
        workspace_key = path_to_mets if path_to_mets else workspace_id
634
635
        if result_job_state == JobState.failed:
636
            await self._cancel_cached_dependent_jobs(workspace_key, result_job_id)
637
638
        if result_job_state != JobState.success:
639
            # TODO: Handle other potential error cases
640
            pass
641
642
        try:
643
            db_result_job = await db_get_processing_job(result_job_id)
644
            # Unlock the output file group pages for the result processing request
645
            await self._unlock_pages_of_workspace(
646
                workspace_key=workspace_key,
647
                output_file_grps=db_result_job.output_file_grps,
648
                page_ids=expand_page_ids(db_result_job.page_id)
649
            )
650
        except ValueError as error:
651
            message = f"Processing result job with id '{result_job_id}' not found in the DB."
652
            raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error)
653
654
        consumed_cached_jobs = await self._consume_cached_jobs_of_workspace(
655
            workspace_key=workspace_key, mets_server_url=mets_server_url, path_to_mets=path_to_mets
656
        )
657
        await self.push_cached_jobs_to_agents(processing_jobs=consumed_cached_jobs)
658
659
    async def list_processors(self) -> List[str]:
660
        # There is no caching on the Processing Server side
661
        processor_names_list = self.deployer.find_matching_network_agents(
662
            docker_only=False, native_only=False, worker_only=False, server_only=False,
663
            str_names_only=True, unique_only=True, sort=True
664
        )
665
        return processor_names_list
666
667
    async def task_sequence_to_processing_jobs(
668
        self,
669
        tasks: List[ProcessorTask],
670
        mets_path: str,
671
        page_id: str,
672
        agent_type: AgentType = AgentType.PROCESSING_WORKER
673
    ) -> List[PYJobOutput]:
674
        temp_file_group_cache = {}
675
        responses = []
676
        for task in tasks:
677
            # Find dependent jobs of the current task
678
            dependent_jobs = []
679
            for input_file_grp in task.input_file_grps:
680
                if input_file_grp in temp_file_group_cache:
681
                    dependent_jobs.append(temp_file_group_cache[input_file_grp])
682
            # NOTE: The `task.mets_path` and `task.page_id` is not utilized in low level
683
            # Thus, setting these two flags in the ocrd process workflow file has no effect
684
            job_input_data = PYJobInput(
685
                processor_name=task.executable,
686
                path_to_mets=mets_path,
687
                input_file_grps=task.input_file_grps,
688
                output_file_grps=task.output_file_grps,
689
                page_id=page_id,
690
                parameters=task.parameters,
691
                agent_type=agent_type,
692
                depends_on=dependent_jobs,
693
            )
694
            response = await self.validate_and_forward_job_to_network_agent(
695
                processor_name=job_input_data.processor_name,
696
                data=job_input_data
697
            )
698
            for file_group in task.output_file_grps:
699
                temp_file_group_cache[file_group] = response.job_id
700
            responses.append(response)
701
        return responses
702
703
    def validate_tasks_agents_existence(self, tasks: List[ProcessorTask], agent_type: AgentType) -> None:
704
        missing_agents = []
705
        for task in tasks:
706
            try:
707
                self.validate_agent_type_and_existence(processor_name=task.executable, agent_type=agent_type)
708
            except HTTPException:
709
                # catching the error is not relevant here
710
                missing_agents.append({task.executable, agent_type})
711
        if missing_agents:
712
            message = (
713
                "Workflow validation has failed. The desired network agents not found. "
714
                f"Missing processing agents: {missing_agents}"
715
            )
716
            raise_http_exception(self.log, status.HTTP_406_NOT_ACCEPTABLE, message)
717
718
    async def run_workflow(
719
        self,
720
        mets_path: str,
721
        workflow: Union[UploadFile, str, None] = File(None),
722
        workflow_id: str = None,
723
        agent_type: AgentType = AgentType.PROCESSING_WORKER,
724
        page_id: str = None,
725
        page_wise: bool = False,
726
        workflow_callback_url: str = None
727
    ) -> PYWorkflowJobOutput:
728
        await create_workspace_if_not_exists(self.log, mets_path=mets_path)
729
        workflow_content = await get_workflow_content(self.log, workflow_id, workflow)
730
        processing_tasks = parse_workflow_tasks(self.log, workflow_content)
731
732
        # Validate the input file groups of the first task in the workflow
733
        validate_first_task_input_file_groups_existence(self.log, mets_path, processing_tasks[0].input_file_grps)
734
735
        # Validate existence of agents (processing workers/processor servers)
736
        # for the ocr-d processors referenced inside tasks
737
        self.validate_tasks_agents_existence(processing_tasks, agent_type)
738
739
        # for page_wise mode, we need to expand the list of pages
740
        # for the database, it's better to keep a short string
741
        page_id = page_id or ''
742
        page_ids = get_page_ids_list(self.log, mets_path, page_id)
743
744
        if not page_wise:
745
            responses = await self.task_sequence_to_processing_jobs(
746
                tasks=processing_tasks,
747
                mets_path=mets_path,
748
                page_id=page_id,
749
                agent_type=agent_type
750
            )
751
            processing_job_ids = [response.job_id for response in responses]
752
            db_workflow_job = DBWorkflowJob(
753
                job_id=generate_id(),
754
                page_id=page_id,
755
                page_wise=page_wise,
756
                processing_job_ids={page_id: processing_job_ids},
757
                path_to_mets=mets_path,
758
                workflow_callback_url=workflow_callback_url
759
            )
760
            await db_workflow_job.insert()
761
            return db_workflow_job.to_job_output()
762
763
        all_pages_job_ids = {}
764
        for current_page in page_ids:
765
            responses = await self.task_sequence_to_processing_jobs(
766
                tasks=processing_tasks,
767
                mets_path=mets_path,
768
                page_id=current_page,
769
                agent_type=agent_type
770
            )
771
            processing_job_ids = [response.job_id for response in responses]
772
            all_pages_job_ids[current_page] = processing_job_ids
773
        db_workflow_job = DBWorkflowJob(
774
            job_id=generate_id(),
775
            page_id=page_id,
776
            page_wise=page_wise,
777
            processing_job_ids=all_pages_job_ids,
778
            path_to_mets=mets_path,
779
            workflow_callback_url=workflow_callback_url
780
        )
781
        await db_workflow_job.insert()
782
        return db_workflow_job.to_job_output()
783
784
    @staticmethod
785
    def _produce_workflow_status_response(processing_jobs: List[DBProcessorJob]) -> Dict:
786
        response = {}
787
        failed_tasks = {}
788
        failed_tasks_key = "failed-processor-tasks"
789
        for p_job in processing_jobs:
790
            response.setdefault(p_job.processor_name, {})
791
            response[p_job.processor_name].setdefault(p_job.state.value, 0)
792
            response[p_job.processor_name][p_job.state.value] += 1
793
            if p_job.state == JobState.failed:
794
                if failed_tasks_key not in response:
795
                    response[failed_tasks_key] = failed_tasks
796
                failed_tasks.setdefault(p_job.processor_name, [])
797
                failed_tasks[p_job.processor_name].append(
798
                    {"job_id": p_job.job_id, "page_id": p_job.page_id}
799
                )
800
        return response
801
802
    @staticmethod
803
    def _produce_workflow_status_simple_response(processing_jobs: List[DBProcessorJob]) -> JobState:
804
        workflow_job_state = JobState.unset
805
        success_jobs = 0
806
        for p_job in processing_jobs:
807
            if p_job.state == JobState.cached or p_job.state == JobState.queued:
808
                continue
809
            if p_job.state == JobState.failed or p_job.state == JobState.cancelled:
810
                workflow_job_state = JobState.failed
811
                break
812
            if p_job.state == JobState.running:
813
                workflow_job_state = JobState.running
814
            if p_job.state == JobState.success:
815
                success_jobs += 1
816
        if len(processing_jobs) == success_jobs:
817
            workflow_job_state = JobState.success
818
        return workflow_job_state
819
820
    async def get_workflow_info(self, workflow_job_id) -> Dict:
821
        """ Return list of a workflow's processor jobs
822
        """
823
        workflow_job = await get_from_database_workflow_job(self.log, workflow_job_id)
824
        job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst]
825
        jobs = await db_get_processing_jobs(job_ids)
826
        response = self._produce_workflow_status_response(processing_jobs=jobs)
827
        return response
828
829
    async def kill_mets_server_zombies(self, minutes_ago: Optional[int] = None, dry_run: Optional[bool] = None) -> List[int]:
830
        pids_killed = kill_mets_server_zombies(minutes_ago=minutes_ago, dry_run=dry_run)
831
        return pids_killed
832
833
    async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, JobState]:
834
        """
835
        Simplified version of the `get_workflow_info` that returns a single state for the entire workflow.
836
        - If a single processing job fails, the entire workflow job status is set to FAILED.
837
        - If there are any processing jobs running, regardless of other states, such as QUEUED and CACHED,
838
        the entire workflow job status is set to RUNNING.
839
        - If all processing jobs has finished successfully, only then the workflow job status is set to SUCCESS
840
        """
841
        workflow_job = await get_from_database_workflow_job(self.log, workflow_job_id)
842
        job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst]
843
        jobs = await db_get_processing_jobs(job_ids)
844
        workflow_job_state = self._produce_workflow_status_simple_response(processing_jobs=jobs)
845
        return {"state": workflow_job_state}
846
847
    async def upload_workflow(self, workflow: Union[UploadFile, str]) -> Dict[str, str]:
848
        """ Store a script for a workflow in the database
849
        """
850
        if isinstance(workflow, str):
851
            with open(workflow) as wf_file:
852
                workflow_content = wf_file.read()
853
        else:
854
            workflow_content = await generate_workflow_content(workflow)
855
        validate_workflow(self.log, workflow_content)
856
        content_hash = generate_workflow_content_hash(workflow_content)
857
        try:
858
            db_workflow_script = await db_find_first_workflow_script_by_content(content_hash)
859
            if db_workflow_script:
860
                message = f"The same workflow script already exists, workflow id: {db_workflow_script.workflow_id}"
861
                raise_http_exception(self.log, status.HTTP_409_CONFLICT, message)
862
        except ValueError:
863
            pass
864
        workflow_id = generate_id()
865
        db_workflow_script = DBWorkflowScript(
866
            workflow_id=workflow_id,
867
            content=workflow_content,
868
            content_hash=content_hash
869
        )
870
        await db_workflow_script.insert()
871
        return {"workflow_id": workflow_id}
872
873
    async def replace_workflow(self, workflow_id, workflow: Union[UploadFile, str]) -> Dict[str, str]:
874
        """ Update a workflow script file in the database
875
        """
876
        try:
877
            db_workflow_script = await db_get_workflow_script(workflow_id)
878
            if isinstance(workflow, str):
879
                with open(workflow) as wf_file:
880
                    workflow_content = wf_file.read()
881
            else:
882
                workflow_content = await generate_workflow_content(workflow)
883
            validate_workflow(self.log, workflow_content)
884
            db_workflow_script.content = workflow_content
885
            content_hash = generate_workflow_content_hash(workflow_content)
886
            db_workflow_script.content_hash = content_hash
887
            await db_workflow_script.save()
888
            return {"workflow_id": db_workflow_script.workflow_id}
889
        except ValueError as error:
890
            message = f"Workflow script not existing for id '{workflow_id}'."
891
            raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error)
892
893
    async def download_workflow(self, workflow_id) -> PlainTextResponse:
894
        """ Load workflow-script from the database
895
        """
896
        try:
897
            workflow = await db_get_workflow_script(workflow_id)
898
            return PlainTextResponse(workflow.content)
899
        except ValueError as error:
900
            message = f"Workflow script not existing for id '{workflow_id}'."
901
            raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error)
902