Test Failed
Push — master ( 0bbeee...5fd70b )
by Hirofumi
02:10
created

ConsumeStoredJobService::execute()   C

Complexity

Conditions 8
Paths 26

Size

Total Lines 43
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 43
c 0
b 0
f 0
rs 5.3846
cc 8
eloc 32
nc 26
nop 1
1
<?php
2
3
namespace Shippinno\Job\Application\Messaging;
4
5
use Interop\Queue\PsrConsumer;
6
use Interop\Queue\PsrContext;
7
use Interop\Queue\PsrMessage;
8
use Shippinno\Job\Application\Job\JobRunnerRegistry;
9
use Shippinno\Job\Domain\Model\AbandonedJobMessage;
10
use Shippinno\Job\Domain\Model\AbandonedJobMessageStore;
11
use Shippinno\Job\Domain\Model\JobFailedException;
12
use Shippinno\Job\Domain\Model\JobRunnerNotRegisteredException;
13
use Shippinno\Job\Domain\Model\JobSerializer;
14
use Shippinno\Job\Domain\Model\JobStore;
15
use Shippinno\Job\Domain\Model\StoredJobSerializer;
16
17
class ConsumeStoredJobService
18
{
19
    /**
20
     * @var StoredJobSerializer
21
     */
22
    private $storedJobSerializer;
23
24
    /**
25
     * @var PsrContext
26
     */
27
    private $context;
28
29
    /**
30
     * @var JobSerializer
31
     */
32
    private $jobSerializer;
33
34
    /**
35
     * @var JobRunnerRegistry
36
     */
37
    private $jobRunnerRegistry;
38
39
    /**
40
     * @var JobStore
41
     */
42
    private $jobStore;
43
44
    /**
45
     * @var AbandonedJobMessageStore
46
     */
47
    private $abandonedJobMessageStore;
48
49
    /**
50
     * @param PsrContext $context
51
     * @param StoredJobSerializer $storedJobSerializer
52
     * @param JobSerializer $jobSerializer
53
     * @param JobRunnerRegistry $jobRunnerRegistry
54
     * @param JobStore $jobStore
55
     * @param AbandonedJobMessageStore $abandonedJobMessageStore
56
     */
57
    public function __construct(
58
        PsrContext $context,
59
        StoredJobSerializer $storedJobSerializer,
60
        JobSerializer $jobSerializer,
61
        JobRunnerRegistry $jobRunnerRegistry,
62
        JobStore $jobStore,
63
        AbandonedJobMessageStore $abandonedJobMessageStore
64
    ) {
65
        $this->context = $context;
66
        $this->storedJobSerializer = $storedJobSerializer;
67
        $this->jobSerializer = $jobSerializer;
68
        $this->jobRunnerRegistry = $jobRunnerRegistry;
69
        $this->jobStore = $jobStore;
70
        $this->abandonedJobMessageStore = $abandonedJobMessageStore;
71
    }
72
73
    /**
74
     * @param string $queueName
75
     */
76
    public function execute(string $queueName)
77
    {
78
        $consumer = $this->context->createConsumer($this->context->createQueue($queueName));
79
        $message = $consumer->receive();
80
        if (null === $message) {
81
            return;
82
        }
83
        $storedJob = $this->storedJobSerializer->deserialize($message->getBody());
84
        $job = $this->jobSerializer->deserialize($storedJob->body(), $storedJob->name());
85
        try {
86
            $jobRunner = $this->jobRunnerRegistry->get(get_class($job));
87
        } catch (JobRunnerNotRegisteredException $e) {
88
            $this->abandonedJobMessageStore->add(
89
                new AbandonedJobMessage($message->getBody(), $e->__toString())
90
            );
91
            $consumer->reject($message);
92
            return;
93
        }
94
        try {
95
            $jobRunner->run($job);
96
            $dependentJobs = $job->dependentJobs();
97
            if (count($dependentJobs) > 0) {
98
                foreach ($dependentJobs as $dependentJob) {
99
                    $this->jobStore->append($dependentJob);
100
                }
101
            }
102
            $consumer->acknowledge($message);
103
        } catch (JobFailedException $e) {
104
            $attempts = $message->getProperty('attempts', 0) + 1;
105
            if ($attempts > $job->maxAttempts()) {
106
                $this->abandonedJobMessageStore->add(
107
                    new AbandonedJobMessage($message->getBody(), $e->__toString())
108
                );
109
                $consumer->reject($message);
110
                return;
111
            }
112
            $message->setProperty('attempts', $attempts);
113
            if ($job->reattemptDelay() > 0) {
114
                $message = $this->delayMessage($message, $job->reattemptDelay());
115
            }
116
            $consumer->reject($message, true);
117
        }
118
    }
119
120
    /**
121
     * @param PsrMessage $message
122
     * @param int $delay
123
     * @return PsrMessage
124
     */
125
    protected function delayMessage(PsrMessage $message, int $delay)
126
    {
127
        if (method_exists($message, 'setDelaySeconds')) {
128
            $message->setDelaySeconds($delay);
0 ignored issues
show
Bug introduced by
The method setDelaySeconds() does not seem to exist on object<Interop\Queue\PsrMessage>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
129
        }
130
131
        return $message;
132
    }
133
}
134