ocrd_network.processing_server   F
last analyzed

Complexity

Total Complexity 103

Size/Duplication

Total Lines 893
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 103
eloc 658
dl 0
loc 893
rs 1.942
c 0
b 0
f 0

39 Methods

Rating   Name   Duplication   Size   Complexity  
A ProcessingServer.start() 0 23 2
B ProcessingServer.__init__() 0 64 2
A ProcessingServer.home_page() 0 7 1
B ProcessingServer.get_network_agent_ocrd_tool() 0 15 6
A ProcessingServer.network_agent_exists_server() 0 3 1
B ProcessingServer.push_job_to_network_agent() 0 14 6
A ProcessingServer.stop_deployed_agents() 0 2 1
A ProcessingServer.query_ocrd_tool_json_from_server() 0 6 2
B ProcessingServer.run_workflow() 0 65 3
A ProcessingServer.upload_workflow() 0 21 3
A ProcessingServer.validate_tasks_agents_existence() 0 14 4
A ProcessingServer.add_api_routes_others() 0 32 1
B ProcessingServer.add_api_routes_processing() 0 57 1
A ProcessingServer.on_startup() 0 3 1
B ProcessingServer.add_api_routes_workflow() 0 55 1
A ProcessingServer.list_processors() 0 7 1
A ProcessingServer._cancel_cached_dependent_jobs() 0 4 1
A ProcessingServer.push_job_to_processor_server() 0 4 1
A ProcessingServer.push_job_to_processing_queue() 0 15 3
A ProcessingServer.forward_tcp_request_to_uds_mets_server() 0 12 1
A ProcessingServer.replace_workflow() 0 15 2
A ProcessingServer.on_shutdown() 0 7 1
A ProcessingServer.kill_mets_server_zombies() 0 3 1
B ProcessingServer.task_sequence_to_processing_jobs() 0 35 5
A ProcessingServer.push_cached_jobs_to_agents() 0 20 5
A ProcessingServer._unlock_pages_of_workspace() 0 7 1
C ProcessingServer._produce_workflow_status_simple_response() 0 17 9
B ProcessingServer._consume_cached_jobs_of_workspace() 0 36 7
A ProcessingServer.get_workflow_info() 0 8 1
A ProcessingServer.network_agent_exists_worker() 0 8 2
A ProcessingServer.validate_agent_type_and_existence() 0 12 4
B ProcessingServer.remove_job_from_request_cache() 0 34 5
A ProcessingServer.get_processor_job() 0 2 1
A ProcessingServer.download_workflow() 0 9 2
A ProcessingServer.get_workflow_info_simple() 0 13 1
A ProcessingServer._lock_pages_of_workspace() 0 8 1
C ProcessingServer.validate_and_forward_job_to_network_agent() 0 94 8
A ProcessingServer._produce_workflow_status_response() 0 17 4
A ProcessingServer.get_processor_job_log() 0 2 1

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.processing_server often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from datetime import datetime
2
from os import getpid
3
from pathlib import Path
4
from typing import Dict, List, Optional, Union
5
from uvicorn import run as uvicorn_run
6
7
from fastapi import APIRouter, FastAPI, File, HTTPException, Request, status, UploadFile
8
from fastapi.exceptions import RequestValidationError
9
from fastapi.responses import FileResponse, JSONResponse, PlainTextResponse
10
11
from ocrd.task_sequence import ProcessorTask
12
from ocrd_utils import initLogging, getLogger
13
from .constants import AgentType, JobState, ServerApiTags
14
from .database import (
15
    initiate_database,
16
    db_get_processing_job,
17
    db_get_processing_jobs,
18
    db_update_processing_job,
19
    db_update_workspace,
20
    db_get_workflow_script,
21
    db_find_first_workflow_script_by_content
22
)
23
from .runtime_data import Deployer
24
from .logging_utils import configure_file_handler_with_formatter, get_processing_server_logging_file_path
25
from .models import (
26
    DBProcessorJob,
27
    DBWorkflowJob,
28
    DBWorkflowScript,
29
    PYJobInput,
30
    PYJobOutput,
31
    PYResultMessage,
32
    PYWorkflowJobOutput
33
)
34
from .rabbitmq_utils import (
35
    check_if_queue_exists,
36
    connect_rabbitmq_publisher,
37
    create_message_queues,
38
    OcrdProcessingMessage
39
)
40
from .server_cache import CacheLockedPages, CacheProcessingRequests
41
from .server_utils import (
42
    create_processing_message,
43
    create_workspace_if_not_exists,
44
    forward_job_to_processor_server,
45
    _get_processor_job,
46
    _get_processor_job_log,
47
    get_page_ids_list,
48
    get_workflow_content,
49
    get_from_database_workspace,
50
    get_from_database_workflow_job,
51
    kill_mets_server_zombies,
52
    parse_workflow_tasks,
53
    raise_http_exception,
54
    request_processor_server_tool_json,
55
    validate_and_return_mets_path,
56
    validate_first_task_input_file_groups_existence,
57
    validate_job_input,
58
    validate_workflow
59
)
60
from .tcp_to_uds_mets_proxy import MetsServerProxy
61
from .utils import (
62
    load_ocrd_all_tool_json,
63
    expand_page_ids,
64
    generate_id,
65
    generate_workflow_content,
66
    generate_workflow_content_hash
67
)
68
69
70
class ProcessingServer(FastAPI):
71
    """FastAPI app to make ocr-d processor calls
72
73
    The Processing-Server receives calls conforming to the ocr-d webapi regarding the processing
74
    part. It can run ocrd-processors and provides endpoints to discover processors and watch the job
75
    status.
76
    The Processing-Server does not execute the processors itself but starts up a queue and a
77
    database to delegate the calls to processing workers. They are started by the Processing-Server
78
    and the communication goes through the queue.
79
    """
