byceps.services.shop.order.order_service   F
last analyzed

Complexity

Total Complexity 87

Size/Duplication

Total Lines 911
Duplicated Lines 0 %

Test Coverage

Coverage 71.19%

Importance

Changes 0
Metric Value
eloc 579
dl 0
loc 911
ccs 210
cts 295
cp 0.7119
rs 2
c 0
b 0
f 0
wmc 87

45 Functions

Rating   Name   Duplication   Size   Complexity  
A set_shipped_flag() 0 14 2
A add_note() 0 7 1
A unset_shipped_flag() 0 14 2
A _persist_shipped_flag() 0 11 1
B _execute_article_creation_actions() 0 34 5
A count_open_orders() 0 9 1
A count_orders_per_payment_state() 0 15 2
A delete_order() 0 12 1
B cancel_order() 0 55 5
A update_line_item_processing_result() 0 12 2
A _update_payment_state() 0 9 1
A _execute_article_revocation_actions() 0 12 4
A mark_order_as_paid() 0 52 3
A _is_open() 0 2 1
A get_order() 0 5 1
A find_order_with_details_for_admin() 0 22 2
A _get_line_item_processing_state() 0 13 4
A _db_orders_to_transfer_objects_with_orderer_users() 0 13 1
A _is_processed() 0 2 1
A get_orders_placed_by_user() 0 16 1
A _is_invoiced() 0 2 1
A find_payment_method_label() 0 4 1
A _to_admin_order_list_items() 0 31 1
A get_orders_placed_by_user_for_storefront() 0 39 1
A _is_paid() 0 2 1
A _get_line_items() 0 13 2
A find_order_by_order_number() 0 11 2
A get_orders_for_order_numbers() 0 18 2
B get_orders_for_shop_paginated() 0 54 8
A get_order_ids_for_order_numbers() 0 16 2
A get_orders() 0 16 2
A _line_item_to_transfer_object() 0 20 1
A count_orders_per_payment_state_via_order_prefix() 0 17 2
A find_order() 0 9 2
A _db_order_to_transfer_object() 0 26 1
A _get_address() 0 6 1
A get_order_count_by_shop_id() 0 14 1
A _is_overdue() 0 4 1
A _find_order_entity() 0 5 1
A _get_order_entity() 0 10 2
B find_order_with_details() 0 45 2
A _is_canceled() 0 4 1
A get_payment_date() 0 10 2
A _get_order_state() 0 14 5
A has_user_placed_orders() 0 12 1

How to fix   Complexity   

Complexity

Complex classes like byceps.services.shop.order.order_service often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
byceps.services.shop.order.order_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2014-2024 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from collections.abc import Sequence
10
import dataclasses
11 1
from datetime import datetime
12 1
from typing import Any
13 1
from uuid import UUID
14 1
15 1
from flask_babel import lazy_gettext
16
from moneyed import Currency, Money
17 1
from sqlalchemy import delete, select
18 1
import structlog
19 1
20 1
from byceps.database import db, paginate, Pagination
21
from byceps.events.shop import ShopOrderCanceledEvent, ShopOrderPaidEvent
22 1
from byceps.services.shop.article import article_service
23 1
from byceps.services.shop.article.models import ArticleType
24 1
from byceps.services.shop.shop.dbmodels import DbShop
25 1
from byceps.services.shop.shop.models import ShopID
26 1
from byceps.services.shop.storefront.models import StorefrontID
27 1
from byceps.services.ticketing.models.ticket import TicketCategoryID
28 1
from byceps.services.user import user_service
29 1
from byceps.services.user.models.user import User, UserID
30 1
from byceps.util.result import Err, Ok, Result
31 1
32 1
from . import (
33
    order_action_service,
34 1
    order_domain_service,
35
    order_invoice_service,
36
    order_log_service,
37
    order_payment_service,
38
)
39
from .actions import (
40
    ticket as ticket_actions,
41 1
    ticket_bundle as ticket_bundle_actions,
42
)
43
from .dbmodels.line_item import DbLineItem
44
from .dbmodels.log import DbOrderLogEntry
45 1
from .dbmodels.order import DbOrder
46 1
from .errors import (
47 1
    OrderAlreadyCanceledError,
48 1
    OrderAlreadyMarkedAsPaidError,
49 1
    OrderNotPaidError,
50 1
)
51 1
from .models.detailed_order import AdminDetailedOrder, DetailedOrder
52 1
from .models.log import OrderLogEntry
53
from .models.number import OrderNumber
54
from .models.order import (
55
    Address,
56
    AdminOrderListItem,
57
    LineItem,
58
    LineItemID,
59
    LineItemProcessingState,
60
    Order,
61
    OrderID,
62
    OrderState,
63
    PaymentState,
64 1
    SiteOrderListItem,
65 1
)
66
from .models.payment import AdditionalPaymentData
67
from .order_domain_service import OVERDUE_THRESHOLD
68 1
69
70
log = structlog.get_logger()
71 1
72
73
def add_note(order: Order, author: User, text: str) -> None:
74
    """Add a note to the order."""
