1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace TreeHouse\IoBundle\Bridge\WorkerBundle\Executor; |
4
|
|
|
|
5
|
|
|
use Psr\Log\LoggerInterface; |
6
|
|
|
use Psr\Log\LogLevel; |
7
|
|
|
use Symfony\Component\OptionsResolver\Exception\InvalidArgumentException; |
8
|
|
|
use Symfony\Component\OptionsResolver\Options; |
9
|
|
|
use Symfony\Component\OptionsResolver\OptionsResolver; |
10
|
|
|
use TreeHouse\IoBundle\Exception\SourceLinkException; |
11
|
|
|
use TreeHouse\IoBundle\Exception\SourceProcessException; |
12
|
|
|
use TreeHouse\IoBundle\Model\SourceInterface; |
13
|
|
|
use TreeHouse\IoBundle\Source\SourceManagerInterface; |
14
|
|
|
use TreeHouse\IoBundle\Source\SourceProcessorInterface; |
15
|
|
|
use TreeHouse\WorkerBundle\Executor\AbstractExecutor; |
16
|
|
|
use TreeHouse\WorkerBundle\Executor\ObjectPayloadInterface; |
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* Worker job to link this source to existing entities, or create a |
20
|
|
|
* new entity when an entity with the same features doesn't already |
21
|
|
|
* exist. |
22
|
|
|
* |
23
|
|
|
* Source process jobs are added to the queue by the SourceModificationListener |
24
|
|
|
*/ |
25
|
|
|
class SourceProcessExecutor extends AbstractExecutor implements ObjectPayloadInterface |
26
|
|
|
{ |
27
|
|
|
const NAME = 'source.process'; |
28
|
|
|
|
29
|
|
|
/** |
30
|
|
|
* @var SourceManagerInterface |
31
|
|
|
*/ |
32
|
|
|
protected $sourceManager; |
33
|
|
|
|
34
|
|
|
/** |
35
|
|
|
* @var SourceProcessorInterface |
36
|
|
|
*/ |
37
|
|
|
protected $processor; |
38
|
|
|
|
39
|
|
|
/** |
40
|
|
|
* @var LoggerInterface |
41
|
|
|
*/ |
42
|
|
|
protected $logger; |
43
|
|
|
|
44
|
|
|
/** |
45
|
|
|
* @param SourceManagerInterface $sourceManager |
46
|
|
|
* @param SourceProcessorInterface $processor |
47
|
|
|
* @param LoggerInterface $logger |
48
|
|
|
*/ |
49
|
12 |
|
public function __construct(SourceManagerInterface $sourceManager, SourceProcessorInterface $processor, LoggerInterface $logger) |
50
|
|
|
{ |
51
|
12 |
|
$this->sourceManager = $sourceManager; |
52
|
12 |
|
$this->processor = $processor; |
53
|
12 |
|
$this->logger = $logger; |
54
|
12 |
|
} |
55
|
|
|
|
56
|
|
|
/** |
57
|
|
|
* @inheritdoc |
58
|
|
|
*/ |
59
|
|
|
public function getName() |
60
|
|
|
{ |
61
|
|
|
return self::NAME; |
62
|
|
|
} |
63
|
|
|
|
64
|
|
|
/** |
65
|
|
|
* @param SourceInterface $object |
66
|
|
|
* |
67
|
|
|
* @return integer[] |
68
|
|
|
*/ |
69
|
12 |
|
public function getObjectPayload($object) |
70
|
|
|
{ |
71
|
12 |
|
return [$object->getId()]; |
72
|
|
|
} |
73
|
|
|
|
74
|
|
|
/** |
75
|
|
|
* @param SourceInterface $object |
76
|
|
|
* |
77
|
|
|
* @return bool |
78
|
|
|
*/ |
79
|
|
|
public function supportsObject($object) |
80
|
|
|
{ |
81
|
|
|
return $object instanceof SourceInterface; |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* @inheritdoc |
86
|
|
|
*/ |
87
|
12 |
View Code Duplication |
public function configurePayload(OptionsResolver $resolver) |
|
|
|
|
88
|
|
|
{ |
89
|
12 |
|
$resolver->setRequired(0); |
90
|
12 |
|
$resolver->setAllowedTypes(0, 'numeric'); |
91
|
12 |
|
$resolver->setNormalizer(0, function (Options $options, $value) { |
92
|
12 |
|
if (null === $source = $this->findSource($value)) { |
93
|
2 |
|
throw new InvalidArgumentException(sprintf('Could not find source with id %d', $value)); |
94
|
|
|
} |
95
|
|
|
|
96
|
10 |
|
return $source; |
97
|
12 |
|
}); |
98
|
12 |
|
} |
99
|
|
|
|
100
|
|
|
/** |
101
|
|
|
* @param array $payload Payload containing the source id |
102
|
|
|
* |
103
|
|
|
* @return bool |
104
|
|
|
*/ |
105
|
10 |
|
public function execute(array $payload) |
106
|
|
|
{ |
107
|
|
|
/** @var SourceInterface $source */ |
108
|
10 |
|
list($source) = $payload; |
109
|
|
|
|
110
|
10 |
|
if ($source->isBlocked()) { |
111
|
2 |
|
$this->logger->debug('Source is blocked'); |
112
|
|
|
|
113
|
2 |
|
$this->processor->unlink($source); |
114
|
|
|
|
115
|
2 |
|
return false; |
116
|
|
|
} |
117
|
|
|
|
118
|
|
|
// reset messages |
119
|
8 |
|
$source->setMessages([]); |
120
|
|
|
|
121
|
|
|
try { |
122
|
|
|
// link the source first before processing it |
123
|
8 |
|
$linked = $this->processor->isLinked($source); |
124
|
8 |
|
if (!$linked) { |
125
|
6 |
|
$this->logger->debug('Linking source first'); |
126
|
6 |
|
$this->processor->link($source); |
127
|
|
|
} |
128
|
|
|
|
129
|
6 |
|
$this->processor->process($source); |
130
|
|
|
|
131
|
|
|
// if the source was unlinked, flush it now |
132
|
4 |
|
if (!$linked) { |
133
|
2 |
|
$this->sourceManager->flush($source); |
134
|
|
|
} |
135
|
|
|
|
136
|
4 |
|
return true; |
137
|
4 |
|
} catch (SourceLinkException $e) { |
138
|
2 |
|
$this->setMessage( |
139
|
2 |
|
$source, |
140
|
2 |
|
'link', |
141
|
2 |
|
sprintf('Could not link source (%d): %s', $source->getId(), $e->getMessage()) |
142
|
|
|
); |
143
|
2 |
|
} catch (SourceProcessException $e) { |
144
|
2 |
|
$this->setMessage( |
145
|
2 |
|
$source, |
146
|
2 |
|
'process', |
147
|
2 |
|
sprintf('Error while processing source (%d): %s', $source->getId(), $e->getMessage()) |
148
|
|
|
); |
149
|
|
|
} |
150
|
|
|
|
151
|
4 |
|
foreach ($source->getMessages() as $key => $messages) { |
152
|
4 |
|
foreach ($messages as $level => $message) { |
153
|
4 |
|
$this->logger->log($level, sprintf('[%s] %s', $key, $message)); |
154
|
|
|
} |
155
|
|
|
} |
156
|
|
|
|
157
|
4 |
|
return false; |
158
|
|
|
} |
159
|
|
|
|
160
|
|
|
/** |
161
|
|
|
* @param int $sourceId |
162
|
|
|
* |
163
|
|
|
* @return SourceInterface |
164
|
|
|
*/ |
165
|
12 |
|
protected function findSource($sourceId) |
166
|
|
|
{ |
167
|
12 |
|
return $this->sourceManager->findById($sourceId); |
168
|
|
|
} |
169
|
|
|
|
170
|
|
|
/** |
171
|
|
|
* Sets message for a specific key, while preserving other keys. |
172
|
|
|
* |
173
|
|
|
* @param SourceInterface $source |
174
|
|
|
* @param string $key |
175
|
|
|
* @param string $message |
176
|
|
|
* @param string $level |
177
|
|
|
*/ |
178
|
4 |
|
protected function setMessage(SourceInterface $source, $key, $message, $level = LogLevel::ERROR) |
179
|
|
|
{ |
180
|
4 |
|
$messages = $source->getMessages(); |
181
|
4 |
|
$messages[$key][$level] = $message; |
182
|
|
|
|
183
|
4 |
|
$source->setMessages($messages); |
184
|
4 |
|
} |
185
|
|
|
} |
186
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.