Completed
Branch master (8e085e)
by Timo
02:38
created

Collection::doSubscribe()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 12
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 1 Features 1
Metric Value
c 2
b 1
f 1
dl 0
loc 12
rs 9.4285
cc 2
eloc 7
nc 2
nop 0
1
<?php
2
3
/*
4
 * This file is part of the Tidal/WampWatch package.
5
 *   (c) 2016 Timo Michna <timomichna/yahoo.de>
6
 *
7
 *  For the full copyright and license information, please view the LICENSE
8
 *  file that was distributed with this source code.
9
 *
10
 */
11
12
namespace Tidal\WampWatch\Subscription;
13
14
use Thruway\Message\SubscribedMessage;
15
use React\Promise\Deferred;
16
use React\Promise\Promise;
17
use Tidal\WampWatch\ClientSessionInterface as ClientSession;
18
use Tidal\WampWatch\Util;
19
20
class Collection
21
{
22
    /**
23
     * The collection's WAMP client session.
24
     *
25
     * @var ClientSession
26
     */
27
    private $session;
28
29
    /**
30
     * @var array list of subscriptions with topic as key and subscription-id as value
31
     */
32
    private $subscriptions = [];
33
34
    /**
35
     * @var array list of subscriptions callbacks with topic as key
36
     */
37
    private $subscriptionCallbacks = [];
38
39
    /**
40
     * @var bool if the collection is successfully subscribed to all topics
41
     */
42
    private $isSubscribed = false;
43
44
    /**
45
     * @var bool if the collection is currently trying to subscribe to all topics
46
     */
47
    private $isSubscribing = false;
48
49
    /**
50
     * @var Deferred
51
     */
52
    private $subscriptionPromise;
53
54
    /**
55
     * Collection constructor.
56
     *
57
     * @param \Tidal\WampWatch\ClientSessionInterface $session
58
     */
59
    public function __construct(ClientSession $session)
60
    {
61
        $this->session = $session;
62
    }
63
64
    /**
65
     * @param string   $topic    the topic the subscription is for
66
     * @param callable $callback the callback for the topic
67
     */
68
    public function addSubscription($topic, callable $callback)
69
    {
70
        $this->subscriptions[$topic] = 0;
71
        $this->subscriptionCallbacks[$topic] = $callback;
72
    }
73
74
    /**
75
     * @return bool
76
     */
77
    public function hasSubscription()
78
    {
79
        return count($this->subscriptions) > 0;
80
    }
81
82
    /**
83
     * Subscribe to all topics added with 'addSubscription'.
84
     * Returns false if already subscribed or curretly subscribing.
85
     *
86
     * @return \React\Promise\Promise
87
     */
88
    public function subscribe()
89
    {
90
        if (!$this->isSubscribed() && !$this->isSubscribing()) {
91
            $this->isSubscribing = true;
92
            $this->subscriptionPromise = new Deferred();
93
            $this->doSubscribe();
94
        }
95
96
        return $this->subscriptionPromise->promise();
97
    }
98
99
    /**
100
     *
101
     */
102
    protected function doSubscribe()
103
    {
104
        foreach (array_keys($this->subscriptions) as $topic) {
105
            $this->session->subscribe($topic, $this->subscriptionCallbacks[$topic])
106
                ->done(function (SubscribedMessage $msg) use ($topic) {
107
                    $this->subscriptions[$topic] = $msg->getSubscriptionId();
108
                    $this->subscriptionPromise->notify($topic);
109
110
                    $this->checkSubscribed();
111
                });
112
        }
113
    }
114
115
    /**
116
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
117
     */
118
    public function unsubscribe()
119
    {
120
        $resolver = function (callable $resolve) {
121
            $resolve();
122
        };
123
        $promise = new  Promise($resolver);
124
125
        if ($this->isSubscribed()) {
126
            foreach ($this->subscriptions as $topic => $subId) {
127
                Util::unsubscribe($this->session, $subId);
128
            }
129
130
            $this->isSubscribed = false;
131
        }
132
133
        return $promise;
134
    }
135
136
    /**
137
     * @return bool
138
     */
139
    public function isSubscribed()
140
    {
141
        return $this->isSubscribed;
142
    }
143
144
    /**
145
     * @return bool
146
     */
147
    public function isSubscribing()
148
    {
149
        return $this->isSubscribing;
150
    }
151
152
    /**
153
     * Check if all subscriptions have been successfully confirmed.
154
     */
155
    protected function checkSubscribed()
156
    {
157
        foreach ($this->subscriptions as $topic => $subId) {
158
            if ($subId === 0) {
159
                return false;
160
            }
161
        }
162
        $this->isSubscribed = true;
163
        $this->isSubscribing = false;
164
        $this->subscriptionPromise->resolve($this->subscriptions);
165
166
        return true;
167
    }
168
}
169