ocrd.mets_server.MpxReq.add_file()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 5
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
"""
2
# METS server functionality
3
"""
4
import os
5
import re
6
from os import _exit, chmod
7
import signal
8
from typing import Dict, Optional, Union, List, Tuple
9
from time import sleep
10
from pathlib import Path
11
from subprocess import Popen, run as subprocess_run
12
from urllib.parse import urlparse
13
import socket
14
import atexit
15
16
from fastapi import FastAPI, Request, Form, Response
17
from fastapi.responses import JSONResponse
18
from requests import Session as requests_session
19
from requests.exceptions import ConnectionError
20
from requests_unixsocket import Session as requests_unixsocket_session
21
from pydantic import BaseModel, Field, ValidationError
22
23
import uvicorn
24
25
from ocrd_models import OcrdFile, ClientSideOcrdFile, OcrdAgent, ClientSideOcrdAgent
26
from ocrd_utils import getLogger
27
28
29
#
30
# Models
31
#
32
33
34
class OcrdFileModel(BaseModel):
35
    file_grp: str = Field()
36
    file_id: str = Field()
37
    mimetype: str = Field()
38
    page_id: Optional[str] = Field()
39
    url: Optional[str] = Field()
40
    local_filename: Optional[str] = Field()
41
42
    @staticmethod
43
    def create(
44
        file_grp: str, file_id: str, page_id: Optional[str], url: Optional[str],
45
        local_filename: Optional[Union[str, Path]], mimetype: str
46
    ):
47
        return OcrdFileModel(
48
            file_grp=file_grp, file_id=file_id, page_id=page_id, mimetype=mimetype, url=url,
49
            local_filename=str(local_filename) if local_filename else None
50
        )
51
52
53
class OcrdAgentModel(BaseModel):
54
    name: str = Field()
55
    type: str = Field()
56
    role: str = Field()
57
    otherrole: Optional[str] = Field()
58
    othertype: str = Field()
59
    notes: Optional[List[Tuple[Dict[str, str], Optional[str]]]] = Field()
60
61
    @staticmethod
62
    def create(
63
        name: str, _type: str, role: str, otherrole: str, othertype: str,
64
        notes: List[Tuple[Dict[str, str], Optional[str]]]
65
    ):
66
        return OcrdAgentModel(name=name, type=_type, role=role, otherrole=otherrole, othertype=othertype, notes=notes)
67
68
69
class OcrdFileListModel(BaseModel):
70
    files: List[OcrdFileModel] = Field()
71
72
    @staticmethod
73
    def create(files: List[OcrdFile]):
74
        ret = OcrdFileListModel(
75
            files=[
76
                OcrdFileModel.create(
77
                    file_grp=f.fileGrp, file_id=f.ID, mimetype=f.mimetype, page_id=f.pageId, url=f.url,
78
                    local_filename=f.local_filename
79
                ) for f in files
80
            ]
81
        )
82
        return ret
83
84
85
class OcrdFileGroupListModel(BaseModel):
86
    file_groups: List[str] = Field()
87
88
    @staticmethod
89
    def create(file_groups: List[str]):
90
        return OcrdFileGroupListModel(file_groups=file_groups)
91
92
93
class OcrdPageListModel(BaseModel):
94
    physical_pages: List[str] = Field()
95
96
    @staticmethod
97
    def create(physical_pages: List[str]):
98
        return OcrdPageListModel(physical_pages=physical_pages)
99
100
101
class OcrdAgentListModel(BaseModel):
102
    agents: List[OcrdAgentModel] = Field()
103
104
    @staticmethod
105
    def create(agents: List[OcrdAgent]):
106
        return OcrdAgentListModel(
107
            agents=[
108
                OcrdAgentModel.create(
109
                    name=a.name, _type=a.type, role=a.role, otherrole=a.otherrole, othertype=a.othertype, notes=a.notes
110
                ) for a in agents
111
            ]
112
        )
