byceps.services.shop.order.order_service   F
last analyzed

Complexity

Total Complexity 85

Size/Duplication

Total Lines 892
Duplicated Lines 0 %

Test Coverage

Coverage 71.19%

Importance

Changes 0
Metric Value
eloc 567
dl 0
loc 892
ccs 210
cts 295
cp 0.7119
rs 2
c 0
b 0
f 0
wmc 85

44 Functions

Rating   Name   Duplication   Size   Complexity  
A get_orders_for_order_numbers() 0 18 2
A set_shipped_flag() 0 14 2
A add_note() 0 7 1
A unset_shipped_flag() 0 14 2
A _persist_shipped_flag() 0 11 1
A _is_open() 0 2 1
B _execute_article_creation_actions() 0 34 5
A get_order() 0 5 1
A count_open_orders() 0 9 1
A find_order_with_details_for_admin() 0 22 2
A count_orders_per_payment_state() 0 15 2
A _get_line_item_processing_state() 0 13 4
A _db_orders_to_transfer_objects_with_orderer_users() 0 13 1
A _is_processed() 0 2 1
A get_orders_placed_by_user() 0 16 1
A _is_invoiced() 0 2 1
A find_payment_method_label() 0 4 1
A _to_admin_order_list_items() 0 31 1
A get_orders_placed_by_user_for_storefront() 0 39 1
A delete_order() 0 12 1
A _is_paid() 0 2 1
A _get_line_items() 0 13 2
B cancel_order() 0 55 5
A find_order_by_order_number() 0 11 2
B get_orders_for_shop_paginated() 0 54 8
A get_order_ids_for_order_numbers() 0 16 2
A get_orders() 0 16 2
A _line_item_to_transfer_object() 0 20 1
A update_line_item_processing_result() 0 12 2
A find_order() 0 9 2
A _db_order_to_transfer_object() 0 26 1
A _get_address() 0 6 1
A get_order_count_by_shop_id() 0 14 1
A _update_payment_state() 0 9 1
A _is_overdue() 0 4 1
A _find_order_entity() 0 5 1
A _get_order_entity() 0 10 2
A _execute_article_revocation_actions() 0 12 4
A mark_order_as_paid() 0 52 3
B find_order_with_details() 0 45 2
A _is_canceled() 0 4 1
A get_payment_date() 0 10 2
A _get_order_state() 0 14 5
A has_user_placed_orders() 0 12 1

How to fix   Complexity   

Complexity

Complex classes like byceps.services.shop.order.order_service often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
byceps.services.shop.order.order_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2014-2024 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from collections.abc import Sequence
10
import dataclasses
11 1
from datetime import datetime
12 1
from typing import Any
13 1
from uuid import UUID
14 1
15 1
from flask_babel import lazy_gettext
16
from moneyed import Currency, Money
17 1
from sqlalchemy import delete, select
18 1
import structlog
19 1
20 1
from byceps.database import db, paginate, Pagination
21
from byceps.events.shop import ShopOrderCanceledEvent, ShopOrderPaidEvent
22 1
from byceps.services.shop.article import article_service
23 1
from byceps.services.shop.article.models import ArticleType
24 1
from byceps.services.shop.shop.dbmodels import DbShop
25 1
from byceps.services.shop.shop.models import ShopID
26 1
from byceps.services.shop.storefront.models import StorefrontID
27 1
from byceps.services.ticketing.models.ticket import TicketCategoryID
28 1
from byceps.services.user import user_service
29 1
from byceps.services.user.models.user import User, UserID
30 1
from byceps.util.result import Err, Ok, Result
31 1
32 1
from . import (
33
    order_action_service,
34 1
    order_domain_service,
35
    order_invoice_service,
36
    order_log_service,
37
    order_payment_service,
38
)
39
from .actions import (
40
    ticket as ticket_actions,
41 1
    ticket_bundle as ticket_bundle_actions,
42
)
43
from .dbmodels.line_item import DbLineItem
44
from .dbmodels.log import DbOrderLogEntry
45 1
from .dbmodels.order import DbOrder
46 1
from .errors import (
47 1
    OrderAlreadyCanceledError,
48 1
    OrderAlreadyMarkedAsPaidError,
49 1
    OrderNotPaidError,
50 1
)
51 1
from .models.detailed_order import AdminDetailedOrder, DetailedOrder
52 1
from .models.log import OrderLogEntry
53
from .models.number import OrderNumber
54
from .models.order import (
55
    Address,
56
    AdminOrderListItem,
57
    LineItem,
58
    LineItemID,
59
    LineItemProcessingState,
60
    Order,
61
    OrderID,
62
    OrderState,
63
    PaymentState,
64 1
    SiteOrderListItem,
65 1
)
66
from .models.payment import AdditionalPaymentData
67
from .order_domain_service import OVERDUE_THRESHOLD
68 1
69
70
log = structlog.get_logger()
71 1
72
73
def add_note(order: Order, author: User, text: str) -> None:
74
    """Add a note to the order."""
