Passed
Push — master ( de0845...2128be )
by Konstantin
03:03
created

ocrd_network.processing_server   A

Complexity

Total Complexity 31

Size/Duplication

Total Lines 315
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 31
eloc 215
dl 0
loc 315
rs 9.92
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
A ProcessingServer.processor_list() 0 10 4
A ProcessingServer.stop_deployed_agents() 0 2 1
A ProcessingServer.start() 0 31 2
A ProcessingServer.create_processing_message() 0 16 1
A ProcessingServer.on_startup() 0 2 1
A ProcessingServer.on_shutdown() 0 7 1
A ProcessingServer.create_message_queues() 0 9 3
A ProcessingServer.connect_publisher() 0 18 2
B ProcessingServer.__init__() 0 79 1
A ProcessingServer.list_processors() 0 4 1
A ProcessingServer.get_processor_info() 0 9 2
A ProcessingServer.get_job() 0 10 2
C ProcessingServer.push_processor_job() 0 65 10
1
from typing import Dict
2
import uvicorn
3
4
from fastapi import FastAPI, status, Request, HTTPException
5
from fastapi.exceptions import RequestValidationError
6
from fastapi.responses import JSONResponse
7
8
from pika.exceptions import ChannelClosedByBroker
9
10
from ocrd_utils import getLogger, get_ocrd_tool_json
11
from ocrd_validators import ParameterValidator
12
from .database import (
13
    db_get_processing_job,
14
    db_get_workspace,
15
    initiate_database
16
)
17
from .deployer import Deployer
18
from .deployment_config import ProcessingServerConfig
19
from .rabbitmq_utils import RMQPublisher, OcrdProcessingMessage
20
from .models import (
21
    DBProcessorJob,
22
    PYJobInput,
23
    PYJobOutput,
24
    StateEnum
25
)
26
from .utils import generate_created_time, generate_id
27
28
29
class ProcessingServer(FastAPI):
30
    """FastAPI app to make ocr-d processor calls
31
32
    The Processing-Server receives calls conforming to the ocr-d webapi regarding the processing
33
    part. It can run ocrd-processors and provides endpoints to discover processors and watch the job
34
    status.
35
    The Processing-Server does not execute the processors itself but starts up a queue and a
36
    database to delegate the calls to processing workers. They are started by the Processing-Server
37
    and the communication goes through the queue.
38
    """
39
40
    def __init__(self, config_path: str, host: str, port: int) -> None:
41
        super().__init__(on_startup=[self.on_startup], on_shutdown=[self.on_shutdown],
42
                         title='OCR-D Processing Server',
43
                         description='OCR-D processing and processors')
44
        self.log = getLogger(__name__)
45
        self.hostname = host
46
        self.port = port
47
        self.config = ProcessingServerConfig(config_path)
48
        self.deployer = Deployer(self.config)
49
        self.mongodb_url = None
50
        self.rmq_host = self.config.queue.address
51
        self.rmq_port = self.config.queue.port
52
        self.rmq_vhost = '/'
53
        self.rmq_username = self.config.queue.credentials[0]
54
        self.rmq_password = self.config.queue.credentials[1]
55
56
        # Gets assigned when `connect_publisher` is called on the working object
57
        self.rmq_publisher = None
58
59
        # This list holds all processors mentioned in the config file
60
        self._processor_list = None
61
62
        # Create routes
63
        self.router.add_api_route(
64
            path='/stop',
65
            endpoint=self.stop_deployed_agents,
66
            methods=['POST'],
67
            tags=['tools'],
68
            summary='Stop database, queue and processing-workers',
69
        )
70
71
        self.router.add_api_route(
72
            path='/processor/{processor_name}',
73
            endpoint=self.push_processor_job,
74
            methods=['POST'],
75
            tags=['processing'],
76
            status_code=status.HTTP_200_OK,
77
            summary='Submit a job to this processor',
78
            response_model=PYJobOutput,
79
            response_model_exclude_unset=True,
80
            response_model_exclude_none=True
81
        )
82
83
        self.router.add_api_route(
84
            path='/processor/{processor_name}/{job_id}',
85
            endpoint=self.get_job,
86
            methods=['GET'],
87
            tags=['processing'],
88
            status_code=status.HTTP_200_OK,
89
            summary='Get information about a job based on its ID',
90
            response_model=PYJobOutput,
91
            response_model_exclude_unset=True,
92
            response_model_exclude_none=True
93
        )