113
114
115
#
116
# Client
117
#
118
119
120
class ClientSideOcrdMets:
121
    """
122
    Partial substitute for :py:class:`ocrd_models.ocrd_mets.OcrdMets` which provides for
123
    :py:meth:`ocrd_models.ocrd_mets.OcrdMets.find_files`,
124
    :py:meth:`ocrd_models.ocrd_mets.OcrdMets.find_all_files`, and
125
    :py:meth:`ocrd_models.ocrd_mets.OcrdMets.add_agent`,
126
    :py:meth:`ocrd_models.ocrd_mets.OcrdMets.agents`,
127
    :py:meth:`ocrd_models.ocrd_mets.OcrdMets.add_file` to query via HTTP a
128
    :py:class:`ocrd.mets_server.OcrdMetsServer`.
129
    """
130
131
    def __init__(self, url, workspace_path: Optional[str] = None):
132
        self.protocol = "tcp" if url.startswith("http://") else "uds"
133
        self.log = getLogger(f"ocrd.models.ocrd_mets.client.{url}")
134
        self.url = url if self.protocol == "tcp" else f'http+unix://{url.replace("/", "%2F").replace(".", "%2E")}'
135
        self.ws_dir_path = workspace_path if workspace_path else None
136
137
        if self.protocol == "tcp" and "tcp_mets" in self.url:
138
            self.multiplexing_mode = True
139
            if not self.ws_dir_path:
140
                # Must be set since this path is the way to multiplex among multiple workspaces on the PS side
141
                raise ValueError("ClientSideOcrdMets runs in multiplexing mode but the workspace dir path is not set!")
142
        else:
143
            self.multiplexing_mode = False
144
145
    @property
146
    def session(self) -> Union[requests_session, requests_unixsocket_session]:
147
        return requests_session() if self.protocol == "tcp" else requests_unixsocket_session()
148
149
    def __getattr__(self, name):
150
        raise NotImplementedError(f"ClientSideOcrdMets has no access to '{name}' - try without METS server")
151
152
    def __str__(self):
153
        return f"<ClientSideOcrdMets[url={self.url}]>"
154
155
    def save(self):
156
        """
157
        Request writing the changes to the file system
158
        """
159
        if not self.multiplexing_mode:
160
            return self.session.request("PUT", url=self.url).text
161
        else:
162
            return self.session.request(
163
                "POST",
164
                self.url,
165
                json=MpxReq.save(self.ws_dir_path)
166
            ).json()["text"]
167
168
    def stop(self):
169
        """
170
        Request stopping the mets server
171
        """
172
        try:
173
            if not self.multiplexing_mode:
174
                return self.session.request("DELETE", self.url).text
175
            else:
176
                return self.session.request(
177
                    "POST",
178
                    self.url,
179
                    json=MpxReq.stop(self.ws_dir_path)
180
                ).json()["text"]
181
        except ConnectionError:
182
            # Expected because we exit the process without returning
183
            pass
184
185
    def reload(self):
186
        """
187
        Request reloading of the mets file from the file system
188
        """
189
        if not self.multiplexing_mode:
190
            return self.session.request("POST", f"{self.url}/reload").text
191
        else:
192
            return self.session.request(
193
                "POST",
194
                self.url,
195
                json=MpxReq.reload(self.ws_dir_path)
196
            ).json()["text"]
197
198
    @property
199
    def unique_identifier(self):
200
        if not self.multiplexing_mode:
201
            return self.session.request("GET", f"{self.url}/unique_identifier").text
202
        else:
203
            return self.session.request(
204
                "POST",
205
                self.url,
206
                json=MpxReq.unique_identifier(self.ws_dir_path)
207
            ).json()["text"]
208
209
    @property
210
    def workspace_path(self):
211
        if not self.multiplexing_mode:
212
            self.ws_dir_path = self.session.request("GET", f"{self.url}/workspace_path").text
213
            return self.ws_dir_path
214
        else:
215
            self.ws_dir_path = self.session.request(
216
                "POST",
217
                self.url,
218
                json=MpxReq.workspace_path(self.ws_dir_path)
219
            ).json()["text"]
220
            return self.ws_dir_path