75
    log_entry = order_domain_service.add_note(order, author, text)
76
77
    db_log_entry = order_log_service.to_db_entry(log_entry)
78
    db.session.add(db_log_entry)
79
    db.session.commit()
80 1
81
82
def set_shipped_flag(order: Order, initiator: User) -> Result[None, str]:
83
    """Mark the order as shipped."""
84
    set_shipped_flag_result = order_domain_service.set_shipped_flag(
85
        order, initiator
86
    )
87
88
    if set_shipped_flag_result.is_err():
89
        return Err(set_shipped_flag_result.unwrap_err())
90
91
    log_entry = set_shipped_flag_result.unwrap()
92
93
    _persist_shipped_flag(log_entry, log_entry.occurred_at)
94
95
    return Ok(None)
96 1
97
98
def unset_shipped_flag(order: Order, initiator: User) -> Result[None, str]:
99
    """Mark the order as not shipped."""
100
    unset_shipped_flag_result = order_domain_service.unset_shipped_flag(
101
        order, initiator
102
    )
103
104
    if unset_shipped_flag_result.is_err():
105
        return Err(unset_shipped_flag_result.unwrap_err())
106
107
    log_entry = unset_shipped_flag_result.unwrap()
108
109
    _persist_shipped_flag(log_entry, None)
110
111
    return Ok(None)
112 1
113
114
def _persist_shipped_flag(
115
    log_entry: OrderLogEntry, processed_at: datetime | None
116
) -> None:
117
    db_order = _get_order_entity(log_entry.order_id)
118
119
    db_log_entry = order_log_service.to_db_entry(log_entry)
120
    db.session.add(db_log_entry)
121
122
    db_order.processed_at = processed_at
123
124
    db.session.commit()
125 1
126
127
def cancel_order(
128
    order_id: OrderID, initiator: User, reason: str
129
) -> Result[tuple[Order, ShopOrderCanceledEvent], OrderAlreadyCanceledError]:
130
    """Cancel the order.
131
132
    Reserved quantities of articles from that order are made available
133 1
    again.
134
    """
135 1
    db_order = _get_order_entity(order_id)
136
137
    orderer_user = user_service.get_user(db_order.placed_by_id)
138 1
    order = _db_order_to_transfer_object(db_order, orderer_user)
139 1
140
    occurred_at = datetime.utcnow()
141 1
142
    cancel_order_result = order_domain_service.cancel_order(
143 1
        order,
144
        orderer_user,
145
        occurred_at,
146
        reason,
147
        initiator,
148
    )
149
    if cancel_order_result.is_err():
150 1
        return Err(cancel_order_result.unwrap_err())
151
152
    event, log_entry = cancel_order_result.unwrap()
153 1
154
    payment_state_to = (
155 1
        PaymentState.canceled_after_paid
156
        if _is_paid(db_order)
157
        else PaymentState.canceled_before_paid
158
    )
159
160
    _update_payment_state(db_order, payment_state_to, occurred_at, initiator)
161 1
    db_order.cancellation_reason = reason
162 1
163
    db_log_entry = order_log_service.to_db_entry(log_entry)
164 1
    db.session.add(db_log_entry)
165 1
166
    # Make the reserved quantity of articles available again.
167
    for db_line_item in db_order.line_items:
168 1
        article_service.increase_quantity(
169 1
            db_line_item.article.id, db_line_item.quantity, commit=False
170
        )
171
172
    db.session.commit()
173 1
174
    canceled_order = _db_order_to_transfer_object(db_order, orderer_user)
175 1
176
    if payment_state_to == PaymentState.canceled_after_paid:
