Code Duplication    Length = 39-42 lines in 3 locations

Transport/PubSub/SubscriberBuilder.php 1 location

@@ 12-53 (lines=42) @@
9
use Cmobi\RabbitmqBundle\Queue\QueueServiceInterface;
10
use Psr\Log\LoggerInterface;
11
12
class SubscriberBuilder implements QueueBuilderInterface
13
{
14
    private $connectionManager;
15
    private $logger;
16
17
    public function __construct(
18
        ConnectionManager $connManager,
19
        LoggerInterface $logger,
20
        array $parameters
21
    ) {
22
        $this->connectionManager = $connManager;
23
        $this->logger = $logger;
24
        $this->channel = null;
25
    }
26
27
    /**
28
     * @param $queueName
29
     * @param QueueServiceInterface $queueService
30
     * @param QueueBagInterface $queueBag
31
     * @return Queue
32
     * @throws \Exception
33
     */
34
    public function buildQueue($queueName, QueueServiceInterface $queueService, QueueBagInterface $queueBag)
35
    {
36
        if (! $queueBag instanceof SubscriberQueueBag) {
37
            throw new \Exception('Unsupported QueueBag');
38
        }
39
        $queue = new Queue($this->getConnectionManager(), $queueBag, $this->logger);
40
        $queueCallback = new SubscriberQueueCallback($queueService);
41
        $queue->setCallback($queueCallback);
42
43
        return $queue;
44
    }
45
46
    /**
47
     * @return ConnectionManager
48
     */
49
    public function getConnectionManager()
50
    {
51
        return $this->connectionManager;
52
    }
53
}
54

Transport/Rpc/RpcServerBuilder.php 1 location

@@ 12-50 (lines=39) @@
9
use Cmobi\RabbitmqBundle\Queue\QueueServiceInterface;
10
use Psr\Log\LoggerInterface;
11
12
class RpcServerBuilder implements QueueBuilderInterface
13
{
14
    private $connectionManager;
15
    private $logger;
16
17
    public function __construct(ConnectionManager $connManager, LoggerInterface $logger)
18
    {
19
        $this->connectionManager = $connManager;
20
        $this->logger = $logger;
21
        $this->channel = null;
22
    }
23
24
    /**
25
     * @param $queueName
26
     * @param QueueServiceInterface $queueService
27
     * @param QueueBagInterface $queueBag
28
     * @return Queue
29
     * @throws \Exception
30
     */
31
    public function buildQueue($queueName, QueueServiceInterface $queueService, QueueBagInterface $queueBag)
32
    {
33
        if (! $queueBag instanceof RpcQueueBag) {
34
            throw new \Exception('Unsupported QueueBag');
35
        }
36
        $queue = new Queue($this->getConnectionManager(), $queueBag, $this->logger);
37
        $queueCallback = new RpcQueueCallback($queueService);
38
        $queue->setCallback($queueCallback);
39
40
        return $queue;
41
    }
42
43
    /**
44
     * @return ConnectionManager
45
     */
46
    public function getConnectionManager()
47
    {
48
        return $this->connectionManager;
49
    }
50
}
51

Transport/Worker/WorkerBuilder.php 1 location

@@ 13-51 (lines=39) @@
10
use Cmobi\RabbitmqBundle\Queue\QueueServiceInterface;
11
use Psr\Log\LoggerInterface;
12
13
class WorkerBuilder implements QueueBuilderInterface
14
{
15
    private $connectionManager;
16
    private $logger;
17
18
    public function __construct(ConnectionManager $connManager, LoggerInterface $logger)
19
    {
20
        $this->connectionManager = $connManager;
21
        $this->logger = $logger;
22
        $this->channel = null;
23
    }
24
25
    /**
26
     * @param $queueName
27
     * @param QueueServiceInterface $queueService
28
     * @param QueueBagInterface $queueBag
29
     * @return Queue
30
     * @throws \Exception
31
     */
32
    public function buildQueue($queueName, QueueServiceInterface $queueService, QueueBagInterface $queueBag)
33
    {
34
        if (! $queueBag instanceof WorkerQueueBag) {
35
            throw new \Exception('Unsupported QueueBag');
36
        }
37
        $queue = new Queue($this->getConnectionManager(), $queueBag, $this->logger);
38
        $queueCallback = new WorkerQueueCallback($queueService);
39
        $queue->setCallback($queueCallback);
40
41
        return $queue;
42
    }
43
44
    /**
45
     * @return ConnectionManager
46
     */
47
    public function getConnectionManager()
48
    {
49
        return $this->connectionManager;
50
    }
51
}
52