221
222
    @property
223
    def physical_pages(self) -> List[str]:
224
        if not self.multiplexing_mode:
225
            return self.session.request("GET", f"{self.url}/physical_pages").json()["physical_pages"]
226
        else:
227
            return self.session.request(
228
                "POST",
229
                self.url,
230
                json=MpxReq.physical_pages(self.ws_dir_path)
231
            ).json()["physical_pages"]
232
233
    @property
234
    def file_groups(self):
235
        if not self.multiplexing_mode:
236
            return self.session.request("GET", f"{self.url}/file_groups").json()["file_groups"]
237
        else:
238
            return self.session.request(
239
                "POST",
240
                self.url,
241
                json=MpxReq.file_groups(self.ws_dir_path)
242
            ).json()["file_groups"]
243
244
    @property
245
    def agents(self):
246
        if not self.multiplexing_mode:
247
            agent_dicts = self.session.request("GET", f"{self.url}/agent").json()["agents"]
248
        else:
249
            agent_dicts = self.session.request(
250
                "POST",
251
                self.url,
252
                json=MpxReq.agents(self.ws_dir_path)
253
            ).json()["agents"]
254
255
        for agent_dict in agent_dicts:
256
            agent_dict["_type"] = agent_dict.pop("type")
257
        return [ClientSideOcrdAgent(None, **agent_dict) for agent_dict in agent_dicts]
258
259
    def add_agent(self, **kwargs):
260
        if not self.multiplexing_mode:
261
            return self.session.request("POST", f"{self.url}/agent", json=OcrdAgentModel.create(**kwargs).dict())
262
        else:
263
            self.session.request(
264
                "POST",
265
                self.url,
266
                json=MpxReq.add_agent(self.ws_dir_path, OcrdAgentModel.create(**kwargs).dict())
267
            ).json()
268
            return OcrdAgentModel.create(**kwargs)
269
270
    def find_files(self, **kwargs):
271
        self.log.debug("find_files(%s)", kwargs)
272
        # translate from native OcrdMets kwargs to OcrdMetsServer REST params
273
        if "pageId" in kwargs:
274
            kwargs["page_id"] = kwargs.pop("pageId")
275
        if "ID" in kwargs:
276
            kwargs["file_id"] = kwargs.pop("ID")
277
        if "fileGrp" in kwargs:
278
            kwargs["file_grp"] = kwargs.pop("fileGrp")
279
280
        if not self.multiplexing_mode:
281
            r = self.session.request(method="GET", url=f"{self.url}/file", params={**kwargs})
282
        else:
283
            r = self.session.request(
284
                "POST",
285
                self.url,
286
                json=MpxReq.find_files(self.ws_dir_path, {**kwargs})
287
            )
288
289
        for f in r.json()["files"]:
290
            yield ClientSideOcrdFile(
291
                None, ID=f["file_id"], pageId=f["page_id"], fileGrp=f["file_grp"], url=f["url"],
292
                local_filename=f["local_filename"], mimetype=f["mimetype"]
293
            )
294
295
    def find_all_files(self, *args, **kwargs):
296
        return list(self.find_files(*args, **kwargs))
297
298
    def add_file(
299
        self, file_grp, content=None, ID=None, url=None, local_filename=None, mimetype=None, pageId=None, **kwargs
300
    ):
301
        data = OcrdFileModel.create(
302
            file_grp=file_grp,
303
            # translate from native OcrdMets kwargs to OcrdMetsServer REST params
304
            file_id=ID, page_id=pageId,
305
            mimetype=mimetype, url=url, local_filename=local_filename
306
        )
307
        # add force+ignore
308
        kwargs = {**kwargs, **data.dict()}
309
310
        if not self.multiplexing_mode:
311
            r = self.session.request("POST", f"{self.url}/file", data=kwargs)
312
            if not r.ok:
313
                raise RuntimeError(f"Failed to add file ({str(data)}): {r.json()}")
314
        else:
315
            r = self.session.request("POST", self.url, json=MpxReq.add_file(self.ws_dir_path, kwargs))
