Code Duplication    Length = 49-49 lines in 3 locations

Transport/Rpc/RpcServerBuilder.php 1 location

@@ 11-59 (lines=49) @@
8
use Cmobi\RabbitmqBundle\Queue\QueueBuilderInterface;
9
use Cmobi\RabbitmqBundle\Queue\QueueServiceInterface;
10
11
class RpcServerBuilder implements QueueBuilderInterface
12
{
13
    private $queue;
14
    private $connectionManager;
15
    private $connectionName;
16
17
    public function __construct(ConnectionManager $connManager, $connectionName = 'default')
18
    {
19
        $this->connectionManager = $connManager;
20
        $this->connectionName = $connectionName;
21
        $this->channel = null;
22
    }
23
24
    /**
25
     * @param $queueName
26
     * @param QueueServiceInterface $queueService
27
     * @param QueueBagInterface $queueBag
28
     * @return Queue
29
     * @throws \Exception
30
     */
31
    public function buildQueue($queueName, QueueServiceInterface $queueService, QueueBagInterface $queueBag)
32
    {
33
        if (! $queueBag instanceof RpcQueueBag) {
34
            throw new \Exception('Unsupported QueueBag');
35
        }
36
        $queue = new Queue($this->getConnectionManager(), $queueBag, $this->connectionName);
37
        $queueCallback = new RpcQueueCallback($queueService);
38
        $queue->setCallback($queueCallback);
39
        $this->queue = $queue;
40
41
        return $queue;
42
    }
43
44
    /**
45
     * @return ConnectionManager
46
     */
47
    public function getConnectionManager()
48
    {
49
        return $this->connectionManager;
50
    }
51
52
    /**
53
     * @return Queue
54
     */
55
    public function getQueue()
56
    {
57
        return $this->queue;
58
    }
59
}
60

Transport/Subscriber/SubscriberBuilder.php 1 location

@@ 11-59 (lines=49) @@
8
use Cmobi\RabbitmqBundle\Queue\QueueBuilderInterface;
9
use Cmobi\RabbitmqBundle\Queue\QueueServiceInterface;
10
11
class SubscriberBuilder implements QueueBuilderInterface
12
{
13
    private $queue;
14
    private $connectionManager;
15
    private $connectionName;
16
17
    public function __construct(ConnectionManager $connManager, $connectionName = 'default')
18
    {
19
        $this->connectionManager = $connManager;
20
        $this->connectionName = $connectionName;
21
        $this->channel = null;
22
    }
23
24
    /**
25
     * @param $queueName
26
     * @param QueueServiceInterface $queueService
27
     * @param QueueBagInterface $queueBag
28
     * @return Queue
29
     * @throws \Exception
30
     */
31
    public function buildQueue($queueName, QueueServiceInterface $queueService, QueueBagInterface $queueBag)
32
    {
33
        if (! $queueBag instanceof SubscriberQueueBag) {
34
            throw new \Exception('Unsupported QueueBag');
35
        }
36
        $queue = new Queue($this->getConnectionManager(), $queueBag, $this->connectionName);
37
        $queueCallback = new SubscriberQueueCallback($queueService);
38
        $queue->setCallback($queueCallback);
39
        $this->queue = $queue;
40
41
        return $queue;
42
    }
43
44
    /**
45
     * @return ConnectionManager
46
     */
47
    public function getConnectionManager()
48
    {
49
        return $this->connectionManager;
50
    }
51
52
    /**
53
     * @return Queue
54
     */
55
    public function getQueue()
56
    {
57
        return $this->queue;
58
    }
59
}
60

Transport/Worker/WorkerBuilder.php 1 location

@@ 11-59 (lines=49) @@
8
use Cmobi\RabbitmqBundle\Queue\QueueBuilderInterface;
9
use Cmobi\RabbitmqBundle\Queue\QueueServiceInterface;
10
11
class WorkerBuilder implements QueueBuilderInterface
12
{
13
    private $queue;
14
    private $connectionManager;
15
    private $connectionName;
16
17
    public function __construct(ConnectionManager $connManager, $connectionName = 'default')
18
    {
19
        $this->connectionManager = $connManager;
20
        $this->connectionName = $connectionName;
21
        $this->channel = null;
22
    }
23
24
    /**
25
     * @param $queueName
26
     * @param QueueServiceInterface $queueService
27
     * @param QueueBagInterface $queueBag
28
     * @return Queue
29
     * @throws \Exception
30
     */
31
    public function buildQueue($queueName, QueueServiceInterface $queueService, QueueBagInterface $queueBag)
32
    {
33
        if (! $queueBag instanceof WorkerQueueBag) {
34
            throw new \Exception('Unsupported QueueBag');
35
        }
36
        $queue = new Queue($this->getConnectionManager(), $queueBag, $this->connectionName);
37
        $queueCallback = new WorkerQueueCallback($queueService);
38
        $queue->setCallback($queueCallback);
39
        $this->queue = $queue;
40
41
        return $queue;
42
    }
43
44
    /**
45
     * @return ConnectionManager
46
     */
47
    public function getConnectionManager()
48
    {
49
        return $this->connectionManager;
50
    }
51
52
    /**
53
     * @return Queue
54
     */
55
    public function getQueue()
56
    {
57
        return $this->queue;
58
    }
59
}
60