75
    log_entry = order_domain_service.add_note(order, author, text)
76
77
    db_log_entry = order_log_service.to_db_entry(log_entry)
78
    db.session.add(db_log_entry)
79
    db.session.commit()
80 1
81
82
def set_shipped_flag(order: Order, initiator: User) -> Result[None, str]:
83
    """Mark the order as shipped."""
84
    set_shipped_flag_result = order_domain_service.set_shipped_flag(
85
        order, initiator
86
    )
87
88
    if set_shipped_flag_result.is_err():
89
        return Err(set_shipped_flag_result.unwrap_err())
90
91
    log_entry = set_shipped_flag_result.unwrap()
92
93
    _persist_shipped_flag(log_entry, log_entry.occurred_at)
94
95
    return Ok(None)
96 1
97
98
def unset_shipped_flag(order: Order, initiator: User) -> Result[None, str]:
99
    """Mark the order as not shipped."""
100
    unset_shipped_flag_result = order_domain_service.unset_shipped_flag(
101
        order, initiator
102
    )
103
104
    if unset_shipped_flag_result.is_err():
105
        return Err(unset_shipped_flag_result.unwrap_err())
106
107
    log_entry = unset_shipped_flag_result.unwrap()
108
109
    _persist_shipped_flag(log_entry, None)
110
111
    return Ok(None)
112 1
113
114
def _persist_shipped_flag(
115
    log_entry: OrderLogEntry, processed_at: datetime | None
116
) -> None:
117
    db_order = _get_order_entity(log_entry.order_id)
118
119
    db_log_entry = order_log_service.to_db_entry(log_entry)
120
    db.session.add(db_log_entry)
121
122
    db_order.processed_at = processed_at
123
124
    db.session.commit()
125 1
126
127
def cancel_order(
128
    order_id: OrderID, initiator: User, reason: str
129
) -> Result[tuple[Order, ShopOrderCanceledEvent], OrderAlreadyCanceledError]:
130
    """Cancel the order.
131
132
    Reserved quantities of articles from that order are made available
133 1
    again.
134
    """
135 1
    db_order = _get_order_entity(order_id)
136
137
    orderer_user = user_service.get_user(db_order.placed_by_id)
138 1
    order = _db_order_to_transfer_object(db_order, orderer_user)
139 1
140
    occurred_at = datetime.utcnow()
141 1
142
    cancel_order_result = order_domain_service.cancel_order(
143 1
        order,
144
        orderer_user,
145
        occurred_at,
146
        reason,
147
        initiator,
148
    )
149
    if cancel_order_result.is_err():
150 1
        return Err(cancel_order_result.unwrap_err())
151
152
    event, log_entry = cancel_order_result.unwrap()
153 1
154
    payment_state_to = (
155 1
        PaymentState.canceled_after_paid
156
        if _is_paid(db_order)
157
        else PaymentState.canceled_before_paid
158
    )
159
160
    _update_payment_state(db_order, payment_state_to, occurred_at, initiator)
161 1
    db_order.cancellation_reason = reason
162 1
163
    db_log_entry = order_log_service.to_db_entry(log_entry)
164 1
    db.session.add(db_log_entry)
165 1
166
    # Make the reserved quantity of articles available again.
167
    for db_line_item in db_order.line_items:
168 1
        article_service.increase_quantity(
169 1
            db_line_item.article.id, db_line_item.quantity, commit=False
170
        )
171
172
    db.session.commit()
173 1
174
    canceled_order = _db_order_to_transfer_object(db_order, orderer_user)
175 1
176
    if payment_state_to == PaymentState.canceled_after_paid:
177 1
        _execute_article_revocation_actions(canceled_order, initiator)
178 1
179
    log.info('Order canceled', shop_order_canceled_event=event)
