Passed
Pull Request — master (#1269)
by
unknown
02:42
created

ocrd_network.client_utils   A

Complexity

Total Complexity 16

Size/Duplication

Total Lines 82
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 16
eloc 61
dl 0
loc 82
rs 10
c 0
b 0
f 0

8 Functions

Rating   Name   Duplication   Size   Complexity  
A poll_wf_status_till_timeout_fail_or_success() 0 2 1
A get_ps_processing_job_status() 0 7 1
A poll_job_status_till_timeout_fail_or_success() 0 2 1
A verify_server_protocol() 0 5 3
A post_ps_workflow_request() 0 13 1
A post_ps_processing_request() 0 11 1
A get_ps_workflow_job_status() 0 7 1
B _poll_endpoint_status() 0 14 7
1
from requests import get as request_get, post as request_post
2
from time import sleep
3
from .constants import JobState, NETWORK_PROTOCOLS
4
5
6
def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int):
7
    if job_type not in ["workflow", "processor"]:
8
        raise ValueError(f"Unknown job type '{job_type}', expected 'workflow' or 'processor'")
9
    job_state = JobState.unset
10
    while tries > 0:
11
        sleep(wait)
12
        if job_type == "processor":
13
            job_state = get_ps_processing_job_status(ps_server_host, job_id)
14
        if job_type == "workflow":
15
            job_state = get_ps_workflow_job_status(ps_server_host, job_id)
16
        if job_state == JobState.success or job_state == JobState.failed:
17
            break
18
        tries -= 1
19
    return job_state
20
21
22
def poll_job_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState:
23
    return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait)
24
25
26
def poll_wf_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState:
27
    return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait)
28
29
30
def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> str:
31
    request_url = f"{ps_server_host}/processor/job/{processing_job_id}"
32
    response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
33
    assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
34
    job_state = response.json()["state"]
35
    assert job_state
36
    return job_state
37
38
39
def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> str:
40
    request_url = f"{ps_server_host}/workflow/job-simple/{workflow_job_id}"
41
    response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
42
    assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
43
    job_state = response.json()["state"]
44
    assert job_state
45
    return job_state
46
47
48
def post_ps_processing_request(ps_server_host: str, processor: str, job_input: dict) -> str:
49
    request_url = f"{ps_server_host}/processor/run/{processor}"
50
    response = request_post(
51
        url=request_url,
52
        headers={"accept": "application/json; charset=utf-8"},
53
        json=job_input
54
    )
55
    assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
56
    processing_job_id = response.json()["job_id"]
57
    assert processing_job_id
58
    return processing_job_id
59
60
61
# TODO: Can be extended to include other parameters such as page_wise
62
def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: str) -> str:
63
    request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise=True"
64
    response = request_post(
65
        url=request_url,
66
        headers={"accept": "application/json; charset=utf-8"},
67
        files={"workflow": open(path_to_wf, "rb")}
68
    )
69
    # print(response.json())
70
    # print(response.__dict__)
71
    assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
72
    wf_job_id = response.json()["job_id"]
73
    assert wf_job_id
74
    return wf_job_id
75
76
77
def verify_server_protocol(address: str):
78
    for protocol in NETWORK_PROTOCOLS:
79
        if address.startswith(protocol):
80
            return
81
    raise ValueError(f"Wrong/Missing protocol in the server address: {address}, must be one of: {NETWORK_PROTOCOLS}")
82