GWatcher::nwProcess()   B
last analyzed

Complexity

Conditions 8
Paths 1

Size

Total Lines 34
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 19
dl 0
loc 34
rs 8.4444
c 0
b 0
f 0
cc 8
nc 1
nop 5
1
<?php
2
/**
3
 * Generic watcher (for service and kv-store)
4
 * User: moyo
5
 * Date: 2018/7/2
6
 * Time: 11:49 AM
7
 */
8
9
namespace Carno\Consul\Chips;
10
11
use Carno\Channel\Exception\ChannelClosingException;
12
use Carno\Consul\APIs\AbstractWatcher;
13
use Carno\Consul\Contracts\Defaults;
14
use function Carno\Coroutine\go;
15
use function Carno\Coroutine\msleep;
16
use Carno\HTTP\Exception\RequestCancelledException;
17
use Carno\Promise\Promise;
18
use Carno\Promise\Promised;
19
use Closure;
20
use Throwable;
21
22
trait GWatcher
23
{
24
    /**
25
     * new watcher process
26
     * @param Promised $cc watch-canceller
27
     * @param Closure $ig instance-generator
28
     * @param Closure $do worker-do
29
     * @param string $em error-msg
30
     * @param array $ec error-context
31
     */
32
    protected function nwProcess(Promised $cc, Closure $ig, Closure $do, string $em, array $ec) : void
33
    {
34
        go(static function () use ($cc, $ig, $do, $em, $ec) {
35
            /**
36
             * @var Promised $ex
37
             */
38
39
            $ex = null;
40
41
            $cc->then(function () use (&$ex) {
42
                $ex && $ex->pended() && $ex->resolve();
43
            });
44
45
            for (;;) {
46
                if (!isset($lister)) {
47
                    $lister = $ig();
48
                    $lister instanceof AbstractWatcher && $lister->setCanceller($ex = Promise::deferred());
49
                }
50
51
                try {
52
                    yield $do($lister);
53
                } catch (ChannelClosingException | RequestCancelledException $e) {
54
                    break;
55
                } catch (Throwable $e) {
56
                    unset($lister);
57
58
                    logger('consul')->warning(
59
                        $em,
60
                        array_merge($ec, ['error' => sprintf('%s::%s', get_class($e), $e->getMessage())])
61
                    );
62
63
                    yield msleep(rand(Defaults::ERROR_RETRY_MIN, Defaults::ERROR_RETRY_MAX));
64
65
                    continue;
66
                }
67
            }
68
        });
69
    }
70
}
71