180 1
181
    return Ok((canceled_order, event))
182 1
183
184
def mark_order_as_paid(
185 1
    order_id: OrderID,
186
    payment_method: str,
187
    initiator: User,
188
    *,
189
    additional_payment_data: AdditionalPaymentData | None = None,
190
) -> Result[tuple[Order, ShopOrderPaidEvent], OrderAlreadyMarkedAsPaidError]:
191
    """Mark the order as paid."""
192
    db_order = _get_order_entity(order_id)
193 1
194
    orderer_user = user_service.get_user(db_order.placed_by_id)
195 1
    order = _db_order_to_transfer_object(db_order, orderer_user)
196 1
197
    occurred_at = datetime.utcnow()
198 1
199
    order_payment_service.add_payment(
200 1
        order,
201
        occurred_at,
202
        payment_method,
203
        order.total_amount,
204
        initiator,
205
        additional_payment_data if additional_payment_data is not None else {},
206
    )
207
208
    mark_order_as_paid_result = order_domain_service.mark_order_as_paid(
209 1
        order,
210
        orderer_user,
211
        occurred_at,
212
        payment_method,
213
        additional_payment_data,
214
        initiator,
215
    )
216
    if mark_order_as_paid_result.is_err():
217 1
        return Err(mark_order_as_paid_result.unwrap_err())
218
219
    event, log_entry = mark_order_as_paid_result.unwrap()
220 1
221
    db_order.payment_method = payment_method
222 1
    _update_payment_state(db_order, PaymentState.paid, occurred_at, initiator)
223 1
224
    db_log_entry = order_log_service.to_db_entry(log_entry)
225 1
    db.session.add(db_log_entry)
226 1
227
    db.session.commit()
228 1
229
    paid_order = _db_order_to_transfer_object(db_order, orderer_user)
230 1
231
    _execute_article_creation_actions(paid_order, initiator)
232 1
233
    log.info('Order paid', shop_order_paid_event=event)
234 1
235
    return Ok((paid_order, event))
236 1
237
238
def _update_payment_state(
239 1
    db_order: DbOrder,
240
    state: PaymentState,
241
    updated_at: datetime,
242
    initiator: User,
243
) -> None:
244
    db_order.payment_state = state
245 1
    db_order.payment_state_updated_at = updated_at
246 1
    db_order.payment_state_updated_by_id = initiator.id
247 1
248
249
def _execute_article_creation_actions(order: Order, initiator: User) -> None:
250 1
    # based on article type
251
    for line_item in order.line_items:
252 1
        if line_item.article_type in (
253 1
            ArticleType.ticket,
254
            ArticleType.ticket_bundle,
255
        ):
256
            article = article_service.get_article(line_item.article_id)
257 1
258
            ticket_category_id = TicketCategoryID(
259 1
                UUID(str(article.type_params['ticket_category_id']))
260
            )
261
262
            if line_item.article_type == ArticleType.ticket:
263 1
                ticket_actions.create_tickets(
264 1
                    order,
265
                    line_item,
266
                    ticket_category_id,
267
                    initiator,
268
                )
269
            elif line_item.article_type == ArticleType.ticket_bundle:
270 1
                ticket_quantity_per_bundle = int(
271 1
                    article.type_params['ticket_quantity']
272
                )
273
                ticket_bundle_actions.create_ticket_bundles(
274 1
                    order,
275
                    line_item,
276
                    ticket_category_id,
277
                    ticket_quantity_per_bundle,
278
                    initiator,
279
                )
280
281
    # based on order action registered for article number
282
    order_action_service.execute_creation_actions(order, initiator)
283 1
284
285
def _execute_article_revocation_actions(order: Order, initiator: User) -> None:
286 1
    # based on article type
287
    for line_item in order.line_items:
288 1
        if line_item.article_type == ArticleType.ticket:
289 1
            ticket_actions.revoke_tickets(order, line_item, initiator)
290
        elif line_item.article_type == ArticleType.ticket_bundle:
291 1
            ticket_bundle_actions.revoke_ticket_bundles(
292
                order, line_item, initiator
293
            )
294
295
    # based on order action registered for article number
296
    order_action_service.execute_revocation_actions(order, initiator)
297 1
298
299
def update_line_item_processing_result(
300 1
    line_item_id: LineItemID, data: dict[str, Any]
301
) -> None:
302
    """Update the line item's processing result data."""
