Completed
Push — master ( d73237...40effa )
by Olivier
05:28 queued 11s
created

PrintLogger   A

Complexity

Total Complexity 1

Size/Duplication

Total Lines 7
Duplicated Lines 0 %

Coupling/Cohesion

Components 0
Dependencies 1

Importance

Changes 0
Metric Value
wmc 1
lcom 0
cbo 1
dl 0
loc 7
rs 10
c 0
b 0
f 0

1 Method

Rating   Name   Duplication   Size   Complexity  
A log() 0 4 1
1
<?php
2
3
/**
4
 * In this example you will see how XDeathMaxCountProcessor work.
5
 *
6
 * Prerequis
7
 * First let's create the delaying exchange+queue
8
 *
9
 * Create an exchange `waiting_5` (type: `topic`)
10
 * Create an queue `waiting_5` (x-dead-letter-exchange: ``, x-message-ttl: `5000`)
11
 * Bind the exchange `waiting_5` to the queue `waiting_5` with routing_key `#`
12
 *
13
 * And then create the simple global queue.
14
 *
15
 * Create an queue `global` with (x-dead-letter-exchange: `waiting_5`, x-dead-letter-routing-key: `global`)
16
 *
17
 * Run this page and send manually a message
18
 */
19
require_once __DIR__.'/../vendor/autoload.php';
20
21
use Swarrot\Consumer;
22
use Swarrot\Broker\Message;
23
use Swarrot\Processor\ProcessorInterface;
24
use Swarrot\Broker\MessageProvider\PeclPackageMessageProvider;
25
26
class FailProcessor implements ProcessorInterface
27
{
28
    public function process(Message $message, array $options)
29
    {
30
        printf("Fail processor consume message #%d\n", $message->getId());
31
32
        throw new \Exception('This is my process exception.');
33
    }
34
}
35
36
class PrintLogger extends \Psr\Log\AbstractLogger
37
{
38
    public function log($level, $message, array $context = array())
39
    {
40
        printf("[Log %s] %s\n", $level, $message);
41
    }
42
}
43
44
$printLogger = new PrintLogger();
45
46
$connection = new \AMQPConnection();
47
$connection->connect();
48
$channel = new \AMQPChannel($connection);
49
$queue = new \AMQPQueue($channel);
50
$queue->setName('global');
51
52
$messageProvider = new PeclPackageMessageProvider($queue);
53
$stack = (new \Swarrot\Processor\Stack\Builder())
54
    ->push('\Swarrot\Processor\Ack\AckProcessor', $messageProvider, $printLogger)
55
    ->push(
56
        '\Swarrot\Processor\XDeath\XDeathMaxCountProcessor',
57
        'global',
58
        function ($e, $message, $options) {
0 ignored issues
show
Unused Code introduced by
The parameter $options is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
59
            if (end($message->getProperties()['headers']['x-death'])['count'] > 5) {
60
                printf("XDeathMaxCountProcessor callback executed. Not rethrow original exception\n");
61
                // when you return false it not rethrow the catched exception
62
                return false;
63
            }
64
65
            printf("XDeathMaxCountProcessor callback executed. Rethrow original exception\n");
66
            // when you return null it rethrow the catched exception
67
            return;
68
        },
69
        $printLogger
70
    );
71
72
$consumer = new Consumer(
73
    $messageProvider,
74
    $stack->resolve(new FailProcessor()),
75
    null,
76
    $printLogger
77
);
78
79
echo '<pre>';
80
try {
81
    $consumer->consume([
82
        'x_death_max_count' => 3,
83
    ]);
84
} catch (\Exception $exception) {
85
    printf("[%s] %s\n", get_class($exception), $exception->getMessage());
86
}
87
echo '</pre>';
88