Collection::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
1
<?php
2
3
/*
4
 * This file is part of the Tidal/WampWatch package.
5
 *   (c) 2016 Timo Michna <timomichna/yahoo.de>
6
 *
7
 *  For the full copyright and license information, please view the LICENSE
8
 *  file that was distributed with this source code.
9
 *
10
 */
11
12
namespace Tidal\WampWatch\Subscription;
13
14
use Thruway\Message\SubscribedMessage;
15
use Tidal\WampWatch\Async\PromiseInterface;
16
use Tidal\WampWatch\ClientSessionInterface as ClientSession;
17
use Tidal\WampWatch\Util;
18
use Tidal\WampWatch\Behavior\Async\MakesPromisesTrait;
19
use Tidal\WampWatch\Behavior\Async\MakesDeferredPromisesTrait;
20
use Tidal\WampWatch\Async\DeferredInterface;
21
22
class Collection
23
{
24
    use MakesPromisesTrait,
25
        MakesDeferredPromisesTrait;
26
27
    /**
28
     * The collection's WAMP client session.
29
     *
30
     * @var ClientSession
31
     */
32
    private $session;
33
34
    /**
35
     * @var array list of subscriptions with topic as key and subscription-id as value
36
     */
37
    private $subscriptions = [];
38
39
    /**
40
     * @var array list of subscriptions callbacks with topic as key
41
     */
42
    private $subscriptionCallbacks = [];
43
44
    /**
45
     * @var bool if the collection is successfully subscribed to all topics
46
     */
47
    private $isSubscribed = false;
48
49
    /**
50
     * @var bool if the collection is currently trying to subscribe to all topics
51
     */
52
    private $isSubscribing = false;
53
54
    /**
55
     * @var DeferredInterface
56
     */
57
    private $subscriptionPromise;
58
59
    /**
60
     * Collection constructor.
61
     *
62
     * @param \Tidal\WampWatch\ClientSessionInterface $session
63
     */
64
    public function __construct(ClientSession $session)
65
    {
66
        $this->session = $session;
67
    }
68
69
    /**
70
     * @param string   $topic    the topic the subscription is for
71
     * @param callable $callback the callback for the topic
72
     */
73
    public function addSubscription($topic, callable $callback)
74
    {
75
        $this->subscriptions[$topic] = 0;
76
        $this->subscriptionCallbacks[$topic] = $callback;
77
    }
78
79
    /**
80
     * @return bool
81
     */
82
    public function hasSubscription()
83
    {
84
        return count($this->subscriptions) > 0;
85
    }
86
87
    /**
88
     * Subscribe to all topics added with 'addSubscription'.
89
     * Returns false if already subscribed or curretly subscribing.
90
     *
91
     * @return PromiseInterface
92
     */
93
    public function subscribe()
94
    {
95
        if (!$this->isSubscribed() && !$this->isSubscribing()) {
96
            $this->subscriptionPromise = $this->createDeferred();
97
            $this->isSubscribing = true;
98
            $this->doSubscribe();
99
        }
100
101
        return $this->subscriptionPromise->promise();
102
    }
103
104
    protected function doSubscribe()
105
    {
106
        \React\Promise\all($this->getSubscriptionPromises())->done(function () {
107
            $this->isSubscribed = true;
108
            $this->isSubscribing = false;
109
            $this->subscriptionPromise->resolve($this->subscriptions);
110
        });
111
    }
112
113
    /**
114
     * @return PromiseInterface[]
115
     */
116
    private function getSubscriptionPromises()
117
    {
118
        $promises = [];
119
120
        foreach (array_keys($this->subscriptions) as $topic) {
121
            $promises[] = $this->session->subscribe($topic, $this->subscriptionCallbacks[$topic])
122
                ->then(function (SubscribedMessage $msg) use ($topic) {
123
                    $this->subscriptions[$topic] = $msg->getSubscriptionId();
124
                    $this->subscriptionPromise->notify($topic);
125
126
                    return $topic;
127
                });
128
        }
129
130
        return $promises;
131
    }
132
133
    /**
134
     * @return PromiseInterface
135
     */
136
    public function unsubscribe()
137
    {
138
        $resolver = function (callable $resolve) {
139
            $resolve();
140
        };
141
        $promise = $this->createPromise($resolver);
142
143
        if ($this->isSubscribed()) {
144
            foreach ($this->subscriptions as $subId) {
145
                Util::unsubscribe($this->session, $subId);
146
            }
147
148
            $this->isSubscribed = false;
149
        }
150
151
        return $promise;
152
    }
153
154
    /**
155
     * @return bool
156
     */
157
    public function isSubscribed()
158
    {
159
        return $this->isSubscribed;
160
    }
161
162
    /**
163
     * @return bool
164
     */
165
    public function isSubscribing()
166
    {
167
        return $this->isSubscribing;
168
    }
169
}
170