Completed
Branch master (a553dc)
by
unknown
26:33
created

KafkaHandler   B

Complexity

Total Complexity 36

Size/Duplication

Total Lines 231
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 9

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 231
rs 8.8
wmc 36
lcom 1
cbo 9

8 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 7 1
B factory() 0 30 6
A write() 0 6 2
C handleBatch() 0 25 7
D send() 0 41 9
B getRandomPartition() 0 30 6
A addMessages() 0 11 3
A warning() 0 6 2
1
<?php
2
/**
3
 * This program is free software; you can redistribute it and/or modify
4
 * it under the terms of the GNU General Public License as published by
5
 * the Free Software Foundation; either version 2 of the License, or
6
 * (at your option) any later version.
7
 *
8
 * This program is distributed in the hope that it will be useful,
9
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
 * GNU General Public License for more details.
12
 *
13
 * You should have received a copy of the GNU General Public License along
14
 * with this program; if not, write to the Free Software Foundation, Inc.,
15
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16
 * http://www.gnu.org/copyleft/gpl.html
17
 *
18
 * @file
19
 */
20
21
namespace MediaWiki\Logger\Monolog;
22
23
use Kafka\MetaDataFromKafka;
24
use Kafka\Produce;
25
use Kafka\Protocol\Decoder;
26
use MediaWiki\Logger\LoggerFactory;
27
use Monolog\Handler\AbstractProcessingHandler;
28
use Monolog\Logger;
29
use Psr\Log\LoggerInterface;
30
31
/**
32
 * Log handler sends log events to a kafka server.
33
 *
34
 * Constructor options array arguments:
35
 * * alias: map from monolog channel to kafka topic name. When no
36
 *	  alias exists the topic "monolog_$channel" will be used.
37
 * * swallowExceptions: Swallow exceptions that occur while talking to
38
 *    kafka. Defaults to false.
39
 * * logExceptions: Log exceptions talking to kafka here. Either null,
40
 *    the name of a channel to log to, or an object implementing
41
 *    FormatterInterface. Defaults to null.
42
 *
43
 * Requires the nmred/kafka-php library, version >= 1.3.0
44
 *
45
 * @since 1.26
46
 * @author Erik Bernhardson <[email protected]>
47
 * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
48
 */
49
class KafkaHandler extends AbstractProcessingHandler {
50
	/**
51
	 * @var Produce Sends requests to kafka
52
	 */
53
	protected $produce;
54
55
	/**
56
	 * @var array Optional handler configuration
57
	 */
58
	protected $options;
59
60
	/**
61
	 * @var array Map from topic name to partition this request produces to
62
	 */
63
	protected $partitions = [];
64
65
	/**
66
	 * @var array defaults for constructor options
67
	 */
68
	private static $defaultOptions = [
69
		'alias' => [], // map from monolog channel to kafka topic
70
		'swallowExceptions' => false, // swallow exceptions sending records
71
		'logExceptions' => null, // A PSR3 logger to inform about errors
72
		'requireAck' => 0,
73
	];
74
75
	/**
76
	 * @param Produce $produce Kafka instance to produce through
77
	 * @param array $options optional handler configuration
78
	 * @param int $level The minimum logging level at which this handler will be triggered
79
	 * @param bool $bubble Whether the messages that are handled can bubble up the stack or not
80
	 */
81
	public function __construct(
82
		Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true
83
	) {
84
		parent::__construct( $level, $bubble );
85
		$this->produce = $produce;
86
		$this->options = array_merge( self::$defaultOptions, $options );
87
	}
88
89
	/**
90
	 * Constructs the necessary support objects and returns a KafkaHandler
91
	 * instance.
92
	 *
93
	 * @param string[] $kafkaServers
94
	 * @param array $options
95
	 * @param int $level The minimum logging level at which this handle will be triggered
96
	 * @param bool $bubble Whether the messages that are handled can bubble the stack or not
97
	 * @return KafkaHandler
98
	 */
99
	public static function factory(
100
		$kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true
101
	) {
102
		$metadata = new MetaDataFromKafka( $kafkaServers );
103
		$produce = new Produce( $metadata );
104
105
		if ( isset( $options['sendTimeout'] ) ) {
106
			$timeOut = $options['sendTimeout'];
107
			$produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
108
			$produce->getClient()->setStreamOption( 'SendTimeoutUSec',
109
				intval( $timeOut * 1000000 )
110
			);
111
		}
112
		if ( isset( $options['recvTimeout'] ) ) {
113
			$timeOut = $options['recvTimeout'];
114
			$produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
115
			$produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
116
				intval( $timeOut * 1000000 )
117
			);
118
		}
119
		if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
120
			$options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
121
		}
122
123
		if ( isset( $options['requireAck'] ) ) {
124
			$produce->setRequireAck( $options['requireAck'] );
125
		}
126
127
		return new self( $produce, $options, $level, $bubble );
