mongomantic.core.base_repository   A
last analyzed

Complexity

Total Complexity 30

Size/Duplication

Total Lines 181
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 30
eloc 108
dl 0
loc 181
rs 10
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A BaseRepository.collection() 0 4 1
A BaseRepository._create_indexes() 0 9 3
A BaseRepository.model() 0 4 1
A BaseRepository.indexes() 0 4 1
A BaseRepository._get_collection() 0 10 4
A BaseRepository._process_kwargs() 0 20 5
A ABRepositoryMeta.__new__() 0 10 4
A BaseRepository.aggregate() 0 8 3
A BaseRepository.save() 0 11 2
A BaseRepository.get() 0 27 3
A BaseRepository.find() 0 32 3
1
from typing import Any, Dict, Iterator, List, Tuple, Type
2
3
from abc import ABCMeta
4
5
from bson import ObjectId
6
from bson.objectid import InvalidId
7
from mongomantic.core.index import Index
8
from pymongo.collection import Collection
9
10
from .database import MongomanticClient
11
from .errors import (
12
    DoesNotExistError,
13
    FieldDoesNotExistError,
14
    IndexCreationError,
15
    InvalidQueryError,
16
    MultipleObjectsReturnedError,
17
    WriteError,
18
)
19
from .mongo_model import MongoDBModel
20
21
22
class ABRepositoryMeta(ABCMeta):
23
    """Abstract Base Repository Metaclass
24
25
    This Metaclass ensures that any concrete implementations of BaseRepository
26
    include all necessary definitions, in order to decrease user errors.
27
    """
28
29
    def __new__(cls, name: str, bases: Tuple[type, ...], namespace: Dict[str, Any], **kwds: Any):
30
        base_repo = super().__new__(cls, name, bases, namespace, **kwds)
31
        meta = base_repo.__dict__.get("Meta", False)
32
        if not meta:
33
            raise NotImplementedError("Internal 'Meta' not implemented")
34
        # Check existence of model and collection
35
        if not (meta.__dict__.get("model", False) and meta.__dict__.get("collection", False)):
36
            raise NotImplementedError("'model' or 'collection' properties are missing from internal Meta class")
37
38
        return base_repo
39
40
41
class BaseRepository(metaclass=ABRepositoryMeta):
42
    class Meta:
43
        @property
44
        def model(self) -> Type[MongoDBModel]:
45
            """Model class that subclasses MongoDBModel"""
46
            raise NotImplementedError
47
48
        @property
49
        def collection(self) -> str:
50
            """String representing the MongoDB collection to use when storing this model"""
51
            raise NotImplementedError
52
53
        @property
54
        def indexes(self) -> List[Index]:
55
            """List of MongoDB indexes that should be setup for this particular model"""
56
            raise NotImplementedError
57
58
    @classmethod
59
    def _get_collection(cls) -> Collection:
60
        """Returns a reference to the MongoDB collection, and initializes indexes if first time"""
61
        if not hasattr(cls, "_indexes") or cls._indexes is None:
62
            cls._indexes = True  # State to know that already checked
63
64
            if getattr(cls.Meta, "auto_create_index", True):
65
                cls._create_indexes()
66
67
        return MongomanticClient.db.__getattr__(cls.Meta.collection)
68
69
    @classmethod
70
    def _create_indexes(cls):
71
        indexes = getattr(cls.Meta, "indexes", False)
72
        if indexes:
73
            try:
74
                pymongo_indexes = [index.to_pymongo() for index in indexes]
75
                cls._get_collection().create_indexes(pymongo_indexes)
76
            except Exception as e:
77
                raise IndexCreationError(f"Failed to create indexes: {e}")
78
79
    @classmethod
80
    def _process_kwargs(cls, kwargs: Dict) -> Tuple:
81
        """Update keyword arguments from human readable to mongo specific"""
82
        if "id" in kwargs:
83
            try:
84
                oid = str(kwargs.pop("id"))
85
                oid = ObjectId(oid)
86
                kwargs["_id"] = oid
87
            except InvalidId:
88
                raise InvalidQueryError(f"Invalid ObjectId {oid}.")
89
90
        projection = kwargs.pop("projection", None)
91
        skip = kwargs.pop("skip", 0)
92
        limit = kwargs.pop("limit", 0)
93
94
        for key in kwargs:
95
            if key not in cls.Meta.model.__fields__:
96
                raise FieldDoesNotExistError(f"Field {key} does not exist for model {cls.Meta.model}")
97
98
        return projection, skip, limit
99
100
    @classmethod
101
    def save(cls, model) -> Type[MongoDBModel]:
102
        """Saves object in MongoDB"""
103
        try:
104
            document = model.to_mongo()
105
            res = cls._get_collection().insert_one(document)
106
        except Exception as e:
107
            raise WriteError(f"Error inserting document: \n{e}")
108
109
        document["_id"] = res.inserted_id
110
        return cls.Meta.model.from_mongo(document)
111
112
    @classmethod
113
    def get(cls, **kwargs) -> Type[MongoDBModel]:
114
        """Get a unique document based on some filter.
115
116
        Args:
117
            kwargs: Filter keyword arguments
118
119
        Raises:
120
            DoesNotExistError: If object not found
121
            MultipleObjectsReturnedError: If more than one object matches filter
122
123
        Returns:
124
            Type[MongoDBModel]: Matching model
125
        """
126
        cls._process_kwargs(kwargs)
127
128
        try:
129
            res = cls._get_collection().find(filter=kwargs, limit=2)
130
            document = next(res)
131
        except StopIteration:
132
            raise DoesNotExistError("Document not found")
133
134
        try:
135
            next(res)
136
            raise MultipleObjectsReturnedError("2 or more items returned, instead of 1")
137
        except StopIteration:
138
            return cls.Meta.model.from_mongo(document)
139
140
    @classmethod
141
    def find(cls, **kwargs) -> Iterator[Type[MongoDBModel]]:
142
        """Queries database and filters on kwargs provided.
143
144
        Args:
145
            kwargs: Filter keyword arguments
146
147
            Reserved *optional* field names:
148
            projection: can either be a list of field names that should be returned in the result set
149
                        or a dict specifying the fields to include or exclude. If projection is a list
150
                        “_id” will always be returned. Use a dict to exclude fields from the result
151
                        (e.g. projection={‘_id’: False}).
152
            skip: the number of documents to omit when returning results
153
            limit: the maximum number of results to return
154
155
        Note that invalid query errors may not be detected until the generator is consumed.
156
        This is because the query is not executed until the result is needed.
157
158
        Raises:
159
            InvalidQueryError: In case one or more arguments were invalid
160
161
        Yields:
162
            Iterator[Type[MongoDBModel]]: Generator that wraps PyMongo cursor and transforms documents to models
163
        """
164
        projection, skip, limit = cls._process_kwargs(kwargs)
165
166
        try:
167
            results = cls._get_collection().find(filter=kwargs, projection=projection, skip=skip, limit=limit)
168
            for result in results:
169
                yield cls.Meta.model.from_mongo(result)
170
        except Exception as e:
171
            raise InvalidQueryError(f"Invalid argument types: {e}")
172
173
    @classmethod
174
    def aggregate(cls, pipeline: List[Dict]):
175
        try:
176
            results = cls._get_collection().aggregate(pipeline)
177
            for result in results:
178
                yield cls.Meta.model.from_mongo(result)
179
        except Exception as e:
180
            raise InvalidQueryError(f"Error executing pipeline: {e}")
181