303
    db_line_item = db.session.get(DbLineItem, line_item_id)
304 1
305
    if db_line_item is None:
306 1
        raise ValueError(f'Unknown line item ID "{line_item_id}"')
307
308
    db_line_item.processing_result = data
309 1
    db_line_item.processed_at = datetime.utcnow()
310 1
    db.session.commit()
311 1
312
313
def delete_order(order: Order) -> None:
314 1
    """Delete an order."""
315
    order_payment_service.delete_payments_for_order(order.id)
316
317
    db.session.execute(delete(DbOrderLogEntry).filter_by(order_id=order.id))
318
    db.session.execute(
319
        delete(DbLineItem).filter_by(order_number=order.order_number)
320
    )
321
    db.session.execute(delete(DbOrder).filter_by(id=order.id))
322
    db.session.commit()
323
324
    log.info('Order deleted', order_number=order.order_number)
325
326
327
def count_open_orders(shop_id: ShopID) -> int:
328 1
    """Return the number of open orders for the shop."""
329
    return (
330
        db.session.scalar(
331
            select(db.func.count(DbOrder.id))
332
            .filter_by(shop_id=shop_id)
333
            .filter_by(_payment_state=PaymentState.open.name)
334
        )
335
        or 0
336
    )
337 1
338
339 1
def count_orders_per_payment_state(shop_id: ShopID) -> dict[PaymentState, int]:
340
    """Count orders for the shop, grouped by payment state."""
341 1
    counts_by_payment_state = dict.fromkeys(PaymentState, 0)
342
343
    rows = db.session.execute(
344
        select(DbOrder._payment_state, db.func.count(DbOrder.id))
345
        .filter(DbOrder.shop_id == shop_id)
346
        .group_by(DbOrder._payment_state)
347 1
    ).all()
348 1
349 1
    for payment_state_str, count in rows:
350
        payment_state = PaymentState[payment_state_str]
351 1
        counts_by_payment_state[payment_state] = count
352
353
    return counts_by_payment_state
354 1
355
356
def count_orders_per_payment_state_via_order_prefix(
357
    order_number_prefix: str,
358 1
) -> dict[PaymentState, int]:
359
    """Count orders with the order number prefix, grouped by payment state."""
360
    counts_by_payment_state = dict.fromkeys(PaymentState, 0)
361 1
362
    rows = db.session.execute(
363
        select(DbOrder._payment_state, db.func.count(DbOrder.id))
364
        .filter(DbOrder.order_number.like(order_number_prefix + '%'))
365 1
        .group_by(DbOrder._payment_state)
366
    ).all()
367 1
368
    for payment_state_str, count in rows:
369
        payment_state = PaymentState[payment_state_str]
370 1
        counts_by_payment_state[payment_state] = count
371
372
    return counts_by_payment_state
373 1
374
375 1
def _find_order_entity(order_id: OrderID) -> DbOrder | None:
376
    """Return the order database entity with that id, or `None` if not
377 1
    found.
378
    """
379
    return db.session.get(DbOrder, order_id)
380 1
381 1
382
def _get_order_entity(order_id: OrderID) -> DbOrder:
383
    """Return the order database entity with that id, or raise an
384 1
    exception.
385
    """
386 1
    db_order = _find_order_entity(order_id)
387 1
388 1
    if db_order is None:
389
        raise ValueError(f'Unknown order ID "{order_id}"')
390
391 1
    return db_order
392
393 1
394
def find_order(order_id: OrderID) -> Order | None:
395
    """Return the order with that id, or `None` if not found."""
396
    db_order = _find_order_entity(order_id)
397
398
    if db_order is None:
399
        return None
400
401
    orderer_user = user_service.get_user(db_order.placed_by_id)
402
    return _db_order_to_transfer_object(db_order, orderer_user)
403
404
405 1
def get_order(order_id: OrderID) -> Order:
406 1
    """Return the order with that id, or raise an exception."""
407
    db_order = _get_order_entity(order_id)
408 1
    orderer_user = user_service.get_user(db_order.placed_by_id)
409
    return _db_order_to_transfer_object(db_order, orderer_user)
410
411
412 1
def find_order_with_details(order_id: OrderID) -> DetailedOrder | None:
413
    """Return the order with that id, or `None` if not found."""