128
	}
129
130
	/**
131
	 * {@inheritDoc}
132
	 */
133
	protected function write( array $record ) {
134
		if ( $record['formatted'] !== null ) {
135
			$this->addMessages( $record['channel'], [ $record['formatted'] ] );
136
			$this->send();
137
		}
138
	}
139
140
	/**
141
	 * {@inheritDoc}
142
	 */
143
	public function handleBatch( array $batch ) {
144
		$channels = [];
145
		foreach ( $batch as $record ) {
146
			if ( $record['level'] < $this->level ) {
147
				continue;
148
			}
149
			$channels[$record['channel']][] = $this->processRecord( $record );
150
		}
151
152
		$formatter = $this->getFormatter();
153
		foreach ( $channels as $channel => $records ) {
154
			$messages = [];
155
			foreach ( $records as $idx => $record ) {
156
				$message = $formatter->format( $record );
157
				if ( $message !== null ) {
158
					$messages[] = $message;
159
				}
160
			}
161
			if ( $messages ) {
162
				$this->addMessages( $channel, $messages );
163
			}
164
		}
165
166
		$this->send();
167
	}
168
169
	/**
170
	 * Send any records in the kafka client internal queue.
171
	 */
172
	protected function send() {
173
		try {
174
			$response = $this->produce->send();
175
		} catch ( \Kafka\Exception $e ) {
176
			$ignore = $this->warning(
177
				'Error sending records to kafka: {exception}',
178
				[ 'exception' => $e ] );
179
			if ( !$ignore ) {
180
				throw $e;
181
			} else {
182
				return;
183
			}
184
		}
185
186
		if ( is_bool( $response ) ) {
187
			return;
188
		}
189
190
		$errors = [];
191
		foreach ( $response as $topicName => $partitionResponse ) {
192
			foreach ( $partitionResponse as $partition => $info ) {
193
				if ( $info['errCode'] === 0 ) {
194
					// no error
195
					continue;
196
				}
197
				$errors[] = sprintf(
198
					'Error producing to %s (errno %d): %s',
199
					$topicName,
200
					$info['errCode'],
201
					Decoder::getError( $info['errCode'] )
202
				);
203
			}
204
		}
205
206
		if ( $errors ) {
207
			$error = implode( "\n", $errors );
208
			if ( !$this->warning( $error ) ) {
209
				throw new \RuntimeException( $error );
210
			}
211
		}
212
	}
213
214
	/**
215
	 * @param string $topic Name of topic to get partition for
216
	 * @return int|null The random partition to produce to for this request,
217
	 *  or null if a partition could not be determined.
218
	 */
219
	protected function getRandomPartition( $topic ) {
220
		if ( !array_key_exists( $topic, $this->partitions ) ) {
221
			try {
222
				$partitions = $this->produce->getAvailablePartitions( $topic );
223
			} catch ( \Kafka\Exception $e ) {
224
				$ignore = $this->warning(
225
					'Error getting metadata for kafka topic {topic}: {exception}',
226
					[ 'topic' => $topic, 'exception' => $e ] );
227
				if ( $ignore ) {
228
					return null;
229
				}
230
				throw $e;
231
			}
232
			if ( $partitions ) {
233
				$key = array_rand( $partitions );
234
				$this->partitions[$topic] = $partitions[$key];
235
			} else {
236
				$details = $this->produce->getClient()->getTopicDetail( $topic );
237
				$ignore = $this->warning(
238
					'No partitions available for kafka topic {topic}',
239
					[ 'topic' => $topic, 'kafka' => $details ]
240
				);
241
				if ( !$ignore ) {
242
					throw new \RuntimeException( "No partitions available for kafka topic $topic" );
243
				}
244
				$this->partitions[$topic] = null;
245
			}
246
		}
247
		return $this->partitions[$topic];
248
	}
249
250
	/**
251
	 * Adds records for a channel to the Kafka client internal queue.
252
	 *
253
	 * @param string $channel Name of Monolog channel records belong to
254
	 * @param array $records List of records to append
255
	 */
256
	protected function addMessages( $channel, array $records ) {
257
		if ( isset( $this->options['alias'][$channel] ) ) {
258
			$topic = $this->options['alias'][$channel];
259
		} else {
260
			$topic = "monolog_$channel";
261
		}
262
		$partition = $this->getRandomPartition( $topic );
263
		if ( $partition !== null ) {
264
			$this->produce->setMessages( $topic, $partition, $records );
265
		}
266
	}
267
268
	/**
269
	 * @param string $message PSR3 compatible message string
270
	 * @param array $context PSR3 compatible log context
271
	 * @return bool true if caller should ignore warning
272
	 */
273
	protected function warning( $message, array $context = [] ) {
274
		if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
275
			$this->options['logExceptions']->warning( $message, $context );
276
		}
277
		return $this->options['swallowExceptions'];
278
	}
279
}
280