80
81
    def __init__(self, config_path: str, host: str, port: int) -> None:
82
        self.title = "OCR-D Processing Server"
83
        super().__init__(
84
            title=self.title,
85
            on_startup=[self.on_startup],
86
            on_shutdown=[self.on_shutdown],
87
            description="OCR-D Processing Server"
88
        )
89
        initLogging()
90
        self.log = getLogger("ocrd_network.processing_server")
91
        log_file = get_processing_server_logging_file_path(pid=getpid())
92
        configure_file_handler_with_formatter(self.log, log_file=log_file, mode="a")
93
94
        self.log.info(f"Loading ocrd all tool json")
95
        self.ocrd_all_tool_json = load_ocrd_all_tool_json()
96
        self.hostname = host
97
        self.port = port
98
99
        # The deployer is used for:
100
        # - deploying agents when the Processing Server is started
101
        # - retrieving runtime data of agents
102
        self.deployer = Deployer(config_path)
103
        # Used for forwarding Mets Server TCP requests to UDS requests
104
        self.mets_server_proxy = MetsServerProxy()
105
        self.use_tcp_mets = self.deployer.use_tcp_mets
106
        # If set, all Mets Server UDS requests are multiplexed over TCP
107
        # Used by processing workers and/or processor servers to report back the results
108
        if self.deployer.internal_callback_url:
109
            host = self.deployer.internal_callback_url
110
            self.internal_job_callback_url = f"{host.rstrip('/')}/result_callback"
111
            self.multiplexing_endpoint = f"{host.rstrip('/')}/tcp_mets"
112
        else:
113
            self.internal_job_callback_url = f"http://{host}:{port}/result_callback"
114
            self.multiplexing_endpoint = f"http://{host}:{port}/tcp_mets"
115
116
        self.mongodb_url = None
117
        self.rabbitmq_url = None
118
        self.rmq_data = {
119
            "host": self.deployer.data_queue.host,
120
            "port": self.deployer.data_queue.port,
121
            "vhost": "/",
122
            "username": self.deployer.data_queue.cred_username,
123
            "password": self.deployer.data_queue.cred_password
124
        }
125
126
        # Gets assigned when `connect_rabbitmq_publisher()` is called on the working object
127
        self.rmq_publisher = None
128
129
        # Used for keeping track of cached processing requests
130
        self.cache_processing_requests = CacheProcessingRequests()
131
132
        # Used for keeping track of locked/unlocked pages of a workspace
133
        self.cache_locked_pages = CacheLockedPages()
134
135
        self.add_api_routes_others()
136
        self.add_api_routes_processing()
137
        self.add_api_routes_workflow()
138
139
        @self.exception_handler(RequestValidationError)
140
        async def validation_exception_handler(request: Request, exc: RequestValidationError):
141
            exc_str = f'{exc}'.replace('\n', ' ').replace('   ', ' ')
142
            self.log.error(f'{request}: {exc_str}')
143
            content = {'status_code': 10422, 'message': exc_str, 'data': None}
144
            return JSONResponse(content=content, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
145
146
    def start(self) -> None:
147
        """ deploy agents (db, queue, workers) and start the processing server with uvicorn
148
        """
149
        try:
150
            self.rabbitmq_url = self.deployer.deploy_rabbitmq()
151
            self.mongodb_url = self.deployer.deploy_mongodb()
152
153
            # The RMQPublisher is initialized and a connection to the RabbitMQ is performed
154
            self.rmq_publisher = connect_rabbitmq_publisher(self.log, self.rmq_data, enable_acks=True)
155
156
            queue_names = self.deployer.find_matching_network_agents(
157
                worker_only=True, str_names_only=True, unique_only=True
158
            )
159
            self.log.info(f"Creating message queues on RabbitMQ instance url: {self.rabbitmq_url}")
160
            create_message_queues(logger=self.log, rmq_publisher=self.rmq_publisher, queue_names=queue_names)
161
162
            self.deployer.deploy_network_agents(mongodb_url=self.mongodb_url, rabbitmq_url=self.rabbitmq_url)
163
        except Exception as error:
164
            self.log.exception(f"Failed to start the Processing Server, error: {error}")
165
            self.log.warning("Trying to stop previously deployed services and network agents.")
166
            self.deployer.stop_all()
167
            raise
168
        uvicorn_run(self, host=self.hostname, port=int(self.port))
169
170
    async def on_startup(self):
171
        self.log.info(f"Initializing the Database on: {self.mongodb_url}")
172
        await initiate_database(db_url=self.mongodb_url)
173
174
    async def on_shutdown(self) -> None:
175
        """
176
        - hosts and pids should be stored somewhere
177
        - ensure queue is empty or processor is not currently running
178
        - connect to hosts and kill pids
179
        """
180
        await self.stop_deployed_agents()
181
182
    def add_api_routes_others(self):
183
        others_router = APIRouter()
184
        others_router.add_api_route(
185
            path="/",
186
            endpoint=self.home_page,
187
            methods=["GET"],
188
            status_code=status.HTTP_200_OK,
189
            summary="Get information about the processing server"
190
        )
191
        others_router.add_api_route(
192
            path="/stop",
193
            endpoint=self.stop_deployed_agents,
194
            methods=["POST"],
195
            tags=[ServerApiTags.TOOLS],
196
            summary="Stop database, queue and processing-workers"
197
        )
198
        others_router.add_api_route(
199
            path="/tcp_mets",
200
            methods=["POST"],
201
            endpoint=self.forward_tcp_request_to_uds_mets_server,
202
            tags=[ServerApiTags.WORKSPACE],
203
            summary="Forward a TCP request to UDS mets server"
204
        )
205
        others_router.add_api_route(
206
            path="/kill_mets_server_zombies",
207
            endpoint=self.kill_mets_server_zombies,
208
            methods=["DELETE"],
209
            tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING],
210
            status_code=status.HTTP_200_OK,
211
            summary="!! Workaround Do Not Use Unless You Have A Reason !! Kill all METS servers on this machine that have been created more than 60 minutes ago."
212
        )