177 1
        _execute_article_revocation_actions(canceled_order, initiator)
178 1
179
    log.info('Order canceled', shop_order_canceled_event=event)
180 1
181
    return Ok((canceled_order, event))
182 1
183
184
def mark_order_as_paid(
185 1
    order_id: OrderID,
186
    payment_method: str,
187
    initiator: User,
188
    *,
189
    additional_payment_data: AdditionalPaymentData | None = None,
190
) -> Result[tuple[Order, ShopOrderPaidEvent], OrderAlreadyMarkedAsPaidError]:
191
    """Mark the order as paid."""
192
    db_order = _get_order_entity(order_id)
193 1
194
    orderer_user = user_service.get_user(db_order.placed_by_id)
195 1
    order = _db_order_to_transfer_object(db_order, orderer_user)
196 1
197
    occurred_at = datetime.utcnow()
198 1
199
    order_payment_service.add_payment(
200 1
        order,
201
        occurred_at,
202
        payment_method,
203
        order.total_amount,
204
        initiator,
205
        additional_payment_data if additional_payment_data is not None else {},
206
    )
207
208
    mark_order_as_paid_result = order_domain_service.mark_order_as_paid(
209 1
        order,
210
        orderer_user,
211
        occurred_at,
212
        payment_method,
213
        additional_payment_data,
214
        initiator,
215
    )
216
    if mark_order_as_paid_result.is_err():
217 1
        return Err(mark_order_as_paid_result.unwrap_err())
218
219
    event, log_entry = mark_order_as_paid_result.unwrap()
220 1
221
    db_order.payment_method = payment_method
222 1
    _update_payment_state(db_order, PaymentState.paid, occurred_at, initiator)
223 1
224
    db_log_entry = order_log_service.to_db_entry(log_entry)
225 1
    db.session.add(db_log_entry)
226 1
227
    db.session.commit()
228 1
229
    paid_order = _db_order_to_transfer_object(db_order, orderer_user)
230 1
231
    _execute_article_creation_actions(paid_order, initiator)
232 1
233
    log.info('Order paid', shop_order_paid_event=event)
234 1
235
    return Ok((paid_order, event))
236 1
237
238
def _update_payment_state(
239 1
    db_order: DbOrder,
240
    state: PaymentState,
241
    updated_at: datetime,
242
    initiator: User,
243
) -> None:
244
    db_order.payment_state = state
245 1
    db_order.payment_state_updated_at = updated_at
246 1
    db_order.payment_state_updated_by_id = initiator.id
247 1
248
249
def _execute_article_creation_actions(order: Order, initiator: User) -> None:
250 1
    # based on article type
251
    for line_item in order.line_items:
252 1
        if line_item.article_type in (
253 1
            ArticleType.ticket,
254
            ArticleType.ticket_bundle,
255
        ):
256
            article = article_service.get_article(line_item.article_id)
257 1
258
            ticket_category_id = TicketCategoryID(
259 1
                UUID(str(article.type_params['ticket_category_id']))
260
            )
261
262
            if line_item.article_type == ArticleType.ticket:
263 1
                ticket_actions.create_tickets(
264 1
                    order,
265
                    line_item,
266
                    ticket_category_id,
267
                    initiator,
268
                )
269
            elif line_item.article_type == ArticleType.ticket_bundle:
270 1
                ticket_quantity_per_bundle = int(
271 1
                    article.type_params['ticket_quantity']
272
                )
273
                ticket_bundle_actions.create_ticket_bundles(
274 1
                    order,
275
                    line_item,
276
                    ticket_category_id,
277
                    ticket_quantity_per_bundle,
278
                    initiator,
279
                )
280
281
    # based on order action registered for article number
282
    order_action_service.execute_creation_actions(order, initiator)
283 1
284
285
def _execute_article_revocation_actions(order: Order, initiator: User) -> None:
286 1
    # based on article type
287
    for line_item in order.line_items:
288 1
        if line_item.article_type == ArticleType.ticket:
289 1
            ticket_actions.revoke_tickets(order, line_item, initiator)
290
        elif line_item.article_type == ArticleType.ticket_bundle:
291 1
            ticket_bundle_actions.revoke_ticket_bundles(
292
                order, line_item, initiator
293
            )
294
295
    # based on order action registered for article number