94
95
        self.router.add_api_route(
96
            path='/processor/{processor_name}',
97
            endpoint=self.get_processor_info,
98
            methods=['GET'],
99
            tags=['processing', 'discovery'],
100
            status_code=status.HTTP_200_OK,
101
            summary='Get information about this processor',
102
        )
103
104
        self.router.add_api_route(
105
            path='/processor',
106
            endpoint=self.list_processors,
107
            methods=['GET'],
108
            tags=['processing', 'discovery'],
109
            status_code=status.HTTP_200_OK,
110
            summary='Get a list of all available processors',
111
        )
112
113
        @self.exception_handler(RequestValidationError)
114
        async def validation_exception_handler(request: Request, exc: RequestValidationError):
115
            exc_str = f'{exc}'.replace('\n', ' ').replace('   ', ' ')
116
            self.log.error(f'{request}: {exc_str}')
117
            content = {'status_code': 10422, 'message': exc_str, 'data': None}
118
            return JSONResponse(content=content, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
119
120
    def start(self) -> None:
121
        """ deploy agents (db, queue, workers) and start the processing server with uvicorn
122
        """
123
        try:
124
            rabbitmq_hostinfo = self.deployer.deploy_rabbitmq(
125
                image='rabbitmq:3-management', detach=True, remove=True)
126
127
            # Assign the credentials to the rabbitmq url parameter
128
            rabbitmq_url = f'amqp://{self.rmq_username}:{self.rmq_password}@{rabbitmq_hostinfo}'
129
130
            mongodb_hostinfo = self.deployer.deploy_mongodb(
131
                image='mongo', detach=True, remove=True)
132
133
            self.mongodb_url = f'mongodb://{mongodb_hostinfo}'
134
135
            # The RMQPublisher is initialized and a connection to the RabbitMQ is performed
136
            self.connect_publisher()
137
138
            self.log.debug(f'Creating message queues on RabbitMQ instance url: {rabbitmq_url}')
139
            self.create_message_queues()
140
141
            # Deploy processing hosts where processing workers are running on
142
            # Note: A deployed processing worker starts listening to a message queue with id
143
            #       processor.name
144
            self.deployer.deploy_hosts(rabbitmq_url, self.mongodb_url)
145
        except Exception:
146
            self.log.error('Error during startup of processing server. '
147
                           'Trying to kill parts of incompletely deployed service')
148
            self.deployer.kill_all()
149
            raise
150
        uvicorn.run(self, host=self.hostname, port=int(self.port))
151
152
    async def on_startup(self):
153
        await initiate_database(db_url=self.mongodb_url)
154
155
    async def on_shutdown(self) -> None:
156
        """
157
        - hosts and pids should be stored somewhere
158
        - ensure queue is empty or processor is not currently running
159
        - connect to hosts and kill pids
160
        """
161
        await self.stop_deployed_agents()
162
163
    async def stop_deployed_agents(self) -> None:
164
        self.deployer.kill_all()
165
166
    def connect_publisher(self, enable_acks: bool = True) -> None:
167
        self.log.info(f'Connecting RMQPublisher to RabbitMQ server: '
168
                      f'{self.rmq_host}:{self.rmq_port}{self.rmq_vhost}')
169
        self.rmq_publisher = RMQPublisher(
170
            host=self.rmq_host,
171
            port=self.rmq_port,
172
            vhost=self.rmq_vhost
173
        )
174
        self.log.debug(f'RMQPublisher authenticates with username: '
175
                       f'{self.rmq_username}, password: {self.rmq_password}')
176
        self.rmq_publisher.authenticate_and_connect(
177
            username=self.rmq_username,
178
            password=self.rmq_password
179
        )
180
        if enable_acks:
181
            self.rmq_publisher.enable_delivery_confirmations()
182
            self.log.info('Delivery confirmations are enabled')
183
        self.log.info('Successfully connected RMQPublisher.')
184
185
    def create_message_queues(self) -> None:
186
        """Create the message queues based on the occurrence of `processor.name` in the config file
187
        """
188
        for host in self.config.hosts:
189
            for processor in host.processors:
190
                # The existence/validity of the processor.name is not tested.
191
                # Even if an ocr-d processor does not exist, the queue is created
192
                self.log.info(f'Creating a message queue with id: {processor.name}')
193
                self.rmq_publisher.create_queue(queue_name=processor.name)
194
195
    @property
196
    def processor_list(self):
197
        if self._processor_list:
198
            return self._processor_list
199
        res = set([])
200
        for host in self.config.hosts:
201
            for processor in host.processors:
202
                res.add(processor.name)
203
        self._processor_list = list(res)
204
        return self._processor_list
205
206
    @staticmethod
207
    def create_processing_message(job: DBProcessorJob) -> OcrdProcessingMessage:
208
        processing_message = OcrdProcessingMessage(
209
            job_id=job.job_id,
210
            processor_name=job.processor_name,
211
            created_time=generate_created_time(),
212
            path_to_mets=job.path_to_mets,
213
            workspace_id=job.workspace_id,
214
            input_file_grps=job.input_file_grps,
215
            output_file_grps=job.output_file_grps,
216
            page_id=job.page_id,
217
            parameters=job.parameters,
218
            result_queue_name=job.result_queue_name,
219
            callback_url=job.callback_url,
220
        )
221
        return processing_message
222
223
    async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJobOutput:
224
        """ Queue a processor job
225
        """
226
        if not self.rmq_publisher:
227
            raise Exception('RMQPublisher is not connected')
228
229
        if processor_name not in self.processor_list:
230
            try:
231
                # Only checks if the process queue exists, if not raises ChannelClosedByBroker
232
                self.rmq_publisher.create_queue(processor_name, passive=True)
233
            except ChannelClosedByBroker as error:
234
                self.log.warning(f"Process queue with id '{processor_name}' not existing: {error}")
235
                # Reconnect publisher - not efficient, but works
236
                # TODO: Revisit when reconnection strategy is implemented
237
                self.connect_publisher(enable_acks=True)
238
                raise HTTPException(
239
                    status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
240
                    detail=f"Process queue with id '{processor_name}' not existing"
241
                )
242
243
        # validate parameters
244
        ocrd_tool = get_ocrd_tool_json(processor_name)
245
        if not ocrd_tool:
246
            raise HTTPException(
247
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
248
                detail=f"Processor '{processor_name}' not available. Empty or missing ocrd_tool"
249
            )
250
        report = ParameterValidator(ocrd_tool).validate(dict(data.parameters))
251
        if not report.is_valid:
252
            raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=report.errors)
