byceps.services.shop.order.order_service   A
last analyzed

Complexity

Total Complexity 41

Size/Duplication

Total Lines 464
Duplicated Lines 0 %

Test Coverage

Coverage 75.14%

Importance

Changes 0
Metric Value
eloc 279
dl 0
loc 464
ccs 136
cts 181
cp 0.7514
rs 9.1199
c 0
b 0
f 0
wmc 41

23 Functions

Rating   Name   Duplication   Size   Complexity  
A get_order() 0 5 1
A count_open_orders() 0 9 1
A find_order_with_details_for_admin() 0 22 2
A count_orders_per_payment_state() 0 15 2
A _to_admin_order_list_items() 0 11 1
A find_order_by_order_number() 0 11 2
A get_orders_for_order_numbers() 0 18 2
A get_order_ids_for_order_numbers() 0 16 2
B get_orders_for_shop_paginated() 0 54 8
A get_orders() 0 16 2
A count_orders_per_payment_state_via_order_prefix() 0 17 2
A find_order() 0 9 2
A get_order_count_by_shop_id() 0 14 1
A find_order_with_details() 0 22 2
A get_db_order() 0 10 2
A _find_db_order() 0 5 1
A get_orders_placed_by_user() 0 16 1
A find_payment_method_label() 0 4 1
A get_orders_placed_by_user_for_storefront() 0 24 1
A _db_orders_to_transfer_objects_with_orderer_users() 0 11 1
A get_overdue_orders() 0 25 1
A get_payment_date() 0 10 2
A has_user_placed_orders() 0 12 1

How to fix   Complexity   

Complexity

Complex classes like byceps.services.shop.order.order_service often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
byceps.services.shop.order.order_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2014-2025 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from collections.abc import Sequence
10
import dataclasses
11 1
from datetime import datetime, timedelta
12 1
13 1
from flask_babel import lazy_gettext
14 1
from sqlalchemy import select
15 1
16
from byceps.database import db, paginate, Pagination
17 1
from byceps.services.shop.invoice import order_invoice_service
18 1
from byceps.services.shop.shop.dbmodels import DbShop
19 1
from byceps.services.shop.shop.models import ShopID
20 1
from byceps.services.shop.storefront.models import StorefrontID
21
from byceps.services.user import user_service
22 1
from byceps.services.user.models.user import UserID
23 1
from byceps.util.result import Err, Ok, Result
24 1
25 1
from . import (
26 1
    order_payment_service,
27 1
)
28 1
from .dbmodels.order import DbOrder
29 1
from .errors import (
30 1
    OrderNotPaidError,
31 1
)
32 1
from .models.detailed_order import AdminDetailedOrder, DetailedOrder
33
from .models.number import OrderNumber
34 1
from .models.order import (
35
    AdminOrderListItem,
36
    Order,
37
    OrderID,
38
    PaymentState,
39
    SiteOrderListItem,
40
)
41 1
from .order_domain_service import OVERDUE_THRESHOLD
42
from .order_helper_service import (
43
    to_admin_order_list_item,
44
    to_detailed_order,
45 1
    to_order,
46 1
    to_site_order_list_item,
47 1
)
48 1
49 1
50 1
def count_open_orders(shop_id: ShopID) -> int:
51 1
    """Return the number of open orders for the shop."""
52 1
    return (
53
        db.session.scalar(
54
            select(db.func.count(DbOrder.id))
55
            .filter_by(shop_id=shop_id)
56
            .filter_by(_payment_state=PaymentState.open.name)
57
        )
58
        or 0
59
    )
60
61
62
def count_orders_per_payment_state(shop_id: ShopID) -> dict[PaymentState, int]:
63
    """Count orders for the shop, grouped by payment state."""
64 1
    counts_by_payment_state = dict.fromkeys(PaymentState, 0)
65 1
66
    rows = db.session.execute(
67
        select(DbOrder._payment_state, db.func.count(DbOrder.id))
68 1
        .filter(DbOrder.shop_id == shop_id)
69
        .group_by(DbOrder._payment_state)
70
    ).all()
71 1
72
    for payment_state_str, count in rows:
73
        payment_state = PaymentState[payment_state_str]
74
        counts_by_payment_state[payment_state] = count
75
76
    return counts_by_payment_state
77
78
79
def count_orders_per_payment_state_via_order_prefix(
80 1
    order_number_prefix: str,
81
) -> dict[PaymentState, int]:
82
    """Count orders with the order number prefix, grouped by payment state."""
