1 | <?php |
||
2 | |||
3 | /** |
||
4 | * In this example you will see how XDeathMaxCountProcessor work. |
||
5 | * |
||
6 | * Prerequis |
||
7 | * First let's create the delaying exchange+queue |
||
8 | * |
||
9 | * Create an exchange `waiting_5` (type: `topic`) |
||
10 | * Create an queue `waiting_5` (x-dead-letter-exchange: ``, x-message-ttl: `5000`) |
||
11 | * Bind the exchange `waiting_5` to the queue `waiting_5` with routing_key `#` |
||
12 | * |
||
13 | * And then create the simple global queue. |
||
14 | * |
||
15 | * Create an queue `global` with (x-dead-letter-exchange: `waiting_5`, x-dead-letter-routing-key: `global`) |
||
16 | * |
||
17 | * Run this page and send manually a message |
||
18 | */ |
||
19 | require_once __DIR__.'/../vendor/autoload.php'; |
||
20 | |||
21 | use Swarrot\Broker\Message; |
||
22 | use Swarrot\Broker\MessageProvider\PeclPackageMessageProvider; |
||
23 | use Swarrot\Consumer; |
||
24 | use Swarrot\Processor\ProcessorInterface; |
||
25 | |||
26 | class FailProcessor implements ProcessorInterface |
||
27 | { |
||
28 | public function process(Message $message, array $options) |
||
29 | { |
||
30 | printf("Fail processor consume message #%d\n", $message->getId()); |
||
31 | |||
32 | throw new \Exception('This is my process exception.'); |
||
33 | } |
||
34 | } |
||
35 | |||
36 | class PrintLogger extends \Psr\Log\AbstractLogger |
||
37 | { |
||
38 | public function log($level, $message, array $context = []) |
||
39 | { |
||
40 | printf("[Log %s] %s\n", $level, $message); |
||
41 | } |
||
42 | } |
||
43 | |||
44 | $printLogger = new PrintLogger(); |
||
45 | |||
46 | $connection = new \AMQPConnection(); |
||
47 | $connection->connect(); |
||
48 | $channel = new \AMQPChannel($connection); |
||
49 | $queue = new \AMQPQueue($channel); |
||
50 | $queue->setName('global'); |
||
51 | |||
52 | $messageProvider = new PeclPackageMessageProvider($queue); |
||
53 | $stack = (new \Swarrot\Processor\Stack\Builder()) |
||
54 | ->push('\Swarrot\Processor\Ack\AckProcessor', $messageProvider, $printLogger) |
||
55 | ->push( |
||
56 | '\Swarrot\Processor\XDeath\XDeathMaxCountProcessor', |
||
57 | 'global', |
||
58 | function ($e, $message, $options) { |
||
0 ignored issues
–
show
|
|||
59 | if (end($message->getProperties()['headers']['x-death'])['count'] > 5) { |
||
60 | printf("XDeathMaxCountProcessor callback executed. Not rethrow original exception\n"); |
||
61 | // when you return false it not rethrow the catched exception |
||
62 | return false; |
||
63 | } |
||
64 | |||
65 | printf("XDeathMaxCountProcessor callback executed. Rethrow original exception\n"); |
||
66 | // when you return null it rethrow the catched exception |
||
67 | return; |
||
68 | }, |
||
69 | $printLogger |
||
70 | ); |
||
71 | |||
72 | $consumer = new Consumer( |
||
73 | $messageProvider, |
||
74 | $stack->resolve(new FailProcessor()), |
||
75 | null, |
||
76 | $printLogger |
||
77 | ); |
||
78 | |||
79 | echo '<pre>'; |
||
80 | try { |
||
81 | $consumer->consume([ |
||
82 | 'x_death_max_count' => 3, |
||
83 | ]); |
||
84 | } catch (\Exception $exception) { |
||
85 | printf("[%s] %s\n", get_class($exception), $exception->getMessage()); |
||
86 | } |
||
87 | echo '</pre>'; |
||
88 |
This check looks for parameters that have been defined for a function or method, but which are not used in the method body.