296
    order_action_service.execute_revocation_actions(order, initiator)
297 1
298
299
def update_line_item_processing_result(
300 1
    line_item_id: LineItemID, data: dict[str, Any]
301
) -> None:
302
    """Update the line item's processing result data."""
303
    db_line_item = db.session.get(DbLineItem, line_item_id)
304 1
305
    if db_line_item is None:
306 1
        raise ValueError(f'Unknown line item ID "{line_item_id}"')
307
308
    db_line_item.processing_result = data
309 1
    db_line_item.processed_at = datetime.utcnow()
310 1
    db.session.commit()
311 1
312
313
def delete_order(order: Order) -> None:
314 1
    """Delete an order."""
315
    order_payment_service.delete_payments_for_order(order.id)
316
317
    db.session.execute(delete(DbOrderLogEntry).filter_by(order_id=order.id))
318
    db.session.execute(
319
        delete(DbLineItem).filter_by(order_number=order.order_number)
320
    )
321
    db.session.execute(delete(DbOrder).filter_by(id=order.id))
322
    db.session.commit()
323
324
    log.info('Order deleted', order_number=order.order_number)
325
326
327
def count_open_orders(shop_id: ShopID) -> int:
328 1
    """Return the number of open orders for the shop."""
329
    return (
330
        db.session.scalar(
331
            select(db.func.count(DbOrder.id))
332
            .filter_by(shop_id=shop_id)
333
            .filter_by(_payment_state=PaymentState.open.name)
334
        )
335
        or 0
336
    )
337 1
338
339 1
def count_orders_per_payment_state(shop_id: ShopID) -> dict[PaymentState, int]:
340
    """Count orders for the shop, grouped by payment state."""
341 1
    counts_by_payment_state = dict.fromkeys(PaymentState, 0)
342
343
    rows = db.session.execute(
344
        select(DbOrder._payment_state, db.func.count(DbOrder.id))
345
        .filter(DbOrder.shop_id == shop_id)
346
        .group_by(DbOrder._payment_state)
347 1
    ).all()
348 1
349 1
    for payment_state_str, count in rows:
350
        payment_state = PaymentState[payment_state_str]
351 1
        counts_by_payment_state[payment_state] = count
352
353
    return counts_by_payment_state
354 1
355
356
def _find_order_entity(order_id: OrderID) -> DbOrder | None:
357
    """Return the order database entity with that id, or `None` if not
358 1
    found.
359
    """
360
    return db.session.get(DbOrder, order_id)
361 1
362
363
def _get_order_entity(order_id: OrderID) -> DbOrder:
364
    """Return the order database entity with that id, or raise an
365 1
    exception.
366
    """
367 1
    db_order = _find_order_entity(order_id)
368
369
    if db_order is None:
370 1
        raise ValueError(f'Unknown order ID "{order_id}"')
371
372
    return db_order
373 1
374
375 1
def find_order(order_id: OrderID) -> Order | None:
376
    """Return the order with that id, or `None` if not found."""
377 1
    db_order = _find_order_entity(order_id)
378
379
    if db_order is None:
380 1
        return None
381 1
382
    orderer_user = user_service.get_user(db_order.placed_by_id)
383
    return _db_order_to_transfer_object(db_order, orderer_user)
384 1
385
386 1
def get_order(order_id: OrderID) -> Order:
387 1
    """Return the order with that id, or raise an exception."""
388 1
    db_order = _get_order_entity(order_id)
389
    orderer_user = user_service.get_user(db_order.placed_by_id)
390
    return _db_order_to_transfer_object(db_order, orderer_user)
391 1
392
393 1
def find_order_with_details(order_id: OrderID) -> DetailedOrder | None:
394
    """Return the order with that id, or `None` if not found."""
395
    db_order = (
396
        db.session.scalars(
397
            select(DbOrder)
398
            .options(
399
                db.joinedload(DbOrder.line_items),
400
            )
401
            .filter_by(id=order_id)
402
        )
403
        .unique()
404
        .one_or_none()
405 1
    )
406 1
407
    if db_order is None:
408 1
        return None
409
410
    placed_by = user_service.get_user(
411
        db_order.placed_by_id, include_avatar=True
412 1
    )
