1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Swarrot\Broker\MessageProvider; |
4
|
|
|
|
5
|
|
|
use Stomp\Client; |
6
|
|
|
use Stomp\Exception\StompException; |
7
|
|
|
use Stomp\SimpleStomp; |
8
|
|
|
use Stomp\Transport\Message as StompMessage; |
9
|
|
|
use Swarrot\Broker\Message; |
10
|
|
|
|
11
|
|
|
class SimpleStompMessageProvider implements MessageProviderInterface |
12
|
|
|
{ |
13
|
|
|
/** |
14
|
|
|
* @var Client |
15
|
|
|
*/ |
16
|
|
|
private $client; |
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* @var SimpleStomp |
20
|
|
|
*/ |
21
|
|
|
private $stomp; |
22
|
|
|
|
23
|
|
|
/** |
24
|
|
|
* @var string |
25
|
|
|
*/ |
26
|
|
|
private $destination; |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* @param Client $client |
30
|
|
|
* @param string $destination |
31
|
|
|
* @param null $subscriptionId |
32
|
|
|
* @param string $ack |
33
|
|
|
* @param null $selector |
34
|
|
|
* @param array $header |
35
|
|
|
*/ |
36
|
|
View Code Duplication |
public function __construct( |
|
|
|
|
37
|
|
|
Client $client, |
38
|
|
|
$destination, |
39
|
|
|
$subscriptionId = null, |
40
|
|
|
$ack = 'client', |
41
|
|
|
$selector = null, |
42
|
|
|
array $header = [] |
43
|
|
|
) { |
44
|
|
|
$this->client = $client; |
45
|
|
|
$this->destination = $destination; |
46
|
|
|
|
47
|
|
|
$this->stomp = new SimpleStomp($client); |
48
|
|
|
$this->stomp->subscribe($destination, $subscriptionId, $ack, $selector, $header); |
49
|
|
|
} |
50
|
|
|
|
51
|
|
View Code Duplication |
public function get() |
|
|
|
|
52
|
|
|
{ |
53
|
|
|
if ($frame = $this->stomp->read()) { |
54
|
|
|
return new Message($frame->getBody(), $frame->getHeaders()); |
55
|
|
|
} |
56
|
|
|
|
57
|
|
|
return null; |
58
|
|
|
} |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* @param Message $message |
62
|
|
|
*/ |
63
|
|
|
public function ack(Message $message) |
64
|
|
|
{ |
65
|
|
|
$this->stomp->ack(new StompMessage($message->getBody(), $message->getProperties())); |
66
|
|
|
} |
67
|
|
|
|
68
|
|
|
/** |
69
|
|
|
* @param Message $message |
70
|
|
|
* @param bool $requeue |
71
|
|
|
* |
72
|
|
|
* @throws StompException |
73
|
|
|
*/ |
74
|
|
|
public function nack(Message $message, $requeue = false) |
75
|
|
|
{ |
76
|
|
|
$protocol = $this->client->getProtocol(); |
77
|
|
|
if (null === $protocol) { |
78
|
|
|
throw new StompException('Stomp protocol is require to NACK Frames.'); |
79
|
|
|
} |
80
|
|
|
|
81
|
|
|
$this->client->sendFrame( |
82
|
|
|
$protocol->getNackFrame( |
83
|
|
|
new StompMessage($message->getBody(), $message->getProperties()), |
84
|
|
|
null, |
85
|
|
|
$requeue |
86
|
|
|
), |
87
|
|
|
false |
88
|
|
|
); |
89
|
|
|
} |
90
|
|
|
|
91
|
|
|
/** |
92
|
|
|
* @return string |
93
|
|
|
*/ |
94
|
|
|
public function getQueueName() |
95
|
|
|
{ |
96
|
|
|
return $this->destination; |
97
|
|
|
} |
98
|
|
|
} |
99
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.