414
    db_order = (
415
        db.session.scalars(
416
            select(DbOrder)
417
            .options(
418
                db.joinedload(DbOrder.line_items),
419
            )
420
            .filter_by(id=order_id)
421
        )
422
        .unique()
423
        .one_or_none()
424
    )
425
426
    if db_order is None:
427
        return None
428
429
    placed_by = user_service.get_user(
430
        db_order.placed_by_id, include_avatar=True
431
    )
432
433
    return DetailedOrder(
434
        id=db_order.id,
435
        created_at=db_order.created_at,
436
        shop_id=db_order.shop_id,
437
        storefront_id=db_order.storefront_id,
438
        order_number=db_order.order_number,
439 1
        placed_by=placed_by,
440
        company=db_order.company,
441
        first_name=db_order.first_name,
442
        last_name=db_order.last_name,
443
        address=_get_address(db_order),
444
        total_amount=db_order.total_amount,
445
        line_items=_get_line_items(db_order),
446
        payment_method=db_order.payment_method,
447
        payment_state=db_order.payment_state,
448
        state=_get_order_state(db_order),
449
        is_open=_is_open(db_order),
450
        is_canceled=_is_canceled(db_order),
451
        is_paid=_is_paid(db_order),
452
        is_invoiced=_is_invoiced(db_order),
453
        is_overdue=_is_overdue(db_order),
454
        is_processing_required=db_order.processing_required,
455
        is_processed=_is_processed(db_order),
456
        cancellation_reason=db_order.cancellation_reason,
457
    )
458
459
460
def find_order_with_details_for_admin(
461
    order_id: OrderID,
462
) -> AdminDetailedOrder | None:
463 1
    """Return the order with that id, or `None` if not found."""
464
    detailed_order = find_order_with_details(order_id)
465
466
    if detailed_order is None:
467
        return None
468
469
    invoices = order_invoice_service.get_invoices_for_order(detailed_order.id)
470
    payments = order_payment_service.get_payments_for_order(detailed_order.id)
471
472
    # Copy other attributes from `DetailedOrder` object.
473
    detailed_order_attributes = {
474
        field.name: getattr(detailed_order, field.name)
475
        for field in dataclasses.fields(detailed_order)
476 1
    }
477
478
    return AdminDetailedOrder(
479
        invoices=invoices,
480
        payments=payments,
481
        **detailed_order_attributes,
482
    )
483
484
485
def find_order_by_order_number(order_number: OrderNumber) -> Order | None:
486
    """Return the order with that order number, or `None` if not found."""
487
    db_order = db.session.execute(
488
        select(DbOrder).filter_by(order_number=order_number)
489
    ).scalar_one_or_none()
490
491
    if db_order is None:
492
        return None
493
494
    orderer_user = user_service.get_user(db_order.placed_by_id)
495
    return _db_order_to_transfer_object(db_order, orderer_user)
496 1
497
498
def get_orders_for_order_numbers(
499
    order_numbers: set[OrderNumber],
500 1
) -> list[Order]:
501 1
    """Return the orders with those order numbers."""
502
    if not order_numbers:
503
        return []
504
505
    db_orders = (
506
        db.session.scalars(
507
            select(DbOrder)
508
            .options(db.joinedload(DbOrder.line_items))
509
            .filter(DbOrder.order_number.in_(order_numbers))
510
        )
511
        .unique()
512
        .all()
513
    )
514
515 1
    return _db_orders_to_transfer_objects_with_orderer_users(db_orders)
516
517
518
def get_order_ids_for_order_numbers(
519
    order_numbers: set[OrderNumber],
520
) -> dict[OrderNumber, OrderID]:
521
    """Return the order IDs for those order numbers."""
522
    if not order_numbers:
523
        return {}
524
525
    order_ids_and_numbers = db.session.execute(
526
        select(DbOrder.id, DbOrder.order_number).filter(
527
            DbOrder.order_number.in_(order_numbers)
528
        )
529
    ).all()
530
531 1
    return {
532
        order_number: order_id
533 1
        for order_id, order_number in order_ids_and_numbers
534 1
    }
535
536
537
def get_order_count_by_shop_id() -> dict[ShopID, int]:
538
    """Return order count (including 0) per shop, indexed by shop ID."""
539
    shop_ids_and_order_counts = (
540
        db.session.execute(
541
            select(DbShop.id, db.func.count(DbOrder.shop_id))
542
            .outerjoin(DbOrder)
543
            .group_by(DbShop.id)
544
        )
545
        .unique()
546
        .tuples()
547
        .all()
548
    )