413
414
    return DetailedOrder(
415
        id=db_order.id,
416
        created_at=db_order.created_at,
417
        shop_id=db_order.shop_id,
418
        storefront_id=db_order.storefront_id,
419
        order_number=db_order.order_number,
420
        placed_by=placed_by,
421
        company=db_order.company,
422
        first_name=db_order.first_name,
423
        last_name=db_order.last_name,
424
        address=_get_address(db_order),
425
        total_amount=db_order.total_amount,
426
        line_items=_get_line_items(db_order),
427
        payment_method=db_order.payment_method,
428
        payment_state=db_order.payment_state,
429
        state=_get_order_state(db_order),
430
        is_open=_is_open(db_order),
431
        is_canceled=_is_canceled(db_order),
432
        is_paid=_is_paid(db_order),
433
        is_invoiced=_is_invoiced(db_order),
434
        is_overdue=_is_overdue(db_order),
435
        is_processing_required=db_order.processing_required,
436
        is_processed=_is_processed(db_order),
437
        cancellation_reason=db_order.cancellation_reason,
438
    )
439 1
440
441
def find_order_with_details_for_admin(
442
    order_id: OrderID,
443
) -> AdminDetailedOrder | None:
444
    """Return the order with that id, or `None` if not found."""
445
    detailed_order = find_order_with_details(order_id)
446
447
    if detailed_order is None:
448
        return None
449
450
    invoices = order_invoice_service.get_invoices_for_order(detailed_order.id)
451
    payments = order_payment_service.get_payments_for_order(detailed_order.id)
452
453
    # Copy other attributes from `DetailedOrder` object.
454
    detailed_order_attributes = {
455
        field.name: getattr(detailed_order, field.name)
456
        for field in dataclasses.fields(detailed_order)
457
    }
458
459
    return AdminDetailedOrder(
460
        invoices=invoices,
461
        payments=payments,
462
        **detailed_order_attributes,
463 1
    )
464
465
466
def find_order_by_order_number(order_number: OrderNumber) -> Order | None:
467
    """Return the order with that order number, or `None` if not found."""
468
    db_order = db.session.execute(
469
        select(DbOrder).filter_by(order_number=order_number)
470
    ).scalar_one_or_none()
471
472
    if db_order is None:
473
        return None
474
475
    orderer_user = user_service.get_user(db_order.placed_by_id)
476 1
    return _db_order_to_transfer_object(db_order, orderer_user)
477
478
479
def get_orders_for_order_numbers(
480
    order_numbers: set[OrderNumber],
481
) -> list[Order]:
482
    """Return the orders with those order numbers."""
483
    if not order_numbers:
484
        return []
485
486
    db_orders = (
487
        db.session.scalars(
488
            select(DbOrder)
489
            .options(db.joinedload(DbOrder.line_items))
490
            .filter(DbOrder.order_number.in_(order_numbers))
491
        )
492
        .unique()
493
        .all()
494
    )
495
496 1
    return _db_orders_to_transfer_objects_with_orderer_users(db_orders)
497
498
499
def get_order_ids_for_order_numbers(
500 1
    order_numbers: set[OrderNumber],
501 1
) -> dict[OrderNumber, OrderID]:
502
    """Return the order IDs for those order numbers."""
503
    if not order_numbers:
504
        return {}
505
506
    order_ids_and_numbers = db.session.execute(
507
        select(DbOrder.id, DbOrder.order_number).filter(
508
            DbOrder.order_number.in_(order_numbers)
509
        )
510
    ).all()
511
512
    return {
513
        order_number: order_id
514
        for order_id, order_number in order_ids_and_numbers
515 1
    }
516
517
518
def get_order_count_by_shop_id() -> dict[ShopID, int]:
519
    """Return order count (including 0) per shop, indexed by shop ID."""
520
    shop_ids_and_order_counts = (
521
        db.session.execute(
522
            select(DbShop.id, db.func.count(DbOrder.shop_id))
523
            .outerjoin(DbOrder)
524
            .group_by(DbShop.id)
525
        )
526
        .unique()
527
        .tuples()
528
        .all()
529
    )
530
531 1
    return dict(shop_ids_and_order_counts)
532
533 1
534 1
def get_orders(order_ids: frozenset[OrderID]) -> list[Order]:
535
    """Return the orders with these ids."""
536
    if not order_ids:
537
        return []
