SourceProcessExecutor::getObjectPayload()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
namespace TreeHouse\IoBundle\Bridge\WorkerBundle\Executor;
4
5
use Psr\Log\LoggerInterface;
6
use Psr\Log\LogLevel;
7
use Symfony\Component\OptionsResolver\Exception\InvalidArgumentException;
8
use Symfony\Component\OptionsResolver\Options;
9
use Symfony\Component\OptionsResolver\OptionsResolver;
10
use TreeHouse\IoBundle\Exception\SourceLinkException;
11
use TreeHouse\IoBundle\Exception\SourceProcessException;
12
use TreeHouse\IoBundle\Model\SourceInterface;
13
use TreeHouse\IoBundle\Source\SourceManagerInterface;
14
use TreeHouse\IoBundle\Source\SourceProcessorInterface;
15
use TreeHouse\WorkerBundle\Executor\AbstractExecutor;
16
use TreeHouse\WorkerBundle\Executor\ObjectPayloadInterface;
17
18
/**
19
 * Worker job to link this source to existing entities, or create a
20
 * new entity when an entity with the same features doesn't already
21
 * exist.
22
 *
23
 * Source process jobs are added to the queue by the SourceModificationListener
24
 */
25
class SourceProcessExecutor extends AbstractExecutor implements ObjectPayloadInterface
26
{
27
    const NAME = 'source.process';
28
29
    /**
30
     * @var SourceManagerInterface
31
     */
32
    protected $sourceManager;
33
34
    /**
35
     * @var SourceProcessorInterface
36
     */
37
    protected $processor;
38
39
    /**
40
     * @var LoggerInterface
41
     */
42
    protected $logger;
43
44
    /**
45
     * @param SourceManagerInterface   $sourceManager
46
     * @param SourceProcessorInterface $processor
47
     * @param LoggerInterface          $logger
48
     */
49 12
    public function __construct(SourceManagerInterface $sourceManager, SourceProcessorInterface $processor, LoggerInterface $logger)
50
    {
51 12
        $this->sourceManager = $sourceManager;
52 12
        $this->processor = $processor;
53 12
        $this->logger = $logger;
54 12
    }
55
56
    /**
57
     * @inheritdoc
58
     */
59
    public function getName()
60
    {
61
        return self::NAME;
62
    }
63
64
    /**
65
     * @param SourceInterface $object
66
     *
67
     * @return integer[]
68
     */
69 12
    public function getObjectPayload($object)
70
    {
71 12
        return [$object->getId()];
72
    }
73
74
    /**
75
     * @param SourceInterface $object
76
     *
77
     * @return bool
78
     */
79
    public function supportsObject($object)
80
    {
81
        return $object instanceof SourceInterface;
82
    }
83
84
    /**
85
     * @inheritdoc
86
     */
87 12 View Code Duplication
    public function configurePayload(OptionsResolver $resolver)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
88
    {
89 12
        $resolver->setRequired(0);
90 12
        $resolver->setAllowedTypes(0, 'numeric');
91 12
        $resolver->setNormalizer(0, function (Options $options, $value) {
92 12
            if (null === $source = $this->findSource($value)) {
93 2
                throw new InvalidArgumentException(sprintf('Could not find source with id %d', $value));
94
            }
95
96 10
            return $source;
97 12
        });
98 12
    }
99
100
    /**
101
     * @param array $payload Payload containing the source id
102
     *
103
     * @return bool
104
     */
105 10
    public function execute(array $payload)
106
    {
107
        /** @var SourceInterface $source */
108 10
        list($source) = $payload;
109
110 10
        if ($source->isBlocked()) {
111 2
            $this->logger->debug('Source is blocked');
112
113 2
            $this->processor->unlink($source);
114
115 2
            return false;
116
        }
117
118
        // reset messages
119 8
        $source->setMessages([]);
120
121
        try {
122
            // link the source first before processing it
123 8
            $linked = $this->processor->isLinked($source);
124 8
            if (!$linked) {
125 6
                $this->logger->debug('Linking source first');
126 6
                $this->processor->link($source);
127
            }
128
129 6
            $this->processor->process($source);
130
131
            // if the source was unlinked, flush it now
132 4
            if (!$linked) {
133 2
                $this->sourceManager->flush($source);
134
            }
135
136 4
            return true;
137 4
        } catch (SourceLinkException $e) {
138 2
            $this->setMessage(
139 2
                $source,
140 2
                'link',
141 2
                sprintf('Could not link source (%d): %s', $source->getId(), $e->getMessage())
142
            );
143 2
        } catch (SourceProcessException $e) {
144 2
            $this->setMessage(
145 2
                $source,
146 2
                'process',
147 2
                sprintf('Error while processing source (%d): %s', $source->getId(), $e->getMessage())
148
            );
149
        }
150
151 4
        foreach ($source->getMessages() as $key => $messages) {
152 4
            foreach ($messages as $level => $message) {
153 4
                $this->logger->log($level, sprintf('[%s] %s', $key, $message));
154
            }
155
        }
156
157 4
        return false;
158
    }
159
160
    /**
161
     * @param int $sourceId
162
     *
163
     * @return SourceInterface
164
     */
165 12
    protected function findSource($sourceId)
166
    {
167 12
        return $this->sourceManager->findById($sourceId);
168
    }
169
170
    /**
171
     * Sets message for a specific key, while preserving other keys.
172
     *
173
     * @param SourceInterface $source
174
     * @param string          $key
175
     * @param string          $message
176
     * @param string          $level
177
     */
178 4
    protected function setMessage(SourceInterface $source, $key, $message, $level = LogLevel::ERROR)
179
    {
180 4
        $messages = $source->getMessages();
181 4
        $messages[$key][$level] = $message;
182
183 4
        $source->setMessages($messages);
184 4
    }
185
}
186