253
254
        if bool(data.path_to_mets) == bool(data.workspace_id):
255
            raise HTTPException(
256
                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
257
                detail="Either 'path' or 'workspace_id' must be provided, but not both"
258
            )
259
        # This check is done to return early in case
260
        # the workspace_id is provided but not existing in the DB
261
        elif data.workspace_id:
262
            try:
263
                await db_get_workspace(data.workspace_id)
264
            except ValueError:
265
                raise HTTPException(
266
                    status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
267
                    detail=f"Workspace with id '{data.workspace_id}' not existing"
268
                )
269
270
        job = DBProcessorJob(
271
            **data.dict(exclude_unset=True, exclude_none=True),
272
            job_id=generate_id(),
273
            processor_name=processor_name,
274
            state=StateEnum.queued
275
        )
276
        await job.insert()
277
        processing_message = self.create_processing_message(job)
278
        encoded_processing_message = OcrdProcessingMessage.encode_yml(processing_message)
279
280
        try:
281
            self.rmq_publisher.publish_to_queue(processor_name, encoded_processing_message)
282
        except Exception as error:
283
            raise HTTPException(
284
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
285
                detail=f'RMQPublisher has failed: {error}'
286
            )
287
        return job.to_job_output()
288
289
    async def get_processor_info(self, processor_name) -> Dict:
290
        """ Return a processor's ocrd-tool.json
291
        """
292
        if processor_name not in self.processor_list:
293
            raise HTTPException(
294
                status_code=status.HTTP_404_NOT_FOUND,
295
                detail='Processor not available'
296
            )
297
        return get_ocrd_tool_json(processor_name)
298
299
    async def get_job(self, processor_name: str, job_id: str) -> PYJobOutput:
300
        """ Return processing job-information from the database
301
        """
302
        try:
303
            job = await db_get_processing_job(job_id)
304
            return job.to_job_output()
305
        except ValueError:
306
            raise HTTPException(
307
                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
308
                detail=f"Processing job with id '{job_id}' of processor type '{processor_name}' not existing"
309
            )
310
311
    async def list_processors(self) -> str:
312
        """ Return a list of all available processors
313
        """
314
        return self.processor_list
315