Passed
Pull Request — master (#18)
by Harry
04:35 queued 23s
created

PoolLogger   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 180
Duplicated Lines 0 %

Test Coverage

Coverage 98.88%

Importance

Changes 0
Metric Value
wmc 20
eloc 73
dl 0
loc 180
rs 10
c 0
b 0
f 0
ccs 88
cts 89
cp 0.9888

11 Methods

Rating   Name   Duplication   Size   Complexity  
A monitor() 0 12 3
A __construct() 0 3 1
A onRunCompleted() 0 5 1
A onRunSuccessful() 0 5 1
A onPoolUpdated() 0 7 2
A onRunStarted() 0 5 1
A getTags() 0 8 3
A onPoolRunAdded() 0 13 1
A onRunFailed() 0 16 2
A getPoolTags() 0 16 3
A getRunTags() 0 13 2
1
<?php
2
/**
3
 * This file is part of graze/parallel-process.
4
 *
5
 * Copyright © 2018 Nature Delivered Ltd. <https://www.graze.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 *
10
 * @license https://github.com/graze/parallel-process/blob/master/LICENSE.md
11
 * @link    https://github.com/graze/parallel-process
12
 */
13
14
namespace Graze\ParallelProcess\Monitor;
15
16
use Exception;
17
use Graze\ParallelProcess\Event\PoolRunEvent;
18
use Graze\ParallelProcess\Event\RunEvent;
19
use Graze\ParallelProcess\PoolInterface;
20
use Graze\ParallelProcess\RunInterface;
21
use Psr\Log\LoggerInterface;
22
23
class PoolLogger
24
{
25
    /** @var LoggerInterface */
26
    private $logger;
27
28
    /**
29
     * LoggingMonitor constructor.
30
     *
31
     * @param LoggerInterface $logger
32
     */
33 6
    public function __construct(LoggerInterface $logger)
34
    {
35 6
        $this->logger = $logger;
36 6
    }
37
38
    /**
39
     * Monitor a Pool or Run, and log all activity
40
     *
41
     * @param PoolInterface|RunInterface $item
42
     */
43 6
    public function monitor($item)
44
    {
45 6
        if ($item instanceof PoolInterface) {
46 2
            $item->addListener(PoolRunEvent::POOL_RUN_ADDED, [$this, 'onPoolRunAdded']);
47 2
            $item->addListener(PoolRunEvent::UPDATED, [$this, 'onPoolUpdated']);
48 2
            array_map([$this, 'monitor'], $item->getAll());
49
        }
50 6
        if ($item instanceof RunInterface) {
51 6
            $item->addListener(RunEvent::STARTED, [$this, 'onRunStarted']);
52 6
            $item->addListener(RunEvent::SUCCESSFUL, [$this, 'onRunSuccessful']);
53 6
            $item->addListener(RunEvent::FAILED, [$this, 'onRunFailed']);
54 6
            $item->addListener(RunEvent::COMPLETED, [$this, 'onRunCompleted']);
55
        }
56 6
    }
57
58
    /**
59
     * @param PoolRunEvent $event
60
     */
61 1
    public function onPoolRunAdded(PoolRunEvent $event)
62
    {
63 1
        $this->logger->debug(
64 1
            sprintf(
65 1
                'pool [%s:%s]: Run [%s:%s] has been added',
66 1
                get_class($event->getPool()),
67 1
                spl_object_hash($event->getPool()),
68 1
                get_class($event->getRun()),
69 1
                spl_object_hash($event->getRun())
70
            ),
71 1
            array_merge($this->getTags($event->getPool()), $this->getTags($event->getRun()))
72
        );
73 1
        $this->monitor($event->getRun());
74 1
    }
75
76
    /**
77
     * @param RunEvent $event
78
     */
79 1
    public function onPoolUpdated(RunEvent $event)
80
    {
81 1
        $pool = $event->getRun();
82 1
        if ($pool instanceof PoolInterface) {
83 1
            $this->logger->debug(
84 1
                sprintf('pool [%s:%s]: updated', get_class($event->getRun()), spl_object_hash($pool)),
85 1
                $this->getTags($pool)
86
            );
87
        }
88 1
    }
89
90
    /**
91
     * @param RunEvent $event
92
     */
93 5
    public function onRunStarted(RunEvent $event)
94
    {
95 5
        $this->logger->debug(
96 5
            sprintf('run [%s:%s]: has started', get_class($event->getRun()), spl_object_hash($event->getRun())),
97 5
            $this->getTags($event->getRun())
98
        );
99 5
    }
100
101
    /**
102
     * @param RunEvent $event
103
     */
104 4
    public function onRunSuccessful(RunEvent $event)
105
    {
106 4
        $this->logger->debug(
107 4
            sprintf('run [%s:%s]: successfully finished', get_class($event->getRun()), spl_object_hash($event->getRun())),
108 4
            $this->getTags($event->getRun())
109
        );
110 4
    }
111
112
    /**
113
     * @param RunEvent $event
114
     */
115 1
    public function onRunFailed(RunEvent $event)
116
    {
117 1
        $errors = array_map(
118 1
            function (Exception $e) {
119 1
                return $e->getMessage();
120 1
            },
121 1
            $event->getRun()->getExceptions()
122
        );
123 1
        $this->logger->debug(
124 1
            sprintf(
125 1
                'run [%s:%s]: failed - %s',
126 1
                get_class($event->getRun()),
127 1
                spl_object_hash($event->getRun()),
128 1
                count($errors) > 0 ? reset($errors) : ''
129
            ),
130 1
            array_merge(['errors' => $errors], $this->getTags($event->getRun()))
131
        );
132 1
    }
133
134
    /**
135
     * @param RunEvent $event
136
     */
137 5
    public function onRunCompleted(RunEvent $event)
138
    {
139 5
        $this->logger->debug(
140 5
            sprintf('run [%s:%s]: has finished running', get_class($event->getRun()), spl_object_hash($event->getRun())),
141 5
            $this->getTags($event->getRun())
142
        );
143 5
    }
144
145
    /**
146
     * @param PoolInterface|RunInterface $item
147
     *
148
     * @return array
149
     */
150 6
    private function getTags($item)
151
    {
152 6
        if ($item instanceof PoolInterface) {
153 2
            return $this->getPoolTags($item);
154 6
        } elseif ($item instanceof RunInterface) {
0 ignored issues
show
introduced by
$item is always a sub-type of Graze\ParallelProcess\RunInterface.
Loading history...
155 6
            return $this->getRunTags($item);
156
        }
157
        return [];
158
    }
159
160
    /**
161
     * @param PoolInterface $pool
162
     *
163
     * @return array
164
     */
165 2
    private function getPoolTags(PoolInterface $pool)
166
    {
167 2
        $tags = [];
168 2
        if ($pool instanceof RunInterface) {
169 2
            $tags = $this->getRunTags($pool);
170
        }
171
        return [
172 2
            'pool' => array_merge(
173
                [
174 2
                    'type'         => get_class($pool),
175 2
                    'id'           => spl_object_hash($pool),
176 2
                    'num_waiting'  => count($pool->getWaiting()),
177 2
                    'num_running'  => count($pool->getRunning()),
178 2
                    'num_finished' => count($pool->getFinished()),
179
                ],
180 2
                (isset($tags['run']) ? $tags['run'] : [])
181
            ),
182
        ];
183
    }
184
185
    /**
186
     * @param RunInterface $run
187
     *
188
     * @return array
189
     */
190 6
    private function getRunTags(RunInterface $run)
191
    {
192
        return [
193 6
            'run' => [
194 6
                'type'         => get_class($run),
195 6
                'id'           => spl_object_hash($run),
196 6
                'tags'         => $run->getTags(),
197 6
                'hasStarted'   => $run->hasStarted(),
198 6
                'isRunning'    => $run->isRunning(),
199 6
                'isSuccessful' => $run->isSuccessful(),
200 6
                'duration'     => $run->getDuration(),
201 6
                'priority'     => $run->getPriority(),
202 6
                'progress'     => $run->getProgress() ? $run->getProgress()[0] : null,
203
            ],
204
        ];
205
    }
206
}
207