Completed
Push — master ( 2651dd...a129fb )
by Timo
23s
created

Collection::isSubscribing()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
3
/*
4
 * This file is part of the Tidal/WampWatch package.
5
 *   (c) 2016 Timo Michna <timomichna/yahoo.de>
6
 *
7
 *  For the full copyright and license information, please view the LICENSE
8
 *  file that was distributed with this source code.
9
 *
10
 */
11
12
namespace Tidal\WampWatch\Subscription;
13
14
use Thruway\Message\SubscribedMessage;
15
use React\Promise\Deferred;
16
use React\Promise\Promise;
17
use Tidal\WampWatch\ClientSessionInterface as ClientSession;
18
use Tidal\WampWatch\Util;
19
20
class Collection
21
{
22
    /**
23
     * The collection's WAMP client session.
24
     *
25
     * @var ClientSession
26
     */
27
    private $session;
28
29
    /**
30
     * @var array list of subscriptions with topic as key and subscription-id as value
31
     */
32
    private $subscriptions = [];
33
34
    /**
35
     * @var array list of subscriptions callbacks with topic as key
36
     */
37
    private $subscriptionCallbacks = [];
38
39
    /**
40
     * @var bool if the collection is successfully subscribed to all topics
41
     */
42
    private $isSubscribed = false;
43
44
    /**
45
     * @var bool if the collection is currently trying to subscribe to all topics
46
     */
47
    private $isSubscribing = false;
48
49
    /**
50
     * @var Deferred
51
     */
52
    private $subscriptionPromise;
53
54
    /**
55
     * Collection constructor.
56
     *
57
     * @param \Tidal\WampWatch\ClientSessionInterface $session
58
     */
59
    public function __construct(ClientSession $session)
60
    {
61
        $this->session = $session;
62
    }
63
64
    /**
65
     * @param string   $topic    the topic the subscription is for
66
     * @param callable $callback the callback for the topic
67
     */
68
    public function addSubscription($topic, callable $callback)
69
    {
70
        $this->subscriptions[$topic] = 0;
71
        $this->subscriptionCallbacks[$topic] = $callback;
72
    }
73
74
    /**
75
     * Subscribe to all topics added with 'addSubscription'.
76
     * Returns false if already subscribed or curretly subscribing.
77
     *
78
     * @return \React\Promise\Promise
79
     */
80
    public function subscribe()
81
    {
82
        if (!$this->isSubscribed() && !$this->isSubscribing()) {
83
            $this->isSubscribing = true;
84
            $this->subscriptionPromise = new Deferred();
85
            $this->doSubscribe();
86
        }
87
88
        return $this->subscriptionPromise->promise();
89
    }
90
91
    /**
92
     *
93
     */
94
    protected function doSubscribe()
95
    {
96
        foreach (array_keys($this->subscriptions) as $topic) {
97
            $this->session->subscribe($topic, $this->subscriptionCallbacks[$topic])
98
                ->done(function (SubscribedMessage $msg) use ($topic) {
99
                    $this->subscriptions[$topic] = $msg->getSubscriptionId();
100
                    $this->subscriptionPromise->notify($topic);
101
102
                    $this->checkSubscribed();
103
                });
104
        }
105
    }
106
107
    /**
108
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
109
     */
110
    public function unsubscribe()
111
    {
112
        $resolver = function (callable $resolve) {
113
            $resolve();
114
        };
115
        $promise = new  Promise($resolver);
116
117
        if ($this->isSubscribed()) {
118
            foreach ($this->subscriptions as $topic => $subId) {
119
                Util::unsubscribe($this->session, $subId);
120
            }
121
122
            $this->isSubscribed = false;
123
        }
124
125
        return $promise;
126
    }
127
128
    /**
129
     * @return bool
130
     */
131
    public function isSubscribed()
132
    {
133
        return $this->isSubscribed;
134
    }
135
136
    /**
137
     * @return bool
138
     */
139
    public function isSubscribing()
140
    {
141
        return $this->isSubscribing;
142
    }
143
144
    /**
145
     * Check if all subscriptions have been successfully confirmed.
146
     */
147
    protected function checkSubscribed()
148
    {
149
        foreach ($this->subscriptions as $topic => $subId) {
150
            if ($subId === 0) {
151
                return false;
152
            }
153
        }
154
        $this->isSubscribed = true;
155
        $this->isSubscribing = false;
156
        $this->subscriptionPromise->resolve($this->subscriptions);
157
158
        return true;
159
    }
160
}
161