Completed
Pull Request — master (#37)
by Peter
23:57
created

ImportJob::retry()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 13
ccs 0
cts 11
cp 0
rs 9.8333
c 0
b 0
f 0
cc 2
nc 2
nop 0
crap 6
1
<?php
2
3
namespace TreeHouse\IoBundle\Import;
4
5
use Psr\Log\LoggerAwareInterface;
6
use Psr\Log\LoggerAwareTrait;
7
use Psr\Log\NullLogger;
8
use TreeHouse\Feeder\Feed;
9
use TreeHouse\IoBundle\Entity\ImportPart;
10
use TreeHouse\IoBundle\Entity\ImportRepository;
11
use TreeHouse\IoBundle\Import\Event\ImporterEvent;
12
use TreeHouse\IoBundle\Import\Event\ImportEvent;
13
use TreeHouse\IoBundle\Import\Importer\Importer;
14
use TreeHouse\IoBundle\Import\Processor\ProcessorInterface;
15
16
class ImportJob implements LoggerAwareInterface
17
{
18
    use LoggerAwareTrait;
19
20
    /**
21
     * @var ImportPart
22
     */
23
    protected $part;
24
25
    /**
26
     * @var Feed
27
     */
28
    protected $feed;
29
30
    /**
31
     * @var ProcessorInterface
32
     */
33
    protected $processor;
34
35
    /**
36
     * @var Importer
37
     */
38
    protected $importer;
39
40
    /**
41
     * @var ImportRepository
42
     */
43
    protected $repository;
44
45
    /**
46
     * @param ImportPart         $part
47
     * @param Feed               $feed
48
     * @param ProcessorInterface $processor
49
     * @param Importer           $importer
50
     * @param ImportRepository   $repository
51
     *
52
     * @throws \RuntimeException When the import or the part has already finished
53
     */
54 2
    public function __construct(
55
        ImportPart $part,
56
        Feed $feed,
57
        ProcessorInterface $processor,
58
        Importer $importer,
59
        ImportRepository $repository
60
    ) {
61 2
        $this->feed = $feed;
62 2
        $this->part = $part;
63 2
        $this->processor = $processor;
64 2
        $this->importer = $importer;
65 2
        $this->repository = $repository;
66 2
        $this->logger = new NullLogger();
67
68 2
        $import = $part->getImport();
69
70
        // check if import has already finished
71 2
        if ($import->isFinished()) {
72
            throw new \RuntimeException(sprintf('Import %d has already finished', $import->getId()));
73
        }
74
75
        // check if this part has already finished
76 2
        if ($part->isFinished()) {
77
            throw new \RuntimeException(
78
                sprintf('Part %d of import %d has already finished', $part->getPosition(), $import->getId())
79
            );
80
        }
81 2
    }
82
83
    /**
84
     * @return ImportResult
85
     */
86 2
    public function run()
87
    {
88 2
        $import = $this->part->getImport();
89
90
        // part is not finished, maybe it's already started/running
91 2
        if ($this->part->isStarted()) {
92
            $this->processor->checkProcessing($this->part);
93
94
            $this->logger->warning(
95
                sprintf(
96
                    'Part %d of import %d has already started, but the process (%s) is no longer running. ' .
97
                    'Resuming the part now.',
98
                    $this->part->getPosition(),
99
                    $import->getId(),
100
                    $this->part->getProcess()
101
                )
102
            );
103
        }
104
105
        // start import if necessary
106 2
        if (!$import->isStarted()) {
107 2
            $this->importer->dispatchEvent(ImportEvents::IMPORT_START, new ImportEvent($import));
108 2
            $this->repository->startImport($import);
109 2
        }
110
111
        try {
112 2
            $this->start();
113 2
            $this->importer->run($this->feed);
114 2
        } catch (\Exception $e) {
115
            // log the error
116
            $this->logger->error($e->getMessage());
117
            $this->logger->debug($e->getTraceAsString());
118
119
            // check if we have any retries left
120
            if ($this->part->getRetries() > 0) {
121
                // mark as unstarted and bail out
122
                $this->retry();
123
124
                return null;
125
            } else {
126
                $this->fail($e->getMessage());
127
            }
128
        }
129
130 2
        $this->repository->addResult($import, $this->importer->getResult());
131
132 2
        $this->finish();
133
134 2
        return $this->importer->getResult();
135
    }
136
137
    /**
138
     * Starts the import part.
139
     */
140 2
    protected function start()
141
    {
142 2
        $this->processor->markProcessing($this->part);
143 2
        $this->repository->startImportPart($this->part);
144
145 2
        $this->importer->dispatchEvent(ImportEvents::PART_START, new ImporterEvent($this->part, $this->importer));
146 2
    }
147
148
    /**
149
     * @throws \RuntimeException When the part has not started yet
150
     */
151 2
    protected function finish()
152
    {
153 2
        $this->logger->debug(
154 2
            sprintf(
155 2
                'Finishing part %d on position %d for import %d',
156 2
                $this->part->getId(),
157 2
                $this->part->getPosition(),
158 2
                $this->part->getImport()->getId()
159 2
            )
160 2
        );
161
162 2
        $this->repository->finishImportPart($this->part);
163
164 2
        $this->importer->dispatchEvent(ImportEvents::PART_FINISH, new ImporterEvent($this->part, $this->importer));
165
166
        // if this is the last part, end the import also
167 2
        $import = $this->part->getImport();
168
169 2
        if (!$this->repository->importHasUnfinishedParts($import)) {
170 2
            $this->repository->finishImport($import);
171 2
            $this->importer->dispatchEvent(ImportEvents::IMPORT_FINISH, new ImportEvent($import));
172 2
        }
173 2
    }
174
175
    /**
176
     * @throws \RuntimeException
177
     */
178
    protected function retry()
179
    {
180
        $retriesLeft = $this->part->getRetries();
181
        if ($retriesLeft < 1) {
182
            throw new \RuntimeException('No more retries left!');
183
        }
184
185
        $this->part->setRetries(--$retriesLeft);
186
        $this->part->setDatetimeEnded(null);
187
        $this->part->setProcess(null);
188
189
        $this->repository->savePart($this->part);
190
    }
191
192
    /**
193
     * @param string $message
194
     */
195
    protected function fail($message)
196
    {
197
        // log the error
198
        $this->part->setError($message);
199
200
        $this->logger->error(
201
            sprintf(
202
                'Importing part %d of import %d failed with message: %s',
203
                $this->part->getPosition(),
204
                $this->part->getImport()->getId(),
205
                $message
206
            )
207
        );
208
    }
209
}
210