538
539
    db_orders = (
540
        db.session.scalars(
541
            select(DbOrder)
542
            .options(db.joinedload(DbOrder.line_items))
543
            .filter(DbOrder.id.in_(order_ids))
544
        )
545
        .unique()
546
        .all()
547
    )
548
549 1
    return _db_orders_to_transfer_objects_with_orderer_users(db_orders)
550
551
552
def get_orders_for_shop_paginated(
553
    shop_id: ShopID,
554
    page: int,
555
    per_page: int,
556
    *,
557
    search_term=None,
558
    only_payment_state: PaymentState | None = None,
559
    only_overdue: bool | None = None,
560
    only_processed: bool | None = None,
561
) -> Pagination:
562
    """Return all orders for that shop, ordered by creation date.
563
564
    If a payment state is specified, only orders in that state are
565
    returned.
566
    """
567
    stmt = (
568
        select(DbOrder)
569
        .options(db.joinedload(DbOrder.line_items))
570
        .filter_by(shop_id=shop_id)
571
        .order_by(DbOrder.created_at.desc())
572
    )
573
574
    if search_term:
575
        ilike_pattern = f'%{search_term}%'
576
        stmt = stmt.filter(DbOrder.order_number.ilike(ilike_pattern))
577
578
    if only_payment_state is not None:
579
        stmt = stmt.filter_by(_payment_state=only_payment_state.name)
580
581
        if (only_payment_state == PaymentState.open) and (
582
            only_overdue is not None
583
        ):
584
            now = datetime.utcnow()
585
586
            if only_overdue:
587
                stmt = stmt.filter(DbOrder.created_at + OVERDUE_THRESHOLD < now)
588
            else:
589
                stmt = stmt.filter(
590
                    DbOrder.created_at + OVERDUE_THRESHOLD >= now
591
                )
592
593
    if only_processed is not None:
594
        stmt = stmt.filter(DbOrder.processing_required == True)  # noqa: E712
595
596
        if only_processed:
597
            stmt = stmt.filter(DbOrder.processed_at.is_not(None))
598
        else:
599
            stmt = stmt.filter(DbOrder.processed_at.is_(None))
600
601
    paginated_orders = paginate(stmt, page, per_page)
602
603
    paginated_orders.items = _to_admin_order_list_items(paginated_orders.items)
604
605
    return paginated_orders
606
607
608
def _to_admin_order_list_items(
609
    db_orders: list[DbOrder],
610
) -> list[AdminOrderListItem]:
611
    orderer_ids = {db_order.placed_by_id for db_order in db_orders}
612
    orderers_by_id = user_service.get_users_indexed_by_id(
613
        orderer_ids, include_avatars=True
614
    )
615
616
    def to_admin_order_list_item(db_order: DbOrder) -> AdminOrderListItem:
617
        placed_by = orderers_by_id[db_order.placed_by_id]
618
619
        return AdminOrderListItem(
620
            id=db_order.id,
621
            created_at=db_order.created_at,
622
            order_number=db_order.order_number,
623
            placed_by=placed_by,
624
            first_name=db_order.first_name,
625
            last_name=db_order.last_name,
626
            total_amount=db_order.total_amount,
627
            payment_state=db_order.payment_state,
628
            state=_get_order_state(db_order),
629
            is_open=_is_open(db_order),
630
            is_canceled=_is_canceled(db_order),
631
            is_paid=_is_paid(db_order),
632
            is_invoiced=_is_invoiced(db_order),
633
            is_overdue=_is_overdue(db_order),
634 1
            is_processing_required=db_order.processing_required,
635
            is_processed=_is_processed(db_order),
636 1
        )
637
638
    return [to_admin_order_list_item(db_order) for db_order in db_orders]
639
640
641
def get_orders_placed_by_user(user_id: UserID) -> list[Order]:
642
    """Return orders placed by the user."""
643
    db_orders = (
644
        db.session.scalars(
645
            select(DbOrder)
646
            .options(
647
                db.joinedload(DbOrder.line_items),
648
            )
649 1
            .filter_by(placed_by_id=user_id)
650
            .order_by(DbOrder.created_at.desc())
651
        )
652 1
        .unique()
653
        .all()
654
    )
655
656 1
    return _db_orders_to_transfer_objects_with_orderer_users(db_orders)
