Passed
Push — main ( bfab89...552a10 )
by Jochen
01:55
created

weitersager.irc._sort_channels_by_name()   A

Complexity

Conditions 2

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 2.5

Importance

Changes 0
Metric Value
cc 2
eloc 2
nop 1
dl 0
loc 2
ccs 1
cts 2
cp 0.5
crap 2.5
rs 10
c 0
b 0
f 0
1
"""
2
weitersager.irc
3
~~~~~~~~~~~~~~~
4
5
Internet Relay Chat
6
7
:Copyright: 2007-2021 Jochen Kupperschmidt
8
:License: MIT, see LICENSE for details.
9
"""
10
11 1
import ssl
12 1
from typing import Any, List, Optional, Set, Union
13
14 1
from irc.bot import ServerSpec, SingleServerIRCBot
15 1
from irc.connection import Factory
16 1
from jaraco.stream.buffer import LenientDecodingLineBuffer
17
18 1
from .config import IrcChannel, IrcConfig, IrcServer
19 1
from .signals import channel_joined
20 1
from .util import log, start_thread
21
22
23 1
class Bot(SingleServerIRCBot):
24
    """An IRC bot to forward messages to IRC channels."""
25
26 1
    def __init__(
27
        self,
28
        server: IrcServer,
29
        nickname: str,
30
        realname: str,
31
        channels: Set[IrcChannel],
32
    ) -> None:
33
        log('Connecting to IRC server {0.host}:{0.port:d} ...', server)
34
35
        server_spec = ServerSpec(server.host, server.port, server.password)
36
        factory = Factory(wrapper=ssl.wrap_socket) if server.ssl else Factory()
37
        SingleServerIRCBot.__init__(
38
            self, [server_spec], nickname, realname, connect_factory=factory
39
        )
40
41
        if server.rate_limit is not None:
42
            log(
43
                'IRC send rate limit set to {:.2f} messages per second.',
44
                server.rate_limit,
45
            )
46
            self.connection.set_rate_limit(server.rate_limit)
47
        else:
48
            log('No IRC send rate limit set.')
49
50
        # Avoid `UnicodeDecodeError` on non-UTF-8 messages.
51
        self.connection.buffer_class = LenientDecodingLineBuffer
52
53
        # Note: `self.channels` already exists in super class.
54
        self.channels_to_join = channels
55
56 1
    def start(self) -> None:
57
        """Connect to the server, in a separate thread."""
58
        start_thread(super().start, self.__class__.__name__)
59
60 1
    def get_version(self) -> str:
61
        """Return this on CTCP VERSION requests."""
62
        return 'Weitersager'
63
64 1
    def on_welcome(self, conn, event) -> None:
65
        """Join channels after connect."""
66
        log('Connected to {}:{:d}.', *conn.socket.getpeername())
67
68
        channels = _sort_channels_by_name(self.channels_to_join)
69
        log('Channels to join: {}', ', '.join(c.name for c in channels))
70
71
        for channel in channels:
72
            log('Joining channel {} ...', channel.name)
73
            conn.join(channel.name, channel.password or '')
74
75 1
    def on_nicknameinuse(self, conn, event) -> None:
76
        """Choose another nickname if conflicting."""
77
        self._nickname += '_'
78
        conn.nick(self._nickname)
79
80 1
    def on_join(self, conn, event) -> None:
81
        """Successfully joined channel."""
82
        joined_nick = event.source.nick
83
        channel_name = event.target
84
85
        if joined_nick == self._nickname:
86
            log('Joined IRC channel: {}', channel_name)
87
            channel_joined.send(channel_name=channel_name)
88
89 1
    def on_badchannelkey(self, conn, event) -> None:
90
        """Channel could not be joined due to wrong password."""
91
        channel_name = event.arguments[0]
92
        log('Cannot join channel {} (bad key).', channel_name)
93
94 1
    def say(
95
        self,
96
        sender: Optional[Any],
97
        *,
98
        channel_name: Optional[str] = None,
99
        text: Optional[str] = None,
100
    ) -> None:
101
        """Say message on channel."""
102
        self.connection.privmsg(channel_name, text)
103
104
105 1
class DummyBot:
106
    """A fake bot that writes messages to STDOUT."""
107
108 1
    def __init__(self, channels: Set[IrcChannel]) -> None:
109
        self.channels = channels
110
111 1
    def start(self) -> None:
112
        # Fake channel joins.
113
        for channel in _sort_channels_by_name(self.channels):
114
            channel_joined.send(channel_name=channel.name)
115
116 1
    def say(
117
        self,
118
        sender: Optional[Any],
119
        *,
120
        channel_name: Optional[str] = None,
121
        text: Optional[str] = None,
122
    ) -> None:
123
        log('{}> {}', channel_name, text)
124
125 1
    def disconnect(self, msg: str) -> None:
126
        # Mimics `irc.bot.SingleServerIRCBot.disconnect`.
127
        log('Shutting down bot ...')
128
129
130 1
def _sort_channels_by_name(channels: Set[IrcChannel]) -> List[IrcChannel]:
131
    return list(sorted(channels, key=lambda c: c.name))
132
133
134 1
def create_bot(config: IrcConfig) -> Union[Bot, DummyBot]:
135
    """Create and return an IRC bot according to the configuration."""
136
    if config.server is None:
137
        log('No IRC server specified; will write to STDOUT instead.')
138
        return DummyBot(config.channels)
139
140
    return Bot(config.server, config.nickname, config.realname, config.channels)
141