Code Duplication    Length = 49-49 lines in 3 locations

Transport/PubSub/SubscriberBuilder.php 1 location

@@ 12-60 (lines=49) @@
9
use Cmobi\RabbitmqBundle\Queue\QueueServiceInterface;
10
use Psr\Log\LoggerInterface;
11
12
class SubscriberBuilder implements QueueBuilderInterface
13
{
14
    private $queue;
15
    private $connectionManager;
16
    private $logger;
17
18
    public function __construct(ConnectionManager $connManager, LoggerInterface $logger)
19
    {
20
        $this->connectionManager = $connManager;
21
        $this->logger = $logger;
22
        $this->channel = null;
23
    }
24
25
    /**
26
     * @param $queueName
27
     * @param QueueServiceInterface $queueService
28
     * @param QueueBagInterface $queueBag
29
     * @return Queue
30
     * @throws \Exception
31
     */
32
    public function buildQueue($queueName, QueueServiceInterface $queueService, QueueBagInterface $queueBag)
33
    {
34
        if (! $queueBag instanceof SubscriberQueueBag) {
35
            throw new \Exception('Unsupported QueueBag');
36
        }
37
        $queue = new Queue($this->getConnectionManager(), $queueBag, $this->logger);
38
        $queueCallback = new SubscriberQueueCallback($queueService);
39
        $queue->setCallback($queueCallback);
40
        $this->queue = $queue;
41
42
        return $queue;
43
    }
44
45
    /**
46
     * @return ConnectionManager
47
     */
48
    public function getConnectionManager()
49
    {
50
        return $this->connectionManager;
51
    }
52
53
    /**
54
     * @return Queue
55
     */
56
    public function getQueue()
57
    {
58
        return $this->queue;
59
    }
60
}
61

Transport/Rpc/RpcServerBuilder.php 1 location

@@ 12-60 (lines=49) @@
9
use Cmobi\RabbitmqBundle\Queue\QueueServiceInterface;
10
use Psr\Log\LoggerInterface;
11
12
class RpcServerBuilder implements QueueBuilderInterface
13
{
14
    private $queue;
15
    private $connectionManager;
16
    private $logger;
17
18
    public function __construct(ConnectionManager $connManager, LoggerInterface $logger)
19
    {
20
        $this->connectionManager = $connManager;
21
        $this->logger = $logger;
22
        $this->channel = null;
23
    }
24
25
    /**
26
     * @param $queueName
27
     * @param QueueServiceInterface $queueService
28
     * @param QueueBagInterface $queueBag
29
     * @return Queue
30
     * @throws \Exception
31
     */
32
    public function buildQueue($queueName, QueueServiceInterface $queueService, QueueBagInterface $queueBag)
33
    {
34
        if (! $queueBag instanceof RpcQueueBag) {
35
            throw new \Exception('Unsupported QueueBag');
36
        }
37
        $queue = new Queue($this->getConnectionManager(), $queueBag, $this->logger);
38
        $queueCallback = new RpcQueueCallback($queueService);
39
        $queue->setCallback($queueCallback);
40
        $this->queue = $queue;
41
42
        return $queue;
43
    }
44
45
    /**
46
     * @return ConnectionManager
47
     */
48
    public function getConnectionManager()
49
    {
50
        return $this->connectionManager;
51
    }
52
53
    /**
54
     * @return Queue
55
     */
56
    public function getQueue()
57
    {
58
        return $this->queue;
59
    }
60
}
61

Transport/Worker/WorkerBuilder.php 1 location

@@ 13-61 (lines=49) @@
10
use Cmobi\RabbitmqBundle\Queue\QueueServiceInterface;
11
use Psr\Log\LoggerInterface;
12
13
class WorkerBuilder implements QueueBuilderInterface
14
{
15
    private $queue;
16
    private $connectionManager;
17
    private $logger;
18
19
    public function __construct(ConnectionManager $connManager, LoggerInterface $logger)
20
    {
21
        $this->connectionManager = $connManager;
22
        $this->logger = $logger;
23
        $this->channel = null;
24
    }
25
26
    /**
27
     * @param $queueName
28
     * @param QueueServiceInterface $queueService
29
     * @param QueueBagInterface $queueBag
30
     * @return Queue
31
     * @throws \Exception
32
     */
33
    public function buildQueue($queueName, QueueServiceInterface $queueService, QueueBagInterface $queueBag)
34
    {
35
        if (! $queueBag instanceof WorkerQueueBag) {
36
            throw new \Exception('Unsupported QueueBag');
37
        }
38
        $queue = new Queue($this->getConnectionManager(), $queueBag, $this->logger);
39
        $queueCallback = new WorkerQueueCallback($queueService);
40
        $queue->setCallback($queueCallback);
41
        $this->queue = $queue;
42
43
        return $queue;
44
    }
45
46
    /**
47
     * @return ConnectionManager
48
     */
49
    public function getConnectionManager()
50
    {
51
        return $this->connectionManager;
52
    }
53
54
    /**
55
     * @return Queue
56
     */
57
    public function getQueue()
58
    {
59
        return $this->queue;
60
    }
61
}
62