Passed
Push — main ( 4dce73...0d6f99 )
by Jochen
04:31
created

get_hour_ranges()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.4218

Importance

Changes 0
Metric Value
cc 1
eloc 4
dl 0
loc 5
rs 10
c 0
b 0
f 0
nop 1
ccs 1
cts 4
cp 0.25
crap 1.4218
1
"""
2
byceps.services.orga_presence.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from datetime import date, datetime
11 1
from itertools import groupby
12 1
from typing import Iterator, Sequence
13 1
from uuid import UUID
14
15 1
import pendulum
16 1
from pendulum import DateTime
17 1
from sqlalchemy import delete
18
19 1
from ...database import db
20 1
from ...typing import PartyID, UserID
21 1
from ...util.datetime.range import create_adjacent_ranges, DateTimeRange
22 1
from ...util.datetime.timezone import get_timezone_string
23
24 1
from ..party import service as party_service
25
26 1
from .dbmodels import (
27
    Presence as DbPresence,
28
    Task as DbTask,
29
    TimeSlot as DbTimeSlot,
30
)
31 1
from .transfer.models import PresenceTimeSlot, TaskTimeSlot, TimeSlot
32
33
34 1
def create_presence(
35
    party_id: PartyID, orga_id: UserID, starts_at: datetime, ends_at: datetime
36
) -> PresenceTimeSlot:
37
    """Create a presence for the orga during the party."""
38
    party = party_service.get_party(party_id)
39
40
    presence = DbPresence(
41
        party_id=party.id, starts_at=starts_at, ends_at=ends_at, orga_id=orga_id
42
    )
43
    db.session.add(presence)
44
    db.session.commit()
45
46
    return _presence_to_time_slot(presence)
47
48
49 1
def delete_time_slot(time_slot_id: UUID) -> None:
50
    """Delete a time slot."""
51
    db.session.execute(
52
        delete(DbTimeSlot)
53
        .where(DbTimeSlot.id == time_slot_id)
54
        .execution_options(synchronize_session='fetch')
55
    )
56
57
58 1
def get_presences(party_id: PartyID) -> list[PresenceTimeSlot]:
59
    """Return all presences for that party."""
60
    presences = db.session \
61
        .query(DbPresence) \
62
        .filter_by(party_id=party_id) \
63
        .options(db.joinedload(DbPresence.orga)) \
64
        .all()
65
66
    return [_presence_to_time_slot(presence) for presence in presences]
67
68
69 1
def get_tasks(party_id: PartyID) -> list[TaskTimeSlot]:
70
    """Return all tasks for that party."""
71
    tasks = db.session \
72
        .query(DbTask) \
73
        .filter_by(party_id=party_id) \
74
        .all()
75
76
    return [_task_to_time_slot(task) for task in tasks]
77
78
79 1
def _presence_to_time_slot(presence: DbPresence) -> PresenceTimeSlot:
80
    return PresenceTimeSlot.from_(
81
        presence.id,
82
        presence.orga,
83
        presence.starts_at,
84
        presence.ends_at,
85
    )
86
87
88 1
def _task_to_time_slot(task: DbTask) -> TaskTimeSlot:
89
    return TaskTimeSlot.from_(
90
        task.id,
91
        task.title,
92
        task.starts_at,
93
        task.ends_at,
94
    )
95
96
97
# -------------------------------------------------------------------- #
98
99
100 1
def get_hour_ranges(time_slots: list[TimeSlot]) -> Iterator[DateTimeRange]:
101
    """Yield hour ranges based on the time slots."""
102
    time_slot_ranges = [time_slot.range for time_slot in time_slots]
103
    hour_starts = _get_hour_starts(time_slot_ranges)
104
    return create_adjacent_ranges(hour_starts)
105
106
107 1
def _get_hour_starts(dt_ranges: Sequence[DateTimeRange]) -> Iterator[datetime]:
108
    min_starts_at = _to_local_pendulum_datetime(_find_earliest_start(dt_ranges))
109
    max_ends_at = _to_local_pendulum_datetime(_find_latest_end(dt_ranges))
110
111
    min_starts_at = min_starts_at.set(minute=0, second=0, microsecond=0)
112
113
    period = pendulum.period(min_starts_at, max_ends_at)
114
    hour_starts = period.range('hours')
115
116
    return _to_datetimes_without_tzinfo(hour_starts)
117
118
119 1
def _find_earliest_start(dt_ranges: Sequence[DateTimeRange]) -> datetime:
120
    return min(dt_range.start for dt_range in dt_ranges)
121
122
123 1
def _find_latest_end(dt_ranges: Sequence[DateTimeRange]) -> datetime:
124
    return max(dt_range.end for dt_range in dt_ranges)
125
126
127 1
def _to_local_pendulum_datetime(dt: datetime) -> DateTime:
128
    return pendulum.instance(dt).in_tz(get_timezone_string())
129
130
131 1
def _to_datetimes_without_tzinfo(dts: Sequence[DateTime]) -> Iterator[datetime]:
132
    for dt in dts:
133
        yield dt.replace(tzinfo=None)
134
135
136 1
def get_days_and_hour_totals(
137
    hour_ranges: Sequence[DateTimeRange],
138
) -> Iterator[tuple[date, int]]:
139
    """Yield (day, relevant hours total) pairs."""
140
141
    def get_date(dt_range: DateTimeRange) -> date:
142
        return dt_range.start.date()
143
144
    for day, hour_ranges_for_day in groupby(hour_ranges, key=get_date):
145
        hour_total = len(list(hour_ranges_for_day))
146
        yield day, hour_total
147