549 1
550
    return dict(shop_ids_and_order_counts)
551
552
553
def get_orders(order_ids: frozenset[OrderID]) -> list[Order]:
554
    """Return the orders with these ids."""
555
    if not order_ids:
556
        return []
557
558
    db_orders = (
559
        db.session.scalars(
560
            select(DbOrder)
561
            .options(db.joinedload(DbOrder.line_items))
562
            .filter(DbOrder.id.in_(order_ids))
563
        )
564
        .unique()
565
        .all()
566
    )
567
568
    return _db_orders_to_transfer_objects_with_orderer_users(db_orders)
569
570
571
def get_orders_for_shop_paginated(
572
    shop_id: ShopID,
573
    page: int,
574
    per_page: int,
575
    *,
576
    search_term=None,
577
    only_payment_state: PaymentState | None = None,
578
    only_overdue: bool | None = None,
579
    only_processed: bool | None = None,
580
) -> Pagination:
581
    """Return all orders for that shop, ordered by creation date.
582
583
    If a payment state is specified, only orders in that state are
584
    returned.
585
    """
586
    stmt = (
587
        select(DbOrder)
588
        .options(db.joinedload(DbOrder.line_items))
589
        .filter_by(shop_id=shop_id)
590
        .order_by(DbOrder.created_at.desc())
591
    )
592
593
    if search_term:
594
        ilike_pattern = f'%{search_term}%'
595
        stmt = stmt.filter(DbOrder.order_number.ilike(ilike_pattern))
596
597
    if only_payment_state is not None:
598
        stmt = stmt.filter_by(_payment_state=only_payment_state.name)
599
600
        if (only_payment_state == PaymentState.open) and (
601
            only_overdue is not None
602
        ):
603
            now = datetime.utcnow()
604
605
            if only_overdue:
606
                stmt = stmt.filter(DbOrder.created_at + OVERDUE_THRESHOLD < now)
607
            else:
608
                stmt = stmt.filter(
609
                    DbOrder.created_at + OVERDUE_THRESHOLD >= now
610
                )
611
612
    if only_processed is not None:
613
        stmt = stmt.filter(DbOrder.processing_required == True)  # noqa: E712
614
615
        if only_processed:
616
            stmt = stmt.filter(DbOrder.processed_at.is_not(None))
617
        else:
618
            stmt = stmt.filter(DbOrder.processed_at.is_(None))
619
620
    paginated_orders = paginate(stmt, page, per_page)
621
622
    paginated_orders.items = _to_admin_order_list_items(paginated_orders.items)
623
624
    return paginated_orders
625
626
627
def _to_admin_order_list_items(
628
    db_orders: list[DbOrder],
629
) -> list[AdminOrderListItem]:
630
    orderer_ids = {db_order.placed_by_id for db_order in db_orders}
631
    orderers_by_id = user_service.get_users_indexed_by_id(
632
        orderer_ids, include_avatars=True
633
    )
634 1
635
    def to_admin_order_list_item(db_order: DbOrder) -> AdminOrderListItem:
636 1
        placed_by = orderers_by_id[db_order.placed_by_id]
637
638
        return AdminOrderListItem(
639
            id=db_order.id,
640
            created_at=db_order.created_at,
641
            order_number=db_order.order_number,
642
            placed_by=placed_by,
643
            first_name=db_order.first_name,
644
            last_name=db_order.last_name,
645
            total_amount=db_order.total_amount,
646
            payment_state=db_order.payment_state,
647
            state=_get_order_state(db_order),
648
            is_open=_is_open(db_order),
649 1
            is_canceled=_is_canceled(db_order),
650
            is_paid=_is_paid(db_order),
651
            is_invoiced=_is_invoiced(db_order),
652 1
            is_overdue=_is_overdue(db_order),
653
            is_processing_required=db_order.processing_required,
654
            is_processed=_is_processed(db_order),
655
        )
656 1
657
    return [to_admin_order_list_item(db_order) for db_order in db_orders]
658
659
660
def get_orders_placed_by_user(user_id: UserID) -> list[Order]:
661
    """Return orders placed by the user."""
662
    db_orders = (
663
        db.session.scalars(
664
            select(DbOrder)
665
            .options(
666
                db.joinedload(DbOrder.line_items),
667
            )
668
            .filter_by(placed_by_id=user_id)
669
            .order_by(DbOrder.created_at.desc())
670 1
        )
671 1
        .unique()
672
        .all()
673 1
    )