213
        self.include_router(others_router)
214
215
    def add_api_routes_processing(self):
216
        processing_router = APIRouter()
217
        processing_router.add_api_route(
218
            path="/processor",
219
            endpoint=self.list_processors,
220
            methods=["GET"],
221
            tags=[ServerApiTags.PROCESSING, ServerApiTags.DISCOVERY],
222
            status_code=status.HTTP_200_OK,
223
            summary="Get a list of all available processors"
224
        )
225
        processing_router.add_api_route(
226
            path="/processor/info/{processor_name}",
227
            endpoint=self.get_network_agent_ocrd_tool,
228
            methods=["GET"],
229
            tags=[ServerApiTags.PROCESSING, ServerApiTags.DISCOVERY],
230
            status_code=status.HTTP_200_OK,
231
            summary="Get information about this processor"
232
        )
233
        processing_router.add_api_route(
234
            path="/processor/run/{processor_name}",
235
            endpoint=self.validate_and_forward_job_to_network_agent,
236
            methods=["POST"],
237
            tags=[ServerApiTags.PROCESSING],
238
            status_code=status.HTTP_200_OK,
239
            summary="Submit a job to this processor",
240
            response_model=PYJobOutput,
241
            response_model_exclude_unset=True,
242
            response_model_exclude_none=True
243
        )
244
        processing_router.add_api_route(
245
            path="/processor/job/{job_id}",
246
            endpoint=self.get_processor_job,
247
            methods=["GET"],
248
            tags=[ServerApiTags.PROCESSING],
249
            status_code=status.HTTP_200_OK,
250
            summary="Get information about a job based on its ID",
251
            response_model=PYJobOutput,
252
            response_model_exclude_unset=True,
253
            response_model_exclude_none=True
254
        )
255
        processing_router.add_api_route(
256
            path="/processor/log/{job_id}",
257
            endpoint=self.get_processor_job_log,
258
            methods=["GET"],
259
            tags=[ServerApiTags.PROCESSING],
260
            status_code=status.HTTP_200_OK,
261
            summary="Get the log file of a job id"
262
        )
263
        processing_router.add_api_route(
264
            path="/result_callback",
265
            endpoint=self.remove_job_from_request_cache,
266
            methods=["POST"],
267
            tags=[ServerApiTags.PROCESSING],
268
            status_code=status.HTTP_200_OK,
269
            summary="Callback used by a worker or processor server for reporting result of a processing request"
270
        )
271
        self.include_router(processing_router)
272
273
    def add_api_routes_workflow(self):
274
        workflow_router = APIRouter()
275
        workflow_router.add_api_route(
276
            path="/workflow",
277
            endpoint=self.upload_workflow,
278
            methods=["POST"],
279
            tags=[ServerApiTags.WORKFLOW],
280
            status_code=status.HTTP_201_CREATED,
281
            summary="Upload/Register a new workflow script"
282
        )
283
        workflow_router.add_api_route(
284
            path="/workflow/{workflow_id}",
285
            endpoint=self.download_workflow,
286
            methods=["GET"],
287
            tags=[ServerApiTags.WORKFLOW],
288
            status_code=status.HTTP_200_OK,
289
            summary="Download a workflow script"
290
        )
291
        workflow_router.add_api_route(
292
            path="/workflow/{workflow_id}",
293
            endpoint=self.replace_workflow,
294
            methods=["PUT"],
295
            tags=[ServerApiTags.WORKFLOW],
296
            status_code=status.HTTP_200_OK,
297
            summary="Update/Replace a workflow script"
298
        )
299
        workflow_router.add_api_route(
300
            path="/workflow/run",
301
            endpoint=self.run_workflow,
302
            methods=["POST"],
303
            tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING],
304
            status_code=status.HTTP_200_OK,
305
            summary="Run a workflow",
306
            response_model=PYWorkflowJobOutput,
307
            response_model_exclude_defaults=True,
308
            response_model_exclude_unset=True,
309
            response_model_exclude_none=True
310
        )
311
        workflow_router.add_api_route(
312
            path="/workflow/job-simple/{workflow_job_id}",
313
            endpoint=self.get_workflow_info_simple,
314
            methods=["GET"],
315
            tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING],