83
    counts_by_payment_state = dict.fromkeys(PaymentState, 0)
84
85
    rows = db.session.execute(
86
        select(DbOrder._payment_state, db.func.count(DbOrder.id))
87
        .filter(DbOrder.order_number.like(order_number_prefix + '%'))
88
        .group_by(DbOrder._payment_state)
89
    ).all()
90
91
    for payment_state_str, count in rows:
92
        payment_state = PaymentState[payment_state_str]
93
        counts_by_payment_state[payment_state] = count
94
95
    return counts_by_payment_state
96 1
97
98
def _find_db_order(order_id: OrderID) -> DbOrder | None:
99
    """Return the order database entity with that id, or `None` if not
100
    found.
101
    """
102
    return db.session.get(DbOrder, order_id)
103
104
105
def get_db_order(order_id: OrderID) -> DbOrder:
106
    """Return the order database entity with that id, or raise an
107
    exception.
108
    """
109
    db_order = _find_db_order(order_id)
110
111
    if db_order is None:
112 1
        raise ValueError(f'Unknown order ID "{order_id}"')
113
114
    return db_order
115
116
117
def find_order(order_id: OrderID) -> Order | None:
118
    """Return the order with that id, or `None` if not found."""
119
    db_order = _find_db_order(order_id)
120
121
    if db_order is None:
122
        return None
123
124
    orderer_user = user_service.get_user(db_order.placed_by_id)
125 1
    return to_order(db_order, orderer_user)
126
127
128
def get_order(order_id: OrderID) -> Order:
129
    """Return the order with that id, or raise an exception."""
130
    db_order = get_db_order(order_id)
131
    orderer_user = user_service.get_user(db_order.placed_by_id)
132
    return to_order(db_order, orderer_user)
133 1
134
135 1
def find_order_with_details(order_id: OrderID) -> DetailedOrder | None:
136
    """Return the order with that id, or `None` if not found."""
137
    db_order = (
138 1
        db.session.scalars(
139 1
            select(DbOrder)
140
            .options(
141 1
                db.joinedload(DbOrder.line_items),
142
            )
143 1
            .filter_by(id=order_id)
144
        )
145
        .unique()
146
        .one_or_none()
147
    )
148
149
    if db_order is None:
150 1
        return None
151
152
    placed_by = user_service.get_user(
153 1
        db_order.placed_by_id, include_avatar=True
154
    )
155 1
156
    return to_detailed_order(db_order, placed_by)
157
158
159
def find_order_with_details_for_admin(
160
    order_id: OrderID,
161 1
) -> AdminDetailedOrder | None:
162 1
    """Return the order with that id, or `None` if not found."""
163
    detailed_order = find_order_with_details(order_id)
164 1
165 1
    if detailed_order is None:
166
        return None
167
168 1
    invoices = order_invoice_service.get_invoices_for_order(detailed_order.id)
169 1
    payments = order_payment_service.get_payments_for_order(detailed_order.id)
170
171
    # Copy other attributes from `DetailedOrder` object.
172
    detailed_order_attributes = {
173 1
        field.name: getattr(detailed_order, field.name)
174
        for field in dataclasses.fields(detailed_order)
175 1
    }
176
177 1
    return AdminDetailedOrder(
178 1
        invoices=invoices,
179
        payments=payments,
180 1
        **detailed_order_attributes,
181
    )
182 1
183
184
def find_order_by_order_number(order_number: OrderNumber) -> Order | None:
185 1
    """Return the order with that order number, or `None` if not found."""
186
    db_order = db.session.execute(
187
        select(DbOrder).filter_by(order_number=order_number)
188
    ).scalar_one_or_none()
189
190
    if db_order is None:
191
        return None
192
193 1
    orderer_user = user_service.get_user(db_order.placed_by_id)
194
    return to_order(db_order, orderer_user)
195 1
196 1
197
def get_orders_for_order_numbers(
198 1
    order_numbers: set[OrderNumber],
199
) -> list[Order]:
200 1
    """Return the orders with those order numbers."""
201
    if not order_numbers:
202
        return []
203
204
    db_orders = (
205
        db.session.scalars(
206
            select(DbOrder)
207
            .options(db.joinedload(DbOrder.line_items))
208
            .filter(DbOrder.order_number.in_(order_numbers))
209 1
        )
210
        .unique()
211
        .all()
212
    )
213
214
    return _db_orders_to_transfer_objects_with_orderer_users(db_orders)