674 1
675
    return _db_orders_to_transfer_objects_with_orderer_users(db_orders)
676 1
677
678
def get_orders_placed_by_user_for_storefront(
679
    user_id: UserID, storefront_id: StorefrontID
680
) -> list[SiteOrderListItem]:
681
    """Return orders placed by the user through that storefront."""
682
    db_orders = (
683
        db.session.scalars(
684
            select(DbOrder)
685
            .options(
686
                db.joinedload(DbOrder.line_items),
687
            )
688
            .filter_by(storefront_id=storefront_id)
689
            .filter_by(placed_by_id=user_id)
690 1
            .order_by(DbOrder.created_at.desc())
691
        )
692
        .unique()
693 1
        .all()
694
    )
695 1
696
    orderer_ids = {db_order.placed_by_id for db_order in db_orders}
697
    orderers_by_id = user_service.get_users_indexed_by_id(orderer_ids)
698
699
    def to_site_order_list_item(db_order: DbOrder) -> SiteOrderListItem:
700
        placed_by = orderers_by_id[db_order.placed_by_id]
701 1
702
        return SiteOrderListItem(
703
            id=db_order.id,
704 1
            created_at=db_order.created_at,
705
            order_number=db_order.order_number,
706
            placed_by=placed_by,
707
            total_amount=db_order.total_amount,
708
            payment_state=db_order.payment_state,
709
            state=_get_order_state(db_order),
710
            is_open=_is_open(db_order),
711
            is_canceled=_is_canceled(db_order),
712 1
            is_paid=_is_paid(db_order),
713
            is_overdue=_is_overdue(db_order),
714 1
        )
715 1
716
    return list(map(to_site_order_list_item, db_orders))
717
718 1
719
def has_user_placed_orders(user_id: UserID, shop_id: ShopID) -> bool:
720
    """Return `True` if the user has placed orders in that shop."""
721
    orders_total = (
722 1
        db.session.scalar(
723
            select(db.func.count(DbOrder.id))
724
            .filter_by(shop_id=shop_id)
725
            .filter_by(placed_by_id=user_id)
726
        )
727 1
        or 0
728
    )
729 1
730
    return orders_total > 0
731
732
733
_PAYMENT_METHOD_LABELS = {
734
    'bank_transfer': lazy_gettext('bank transfer'),
735
    'cash': lazy_gettext('cash'),
736
    'direct_debit': lazy_gettext('direct debit'),
737
    'free': lazy_gettext('free'),
738
}
739
740
741
def find_payment_method_label(payment_method: str) -> str | None:
742
    """Return a label for the payment method."""
743
    label = _PAYMENT_METHOD_LABELS.get(payment_method)
744
    return label or payment_method
745
746
747
def get_payment_date(order_id: OrderID) -> Result[datetime, OrderNotPaidError]:
748
    """Return the date the order has been marked as paid."""
749
    paid_at = db.session.scalar(
750
        select(DbOrder.payment_state_updated_at).filter_by(id=order_id)
751
    )
752
753
    if not paid_at:
754
        return Err(OrderNotPaidError())
755
756 1
    return Ok(paid_at)
757
758
759 1
def _db_order_to_transfer_object(db_order: DbOrder, placed_by: User) -> Order:
760 1
    """Create transfer object from order database entity."""
761
    return Order(
762
        id=db_order.id,
763
        created_at=db_order.created_at,
764 1
        shop_id=db_order.shop_id,
765
        storefront_id=db_order.storefront_id,
766
        order_number=db_order.order_number,
767
        placed_by=placed_by,
768
        company=db_order.company,
769
        first_name=db_order.first_name,
770
        last_name=db_order.last_name,
771
        address=_get_address(db_order),
772 1
        total_amount=db_order.total_amount,
773 1
        line_items=_get_line_items(db_order),
774
        payment_method=db_order.payment_method,
775
        payment_state=db_order.payment_state,
776
        state=_get_order_state(db_order),
777
        is_open=_is_open(db_order),
778
        is_canceled=_is_canceled(db_order),
779
        is_paid=_is_paid(db_order),
780
        is_invoiced=_is_invoiced(db_order),
781 1
        is_overdue=_is_overdue(db_order),
782 1
        is_processing_required=db_order.processing_required,
783
        is_processed=_is_processed(db_order),
784 1
        cancellation_reason=db_order.cancellation_reason,
785
    )