316
            status_code=status.HTTP_200_OK,
317
            summary="Get simplified overall job status"
318
        )
319
        workflow_router.add_api_route(
320
            path="/workflow/job/{workflow_job_id}",
321
            endpoint=self.get_workflow_info,
322
            methods=["GET"],
323
            tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING],
324
            status_code=status.HTTP_200_OK,
325
            summary="Get information about a workflow run"
326
        )
327
        self.include_router(workflow_router)
328
329
    async def forward_tcp_request_to_uds_mets_server(self, request: Request) -> Dict:
330
        """Forward mets-server-request
331
332
        A processor calls a mets related method like add_file with ClientSideOcrdMets. This sends
333
        a request to this endpoint. This request contains all information necessary to make a call
334
        to the uds-mets-server. This information is used by `MetsServerProxy` to make a the call
335
        to the local (local for the processing-server) reachable the uds-mets-server.
336
        """
337
        request_body = await request.json()
338
        ws_dir_path = request_body["workspace_path"]
339
        self.deployer.start_uds_mets_server(ws_dir_path=ws_dir_path)
340
        return self.mets_server_proxy.forward_tcp_request(request_body=request_body)
341
342
    async def home_page(self):
343
        message = f"The home page of the {self.title}"
344
        json_message = {
345
            "message": message,
346
            "time": datetime.now().strftime("%Y-%m-%d %H:%M")
347
        }
348
        return json_message
349
350
    async def stop_deployed_agents(self) -> None:
351
        self.deployer.stop_all()
352
353
    def query_ocrd_tool_json_from_server(self, processor_name: str) -> Dict:
354
        processor_server_base_url = self.deployer.resolve_processor_server_url(processor_name)
355
        if processor_server_base_url == '':
356
            message = f"Processor Server URL of '{processor_name}' not found"
357
            raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message=message)
358
        return request_processor_server_tool_json(self.log, processor_server_base_url=processor_server_base_url)
359
360
    async def get_network_agent_ocrd_tool(
361
        self, processor_name: str, agent_type: AgentType = AgentType.PROCESSING_WORKER
362
    ) -> Dict:
363
        ocrd_tool = {}
364
        error_message = f"Network agent of type '{agent_type}' for processor '{processor_name}' not found."
365
        if agent_type != AgentType.PROCESSING_WORKER and agent_type != AgentType.PROCESSOR_SERVER:
366
            message = f"Unknown agent type: {agent_type}, {type(agent_type)}"
367
            raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message)
368
        if agent_type == AgentType.PROCESSING_WORKER:
369
            ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None)
370
        if agent_type == AgentType.PROCESSOR_SERVER:
371
            ocrd_tool = self.query_ocrd_tool_json_from_server(processor_name)
372
        if not ocrd_tool:
373
            raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, error_message)
374
        return ocrd_tool
375
376
    def network_agent_exists_server(self, processor_name: str) -> bool:
377
        processor_server_url = self.deployer.resolve_processor_server_url(processor_name)
378
        return bool(processor_server_url)
379
380
    def network_agent_exists_worker(self, processor_name: str) -> bool:
381
        # TODO: Reconsider and refactor this.
382
        #  Added ocrd-dummy by default if not available for the integration tests.
383
        #  A proper Processing Worker / Processor Server registration endpoint
384
        #  is needed on the Processing Server side
385
        if processor_name == 'ocrd-dummy':
386
            return True
387
        return bool(check_if_queue_exists(self.log, self.rmq_data, processor_name=processor_name))
388
389
    def validate_agent_type_and_existence(self, processor_name: str, agent_type: AgentType) -> None:
390
        agent_exists = False
391
        if agent_type == AgentType.PROCESSOR_SERVER:
392
            agent_exists = self.network_agent_exists_server(processor_name=processor_name)
393
        elif agent_type == AgentType.PROCESSING_WORKER:
394
            agent_exists = self.network_agent_exists_worker(processor_name=processor_name)
395
        else:
396
            message = f"Unknown agent type: {agent_type}, {type(agent_type)}"
397
            raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message)
398
        if not agent_exists:
399
            message = f"Network agent of type '{agent_type}' for processor '{processor_name}' not found."
400
            raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
401
402
    async def validate_and_forward_job_to_network_agent(self, processor_name: str, data: PYJobInput) -> PYJobOutput:
403
        # Append the processor name to the request itself
404
        data.processor_name = processor_name
405
        self.validate_agent_type_and_existence(processor_name=data.processor_name, agent_type=data.agent_type)
406
        if data.job_id:
407
            message = f"Processing request job id field is set but must not be: {data.job_id}"
408
            raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
409
        # Generate processing job id
410
        data.job_id = generate_id()
411
        ocrd_tool = await self.get_network_agent_ocrd_tool(
412
            processor_name=data.processor_name,
413
            agent_type=data.agent_type
414
        )
415
        validate_job_input(self.log, data.processor_name, ocrd_tool, data)
416
417
        if data.workspace_id:
418
            # just a check whether the workspace exists in the database or not
419
            await get_from_database_workspace(self.log, data.workspace_id)
420
        else:  # data.path_to_mets provided instead
421
            await create_workspace_if_not_exists(self.log, mets_path=data.path_to_mets)