657
658
659
def get_orders_placed_by_user_for_storefront(
660
    user_id: UserID, storefront_id: StorefrontID
661
) -> list[SiteOrderListItem]:
662
    """Return orders placed by the user through that storefront."""
663
    db_orders = (
664
        db.session.scalars(
665
            select(DbOrder)
666
            .options(
667
                db.joinedload(DbOrder.line_items),
668
            )
669
            .filter_by(storefront_id=storefront_id)
670 1
            .filter_by(placed_by_id=user_id)
671 1
            .order_by(DbOrder.created_at.desc())
672
        )
673 1
        .unique()
674 1
        .all()
675
    )
676 1
677
    orderer_ids = {db_order.placed_by_id for db_order in db_orders}
678
    orderers_by_id = user_service.get_users_indexed_by_id(orderer_ids)
679
680
    def to_site_order_list_item(db_order: DbOrder) -> SiteOrderListItem:
681
        placed_by = orderers_by_id[db_order.placed_by_id]
682
683
        return SiteOrderListItem(
684
            id=db_order.id,
685
            created_at=db_order.created_at,
686
            order_number=db_order.order_number,
687
            placed_by=placed_by,
688
            total_amount=db_order.total_amount,
689
            payment_state=db_order.payment_state,
690 1
            state=_get_order_state(db_order),
691
            is_open=_is_open(db_order),
692
            is_canceled=_is_canceled(db_order),
693 1
            is_paid=_is_paid(db_order),
694
            is_overdue=_is_overdue(db_order),
695 1
        )
696
697
    return list(map(to_site_order_list_item, db_orders))
698
699
700
def has_user_placed_orders(user_id: UserID, shop_id: ShopID) -> bool:
701 1
    """Return `True` if the user has placed orders in that shop."""
702
    orders_total = (
703
        db.session.scalar(
704 1
            select(db.func.count(DbOrder.id))
705
            .filter_by(shop_id=shop_id)
706
            .filter_by(placed_by_id=user_id)
707
        )
708
        or 0
709
    )
710
711
    return orders_total > 0
712 1
713
714 1
_PAYMENT_METHOD_LABELS = {
715 1
    'bank_transfer': lazy_gettext('bank transfer'),
716
    'cash': lazy_gettext('cash'),
717
    'direct_debit': lazy_gettext('direct debit'),
718 1
    'free': lazy_gettext('free'),
719
}
720
721
722 1
def find_payment_method_label(payment_method: str) -> str | None:
723
    """Return a label for the payment method."""
724
    label = _PAYMENT_METHOD_LABELS.get(payment_method)
725
    return label or payment_method
726
727 1
728
def get_payment_date(order_id: OrderID) -> Result[datetime, OrderNotPaidError]:
729 1
    """Return the date the order has been marked as paid."""
730
    paid_at = db.session.scalar(
731
        select(DbOrder.payment_state_updated_at).filter_by(id=order_id)
732
    )
733
734
    if not paid_at:
735
        return Err(OrderNotPaidError())
736
737
    return Ok(paid_at)
738
739
740
def _db_order_to_transfer_object(db_order: DbOrder, placed_by: User) -> Order:
741
    """Create transfer object from order database entity."""
742
    return Order(
743
        id=db_order.id,
744
        created_at=db_order.created_at,
745
        shop_id=db_order.shop_id,
746
        storefront_id=db_order.storefront_id,
747
        order_number=db_order.order_number,
748
        placed_by=placed_by,
749
        company=db_order.company,
750
        first_name=db_order.first_name,
751
        last_name=db_order.last_name,
752
        address=_get_address(db_order),
753
        total_amount=db_order.total_amount,
754
        line_items=_get_line_items(db_order),
755
        payment_method=db_order.payment_method,
756 1
        payment_state=db_order.payment_state,
757
        state=_get_order_state(db_order),
758
        is_open=_is_open(db_order),
759 1
        is_canceled=_is_canceled(db_order),
760 1
        is_paid=_is_paid(db_order),
761
        is_invoiced=_is_invoiced(db_order),
762
        is_overdue=_is_overdue(db_order),
763
        is_processing_required=db_order.processing_required,
764 1
        is_processed=_is_processed(db_order),
765
        cancellation_reason=db_order.cancellation_reason,
766
    )
