Passed
Push — master ( 4cc39f...d5b9c5 )
by Timo
03:08
created

MonitorTrait::retrieveCallData()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 20
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 20
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 11
nc 2
nop 3
1
<?php
2
3
/*
4
 * This file is part of the Tidal/WampWatch package.
5
 *   (c) 2016 Timo Michna <timomichna/yahoo.de>
6
 *
7
 *  For the full copyright and license information, please view the LICENSE
8
 *  file that was distributed with this source code.
9
 *
10
 */
11
12
namespace Tidal\WampWatch;
13
14
use Evenement\EventEmitterTrait;
15
use React\Promise\Promise;
16
use Tidal\WampWatch\ClientSessionInterface as ClientSession;
17
use Tidal\WampWatch\Subscription\Collection as SubscriptionCollection;
18
use React\Promise\Deferred;
19
use Tidal\WampWatch\Adapter\React\PromiseAdapter;
20
use Tidal\WampWatch\Adapter\React\DeferredAdapter;
21
22
/**
23
 * Description of MonitorTrait.
24
 *
25
 * @author Timo
26
 */
27
trait MonitorTrait
28
{
29
    use EventEmitterTrait;
30
31
    /**
32
     * The monitor's WAMP client session.
33
     *
34
     * @var ClientSession
35
     */
36
    protected $session;
37
38
    /**
39
     * if the monitor is running.
40
     *
41
     * @var bool
42
     */
43
    protected $isRunning = false;
44
45
    /**
46
     * @var SubscriptionCollection collection for meta subscriptions
47
     */
48
    protected $metaSubscriptionCollection;
49
50
    /**
51
     * @var string
52
     */
53
    protected $initialCallProcedure;
54
55
    /**
56
     * @var callable
57
     */
58
    protected $initialCallCallback;
59
60
    /**
61
     * @var bool
62
     */
63
    protected $initialCallDone = false;
64
65
    /**
66
     * @param ClientSession $session
67
     */
68
    protected function setClientSession(ClientSession $session)
69
    {
70
        $this->session = $session;
71
    }
72
73
    /**
74
     * Start the monitor.
75
     *
76
     * @return bool
77
     */
78
    public function start()
79
    {
80
        $promises = [
81
            $this->getMetaSubscriptionCollection()->subscribe(),
82
            $this->callInitialProcedure(),
83
        ];
84
85
        \React\Promise\all($promises)->done(function () {
86
            $this->isRunning = true;
87
            $this->emit('start', [$this->getList()]);
88
        });
89
90
        return true;
91
    }
92
93
    /**
94
     * Stop the monitor.
95
     * Returns boolean if the monitor could be started.
96
     *
97
     * @return bool
98
     */
99
    public function stop()
100
    {
101
        if (!$this->isRunning()) {
102
            return false;
103
        }
104
105
        $this->getMetaSubscriptionCollection()->unsubscribe();
106
107
        $this->isRunning = false;
108
        $this->emit('stop', [$this]);
109
110
        return true;
111
    }
112
113
    protected function getList()
114
    {
115
        return [];
116
    }
117
118
    /**
119
     * Get the monitor's WAMP client session.
120
     *
121
     * @return ClientSession
122
     */
123
    public function getServerSession()
124
    {
125
        return $this->session;
126
    }
127
128
    /**
129
     * Get the monitor's running state.
130
     *
131
     * @return bool
132
     */
133
    public function isRunning()
134
    {
135
        return $this->isRunning;
136
    }
137
138
    /**
139
     * @param \Tidal\WampWatch\Subscription\Collection $collection
140
     */
141
    public function setMetaSubscriptionCollection(SubscriptionCollection $collection)
142
    {
143
        $this->metaSubscriptionCollection = $collection;
144
    }
145
146
    /**
147
     * @return \Tidal\WampWatch\Subscription\Collection
148
     */
149
    public function getMetaSubscriptionCollection()
150
    {
151
        return isset($this->metaSubscriptionCollection)
152
            ? $this->metaSubscriptionCollection
153
            : $this->metaSubscriptionCollection = new SubscriptionCollection($this->session);
154
    }
155
156
    protected function setInitialCall($procedure, callable $callback)
157
    {
158
        $this->initialCallProcedure = (string) $procedure;
159
        $this->initialCallCallback = $callback;
160
    }
161
162
    /**
163
     * @return \React\Promise\Promise
164
     */
165
    protected function callInitialProcedure()
166
    {
167
        if (!isset($this->initialCallProcedure) || !isset($this->initialCallCallback)) {
168
            $this->initialCallDone = true;
169
170
            return new  Promise(function (callable $resolve) {
171
                $resolve();
172
            });
173
        }
174
175
        return $this->session->call($this->initialCallProcedure, [])->then(function ($res) {
176
            $this->initialCallDone = true;
177
            $cb = $this->initialCallCallback;
178
            $cb($res);
179
180
            return $res;
181
        },
182
            $this->getErrorCallback()
183
        );
184
    }
185
186
    private function getErrorCallback()
187
    {
188
        return function ($error) {
189
            $this->emit('error', [$error]);
190
191
            return $error;
192
        };
193
    }
194
195
    private function retrieveCallData($procedure, callable $filter = null, $arguments = [])
196
    {
197
        $deferred = new DeferredAdapter(
198
            new Deferred()
199
        );
200
201
        $filter = $filter ?: function ($res) {
202
            return $res;
203
        };
204
205
        $this->session->call($procedure, $arguments)
206
            ->then(
207
                function ($res) use ($deferred, $filter) {
208
                    $deferred->resolve($filter($res));
209
                },
210
                $this->getErrorCallback()
211
            );
212
213
        return $deferred->promise();
214
    }
215
216
    /**
217
     * @param callable $callback
218
     *
219
     * @return PromiseAdapter
220
     */
221
    private function createPromiseAdapter(callable $callback)
222
    {
223
        return new PromiseAdapter(
224
            new Promise(
225
                $callback
226
            )
227
        );
228
    }
229
}
230