Passed
Pull Request — master (#15)
by
unknown
03:40
created

ChunkExtension::eachChunk()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 4
c 0
b 0
f 0
ccs 3
cts 3
cp 1
rs 10
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Rodenastyle\StreamParser\Extensions;
4
5
use Rodenastyle\StreamParser\Exceptions\UndefinedCallbackException;
6
use Rodenastyle\StreamParser\StreamParserInterface;
7
8
/**
9
 * Created by PhpStorm.
10
 * User: Loïc Gouttefangeas <[email protected]>
11
 * Date: 25/08/2018
12
 * Time: 20:59
13
 */
14
class ChunkExtension implements \Rodenastyle\StreamParser\StreamParserInterface
15
{
16
    const DEFAULT_CHUNK_SIZE = 5000;
17
18
    /**
19
     * @var \Rodenastyle\StreamParser\StreamParserInterface
20
     */
21
    private $parser;
22
    private $iterationCount = 0;
23
    private $chunkSize;
24
    /** @var  callable $onChunkCompleteCallback */
25
    private $onChunkCompleteCallback;
26
27 4
    public function __construct(\Rodenastyle\StreamParser\StreamParserInterface $parser, $chunkSize = self::DEFAULT_CHUNK_SIZE)
28
    {
29 4
        $this->parser = $parser;
30 4
        $this->chunkSize = $chunkSize;
31 4
    }
32
33
    public function from(String $source): \Rodenastyle\StreamParser\StreamParserInterface
34
    {
35
        return $this->parser->from($source);
36
    }
37
38 3
    public function eachChunk(callable $function): StreamParserInterface
39
    {
40 3
        $this->onChunkCompleteCallback = $function;
41 3
        return $this;
42
    }
43
44 4
    public function each(callable $function)
45
    {
46 4
        if ($this->onChunkCompleteCallback === null) {
47 1
            throw new UndefinedCallbackException("No callback is set on chunk complete");
48
        }
49
        // decorate function to apply both its behavior and the chunk process
50 3
        $function = function ($collection) use ($function) {
51 3
            $this->iterationCount++;
52 3
            $function($collection);
53
54 3
            if ($this->iterationCount % $this->chunkSize === 0) {
55 2
                $this->executeChunkCompleteCallback();
56
            }
57 3
        };
58 3
        $this->parser->each($function);
59
60
        // make sure no entry is left out
61 3
        $this->processLastEntries();
62 3
        return $this;
63
    }
64
65
    /**
66
     * @param $this
67
     */
68 3
    protected function executeChunkCompleteCallback()
69
    {
70 3
        $this->iterationCount = 0;
71 3
        ($this->onChunkCompleteCallback)();
72 3
    }
73
74 3
    protected function processLastEntries()
75
    {
76 3
        if ($this->iterationCount % $this->chunkSize !== 0) {
77 2
            $this->executeChunkCompleteCallback();
78
        }
79 3
    }
80
81
82
    /**
83
     * @param int $chunkSize
84
     * @return ChunkExtension
85
     */
86 2
    public function setChunkSize(int $chunkSize): ChunkExtension
87
    {
88 2
        $this->chunkSize = $chunkSize;
89 2
        return $this;
90
    }
91
92
    /**
93
     * @return \Rodenastyle\StreamParser\StreamParserInterface
94
     */
95
    public function getParser(): \Rodenastyle\StreamParser\StreamParserInterface
96
    {
97
        return $this->parser;
98
    }
99
}