|
1
|
|
|
<?php |
|
2
|
|
|
|
|
3
|
|
|
namespace Shippinno\Job\Infrastructure\Ui\Console\Laravel\Command; |
|
4
|
|
|
|
|
5
|
|
|
use Doctrine\ORM\EntityManager; |
|
6
|
|
|
use Illuminate\Console\Command; |
|
7
|
|
|
use Illuminate\Contracts\Console\Application; |
|
8
|
|
|
use Illuminate\Contracts\Container\Container; |
|
9
|
|
|
use Interop\Queue\PsrConsumer; |
|
10
|
|
|
use Interop\Queue\PsrContext; |
|
11
|
|
|
use Interop\Queue\PsrMessage; |
|
12
|
|
|
use JMS\Serializer\Serializer; |
|
13
|
|
|
use Shippinno\Job\Application\Job\Job; |
|
14
|
|
|
use Shippinno\Job\Application\Job\JobRunner; |
|
15
|
|
|
use Shippinno\Job\Domain\Model\JobFailedException; |
|
16
|
|
|
|
|
17
|
|
|
class JobConsume extends Command |
|
18
|
|
|
{ |
|
19
|
|
|
/** |
|
20
|
|
|
* {@inheritdoc} |
|
21
|
|
|
*/ |
|
22
|
|
|
protected $signature = 'job:consume'; |
|
23
|
|
|
|
|
24
|
|
|
/** |
|
25
|
|
|
* @var PsrContext |
|
26
|
|
|
*/ |
|
27
|
|
|
private $context; |
|
28
|
|
|
|
|
29
|
|
|
/** |
|
30
|
|
|
* @var Serializer |
|
31
|
|
|
*/ |
|
32
|
|
|
private $serializer; |
|
33
|
|
|
|
|
34
|
|
|
/** |
|
35
|
|
|
* @var EntityManager |
|
36
|
|
|
*/ |
|
37
|
|
|
private $entityManager; |
|
38
|
|
|
|
|
39
|
|
|
/** |
|
40
|
|
|
* @var Container |
|
41
|
|
|
*/ |
|
42
|
|
|
private $container; |
|
43
|
|
|
|
|
44
|
|
|
/** |
|
45
|
|
|
* @param PsrContext $context |
|
46
|
|
|
* @param Serializer $serializer |
|
47
|
|
|
* @param EntityManager $entityManager |
|
48
|
|
|
* @param Container $container |
|
49
|
|
|
*/ |
|
50
|
3 |
|
public function __construct( |
|
51
|
|
|
PsrContext $context, |
|
52
|
|
|
Serializer $serializer, |
|
53
|
|
|
EntityManager $entityManager, |
|
|
|
|
|
|
54
|
|
|
Container $container |
|
55
|
|
|
) { |
|
56
|
3 |
|
parent::__construct(); |
|
57
|
3 |
|
$this->context = $context; |
|
58
|
3 |
|
$this->serializer = $serializer; |
|
59
|
3 |
|
$this->entityManager = $entityManager; |
|
60
|
3 |
|
$this->container = $container; |
|
61
|
3 |
|
} |
|
62
|
|
|
|
|
63
|
|
|
/** |
|
64
|
|
|
* @throws \Doctrine\Common\Persistence\Mapping\MappingException |
|
65
|
|
|
* @throws \Doctrine\ORM\ORMException |
|
66
|
|
|
* @throws \Doctrine\ORM\OptimisticLockException |
|
67
|
|
|
*/ |
|
68
|
|
|
public function handle() |
|
69
|
|
|
{ |
|
70
|
|
|
$consumer = $this->createConsumer(); |
|
71
|
|
|
while (true) { |
|
72
|
|
|
$this->consume($consumer); |
|
73
|
|
|
} |
|
74
|
|
|
} |
|
75
|
|
|
|
|
76
|
|
|
/** |
|
77
|
|
|
* @return PsrConsumer |
|
78
|
|
|
*/ |
|
79
|
|
|
protected function createConsumer(): PsrConsumer |
|
80
|
|
|
{ |
|
81
|
|
|
$queue = $this->context->createQueue('test'); |
|
82
|
|
|
$consumer = $this->context->createConsumer($queue); |
|
83
|
|
|
|
|
84
|
|
|
return $consumer; |
|
85
|
|
|
} |
|
86
|
|
|
|
|
87
|
|
|
/** |
|
88
|
|
|
* @param PsrConsumer $consumer |
|
89
|
|
|
* @throws \Doctrine\Common\Persistence\Mapping\MappingException |
|
90
|
|
|
* @throws \Doctrine\ORM\ORMException |
|
91
|
|
|
* @throws \Doctrine\ORM\OptimisticLockException |
|
92
|
|
|
*/ |
|
93
|
3 |
|
protected function consume(PsrConsumer $consumer) |
|
94
|
|
|
{ |
|
95
|
3 |
|
$this->entityManager->clear(); |
|
96
|
3 |
|
$message = $consumer->receive(); |
|
97
|
3 |
|
if (null === $message) { |
|
98
|
|
|
return; |
|
99
|
|
|
} |
|
100
|
3 |
|
$messageBody = json_decode($message->getBody()); |
|
101
|
|
|
/** @var Job $job */ |
|
102
|
3 |
|
$job = $this->serializer->deserialize($messageBody->body, $messageBody->name, 'json'); |
|
103
|
3 |
|
if ($message->getProperty('attempts') > $job->maxAttempts()) { |
|
104
|
1 |
|
$consumer->reject($message); |
|
105
|
|
|
} |
|
106
|
|
|
/** @var JobRunner $jobRunner */ |
|
107
|
3 |
|
$jobRunner = $this->container->make($job->jobRunner()); |
|
108
|
|
|
try { |
|
109
|
3 |
|
$jobRunner->run($job); |
|
110
|
1 |
|
$consumer->acknowledge($message); |
|
111
|
2 |
|
} catch (JobFailedException $e) { |
|
112
|
2 |
|
$message->setProperty( |
|
113
|
2 |
|
'attempts', |
|
114
|
2 |
|
intval($message->getProperty('attempts', 0)) ?: 0 |
|
115
|
|
|
); |
|
116
|
2 |
|
if ($job->reattemptDelay() > 0) { |
|
117
|
|
|
$message = $this->delayMessage($message, $job->reattemptDelay()); |
|
118
|
|
|
} |
|
119
|
2 |
|
$consumer->reject($message, true); |
|
120
|
3 |
|
} finally { |
|
121
|
3 |
|
$this->entityManager->flush(); |
|
122
|
|
|
} |
|
123
|
3 |
|
} |
|
124
|
|
|
|
|
125
|
|
|
/** |
|
126
|
|
|
* @param PsrMessage $message |
|
127
|
|
|
* @param int $delay |
|
128
|
|
|
* @return PsrMessage |
|
129
|
|
|
*/ |
|
130
|
|
|
protected function delayMessage(PsrMessage $message, int $delay) |
|
131
|
|
|
{ |
|
132
|
|
|
if (method_exists($message, 'setDelaySeconds')) { |
|
133
|
|
|
$message->setDelaySeconds($delay); |
|
|
|
|
|
|
134
|
|
|
} |
|
135
|
|
|
|
|
136
|
|
|
return $message; |
|
137
|
|
|
} |
|
138
|
|
|
} |
|
139
|
|
|
|
The
EntityManagermight become unusable for example if a transaction is rolled back and it gets closed. Let’s assume that somewhere in your application, or in a third-party library, there is code such as the following:If that code throws an exception and the
EntityManageris closed. Any other code which depends on the same instance of theEntityManagerduring this request will fail.On the other hand, if you instead inject the
ManagerRegistry, thegetManager()method guarantees that you will always get a usable manager instance.