422
423
        workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id
424
        # initialize the request counter for the workspace_key
425
        self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=0)
426
427
        # This check is done to return early in case a workspace_id is provided
428
        # but the abs mets path cannot be queried from the DB
429
        request_mets_path = await validate_and_return_mets_path(self.log, data)
430
431
        page_ids = expand_page_ids(data.page_id)
432
433
        # A flag whether the current request must be cached
434
        # This is set to true if for any output file group there
435
        # is a page_id value that has been previously locked
436
        cache_current_request = False
437
438
        # Check if there are any dependencies of the current request
439
        if data.depends_on:
440
            cache_current_request = await self.cache_processing_requests.is_caching_required(data.depends_on)
441
442
        # No need for further check of locked pages dependency
443
        # if the request should be already cached
444
        if not cache_current_request:
445
            # Check if there are any locked pages for the current request
446
            cache_current_request = self.cache_locked_pages.check_if_locked_pages_for_output_file_grps(
447
                workspace_key=workspace_key,
448
                output_file_grps=data.output_file_grps,
449
                page_ids=page_ids
450
            )
451
452
        if cache_current_request:
453
            # Cache the received request
454
            self.cache_processing_requests.cache_request(workspace_key, data)
455
456
            # Create a cached job DB entry
457
            db_cached_job = DBProcessorJob(
458
                **data.dict(exclude_unset=True, exclude_none=True),
459
                internal_callback_url=self.internal_job_callback_url,
460
                state=JobState.cached
461
            )
462
            await db_cached_job.insert()
463
            return db_cached_job.to_job_output()
464
465
        # Lock the pages in the request
466
        self.cache_locked_pages.lock_pages(
467
            workspace_key=workspace_key,
468
            output_file_grps=data.output_file_grps,
469
            page_ids=page_ids
470
        )
471
472
        # Start a UDS Mets Server with the current workspace
473
        ws_dir_path = str(Path(request_mets_path).parent)
474
        mets_server_url = self.deployer.start_uds_mets_server(ws_dir_path=ws_dir_path)
475
        if self.use_tcp_mets:
476
            # let workers talk to mets server via tcp instead of using unix-socket
477
            mets_server_url = self.multiplexing_endpoint
478
479
        # Assign the mets server url in the database (workers read mets_server_url from db)
480
        await db_update_workspace(
481
            workspace_id=data.workspace_id,
482
            workspace_mets_path=data.path_to_mets,
483
            mets_server_url=mets_server_url
484
        )
485
486
        # Create a queued job DB entry
487
        db_queued_job = DBProcessorJob(
488
            **data.dict(exclude_unset=True, exclude_none=True),
489
            internal_callback_url=self.internal_job_callback_url,
490
            state=JobState.queued
491
        )
492
        await db_queued_job.insert()
493
        self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1)
494
        job_output = await self.push_job_to_network_agent(data=data, db_job=db_queued_job)
495
        return job_output
496
497
    async def push_job_to_network_agent(self, data: PYJobInput, db_job: DBProcessorJob) -> PYJobOutput:
498
        if data.agent_type != AgentType.PROCESSING_WORKER and data.agent_type != AgentType.PROCESSOR_SERVER:
499
            message = f"Unknown agent type: {data.agent_type}, {type(data.agent_type)}"
500
            raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message)
501
        job_output = None
502
        self.log.debug(f"Pushing to {data.agent_type}: {data.processor_name}, {data.page_id}, {data.job_id}")
503
        if data.agent_type == AgentType.PROCESSING_WORKER:
504
            job_output = await self.push_job_to_processing_queue(db_job=db_job)
505
        if data.agent_type == AgentType.PROCESSOR_SERVER:
506
            job_output = await self.push_job_to_processor_server(job_input=data)
507
        if not job_output:
508
            message = f"Failed to create job output for job input: {data}"
509
            raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message)
510
        return job_output
511
512
    async def push_job_to_processing_queue(self, db_job: DBProcessorJob) -> PYJobOutput:
513
        if not self.rmq_publisher:
514
            message = "The Processing Server has no connection to RabbitMQ Server. RMQPublisher is not connected."
515
            raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message)
516
        processing_message = create_processing_message(self.log, db_job)
517
        try:
518
            encoded_message = OcrdProcessingMessage.encode_yml(processing_message)
519
            self.rmq_publisher.publish_to_queue(queue_name=db_job.processor_name, message=encoded_message)
520
        except Exception as error:
521
            message = (
522
                f"Processing server has failed to push processing message to queue: {db_job.processor_name}, "
523
                f"Processing message: {processing_message.__dict__}"
524
            )
525
            raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error)
526
        return db_job.to_job_output()
527
528
    async def push_job_to_processor_server(self, job_input: PYJobInput) -> PYJobOutput:
529
        processor_server_base_url = self.deployer.resolve_processor_server_url(job_input.processor_name)
530
        return await forward_job_to_processor_server(
531
            self.log, job_input=job_input, processor_server_base_url=processor_server_base_url
532
        )
533
534
    async def get_processor_job(self, job_id: str) -> PYJobOutput:
535
        return await _get_processor_job(self.log, job_id)
536
537
    async def get_processor_job_log(self, job_id: str) -> FileResponse:
538
        return await _get_processor_job_log(self.log, job_id)
539
540
    async def _lock_pages_of_workspace(
541
        self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]
