Passed
Push — master ( f25e4d...a2be21 )
by Benjamin
03:57 queued 01:51
created

UdpTransport::sendMessageInChunks()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 31
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 3

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
eloc 15
c 1
b 0
f 0
nc 3
nop 1
dl 0
loc 31
ccs 16
cts 16
cp 1
crap 3
rs 9.7666
1
<?php
2
3
/*
4
 * This file is part of the php-gelf package.
5
 *
6
 * (c) Benjamin Zikarsky <http://benjamin-zikarsky.de>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Gelf\Transport;
13
14
use Gelf\MessageInterface as Message;
15
use Gelf\Encoder\CompressedJsonEncoder as DefaultEncoder;
16
use InvalidArgumentException;
17
use RuntimeException;
18
19
/**
20
 * UdpTransport allows the transfer of GELF-messages to an compatible
21
 * GELF-UDP-backend as described in
22
 * https://github.com/Graylog2/graylog2-docs/wiki/GELF
23
 *
24
 * It can also act as a direct publisher
25
 *
26
 * @author Benjamin Zikarsky <[email protected]>
27
 */
28
class UdpTransport extends AbstractTransport
29
{
30
    const CHUNK_GELF_ID = "\x1e\x0f";
31
    const CHUNK_MAX_COUNT = 128; // as per GELF spec
32
    const CHUNK_SIZE_LAN = 8154;
33
    const CHUNK_SIZE_WAN = 1420;
34
    const CHUNK_HEADER_LENGTH = 12; // GELF ID (2b), id (8b) , sequence (2b)
35
36
    const DEFAULT_HOST = "127.0.0.1";
37
    const DEFAULT_PORT = 12201;
38
39
    /**
40
     * @var int
41
     */
42
    protected $chunkSize;
43
44
    /**
45
     * @var StreamSocketClient
46
     */
47
    protected $socketClient;
48
49
    /**
50
     * Class constructor
51
     *
52
     * @param string $host when NULL or empty DEFAULT_HOST is used
53
     * @param int $port when NULL or empty DEFAULT_PORT is used
54
     * @param int $chunkSize defaults to CHUNK_SIZE_WAN,
55
     *                          0 disables chunks completely
56
     */
57 5
    public function __construct(
58
        $host = self::DEFAULT_HOST,
59
        $port = self::DEFAULT_PORT,
60
        $chunkSize = self::CHUNK_SIZE_WAN
61
    ) {
62
        // allow NULL-like values for fallback on default
63 5
        $host = $host ?: self::DEFAULT_HOST;
64 5
        $port = $port ?: self::DEFAULT_PORT;
65
66 5
        $this->socketClient = new StreamSocketClient('udp', $host, $port);
67 5
        $this->chunkSize = $chunkSize;
68
69 5
        $this->messageEncoder = new DefaultEncoder();
70
71 5
        if ($chunkSize > 0 && $chunkSize <= self::CHUNK_HEADER_LENGTH) {
72
            throw new InvalidArgumentException('Chunk-size has to exceed ' . self::CHUNK_HEADER_LENGTH
73
                . ' which is the number of bytes reserved for the chunk-header');
74
        }
75 5
    }
76
77
    /**
78
     * Sends a Message over this transport
79
     *
80
     * @param Message $message
81
     *
82
     * @return int the number of UDP packets sent
83
     */
84 3
    public function send(Message $message)
85
    {
86 3
        $rawMessage = $this->getMessageEncoder()->encode($message);
87
88
        // test if we need to split the message to multiple chunks
89
        // chunkSize == 0 allows for an unlimited packet-size, and therefore
90
        // disables chunking
91 3
        if ($this->chunkSize && strlen($rawMessage) > $this->chunkSize) {
92 2
            return $this->sendMessageInChunks($rawMessage);
93
        }
94
95
        // send message in one packet
96 1
        $this->socketClient->write($rawMessage);
97
98 1
        return 1;
99
    }
100
101
    /**
102
     * Sends given string in multiple chunks
103
     *
104
     * @param string $rawMessage
105
     * @return int
106
     *
107
     * @throws RuntimeException on too large messages which would exceed the
108
     * maximum number of possible chunks
109
     */
110 2
    protected function sendMessageInChunks($rawMessage)
111
    {
112
        // split to chunks
113 2
        $chunks = str_split($rawMessage, $this->chunkSize - self::CHUNK_HEADER_LENGTH);
114 2
        $numChunks = count($chunks);
115
116 2
        if ($numChunks > self::CHUNK_MAX_COUNT) {
117 1
            throw new RuntimeException(
118 1
                sprintf(
119 1
                    "Message is too big. Chunk count exceeds %d",
120 1
                    self::CHUNK_MAX_COUNT
121
                )
122
            );
123
        }
124
125
        // generate a random 8byte-message-id
126 1
        $messageId = substr(md5(uniqid("", true), true), 0, 8);
127
128
        // send chunks with a correct chunk-header
129
        // @link http://graylog2.org/gelf#specs
130 1
        foreach ($chunks as $idx => $chunk) {
131 1
            $data = self::CHUNK_GELF_ID            // GELF chunk magic bytes
132 1
                . $messageId                       // unique message id
133 1
                . pack('CC', $idx, $numChunks)     // sequence information
134 1
                . $chunk                           // chunk-data
135
            ;
136
137 1
            $this->socketClient->write($data);
138
        }
139
140 1
        return $numChunks;
141
    }
142
}
143