316
            if not r.ok:
317
                raise RuntimeError(f"Failed to add file ({str(data)}): {r.json()}")
318
319
        return ClientSideOcrdFile(
320
            None, fileGrp=file_grp,
321
            ID=ID, pageId=pageId,
322
            url=url, mimetype=mimetype, local_filename=local_filename
323
        )
324
325
326
class MpxReq:
327
    """This class wraps the request bodies needed for the tcp forwarding
328
329
    For every mets-server-call like find_files or workspace_path a special request_body is
330
    needed to call `MetsServerProxy.forward_tcp_request`. These are created by this functions.
331
332
    Reason to put this to a separate class is to allow easier testing
333
    """
334
335
    @staticmethod
336
    def __args_wrapper(
337
        workspace_path: str, method_type: str, response_type: str, request_url: str, request_data: dict
338
    ) -> Dict:
339
        return {
340
            "workspace_path": workspace_path,
341
            "method_type": method_type,
342
            "response_type": response_type,
343
            "request_url": request_url,
344
            "request_data": request_data
345
        }
346
347
    @staticmethod
348
    def save(ws_dir_path: str) -> Dict:
349
        return MpxReq.__args_wrapper(
350
            ws_dir_path, method_type="PUT", response_type="text", request_url="", request_data={})
351
352
    @staticmethod
353
    def stop(ws_dir_path: str) -> Dict:
354
        return MpxReq.__args_wrapper(
355
            ws_dir_path, method_type="DELETE", response_type="text", request_url="", request_data={})
356
357
    @staticmethod
358
    def reload(ws_dir_path: str) -> Dict:
359
        return MpxReq.__args_wrapper(
360
            ws_dir_path, method_type="POST", response_type="text", request_url="reload", request_data={})
361
362
    @staticmethod
363
    def unique_identifier(ws_dir_path: str) -> Dict:
364
        return MpxReq.__args_wrapper(
365
            ws_dir_path, method_type="GET", response_type="text", request_url="unique_identifier", request_data={})
366
367
    @staticmethod
368
    def workspace_path(ws_dir_path: str) -> Dict:
369
        return MpxReq.__args_wrapper(
370
            ws_dir_path, method_type="GET", response_type="text", request_url="workspace_path", request_data={})
371
372
    @staticmethod
373
    def physical_pages(ws_dir_path: str) -> Dict:
374
        return MpxReq.__args_wrapper(
375
            ws_dir_path, method_type="GET", response_type="dict", request_url="physical_pages", request_data={})
376
377
    @staticmethod
378
    def file_groups(ws_dir_path: str) -> Dict:
379
        return MpxReq.__args_wrapper(
380
            ws_dir_path, method_type="GET", response_type="dict", request_url="file_groups", request_data={})
381
382
    @staticmethod
383
    def agents(ws_dir_path: str) -> Dict:
384
        return MpxReq.__args_wrapper(
385
            ws_dir_path, method_type="GET", response_type="class", request_url="agent", request_data={})
386
387
    @staticmethod
388
    def add_agent(ws_dir_path: str, agent_model: Dict) -> Dict:
389
        request_data = {"class": agent_model}
390
        return MpxReq.__args_wrapper(
391
            ws_dir_path, method_type="POST", response_type="class", request_url="agent", request_data=request_data)
392
393
    @staticmethod
394
    def find_files(ws_dir_path: str, params: Dict) -> Dict:
395
        request_data = {"params": params}
396
        return MpxReq.__args_wrapper(
397
            ws_dir_path, method_type="GET", response_type="class", request_url="file", request_data=request_data)
398
399
    @staticmethod
400
    def add_file(ws_dir_path: str, data: Dict) -> Dict:
401
        request_data = {"form": data}
402
        return MpxReq.__args_wrapper(
403
            ws_dir_path, method_type="POST", response_type="class", request_url="file", request_data=request_data)
404
405
#
406
# Server
407
#
408
409
410
class OcrdMetsServer:
411
    def __init__(self, workspace, url):