542
    ) -> None:
543
        # Lock the output file group pages for the current request
544
        self.cache_locked_pages.lock_pages(
545
            workspace_key=workspace_key,
546
            output_file_grps=output_file_grps,
547
            page_ids=page_ids
548
        )
549
550
    async def _unlock_pages_of_workspace(
551
        self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]
552
    ) -> None:
553
        self.cache_locked_pages.unlock_pages(
554
            workspace_key=workspace_key,
555
            output_file_grps=output_file_grps,
556
            page_ids=page_ids
557
        )
558
559
    async def push_cached_jobs_to_agents(self, processing_jobs: List[PYJobInput]) -> None:
560
        if not len(processing_jobs):
561
            self.log.debug("No processing jobs were consumed from the requests cache")
562
            return
563
        for data in processing_jobs:
564
            self.log.info(f"Changing the job status of: {data.job_id} from {JobState.cached} to {JobState.queued}")
565
            db_consumed_job = await db_update_processing_job(job_id=data.job_id, state=JobState.queued)
566
            workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id
567
568
            # Lock the output file group pages for the current request
569
            await self._lock_pages_of_workspace(
570
                workspace_key=workspace_key,
571
                output_file_grps=data.output_file_grps,
572
                page_ids=expand_page_ids(data.page_id)
573
            )
574
575
            self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1)
576
            job_output = await self.push_job_to_network_agent(data=data, db_job=db_consumed_job)
577
            if not job_output:
578
                self.log.exception(f"Failed to create job output for job input data: {data}")
579
580
    async def _cancel_cached_dependent_jobs(self, workspace_key: str, job_id: str) -> None:
581
        await self.cache_processing_requests.cancel_dependent_jobs(
582
            workspace_key=workspace_key,
583
            processing_job_id=job_id
584
        )
585
586
    async def _consume_cached_jobs_of_workspace(
587
        self, workspace_key: str, mets_server_url: str, path_to_mets: str
588
    ) -> List[PYJobInput]:
589
        # decrease the internal cache counter by 1
590
        request_counter = self.cache_processing_requests.update_request_counter(
591
            workspace_key=workspace_key, by_value=-1
592
        )
593
        self.log.debug(f"Internal processing job cache counter value: {request_counter}")
594
        if (workspace_key not in self.cache_processing_requests.processing_requests or
595
            not len(self.cache_processing_requests.processing_requests[workspace_key])):
596
            if request_counter <= 0:
597
                # Shut down the Mets Server for the workspace_key since no
598
                # more internal callbacks are expected for that workspace
599
                self.log.debug(f"Stopping the mets server: {mets_server_url}")
600
                self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url, path_to_mets=path_to_mets)
601
602
                try:
603
                    # The queue is empty - delete it
604
                    del self.cache_processing_requests.processing_requests[workspace_key]
605
                except KeyError:
606
                    self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}")
607
608
                # For debugging purposes it is good to see if any locked pages are left
609
                self.log.debug(f"Contents of the locked pages cache for: {workspace_key}")
610
                locked_pages = self.cache_locked_pages.get_locked_pages(workspace_key=workspace_key)
611
                for output_file_grp in locked_pages:
612
                    self.log.debug(f"{output_file_grp}: {locked_pages[output_file_grp]}")
613
            else:
614
                self.log.debug(f"Internal request cache is empty but waiting for {request_counter} result callbacks.")
615
            return []
616
        # Check whether the internal queue for the workspace key still exists
617
        if workspace_key not in self.cache_processing_requests.processing_requests:
618
            self.log.debug(f"No internal queue available for workspace with key: {workspace_key}")
619
            return []
620
        consumed_requests = await self.cache_processing_requests.consume_cached_requests(workspace_key=workspace_key)
621
        return consumed_requests
622
623
    async def remove_job_from_request_cache(self, result_message: PYResultMessage):
624
        result_job_id = result_message.job_id
625
        result_job_state = result_message.state
626
        path_to_mets = result_message.path_to_mets
627
        workspace_id = result_message.workspace_id
628
        self.log.info(f"Result job_id: {result_job_id}, state: {result_job_state}")
629
630
        db_workspace = await get_from_database_workspace(self.log, workspace_id, path_to_mets)
631
        mets_server_url = db_workspace.mets_server_url
632
        workspace_key = path_to_mets if path_to_mets else workspace_id
633
634
        if result_job_state == JobState.failed:
635
            await self._cancel_cached_dependent_jobs(workspace_key, result_job_id)
636
637
        if result_job_state != JobState.success:
638
            # TODO: Handle other potential error cases
639
            pass
640
641
        try:
642
            db_result_job = await db_get_processing_job(result_job_id)
643
            # Unlock the output file group pages for the result processing request
644
            await self._unlock_pages_of_workspace(
645
                workspace_key=workspace_key,
646
                output_file_grps=db_result_job.output_file_grps,
647
                page_ids=expand_page_ids(db_result_job.page_id)
648
            )
649
        except ValueError as error:
650
            message = f"Processing result job with id '{result_job_id}' not found in the DB."
651
            raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error)
652
653
        consumed_cached_jobs = await self._consume_cached_jobs_of_workspace(
654
            workspace_key=workspace_key, mets_server_url=mets_server_url, path_to_mets=path_to_mets
655
        )
