Code Duplication    Length = 20-20 lines in 2 locations

src/Extraload/Extractor/QueuedExtractor.php 1 location

@@ 7-26 (lines=20) @@
4
5
use Ko\AmqpBroker;
6
7
class QueuedExtractor implements ExtractorInterface
8
{
9
    private $extractor;
10
    private $broker;
11
    private $proucer;
12
13
    public function __construct(ExtractorIteratorInterface $extractor, AmqpBroker $broker, $proucer)
14
    {
15
        $this->extractor = $extractor;
16
        $this->broker = $broker;
17
        $this->proucer = $proucer;
18
    }
19
20
    public function extract()
21
    {
22
        while (null !== $extracted = $this->extractor->extract()) {
23
            $this->broker->getProducer($this->proucer)->publish(serialize($extracted));
24
        }
25
    }
26
}
27

src/Extraload/Transformer/TransformerConsumer.php 1 location

@@ 7-26 (lines=20) @@
4
5
use Ko\AmqpBroker;
6
7
class TransformerConsumer implements TransformerInterface
8
{
9
    private $transformer;
10
    private $broker;
11
    private $producer;
12
13
    public function __construct(TransformerInterface $transformer, AmqpBroker $broker, $producer)
14
    {
15
        $this->transformer = $transformer;
16
        $this->broker = $broker;
17
        $this->producer = $producer;
18
    }
19
20
    public function transform($data = null)
21
    {
22
        $this->broker->getProducer($this->producer)->publish(serialize(
23
            $this->transformer->transform($data)
24
        ));
25
    }
26
}
27