412
        self.workspace = workspace
413
        self.url = url
414
        self.is_uds = not (url.startswith('http://') or url.startswith('https://'))
415
        self.log = getLogger(f'ocrd.models.ocrd_mets.server.{self.url}')
416
417
    @staticmethod
418
    def create_process(mets_server_url: str, ws_dir_path: str, log_file: str) -> int:
419
        sub_process = Popen(
420
            args=["ocrd", "workspace", "-U", f"{mets_server_url}", "-d", f"{ws_dir_path}", "server", "start"],
421
            stdout=open(file=log_file, mode="w"), stderr=open(file=log_file, mode="a"), cwd=ws_dir_path,
422
            shell=False, universal_newlines=True, start_new_session=True
423
        )
424
        # Wait for the mets server to start
425
        sleep(2)
426
        if sub_process.poll():
427
            raise RuntimeError(f"Mets server starting failed. See {log_file} for errors")
428
        return sub_process.pid
429
430
    @staticmethod
431
    def kill_process(mets_server_pid: int):
432
        os.kill(mets_server_pid, signal.SIGINT)
433
        sleep(3)
434
        try:
435
            os.kill(mets_server_pid, signal.SIGKILL)
436
        except ProcessLookupError as e:
437
            pass
438
439
    def shutdown(self):
440
        pid = os.getpid()
441
        self.log.info(f"Shutdown method of mets server[{pid}] invoked, sending SIGTERM signal.")
442
        os.kill(pid, signal.SIGTERM)
443
        if self.is_uds:
444
            if Path(self.url).exists():
445
                self.log.warning(f"Due to a server shutdown, removing the existing UDS socket file: {self.url}")
446
                Path(self.url).unlink()
447
448
    def startup(self):
449
        self.log.info(f"Configuring the Mets Server")
450
451
        workspace = self.workspace
452
453
        app = FastAPI(
454
            title="OCR-D METS Server",
455
            description="Providing simultaneous write-access to mets.xml for OCR-D",
456
        )
457
458
        @app.exception_handler(ValidationError)
459
        async def exception_handler_validation_error(request: Request, exc: ValidationError):
460
            return JSONResponse(status_code=400, content=exc.errors())
461
462
        @app.exception_handler(FileExistsError)
463
        async def exception_handler_file_exists(request: Request, exc: FileExistsError):
464
            return JSONResponse(status_code=400, content=str(exc))
465
466
        @app.exception_handler(re.error)
467
        async def exception_handler_invalid_regex(request: Request, exc: re.error):
468
            return JSONResponse(status_code=400, content=f'invalid regex: {exc}')
469
470
        @app.put(path='/')
471
        def save():
472
            """
473
            Write current changes to the file system
474
            """
475
            workspace.save_mets()
476
            response = Response(content="The Mets Server is writing changes to disk.", media_type='text/plain')
477
            self.log.debug(f"PUT / -> {response.__dict__}")
478
            return response
479
480
        @app.delete(path='/')
481
        def stop():
482
            """
483
            Stop the mets server
484
            """
485
            workspace.save_mets()
486
            response = Response(content="The Mets Server will shut down soon...", media_type='text/plain')
487
            self.shutdown()
488
            self.log.debug(f"DELETE / -> {response.__dict__}")
489
            return response
490
491
        @app.post(path='/reload')
492
        def workspace_reload_mets():
493
            """
494
            Reload mets file from the file system
495
            """
496
            workspace.reload_mets()
497
            response = Response(content=f"Reloaded from {workspace.directory}", media_type='text/plain')
498
            self.log.debug(f"POST /reload -> {response.__dict__}")
499
            return response
500
501
        @app.get(path='/unique_identifier', response_model=str)
502
        async def unique_identifier():
503
            response = Response(content=workspace.mets.unique_identifier, media_type='text/plain')
504
            self.log.debug(f"GET /unique_identifier -> {response.__dict__}")
505
            return response
506
507
        @app.get(path='/workspace_path', response_model=str)
508
        async def workspace_path():