656
        await self.push_cached_jobs_to_agents(processing_jobs=consumed_cached_jobs)
657
658
    async def list_processors(self) -> List[str]:
659
        # There is no caching on the Processing Server side
660
        processor_names_list = self.deployer.find_matching_network_agents(
661
            docker_only=False, native_only=False, worker_only=False, server_only=False,
662
            str_names_only=True, unique_only=True, sort=True
663
        )
664
        return processor_names_list
665
666
    async def task_sequence_to_processing_jobs(
667
        self,
668
        tasks: List[ProcessorTask],
669
        mets_path: str,
670
        page_id: str,
671
        agent_type: AgentType = AgentType.PROCESSING_WORKER
672
    ) -> List[PYJobOutput]:
673
        temp_file_group_cache = {}
674
        responses = []
675
        for task in tasks:
676
            # Find dependent jobs of the current task
677
            dependent_jobs = []
678
            for input_file_grp in task.input_file_grps:
679
                if input_file_grp in temp_file_group_cache:
680
                    dependent_jobs.append(temp_file_group_cache[input_file_grp])
681
            # NOTE: The `task.mets_path` and `task.page_id` is not utilized in low level
682
            # Thus, setting these two flags in the ocrd process workflow file has no effect
683
            job_input_data = PYJobInput(
684
                processor_name=task.executable,
685
                path_to_mets=mets_path,
686
                input_file_grps=task.input_file_grps,
687
                output_file_grps=task.output_file_grps,
688
                page_id=page_id,
689
                parameters=task.parameters,
690
                agent_type=agent_type,
691
                depends_on=dependent_jobs,
692
            )
693
            response = await self.validate_and_forward_job_to_network_agent(
694
                processor_name=job_input_data.processor_name,
695
                data=job_input_data
696
            )
697
            for file_group in task.output_file_grps:
698
                temp_file_group_cache[file_group] = response.job_id
699
            responses.append(response)
700
        return responses
701
702
    def validate_tasks_agents_existence(self, tasks: List[ProcessorTask], agent_type: AgentType) -> None:
703
        missing_agents = []
704
        for task in tasks:
705
            try:
706
                self.validate_agent_type_and_existence(processor_name=task.executable, agent_type=agent_type)
707
            except HTTPException:
708
                # catching the error is not relevant here
709
                missing_agents.append({task.executable, agent_type})
710
        if missing_agents:
711
            message = (
712
                "Workflow validation has failed. The desired network agents not found. "
713
                f"Missing processing agents: {missing_agents}"
714
            )
715
            raise_http_exception(self.log, status.HTTP_406_NOT_ACCEPTABLE, message)
716
717
    async def run_workflow(
718
        self,
719
        mets_path: str,
720
        workflow: Union[UploadFile, None] = File(None),
721
        workflow_id: str = None,
722
        agent_type: AgentType = AgentType.PROCESSING_WORKER,
723
        page_id: str = None,
724
        page_wise: bool = False,
725
        workflow_callback_url: str = None
726
    ) -> PYWorkflowJobOutput:
727
        await create_workspace_if_not_exists(self.log, mets_path=mets_path)
728
        workflow_content = await get_workflow_content(self.log, workflow_id, workflow)
729
        processing_tasks = parse_workflow_tasks(self.log, workflow_content)
730
731
        # Validate the input file groups of the first task in the workflow
732
        validate_first_task_input_file_groups_existence(self.log, mets_path, processing_tasks[0].input_file_grps)
733
734
        # Validate existence of agents (processing workers/processor servers)
735
        # for the ocr-d processors referenced inside tasks
736
        self.validate_tasks_agents_existence(processing_tasks, agent_type)
737
738
        page_ids = get_page_ids_list(self.log, mets_path, page_id)
739
740
        # TODO: Reconsider this, the compact page range may not always work if the page_ids are hashes!
741
        compact_page_range = f"{page_ids[0]}..{page_ids[-1]}"
742
743
        if not page_wise:
744
            responses = await self.task_sequence_to_processing_jobs(
745
                tasks=processing_tasks,
746
                mets_path=mets_path,
747
                page_id=compact_page_range,
748
                agent_type=agent_type
749
            )
750
            processing_job_ids = [response.job_id for response in responses]
751
            db_workflow_job = DBWorkflowJob(
752
                job_id=generate_id(),
753
                page_id=compact_page_range,
754
                page_wise=page_wise,
755
                processing_job_ids={compact_page_range: processing_job_ids},
756
                path_to_mets=mets_path,
757
                workflow_callback_url=workflow_callback_url
758
            )
759
            await db_workflow_job.insert()
760
            return db_workflow_job.to_job_output()
761
762
        all_pages_job_ids = {}
763
        for current_page in page_ids:
764
            responses = await self.task_sequence_to_processing_jobs(
765
                tasks=processing_tasks,
766
                mets_path=mets_path,
767
                page_id=current_page,
768
                agent_type=agent_type
769
            )
770
            processing_job_ids = [response.job_id for response in responses]
771
            all_pages_job_ids[current_page] = processing_job_ids
772
        db_workflow_job = DBWorkflowJob(
773
            job_id=generate_id(),
774
            page_id=compact_page_range,
775
            page_wise=page_wise,
776
            processing_job_ids=all_pages_job_ids,
777
            path_to_mets=mets_path,
778
            workflow_callback_url=workflow_callback_url
779
        )