215
216
217 1
def get_order_ids_for_order_numbers(
218
    order_numbers: set[OrderNumber],
219
) -> dict[OrderNumber, OrderID]:
220 1
    """Return the order IDs for those order numbers."""
221
    if not order_numbers:
222 1
        return {}
223 1
224
    order_ids_and_numbers = db.session.execute(
225 1
        select(DbOrder.id, DbOrder.order_number).filter(
226 1
            DbOrder.order_number.in_(order_numbers)
227
        )
228 1
    ).all()
229
230 1
    return {
231
        order_number: order_id
232 1
        for order_id, order_number in order_ids_and_numbers
233
    }
234 1
235
236 1
def get_order_count_by_shop_id() -> dict[ShopID, int]:
237
    """Return order count (including 0) per shop, indexed by shop ID."""
238
    shop_ids_and_order_counts = (
239 1
        db.session.execute(
240
            select(DbShop.id, db.func.count(DbOrder.shop_id))
241
            .outerjoin(DbOrder)
242
            .group_by(DbShop.id)
243
        )
244
        .unique()
245 1
        .tuples()
246 1
        .all()
247 1
    )
248
249
    return dict(shop_ids_and_order_counts)
250 1
251
252 1
def get_orders(order_ids: frozenset[OrderID]) -> list[Order]:
253 1
    """Return the orders with these ids."""
254
    if not order_ids:
255
        return []
256
257 1
    db_orders = (
258
        db.session.scalars(
259 1
            select(DbOrder)
260
            .options(db.joinedload(DbOrder.line_items))
261
            .filter(DbOrder.id.in_(order_ids))
262
        )
263 1
        .unique()
264 1
        .all()
265
    )
266
267
    return _db_orders_to_transfer_objects_with_orderer_users(db_orders)
268
269
270 1
def get_orders_for_shop_paginated(
271 1
    shop_id: ShopID,
272
    page: int,
273
    per_page: int,
274 1
    *,
275
    search_term=None,
276
    only_payment_state: PaymentState | None = None,
277
    only_overdue: bool | None = None,
278
    only_processed: bool | None = None,
279
) -> Pagination:
280
    """Return all orders for that shop, ordered by creation date.
281
282
    If a payment state is specified, only orders in that state are
283 1
    returned.
284
    """
285
    stmt = (
286 1
        select(DbOrder)
287
        .options(db.joinedload(DbOrder.line_items))
288 1
        .filter_by(shop_id=shop_id)
289 1
        .order_by(DbOrder.created_at.desc())
290
    )
291 1
292
    if search_term:
293
        ilike_pattern = f'%{search_term}%'
294
        stmt = stmt.filter(DbOrder.order_number.ilike(ilike_pattern))
295
296
    if only_payment_state is not None:
297 1
        stmt = stmt.filter_by(_payment_state=only_payment_state.name)
298
299
        if (only_payment_state == PaymentState.open) and (
300 1
            only_overdue is not None
301
        ):
302
            now = datetime.utcnow()
303
304 1
            if only_overdue:
305
                stmt = stmt.filter(DbOrder.created_at + OVERDUE_THRESHOLD < now)
306 1
            else:
307
                stmt = stmt.filter(
308
                    DbOrder.created_at + OVERDUE_THRESHOLD >= now
309 1
                )
310 1
311 1
    if only_processed is not None:
312
        stmt = stmt.filter(DbOrder.processing_required == True)  # noqa: E712
313
314 1
        if only_processed:
315
            stmt = stmt.filter(DbOrder.processed_at.is_not(None))
316
        else:
317
            stmt = stmt.filter(DbOrder.processed_at.is_(None))
318
319
    paginated_orders = paginate(stmt, page, per_page)
320
321
    paginated_orders.items = _to_admin_order_list_items(paginated_orders.items)
322
323
    return paginated_orders
324
325
326
def _to_admin_order_list_items(
327
    db_orders: list[DbOrder],
328 1
) -> list[AdminOrderListItem]:
329
    orderer_ids = {db_order.placed_by_id for db_order in db_orders}
330
    orderers_by_id = user_service.get_users_indexed_by_id(
331
        orderer_ids, include_avatars=True
332
    )
333
334
    return [
335
        to_admin_order_list_item(db_order, orderers_by_id)
336
        for db_order in db_orders
337 1
    ]
338
339 1
340
def get_overdue_orders(
341 1
    shop_id: ShopID, older_than: timedelta, *, limit: int | None = None
342
) -> list[Order]:
343
    """Return all overdue orders for that shop, ordered by creation date."""
