Passed
Push — develop ( 591d1d...018914 )
by
unknown
01:26
created

create_provider()   A

Complexity

Conditions 4

Size

Total Lines 19
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 14
nop 3
dl 0
loc 19
rs 9.7
c 0
b 0
f 0
1
from typing import Mapping
2
3
from skosprovider.registry import Registry
4
from sqlalchemy.orm import Session
5
6
from atramhasis import mappers
7
from atramhasis.data.datamanagers import ProviderDataManager
8
from atramhasis.data.models import Provider
9
from atramhasis.errors import ValidationError
10
11
12
def create_provider(json_data: Mapping, session: Session, skos_registry: Registry) -> Provider:
13
    """Process a provider JSON into a newly stored Provider."""
14
    for provider in skos_registry.get_providers():
15
        if provider.get_vocabulary_uri() == json_data["conceptscheme_uri"]:
16
            raise ValidationError(
17
                "Provider could not be validated.",
18
                [{"conceptscheme_uri": "Collides with existing provider."}],
19
            )
20
    db_provider = mappers.map_provider(json_data)
21
    if not db_provider.id:
22
        # Store conceptscheme first so we can copy its id
23
        session.add(db_provider.conceptscheme)
24
        session.flush()
25
        db_provider.id = str(db_provider.conceptscheme.id)
26
27
    session.add(db_provider)
28
    session.flush()
29
30
    return db_provider
31
32
33
def update_provider(provider_id: str, json_data: Mapping, session: Session) -> Provider:
34
    """Process a JSON into to update an existing provider."""
35
    manager = ProviderDataManager(session)
36
    db_provider = manager.get_provider_by_id(provider_id)
37
    db_provider = mappers.map_provider(json_data, provider=db_provider)
38
    session.flush()
39
    return db_provider
40
41
42
def delete_provider(provider_id, session: Session) -> None:
43
    manager = ProviderDataManager(session)
44
    db_provider = manager.get_provider_by_id(provider_id)
45
    session.delete(db_provider)
46