509
            response = Response(content=workspace.directory, media_type="text/plain")
510
            self.log.debug(f"GET /workspace_path -> {response.__dict__}")
511
            return response
512
513
        @app.get(path='/physical_pages', response_model=OcrdPageListModel)
514
        async def physical_pages():
515
            response = {'physical_pages': workspace.mets.physical_pages}
516
            self.log.debug(f"GET /physical_pages -> {response}")
517
            return response
518
519
        @app.get(path='/physical_pages', response_model=OcrdPageListModel)
520
        async def physical_pages():
521
            return {'physical_pages': workspace.mets.physical_pages}
522
523
        @app.get(path='/file_groups', response_model=OcrdFileGroupListModel)
524
        async def file_groups():
525
            response = {'file_groups': workspace.mets.file_groups}
526
            self.log.debug(f"GET /file_groups -> {response}")
527
            return response
528
529
        @app.get(path='/agent', response_model=OcrdAgentListModel)
530
        async def agents():
531
            response = OcrdAgentListModel.create(workspace.mets.agents)
532
            self.log.debug(f"GET /agent -> {response.__dict__}")
533
            return response
534
535
        @app.post(path='/agent', response_model=OcrdAgentModel)
536
        async def add_agent(agent: OcrdAgentModel):
537
            kwargs = agent.dict()
538
            kwargs['_type'] = kwargs.pop('type')
539
            workspace.mets.add_agent(**kwargs)
540
            response = agent
541
            self.log.debug(f"POST /agent -> {response.__dict__}")
542
            return response
543
544
        @app.get(path="/file", response_model=OcrdFileListModel)
545
        async def find_files(
546
            file_grp: Optional[str] = None,
547
            file_id: Optional[str] = None,
548
            page_id: Optional[str] = None,
549
            mimetype: Optional[str] = None,
550
            local_filename: Optional[str] = None,
551
            url: Optional[str] = None
552
        ):
553
            """
554
            Find files in the mets
555
            """
556
            found = workspace.mets.find_all_files(
557
                fileGrp=file_grp, ID=file_id, pageId=page_id, mimetype=mimetype, local_filename=local_filename, url=url
558
            )
559
            response = OcrdFileListModel.create(found)
560
            self.log.debug(f"GET /file -> {response.__dict__}")
561
            return response
562
563
        @app.post(path='/file', response_model=OcrdFileModel)
564
        async def add_file(
565
            file_grp: str = Form(),
566
            file_id: str = Form(),
567
            page_id: Optional[str] = Form(None),
568
            mimetype: str = Form(),
569
            url: Optional[str] = Form(None),
570
            local_filename: Optional[str] = Form(None),
571
            force: bool = Form(False),
572
        ):
573
            """
574
            Add a file
575
            """
576
            # Validate
577
            file_resource = OcrdFileModel.create(
578
                file_grp=file_grp, file_id=file_id, page_id=page_id, mimetype=mimetype, url=url,
579
                local_filename=local_filename
580
            )
581
            # Add to workspace
582
            kwargs = file_resource.dict()
583
            workspace.add_file(**kwargs, force=force)
584
            response = file_resource
585
            self.log.debug(f"POST /file -> {response.__dict__}")
586
            return response
587
588
        # ------------- #
589
590
        if self.is_uds:
591
            # Create socket and change to world-readable and -writable to avoid permission errors
592
            self.log.debug(f"chmod 0o677 {self.url}")
593
            server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
594
            server.bind(self.url)  # creates the socket file
595
            atexit.register(self.shutdown)
596
            server.close()
597
            chmod(self.url, 0o666)
598
            uvicorn_kwargs = {'uds': self.url}
599
        else:
600
            parsed = urlparse(self.url)
601
            uvicorn_kwargs = {'host': parsed.hostname, 'port': parsed.port}
602
        uvicorn_kwargs['log_config'] = None
603
        uvicorn_kwargs['access_log'] = False
604
605
        self.log.info("Starting the uvicorn Mets Server")
606
        uvicorn.run(app, **uvicorn_kwargs)
607