344
    now = datetime.utcnow()
345
346
    db_orders = (
347 1
        db.session.scalars(
348 1
            select(DbOrder)
349 1
            .options(db.joinedload(DbOrder.line_items))
350
            .filter_by(shop_id=shop_id)
351 1
            .filter_by(_payment_state=PaymentState.open.name)
352
            .filter(DbOrder.created_at + older_than < now)
353
            .order_by(DbOrder.created_at)
354 1
        )
355
        .unique()
356
        .all()
357
    )
358 1
359
    orderer_ids = {db_order.placed_by_id for db_order in db_orders}
360
    orderers_by_id = user_service.get_users_indexed_by_id(orderer_ids)
361 1
362
    return [
363
        to_order(db_order, orderers_by_id[db_order.placed_by_id])
364
        for db_order in db_orders
365 1
    ]
366
367 1
368
def get_orders_placed_by_user(user_id: UserID) -> list[Order]:
369
    """Return orders placed by the user."""
370 1
    db_orders = (
371
        db.session.scalars(
372
            select(DbOrder)
373 1
            .options(
374
                db.joinedload(DbOrder.line_items),
375 1
            )
376
            .filter_by(placed_by_id=user_id)
377 1
            .order_by(DbOrder.created_at.desc())
378
        )
379
        .unique()
380 1
        .all()
381 1
    )
382
383
    return _db_orders_to_transfer_objects_with_orderer_users(db_orders)
384 1
385
386 1
def get_orders_placed_by_user_for_storefront(
387 1
    user_id: UserID, storefront_id: StorefrontID
388 1
) -> list[SiteOrderListItem]:
389
    """Return orders placed by the user through that storefront."""
390
    db_orders = (
391 1
        db.session.scalars(
392
            select(DbOrder)
393 1
            .options(
394
                db.joinedload(DbOrder.line_items),
395
            )
396
            .filter_by(storefront_id=storefront_id)
397
            .filter_by(placed_by_id=user_id)
398
            .order_by(DbOrder.created_at.desc())
399
        )
400
        .unique()
401
        .all()
402
    )
403
404
    orderer_ids = {db_order.placed_by_id for db_order in db_orders}
405 1
    orderers_by_id = user_service.get_users_indexed_by_id(orderer_ids)
406 1
407
    return [
408 1
        to_site_order_list_item(db_order, orderers_by_id)
409
        for db_order in db_orders
410
    ]
411
412 1
413
def has_user_placed_orders(user_id: UserID, shop_id: ShopID) -> bool:
414
    """Return `True` if the user has placed orders in that shop."""
415
    orders_total = (
416
        db.session.scalar(
417
            select(db.func.count(DbOrder.id))
418
            .filter_by(shop_id=shop_id)
419
            .filter_by(placed_by_id=user_id)
420
        )
421
        or 0
422
    )
423
424
    return orders_total > 0
425
426
427
_PAYMENT_METHOD_LABELS = {
428
    'bank_transfer': lazy_gettext('bank transfer'),
429
    'cash': lazy_gettext('cash'),
430
    'direct_debit': lazy_gettext('direct debit'),
431
    'free': lazy_gettext('free'),
432
}
433
434
435
def find_payment_method_label(payment_method: str) -> str | None:
436
    """Return a label for the payment method."""
437
    label = _PAYMENT_METHOD_LABELS.get(payment_method)
438
    return label or payment_method
439 1
440
441
def get_payment_date(order_id: OrderID) -> Result[datetime, OrderNotPaidError]:
442
    """Return the date the order has been marked as paid."""
443
    paid_at = db.session.scalar(
444
        select(DbOrder.payment_state_updated_at).filter_by(id=order_id)
445
    )
446
447
    if not paid_at:
448
        return Err(OrderNotPaidError())
449
450
    return Ok(paid_at)
451
452
453
def _db_orders_to_transfer_objects_with_orderer_users(
454
    db_orders: Sequence[DbOrder], *, include_avatars=False
455
) -> list[Order]:
456
    orderer_ids = {db_order.placed_by_id for db_order in db_orders}
457
    orderers_by_id = user_service.get_users_indexed_by_id(
458
        orderer_ids, include_avatars=True
459
    )
460
461
    return [
462
        to_order(db_order, orderers_by_id[db_order.placed_by_id])
463 1
        for db_order in db_orders
464
    ]
465