767
768
769
def _db_orders_to_transfer_objects_with_orderer_users(
770
    db_orders: Sequence[DbOrder], *, include_avatars=False
771
) -> list[Order]:
772 1
    orderer_ids = {db_order.placed_by_id for db_order in db_orders}
773 1
    orderers_by_id = user_service.get_users_indexed_by_id(
774
        orderer_ids, include_avatars=True
775
    )
776
777
    return [
778
        _db_order_to_transfer_object(
779
            db_order, orderers_by_id[db_order.placed_by_id]
780
        )
781 1
        for db_order in db_orders
782 1
    ]
783
784 1
785
def _get_address(db_order: DbOrder) -> Address:
786
    return Address(
787
        country=db_order.country,
788
        zip_code=db_order.zip_code,
789
        city=db_order.city,
790
        street=db_order.street,
791 1
    )
792
793 1
794
def _get_line_items(db_order: DbOrder) -> list[LineItem]:
795
    is_order_canceled = _is_canceled(db_order)
796 1
797
    line_items = [
798 1
        _line_item_to_transfer_object(
799
            db_line_item, db_order.currency, is_order_canceled
800
        )
801
        for db_line_item in db_order.line_items
802
    ]
803 1
804
    line_items.sort(key=lambda li: li.article_id)
805
806
    return line_items
807 1
808
809
def _is_overdue(db_order: DbOrder) -> bool:
810
    """Return `True` if payment of the order is overdue."""
811
    return order_domain_service.is_overdue(
812
        db_order.created_at, db_order.payment_state
813
    )
814
815
816
def _line_item_to_transfer_object(
817
    db_line_item: DbLineItem, currency: Currency, is_order_canceled: bool
818
) -> LineItem:
819
    """Create transfer object from line item database entity."""
820
    return LineItem(
821
        id=db_line_item.id,
822
        order_number=db_line_item.order_number,
823
        article_id=db_line_item.article_id,
824
        article_number=db_line_item.article_number,
825
        article_type=db_line_item.article_type,
826
        name=db_line_item.name,
827 1
        unit_price=Money(db_line_item.unit_price, currency),
828
        tax_rate=db_line_item.tax_rate,
829
        quantity=db_line_item.quantity,
830 1
        line_amount=Money(db_line_item.line_amount, currency),
831 1
        processing_required=db_line_item.processing_required,
832
        processing_result=db_line_item.processing_result or {},
833 1
        processed_at=db_line_item.processed_at,
834
        processing_state=_get_line_item_processing_state(
835
            db_line_item, is_order_canceled
836 1
        ),
837 1
    )
838
839 1
840
def _get_line_item_processing_state(
841
    db_line_item: DbLineItem, is_order_canceled: bool
842 1
) -> LineItemProcessingState:
843 1
    if not db_line_item.processing_required:
844 1
        return LineItemProcessingState.not_applicable
845 1
846 1
    if is_order_canceled:
847
        return LineItemProcessingState.canceled
848 1
849 1
    if db_line_item.processed_at is not None:
850
        return LineItemProcessingState.complete
851 1
    else:
852 1
        return LineItemProcessingState.pending
853 1
854
855 1
def _get_order_state(db_order: DbOrder) -> OrderState:
856
    is_canceled = _is_canceled(db_order)
857
    is_paid = _is_paid(db_order)
858 1
    is_processing_required = db_order.processing_required
859 1
    is_processed = _is_processed(db_order)
860
861
    if is_canceled:
862 1
        return OrderState.canceled
863 1
864
    if is_paid:
865
        if not is_processing_required or is_processed:
866
            return OrderState.complete
867
868
    return OrderState.open
869 1
870 1
871
def _is_open(db_order: DbOrder) -> bool:
872
    return db_order.payment_state == PaymentState.open
873 1
874 1
875
def _is_canceled(db_order: DbOrder) -> bool:
876
    return db_order.payment_state in {
877 1
        PaymentState.canceled_before_paid,
878 1
        PaymentState.canceled_after_paid,
879
    }
880
881
882
def _is_paid(db_order: DbOrder) -> bool:
883
    return db_order.payment_state == PaymentState.paid
884
885
886
def _is_invoiced(db_order: DbOrder) -> bool:
887
    return db_order.invoice_created_at is not None
888
889
890
def _is_processed(db_order: DbOrder) -> bool:
891
    return db_order.processed_at is not None
892