Consuming   A
last analyzed

Complexity

Total Complexity 7

Size/Duplication

Total Lines 83
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 19
dl 0
loc 83
rs 10
c 0
b 0
f 0
wmc 7

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A channel() 0 3 1
A topic() 0 3 1
A invoking() 0 8 2
A concurrency() 0 3 1
A assigned() 0 5 1
1
<?php
2
/**
3
 * Messages consuming
4
 * User: moyo
5
 * Date: 27/02/2018
6
 * Time: 4:35 PM
7
 */
8
9
namespace Carno\NSQ\Types;
10
11
use Carno\NSQ\Connector\Nsqd;
12
use Throwable;
13
14
class Consuming
15
{
16
    /**
17
     * @var string
18
     */
19
    private $topic = null;
20
21
    /**
22
     * @var string
23
     */
24
    private $channel = null;
25
26
    /**
27
     * @var callable
28
     */
29
    private $program = null;
30
31
    /**
32
     * @var int
33
     */
34
    private $concurrency = null;
35
36
    /**
37
     * Consuming constructor.
38
     * @param callable $program
39
     * @param int $concurrency
40
     */
41
    public function __construct(callable $program, int $concurrency = 1)
42
    {
43
        $this->program = $program;
44
        $this->concurrency = $concurrency;
45
    }
46
47
    /**
48
     * @param Nsqd $nsqd
49
     * @param Message $message
50
     */
51
    public function invoking(Nsqd $nsqd, Message $message) : void
52
    {
53
        try {
54
            call_user_func($this->program, $message->link($nsqd));
55
        } catch (Throwable $e) {
56
            logger('nsq')->notice(
57
                'Consuming invoker failure',
58
                ['error' => sprintf('%s::%s', get_class($e), $e->getMessage())]
59
            );
60
        }
61
    }
62
63
    /**
64
     * @return int
65
     */
66
    public function concurrency() : int
67
    {
68
        return $this->concurrency;
69
    }
70
71
    /**
72
     * @param string $topic
73
     * @param string $channel
74
     * @return static
75
     */
76
    public function assigned(string $topic, string $channel) : self
77
    {
78
        $this->topic = $topic;
79
        $this->channel = $channel;
80
        return $this;
81
    }
82
83
    /**
84
     * @return string
85
     */
86
    public function topic() : string
87
    {
88
        return $this->topic;
89
    }
90
91
    /**
92
     * @return string
93
     */
94
    public function channel() : string
95
    {
96
        return $this->channel;
97
    }
98
}
99