Passed
Push — master ( e73242...f49b80 )
by Hirofumi
02:35
created

JobConsume::delayMessage()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 0
cts 4
cp 0
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 4
nc 2
nop 2
crap 6
1
<?php
2
3
namespace Shippinno\Job\Infrastructure\Ui\Console\Laravel\Command;
4
5
use Doctrine\Common\Persistence\ManagerRegistry;
6
use Doctrine\ORM\EntityManager;
7
use Illuminate\Console\Command;
8
use Illuminate\Contracts\Container\Container;
9
use Interop\Queue\PsrConsumer;
10
use Interop\Queue\PsrContext;
11
use Interop\Queue\PsrMessage;
12
use JMS\Serializer\SerializerBuilder;
13
use Shippinno\Job\Application\Job\JobRunner;
14
use Shippinno\Job\Domain\Model\Job;
15
use Shippinno\Job\Domain\Model\JobFailedException;
16
use Shippinno\Job\Infrastructure\Serialization\JMS\BuildsSerializer;
17
18
class JobConsume extends Command
19
{
20
    use BuildsSerializer;
21
22
    /**
23
     * {@inheritdoc}
24
     */
25
    protected $signature = 'job:consume';
26
27
    /**
28
     * @var PsrContext
29
     */
30
    private $context;
31
32
    /**
33
     * @var Container
34
     */
35
    private $container;
36
37
    /**
38
     * @var ManagerRegistry|null
39
     */
40
    private $managerRegistry;
41
42
    /**
43
     * @param PsrContext $context
44
     * @param SerializerBuilder $serializerBuilder
45
     * @param Container $container
46
     * @param ManagerRegistry|null $managerRegistry
47
     */
48 3
    public function __construct(
49
        PsrContext $context,
50
        SerializerBuilder $serializerBuilder,
51
        Container $container,
52
        ManagerRegistry $managerRegistry = null
53
    ) {
54 3
        parent::__construct();
55 3
        $this->context = $context;
56 3
        $this->buildSerializer($serializerBuilder);
57 3
        $this->container = $container;
58 3
        $this->managerRegistry = $managerRegistry;
59 3
    }
60
61
    /**
62
     * @throws \Doctrine\Common\Persistence\Mapping\MappingException
63
     * @throws \Doctrine\ORM\ORMException
64
     * @throws \Doctrine\ORM\OptimisticLockException
65
     */
66
    public function handle()
67
    {
68
        $consumer = $this->createConsumer();
69
        while (true) {
70
            $this->consume($consumer);
71
        }
72
    }
73
74
    /**
75
     * @return PsrConsumer
76
     */
77
    protected function createConsumer(): PsrConsumer
78
    {
79
        $queue = $this->context->createQueue('test');
80
        $consumer = $this->context->createConsumer($queue);
81
82
        return $consumer;
83
    }
84
85
    /**
86
     * @param PsrConsumer $consumer
87
     */
88 3
    protected function consume(PsrConsumer $consumer)
89
    {
90 3
        if (null !== $this->managerRegistry) {
91 3
            $this->managerRegistry->getManager()->clear();
92
        }
93 3
        $message = $consumer->receive();
94 3
        if (null === $message) {
95
            return;
96
        }
97 3
        $messageBody = json_decode($message->getBody());
98
        /** @var Job $job */
99 3
        $job = $this->serializer->deserialize($messageBody->body, $messageBody->name, 'json');
100 3
        $attempts = $message->getProperty('attempts', 0) + 1;
101 3
        if ($attempts > $job->maxAttempts()) {
102 1
            $consumer->reject($message);
103 1
            return;
104
        }
105
        /** @var JobRunner $jobRunner */
106 2
        $jobRunner = $this->container->make($job->jobRunner());
107
        try {
108 2
            $jobRunner->run($job);
109 1
            $consumer->acknowledge($message);
110 1
        } catch (JobFailedException $e) {
111 1
            $message->setProperty('attempts', $attempts);
112 1
            if ($job->reattemptDelay() > 0) {
113
                $message = $this->delayMessage($message, $job->reattemptDelay());
114
            }
115 1
            $consumer->reject($message, true);
116 2
        } finally {
117 2
            if (null !== $this->managerRegistry) {
118 2
                $this->managerRegistry->getManager()->flush();
119
            }
120
        }
121 2
    }
122
123
    /**
124
     * @param PsrMessage $message
125
     * @param int $delay
126
     * @return PsrMessage
127
     */
128
    protected function delayMessage(PsrMessage $message, int $delay)
129
    {
130
        if (method_exists($message, 'setDelaySeconds')) {
131
            $message->setDelaySeconds($delay);
0 ignored issues
show
Bug introduced by
The method setDelaySeconds() does not seem to exist on object<Interop\Queue\PsrMessage>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
132
        }
133
134
        return $message;
135
    }
136
}
137