Passed
Push — main ( 379767...245673 )
by Rami
01:35
created

BaseRepository.model()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
from typing import Dict, Iterator, List, Tuple, Type
2
3
from abc import ABC, ABCMeta, abstractmethod
4
5
from bson import ObjectId
6
from bson.objectid import InvalidId
7
8
from .database import MongomanticClient
9
from .errors import (
10
    DoesNotExistError,
11
    FieldDoesNotExistError,
12
    InvalidQueryError,
13
    MultipleObjectsReturnedError,
14
    WriteError,
15
)
16
from .mongo_model import MongoDBModel
17
18
19
class ABRepositoryMeta(ABCMeta):
20
    """Abstract Base Repository Metaclass
21
22
    This Metaclass ensures that any concrete implementations of BaseRepository
23
    include all necessary definitions, in order to decrease user errors.
24
    """
25
26
    def __new__(cls, name, bases, dct):
27
        base_repo = super().__new__(cls, name, bases, dct)
28
        meta = base_repo.__dict__.get("Meta", False)
29
        if not meta:
30
            raise NotImplementedError("Internal 'Meta' not implemented")
31
        else:
32
            # Check existence of model and collection
33
            if not (meta.__dict__.get("model", False) and meta.__dict__.get("collection", False)):
34
                raise NotImplementedError("'model' or 'collection' properties are missing from internal Meta class")
35
36
        return base_repo
37
38
39
class BaseRepository(metaclass=ABRepositoryMeta):
40
    class Meta:
41
        @property
42
        def model(self) -> Type[MongoDBModel]:
43
            """Model class that subclasses MongoDBModel"""
44
            raise NotImplementedError
45
46
        @property
47
        def collection(self) -> str:
48
            """String representing the MongoDB collection to use when storing this model"""
49
            raise NotImplementedError
50
51
    @classmethod
52
    def process_kwargs(cls, kwargs: Dict) -> Tuple:
53
        """Update keyword arguments from human readable to mongo specific"""
54
        if "id" in kwargs:
55
            try:
56
                oid = str(kwargs.pop("id"))
57
                oid = ObjectId(oid)
58
                kwargs["_id"] = oid
59
            except InvalidId:
60
                raise InvalidQueryError(f"Invalid ObjectId {oid}.")
61
62
        projection = kwargs.pop("projection", None)
63
        skip = kwargs.pop("skip", 0)
64
        limit = kwargs.pop("limit", 0)
65
66
        for key in kwargs:
67
            if key not in cls.Meta.model.__fields__:
68
                raise FieldDoesNotExistError(f"Field {key} does not exist for model {cls.Meta.model}")
69
70
        return projection, skip, limit
71
72
    @classmethod
73
    def save(cls, model) -> Type[MongoDBModel]:
74
        """Saves object in MongoDB"""
75
        try:
76
            document = model.to_mongo()
77
            res = MongomanticClient.db.__getattr__(cls.Meta.collection).insert_one(document)
78
        except Exception as e:
79
            res = None
80
            raise WriteError(f"Error inserting document: \n{e}")
81
        else:
82
            if res is None:
83
                raise WriteError("Error inserting document")
84
85
        document["_id"] = res.inserted_id
86
        return cls.Meta.model.from_mongo(document)
87
88
    @classmethod
89
    def get(cls, **kwargs) -> Type[MongoDBModel]:
90
        """Get a unique document based on some filter.
91
92
        Args:
93
            kwargs: Filter keyword arguments
94
95
        Raises:
96
            DoesNotExistError: If object not found
97
            MultipleObjectsReturnedError: If more than one object matches filter
98
99
        Returns:
100
            Type[MongoDBModel]: Matching model
101
        """
102
        cls.process_kwargs(kwargs)
103
104
        try:
105
            res = MongomanticClient.db.__getattr__(cls.Meta.collection).find(filter=kwargs, limit=2)
106
            document = next(res)
107
        except StopIteration:
108
            raise DoesNotExistError("Document not found")
109
110
        try:
111
            res = next(res)
112
            raise MultipleObjectsReturnedError("2 or more items returned, instead of 1")
113
        except StopIteration:
114
            return cls.Meta.model.from_mongo(document)
115
116
    @classmethod
117
    def find(cls, **kwargs) -> Iterator[Type[MongoDBModel]]:
118
        """Queries database and filters on kwargs provided.
119
120
        Args:
121
            kwargs: Filter keyword arguments
122
123
            Reserved *optional* field names:
124
            projection: can either be a list of field names that should be returned in the result set
125
                        or a dict specifying the fields to include or exclude. If projection is a list
126
                        “_id” will always be returned. Use a dict to exclude fields from the result
127
                        (e.g. projection={‘_id’: False}).
128
            skip: the number of documents to omit when returning results
129
            limit: the maximum number of results to return
130
131
        Note that invalid query errors may not be detected until the generator is consumed.
132
        This is because the query is not executed until the result is needed.
133
134
        Raises:
135
            InvalidQueryError: In case one or more arguments were invalid
136
137
        Yields:
138
            Iterator[Type[MongoDBModel]]: Generator that wraps PyMongo cursor and transforms documents to models
139
        """
140
        projection, skip, limit = cls.process_kwargs(kwargs)
141
142
        try:
143
            results = MongomanticClient.db.__getattr__(cls.Meta.collection).find(
144
                filter=kwargs, projection=projection, skip=skip, limit=limit
145
            )
146
            for result in results:
147
                yield cls.Meta.model.from_mongo(result)
148
        except Exception as e:
149
            raise InvalidQueryError(f"Invalid argument types: {e}")
150
151
    @classmethod
152
    def aggregate(cls, pipeline: List[Dict]):
153
        try:
154
            results = MongomanticClient.db.__getattr__(cls.Meta.collection).aggregate(pipeline)
155
            for result in results:
156
                yield cls.Meta.model.from_mongo(result)
157
        except Exception as e:
158
            raise InvalidQueryError(f"Error executing pipeline: {e}")
159