780
        await db_workflow_job.insert()
781
        return db_workflow_job.to_job_output()
782
783
    @staticmethod
784
    def _produce_workflow_status_response(processing_jobs: List[DBProcessorJob]) -> Dict:
785
        response = {}
786
        failed_tasks = {}
787
        failed_tasks_key = "failed-processor-tasks"
788
        for p_job in processing_jobs:
789
            response.setdefault(p_job.processor_name, {})
790
            response[p_job.processor_name].setdefault(p_job.state.value, 0)
791
            response[p_job.processor_name][p_job.state.value] += 1
792
            if p_job.state == JobState.failed:
793
                if failed_tasks_key not in response:
794
                    response[failed_tasks_key] = failed_tasks
795
                failed_tasks.setdefault(p_job.processor_name, [])
796
                failed_tasks[p_job.processor_name].append(
797
                    {"job_id": p_job.job_id, "page_id": p_job.page_id}
798
                )
799
        return response
800
801
    @staticmethod
802
    def _produce_workflow_status_simple_response(processing_jobs: List[DBProcessorJob]) -> JobState:
803
        workflow_job_state = JobState.unset
804
        success_jobs = 0
805
        for p_job in processing_jobs:
806
            if p_job.state == JobState.cached or p_job.state == JobState.queued:
807
                continue
808
            if p_job.state == JobState.failed or p_job.state == JobState.cancelled:
809
                workflow_job_state = JobState.failed
810
                break
811
            if p_job.state == JobState.running:
812
                workflow_job_state = JobState.running
813
            if p_job.state == JobState.success:
814
                success_jobs += 1
815
        if len(processing_jobs) == success_jobs:
816
            workflow_job_state = JobState.success
817
        return workflow_job_state
818
819
    async def get_workflow_info(self, workflow_job_id) -> Dict:
820
        """ Return list of a workflow's processor jobs
821
        """
822
        workflow_job = await get_from_database_workflow_job(self.log, workflow_job_id)
823
        job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst]
824
        jobs = await db_get_processing_jobs(job_ids)
825
        response = self._produce_workflow_status_response(processing_jobs=jobs)
826
        return response
827
828
    async def kill_mets_server_zombies(self, minutes_ago : Optional[int] = None, dry_run : Optional[bool] = None) -> List[int]:
829
        pids_killed = kill_mets_server_zombies(minutes_ago=minutes_ago, dry_run=dry_run)
830
        return pids_killed
831
832
    async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, JobState]:
833
        """
834
        Simplified version of the `get_workflow_info` that returns a single state for the entire workflow.
835
        - If a single processing job fails, the entire workflow job status is set to FAILED.
836
        - If there are any processing jobs running, regardless of other states, such as QUEUED and CACHED,
837
        the entire workflow job status is set to RUNNING.
838
        - If all processing jobs has finished successfully, only then the workflow job status is set to SUCCESS
839
        """
840
        workflow_job = await get_from_database_workflow_job(self.log, workflow_job_id)
841
        job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst]
842
        jobs = await db_get_processing_jobs(job_ids)
843
        workflow_job_state = self._produce_workflow_status_simple_response(processing_jobs=jobs)
844
        return {"state": workflow_job_state}
845
846
    async def upload_workflow(self, workflow: UploadFile) -> Dict[str, str]:
847
        """ Store a script for a workflow in the database
848
        """
849
        workflow_content = await generate_workflow_content(workflow)
850
        validate_workflow(self.log, workflow_content)
851
        content_hash = generate_workflow_content_hash(workflow_content)
852
        try:
853
            db_workflow_script = await db_find_first_workflow_script_by_content(content_hash)
854
            if db_workflow_script:
855
                message = f"The same workflow script already exists, workflow id: {db_workflow_script.workflow_id}"
856
                raise_http_exception(self.log, status.HTTP_409_CONFLICT, message)
857
        except ValueError:
858
            pass
859
        workflow_id = generate_id()
860
        db_workflow_script = DBWorkflowScript(
861
            workflow_id=workflow_id,
862
            content=workflow_content,
863
            content_hash=content_hash
864
        )
865
        await db_workflow_script.insert()
866
        return {"workflow_id": workflow_id}
867
868
    async def replace_workflow(self, workflow_id, workflow: UploadFile) -> Dict[str, str]:
869
        """ Update a workflow script file in the database
870
        """
871
        try:
872
            db_workflow_script = await db_get_workflow_script(workflow_id)
873
            workflow_content = await generate_workflow_content(workflow)
874
            validate_workflow(self.log, workflow_content)
875
            db_workflow_script.content = workflow_content
876
            content_hash = generate_workflow_content_hash(workflow_content)
877
            db_workflow_script.content_hash = content_hash
878
            await db_workflow_script.save()
879
            return {"workflow_id": db_workflow_script.workflow_id}
880
        except ValueError as error:
881
            message = f"Workflow script not existing for id '{workflow_id}'."
882
            raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error)
883
884
    async def download_workflow(self, workflow_id) -> PlainTextResponse:
885
        """ Load workflow-script from the database
886
        """
887
        try:
888
            workflow = await db_get_workflow_script(workflow_id)
889
            return PlainTextResponse(workflow.content)
890
        except ValueError as error:
891
            message = f"Workflow script not existing for id '{workflow_id}'."
892
            raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error)
893