Passed
Pull Request — main (#32)
by Rami
01:09
created

mongomantic.core.base_repository   A

Complexity

Total Complexity 32

Size/Duplication

Total Lines 185
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 32
eloc 111
dl 0
loc 185
rs 9.84
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A BaseRepository.collection() 0 4 1
A BaseRepository.aggregate() 0 8 3
A BaseRepository._create_indexes() 0 9 3
A BaseRepository.model() 0 4 1
A BaseRepository.save() 0 15 4
A BaseRepository.indexes() 0 4 1
A BaseRepository._get_collection() 0 10 4
A BaseRepository.get() 0 27 3
A BaseRepository._process_kwargs() 0 20 5
A BaseRepository.find() 0 32 3
A ABRepositoryMeta.__new__() 0 10 4
1
from typing import Dict, Iterator, List, Tuple, Type
2
3
from abc import ABCMeta
4
5
from bson import ObjectId
6
from bson.objectid import InvalidId
7
from mongomantic.core.index import Index
8
from pymongo.collection import Collection
9
10
from .database import MongomanticClient
11
from .errors import (
12
    DoesNotExistError,
13
    FieldDoesNotExistError,
14
    IndexCreationError,
15
    InvalidQueryError,
16
    MultipleObjectsReturnedError,
17
    WriteError,
18
)
19
from .mongo_model import MongoDBModel
20
21
22
class ABRepositoryMeta(ABCMeta):
23
    """Abstract Base Repository Metaclass
24
25
    This Metaclass ensures that any concrete implementations of BaseRepository
26
    include all necessary definitions, in order to decrease user errors.
27
    """
28
29
    def __new__(cls, name, bases, dct):
30
        base_repo = super().__new__(cls, name, bases, dct)
31
        meta = base_repo.__dict__.get("Meta", False)
32
        if not meta:
33
            raise NotImplementedError("Internal 'Meta' not implemented")
34
        # Check existence of model and collection
35
        if not (meta.__dict__.get("model", False) and meta.__dict__.get("collection", False)):
36
            raise NotImplementedError("'model' or 'collection' properties are missing from internal Meta class")
37
38
        return base_repo
39
40
41
class BaseRepository(metaclass=ABRepositoryMeta):
42
    class Meta:
43
        @property
44
        def model(self) -> Type[MongoDBModel]:
45
            """Model class that subclasses MongoDBModel"""
46
            raise NotImplementedError
47
48
        @property
49
        def collection(self) -> str:
50
            """String representing the MongoDB collection to use when storing this model"""
51
            raise NotImplementedError
52
53
        @property
54
        def indexes(self) -> List[Index]:
55
            """List of MongoDB indexes that should be setup for this particular model"""
56
            raise NotImplementedError
57
58
    @classmethod
59
    def _get_collection(cls) -> Collection:
60
        """Returns a reference to the MongoDB collection, and initializes indexes if first time"""
61
        if not hasattr(cls, "_indexes") or cls._indexes is None:
62
            cls._indexes = True  # State to know that already checked
63
64
            if getattr(cls.Meta, "auto_create_index", True):
65
                cls._create_indexes()
66
67
        return MongomanticClient.db.__getattr__(cls.Meta.collection)
68
69
    @classmethod
70
    def _create_indexes(cls):
71
        indexes = getattr(cls.Meta, "indexes", False)
72
        if indexes:
73
            try:
74
                pymongo_indexes = [index.to_pymongo() for index in indexes]
75
                cls._get_collection().create_indexes(pymongo_indexes)
76
            except Exception as e:
77
                raise IndexCreationError(f"Failed to create indexes: {e}")
78
79
    @classmethod
80
    def _process_kwargs(cls, kwargs: Dict) -> Tuple:
81
        """Update keyword arguments from human readable to mongo specific"""
82
        if "id" in kwargs:
83
            try:
84
                oid = str(kwargs.pop("id"))
85
                oid = ObjectId(oid)
86
                kwargs["_id"] = oid
87
            except InvalidId:
88
                raise InvalidQueryError(f"Invalid ObjectId {oid}.")
89
90
        projection = kwargs.pop("projection", None)
91
        skip = kwargs.pop("skip", 0)
92
        limit = kwargs.pop("limit", 0)
93
94
        for key in kwargs:
95
            if key not in cls.Meta.model.__fields__:
96
                raise FieldDoesNotExistError(f"Field {key} does not exist for model {cls.Meta.model}")
97
98
        return projection, skip, limit
99
100
    @classmethod
101
    def save(cls, model) -> Type[MongoDBModel]:
102
        """Saves object in MongoDB"""
103
        try:
104
            document = model.to_mongo()
105
            res = cls._get_collection().insert_one(document)
106
        except Exception as e:
107
            res = None
108
            raise WriteError(f"Error inserting document: \n{e}")
109
        else:
110
            if res is None:
111
                raise WriteError("Error inserting document")
112
113
        document["_id"] = res.inserted_id
114
        return cls.Meta.model.from_mongo(document)
115
116
    @classmethod
117
    def get(cls, **kwargs) -> Type[MongoDBModel]:
118
        """Get a unique document based on some filter.
119
120
        Args:
121
            kwargs: Filter keyword arguments
122
123
        Raises:
124
            DoesNotExistError: If object not found
125
            MultipleObjectsReturnedError: If more than one object matches filter
126
127
        Returns:
128
            Type[MongoDBModel]: Matching model
129
        """
130
        cls._process_kwargs(kwargs)
131
132
        try:
133
            res = cls._get_collection().find(filter=kwargs, limit=2)
134
            document = next(res)
135
        except StopIteration:
136
            raise DoesNotExistError("Document not found")
137
138
        try:
139
            res = next(res)
140
            raise MultipleObjectsReturnedError("2 or more items returned, instead of 1")
141
        except StopIteration:
142
            return cls.Meta.model.from_mongo(document)
143
144
    @classmethod
145
    def find(cls, **kwargs) -> Iterator[Type[MongoDBModel]]:
146
        """Queries database and filters on kwargs provided.
147
148
        Args:
149
            kwargs: Filter keyword arguments
150
151
            Reserved *optional* field names:
152
            projection: can either be a list of field names that should be returned in the result set
153
                        or a dict specifying the fields to include or exclude. If projection is a list
154
                        “_id” will always be returned. Use a dict to exclude fields from the result
155
                        (e.g. projection={‘_id’: False}).
156
            skip: the number of documents to omit when returning results
157
            limit: the maximum number of results to return
158
159
        Note that invalid query errors may not be detected until the generator is consumed.
160
        This is because the query is not executed until the result is needed.
161
162
        Raises:
163
            InvalidQueryError: In case one or more arguments were invalid
164
165
        Yields:
166
            Iterator[Type[MongoDBModel]]: Generator that wraps PyMongo cursor and transforms documents to models
167
        """
168
        projection, skip, limit = cls._process_kwargs(kwargs)
169
170
        try:
171
            results = cls._get_collection().find(filter=kwargs, projection=projection, skip=skip, limit=limit)
172
            for result in results:
173
                yield cls.Meta.model.from_mongo(result)
174
        except Exception as e:
175
            raise InvalidQueryError(f"Invalid argument types: {e}")
176
177
    @classmethod
178
    def aggregate(cls, pipeline: List[Dict]):
179
        try:
180
            results = cls._get_collection().aggregate(pipeline)
181
            for result in results:
182
                yield cls.Meta.model.from_mongo(result)
183
        except Exception as e:
184
            raise InvalidQueryError(f"Error executing pipeline: {e}")
185