Completed
Push — master ( 869006...91820d )
by Timo
22s
created

Collection::unsubscribe()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 17
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 1 Features 1
Metric Value
c 2
b 1
f 1
dl 0
loc 17
rs 9.4285
cc 3
eloc 9
nc 2
nop 0
1
<?php
2
3
/*
4
 * This file is part of the Tidal/WampWatch package.
5
 *   (c) 2016 Timo Michna <timomichna/yahoo.de>
6
 *
7
 *  For the full copyright and license information, please view the LICENSE
8
 *  file that was distributed with this source code.
9
 *
10
 */
11
12
namespace Tidal\WampWatch\Subscription;
13
14
use Thruway\Message\SubscribedMessage;
15
use React\Promise\Deferred;
16
use React\Promise\Promise;
17
use Tidal\WampWatch\ClientSessionInterface as ClientSession;
18
use Tidal\WampWatch\Util;
19
20
class Collection
21
{
22
    /**
23
     * The collection's WAMP client session.
24
     *
25
     * @var ClientSession
26
     */
27
    private $session;
28
29
    /**
30
     * @var array list of subscriptions with topic as key and subscription-id as value
31
     */
32
    private $subscriptions = [];
33
34
    /**
35
     * @var array list of subscriptions callbacks with topic as key
36
     */
37
    private $subscriptionCallbacks = [];
38
39
    /**
40
     * @var bool if the collection is successfully subscribed to all topics
41
     */
42
    private $isSubscribed = false;
43
44
    /**
45
     * @var bool if the collection is currently trying to subscribe to all topics
46
     */
47
    private $isSubscribing = false;
48
49
    /**
50
     * @var Deferred
51
     */
52
    private $subscriptionPromise;
53
54
    /**
55
     * Collection constructor.
56
     *
57
     * @param \Tidal\WampWatch\ClientSessionInterface $session
58
     */
59
    public function __construct(ClientSession $session)
60
    {
61
        $this->session = $session;
62
    }
63
64
    /**
65
     * @param string   $topic    the topic the subscription is for
66
     * @param callable $callback the callback for the topic
67
     */
68
    public function addSubscription($topic, callable $callback)
69
    {
70
        $this->subscriptions[$topic] = 0;
71
        $this->subscriptionCallbacks[$topic] = $callback;
72
    }
73
74
    /**
75
     * @return bool
76
     */
77
    public function hasSubscription()
78
    {
79
        return count($this->subscriptions) > 0;
80
    }
81
82
    /**
83
     * Subscribe to all topics added with 'addSubscription'.
84
     * Returns false if already subscribed or curretly subscribing.
85
     *
86
     * @return \React\Promise\Promise
87
     */
88
    public function subscribe()
89
    {
90
        if (!$this->isSubscribed() && !$this->isSubscribing()) {
91
            $this->subscriptionPromise = new Deferred();
92
            $this->isSubscribing = true;
93
            $this->doSubscribe();
94
        }
95
96
        return $this->subscriptionPromise->promise();
97
    }
98
99
    /**
100
     *
101
     */
102
    protected function doSubscribe()
103
    {
104
        \React\Promise\all($this->getSubscriptionPromises())->done(function () {
105
            $this->isSubscribed = true;
106
            $this->isSubscribing = false;
107
            $this->subscriptionPromise->resolve($this->subscriptions);
108
        });
109
    }
110
111
    /**
112
     * @return Promise[]
113
     */
114
    private function getSubscriptionPromises()
115
    {
116
        $promises = [];
117
118
        foreach (array_keys($this->subscriptions) as $topic) {
119
            $promises[] = $this->session->subscribe($topic, $this->subscriptionCallbacks[$topic])
120
                ->then(function (SubscribedMessage $msg) use ($topic) {
121
                    $this->subscriptions[$topic] = $msg->getSubscriptionId();
122
                    $this->subscriptionPromise->notify($topic);
123
124
                    return $topic;
125
                });
126
        }
127
128
        return $promises;
129
    }
130
131
    /**
132
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
133
     */
134
    public function unsubscribe()
135
    {
136
        $resolver = function (callable $resolve) {
137
            $resolve();
138
        };
139
        $promise = new  Promise($resolver);
140
141
        if ($this->isSubscribed()) {
142
            foreach ($this->subscriptions as $topic => $subId) {
143
                Util::unsubscribe($this->session, $subId);
144
            }
145
146
            $this->isSubscribed = false;
147
        }
148
149
        return $promise;
150
    }
151
152
    /**
153
     * @return bool
154
     */
155
    public function isSubscribed()
156
    {
157
        return $this->isSubscribed;
158
    }
159
160
    /**
161
     * @return bool
162
     */
163
    public function isSubscribing()
164
    {
165
        return $this->isSubscribing;
166
    }
167
}
168