Passed
Push — master ( 18ecf8...407c02 )
by Konstantin
03:05
created

ocrd_network.client_utils   A

Complexity

Total Complexity 19

Size/Duplication

Total Lines 102
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 19
eloc 75
dl 0
loc 102
rs 10
c 0
b 0
f 0

11 Functions

Rating   Name   Duplication   Size   Complexity  
A poll_wf_status_till_timeout_fail_or_success() 0 2 1
A poll_job_status_till_timeout_fail_or_success() 0 2 1
B _poll_endpoint_status() 0 14 7
A get_ps_processing_job_status() 0 7 1
A verify_server_protocol() 0 5 3
A post_ps_workflow_request() 0 13 1
A post_ps_processing_request() 0 11 1
A get_ps_deployed_processor_ocrd_tool() 0 5 1
A get_ps_processing_job_log() 0 4 1
A get_ps_workflow_job_status() 0 7 1
A get_ps_deployed_processors() 0 5 1
1
from requests import get as request_get, post as request_post
2
from time import sleep
3
from .constants import JobState, NETWORK_PROTOCOLS
4
5
6
def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int):
7
    if job_type not in ["workflow", "processor"]:
8
        raise ValueError(f"Unknown job type '{job_type}', expected 'workflow' or 'processor'")
9
    job_state = JobState.unset
10
    while tries > 0:
11
        sleep(wait)
12
        if job_type == "processor":
13
            job_state = get_ps_processing_job_status(ps_server_host, job_id)
14
        if job_type == "workflow":
15
            job_state = get_ps_workflow_job_status(ps_server_host, job_id)
16
        if job_state == JobState.success or job_state == JobState.failed:
17
            break
18
        tries -= 1
19
    return job_state
20
21
22
def poll_job_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState:
23
    return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait)
24
25
26
def poll_wf_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState:
27
    return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait)
28
29
30
def get_ps_deployed_processors(ps_server_host: str):
31
    request_url = f"{ps_server_host}/processor"
32
    response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
33
    assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
34
    return response.json()
35
36
37
def get_ps_deployed_processor_ocrd_tool(ps_server_host: str, processor_name: str):
38
    request_url = f"{ps_server_host}/processor/info/{processor_name}"
39
    response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
40
    assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
41
    return response.json()
42
43
44
def get_ps_processing_job_log(ps_server_host: str, processing_job_id: str):
45
    request_url = f"{ps_server_host}/processor/log/{processing_job_id}"
46
    response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
47
    return response
48
49
50
def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> str:
51
    request_url = f"{ps_server_host}/processor/job/{processing_job_id}"
52
    response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
53
    assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
54
    job_state = response.json()["state"]
55
    assert job_state
56
    return job_state
57
58
59
def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> str:
60
    request_url = f"{ps_server_host}/workflow/job-simple/{workflow_job_id}"
61
    response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
62
    assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
63
    job_state = response.json()["state"]
64
    assert job_state
65
    return job_state
66
67
68
def post_ps_processing_request(ps_server_host: str, processor: str, job_input: dict) -> str:
69
    request_url = f"{ps_server_host}/processor/run/{processor}"
70
    response = request_post(
71
        url=request_url,
72
        headers={"accept": "application/json; charset=utf-8"},
73
        json=job_input
74
    )
75
    assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
76
    processing_job_id = response.json()["job_id"]
77
    assert processing_job_id
78
    return processing_job_id
79
80
81
# TODO: Can be extended to include other parameters such as page_wise
82
def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: str) -> str:
83
    request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise=True"
84
    response = request_post(
85
        url=request_url,
86
        headers={"accept": "application/json; charset=utf-8"},
87
        files={"workflow": open(path_to_wf, "rb")}
88
    )
89
    # print(response.json())
90
    # print(response.__dict__)
91
    assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
92
    wf_job_id = response.json()["job_id"]
93
    assert wf_job_id
94
    return wf_job_id
95
96
97
def verify_server_protocol(address: str):
98
    for protocol in NETWORK_PROTOCOLS:
99
        if address.startswith(protocol):
100
            return
101
    raise ValueError(f"Wrong/Missing protocol in the server address: {address}, must be one of: {NETWORK_PROTOCOLS}")
102