786
787
788
def _db_orders_to_transfer_objects_with_orderer_users(
789
    db_orders: Sequence[DbOrder], *, include_avatars=False
790
) -> list[Order]:
791 1
    orderer_ids = {db_order.placed_by_id for db_order in db_orders}
792
    orderers_by_id = user_service.get_users_indexed_by_id(
793 1
        orderer_ids, include_avatars=True
794
    )
795
796 1
    return [
797
        _db_order_to_transfer_object(
798 1
            db_order, orderers_by_id[db_order.placed_by_id]
799
        )
800
        for db_order in db_orders
801
    ]
802
803 1
804
def _get_address(db_order: DbOrder) -> Address:
805
    return Address(
806
        country=db_order.country,
807 1
        zip_code=db_order.zip_code,
808
        city=db_order.city,
809
        street=db_order.street,
810
    )
811
812
813
def _get_line_items(db_order: DbOrder) -> list[LineItem]:
814
    is_order_canceled = _is_canceled(db_order)
815
816
    line_items = [
817
        _line_item_to_transfer_object(
818
            db_line_item, db_order.currency, is_order_canceled
819
        )
820
        for db_line_item in db_order.line_items
821
    ]
822
823
    line_items.sort(key=lambda li: li.article_id)
824
825
    return line_items
826
827 1
828
def _is_overdue(db_order: DbOrder) -> bool:
829
    """Return `True` if payment of the order is overdue."""
830 1
    return order_domain_service.is_overdue(
831 1
        db_order.created_at, db_order.payment_state
832
    )
833 1
834
835
def _line_item_to_transfer_object(
836 1
    db_line_item: DbLineItem, currency: Currency, is_order_canceled: bool
837 1
) -> LineItem:
838
    """Create transfer object from line item database entity."""
839 1
    return LineItem(
840
        id=db_line_item.id,
841
        order_number=db_line_item.order_number,
842 1
        article_id=db_line_item.article_id,
843 1
        article_number=db_line_item.article_number,
844 1
        article_type=db_line_item.article_type,
845 1
        name=db_line_item.name,
846 1
        unit_price=Money(db_line_item.unit_price, currency),
847
        tax_rate=db_line_item.tax_rate,
848 1
        quantity=db_line_item.quantity,
849 1
        line_amount=Money(db_line_item.line_amount, currency),
850
        processing_required=db_line_item.processing_required,
851 1
        processing_result=db_line_item.processing_result or {},
852 1
        processed_at=db_line_item.processed_at,
853 1
        processing_state=_get_line_item_processing_state(
854
            db_line_item, is_order_canceled
855 1
        ),
856
    )
857
858 1
859 1
def _get_line_item_processing_state(
860
    db_line_item: DbLineItem, is_order_canceled: bool
861
) -> LineItemProcessingState:
862 1
    if not db_line_item.processing_required:
863 1
        return LineItemProcessingState.not_applicable
864
865
    if is_order_canceled:
866
        return LineItemProcessingState.canceled
867
868
    if db_line_item.processed_at is not None:
869 1
        return LineItemProcessingState.complete
870 1
    else:
871
        return LineItemProcessingState.pending
872
873 1
874 1
def _get_order_state(db_order: DbOrder) -> OrderState:
875
    is_canceled = _is_canceled(db_order)
876
    is_paid = _is_paid(db_order)
877 1
    is_processing_required = db_order.processing_required
878 1
    is_processed = _is_processed(db_order)
879
880
    if is_canceled:
881
        return OrderState.canceled
882
883
    if is_paid:
884
        if not is_processing_required or is_processed:
885
            return OrderState.complete
886
887
    return OrderState.open
888
889
890
def _is_open(db_order: DbOrder) -> bool:
891
    return db_order.payment_state == PaymentState.open
892
893
894
def _is_canceled(db_order: DbOrder) -> bool:
895
    return db_order.payment_state in {
896
        PaymentState.canceled_before_paid,
897
        PaymentState.canceled_after_paid,
898
    }
899
900
901
def _is_paid(db_order: DbOrder) -> bool:
902
    return db_order.payment_state == PaymentState.paid
903
904
905
def _is_invoiced(db_order: DbOrder) -> bool:
906
    return db_order.invoice_created_at is not None
907
908
909
def _is_processed(db_order: DbOrder) -> bool:
910
    return db_order.processed_at is not None
911