Issues (15)

src/Chips/Subscriber.php (4 issues)

1
<?php
2
/**
3
 * Subscribes API
4
 * User: moyo
5
 * Date: 2018/8/28
6
 * Time: 5:08 PM
7
 */
8
9
namespace Carno\Redis\Chips;
10
11
use Carno\Channel\Chan;
12
use Carno\Channel\Channel;
13
use Carno\Promise\Promise;
14
use Carno\Promise\Promised;
15
use Carno\Redis\Types\Message;
16
17
trait Subscriber
18
{
19
    /**
20
     * @var Chan
21
     */
22
    private $consumer = null;
23
24
    /**
25
     * @var bool
26
     */
27
    private $subscribed = false;
28
29
    /**
30
     * @var Promised
31
     */
32
    private $acknowledge = null;
33
34
    /**
35
     * @param string ...$channels
36
     * @return Chan
37
     */
38
    public function subscribe(string ...$channels)
39
    {
40
        $this->command('subscribe', $channels);
0 ignored issues
show
It seems like command() must be provided by classes using this trait. How about adding it as abstract method to this trait? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

40
        $this->/** @scrutinizer ignore-call */ 
41
               command('subscribe', $channels);
Loading history...
41
        yield $this->acknowledge();
0 ignored issues
show
Bug Best Practice introduced by
The expression yield $this->acknowledge() returns the type Generator which is incompatible with the documented return type Carno\Channel\Chan.
Loading history...
42
        return $this->consuming();
43
    }
44
45
    /**
46
     * @param string ...$patterns
47
     * @return Chan
48
     */
49
    public function pSubscribe(string ...$patterns)
50
    {
51
        $this->command('psubscribe', $patterns);
52
        yield $this->acknowledge();
0 ignored issues
show
Bug Best Practice introduced by
The expression yield $this->acknowledge() returns the type Generator which is incompatible with the documented return type Carno\Channel\Chan.
Loading history...
53
        return $this->consuming();
54
    }
55
56
    /**
57
     * @return Chan
58
     */
59
    private function consuming() : Chan
60
    {
61
        if ($this->consumer) {
62
            return $this->consumer;
63
        }
64
65
        /**
66
         * @var Promised $closed
67
         */
68
69
        $closed = $this->closed();
0 ignored issues
show
It seems like closed() must be provided by classes using this trait. How about adding it as abstract method to this trait? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

69
        /** @scrutinizer ignore-call */ 
70
        $closed = $this->closed();
Loading history...
70
71
        $closed->then(function () {
72
            $this->consumer && $this->consumer->close();
73
        });
74
75
        return $this->consumer = new Channel;
76
    }
77
78
    /**
79
     * @return bool
80
     */
81
    private function subscribed() : bool
82
    {
83
        return $this->subscribed;
84
    }
85
86
    /**
87
     * @return Promised
88
     */
89
    private function acknowledge() : Promised
90
    {
91
        ($this->acknowledge = Promise::deferred())->then(function () {
92
            unset($this->acknowledge);
93
        });
94
        return $this->acknowledge;
95
    }
96
97
    /**
98
     * @param array $recv
99
     */
100
    private function messaging(array $recv) : void
101
    {
102
        switch ($recv[0]) {
103
            case 'message':
104
                [1 => $channel, 2 => $payload] = $recv;
105
                break;
106
            case 'pmessage':
107
                [2 => $channel, 3 => $payload] = $recv;
108
                break;
109
            case 'subscribe':
110
            case 'psubscribe':
111
                $this->subscribed = true;
112
                $this->acknowledge->resolve();
113
                return;
114
            case 'unsubscribe':
115
            case 'punsubscribe':
116
                $this->subscribed = false;
117
                $this->acknowledge->resolve();
118
                unset($this->consumer);
119
                return;
120
        }
121
122
        if (isset($channel) && isset($payload)) {
123
            $this->consuming()->send(new Message($channel, $payload));
124
        }
125
    }
126
}
127