Completed
Push — master ( f12d43...45cb1a )
by Dan Michael O.
17:51 queued 03:12
created

OaiPmhHarvest::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 13
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 13
rs 9.4285
cc 1
eloc 11
nc 1
nop 6
1
<?php
2
3
namespace Colligator\Jobs;
4
5
use Colligator\Collection;
6
use Illuminate\Foundation\Bus\DispatchesJobs;
7
use Log;
8
use Scriptotek\OaiPmh\BadRequestError;
9
use Scriptotek\OaiPmh\Client as OaiPmhClient;
10
use Scriptotek\OaiPmh\ListRecordsResponse;
11
use Storage;
12
13
class OaiPmhHarvest extends Job
14
{
15
    use DispatchesJobs;
16
17
    public $name;
18
    public $url;
19
    public $schema;
20
    public $set;
21
    public $start;
22
    public $until;
23
    public $resume;
24
    public $fromDump;
25
    public $maxRetries;
26
    public $sleepTimeOnError;
27
28
    /**
29
     * Start time for the full harvest.
30
     *
31
     * @var float
32
     */
33
    protected $startTime;
34
35
    /**
36
     * Start time for the current batch.
37
     *
38
     * @var float
39
     */
40
    protected $batchTime;
41
42
    /**
43
     * Harvest position.
44
     *
45
     * @var int
46
     */
47
    protected $batchPos = 0;
48
49
    /**
50
     * @var Collection
51
     */
52
    public $collection;
53
54
    /**
55
     * Number of records retrieved between each emitted OaiPmhHarvestStatus event.
56
     * A too small number will cause CPU overhead.
57
     *
58
     * @var int
59
     */
60
    protected $statusUpdateEvery = 50;
61
62
    /**
63
     * Create a new job instance.
64
     *
65
     * @param string $name     Harvest name from config
66
     * @param array  $config   Harvest config array (url, set, schema)
67
     * @param string $start    Start date (optional)
68
     * @param string $until    End date (optional)
69
     * @param string $resume   Resumption token for continuing an aborted harvest (optional)
70
     * @param bool   $fromDump Import from local dump
71
     */
72
    public function __construct($name, $config, $start = null, $until = null, $resume = null, $fromDump = false)
73
    {
74
        $this->name = $name;
75
        $this->url = $config['url'];
76
        $this->schema = $config['schema'];
77
        $this->set = $config['set'];
78
        $this->start = $start;
79
        $this->until = $until;
80
        $this->resume = $resume;
81
        $this->fromDump = $fromDump;
82
        $this->maxRetries = array_get($config, 'max-retries', 1000);
83
        $this->sleepTimeOnError = array_get($config, 'sleep-time-on-error', 60);
84
    }
85
86
    /**
87
     * Import local XML dump rather than talking to the OAI-PMH server.
88
     */
89
    public function fromDump()
90
    {
91
        $files = Storage::disk('local')->files('harvests/' . $this->name);
92
        $recordsHarvested = 0;
93
        foreach ($files as $filename) {
94
            if (!preg_match('/.xml$/', $filename)) {
95
                continue;
96
            }
97
98
            $response = new ListRecordsResponse(Storage::disk('local')->get($filename));
99
            foreach ($response->records as $record) {
100
                $this->dispatch(new ImportRecord($this->collection, $record->data->asXML()));
101
                ++$recordsHarvested;
102
                if ($recordsHarvested % $this->statusUpdateEvery == 0) {
103
                    $this->status($recordsHarvested, $recordsHarvested);
104
                }
105
            }
106
107
        }
108
109
        return $recordsHarvested;
110
    }
111
112
    public function fromNetwork()
113
    {
114
        $dest_path = 'harvests/' . $this->name . '/';
115
116
        Storage::disk('local')->deleteDir($dest_path);
117
118
        $latest = $dest_path . 'latest.xml';
119
120
        $client = new OaiPmhClient($this->url, [
121
            'schema'              => $this->schema,
122
            'user-agent'          => 'Colligator/0.1',
123
            'max-retries'         => $this->maxRetries,
124
            'sleep-time-on-error' => $this->sleepTimeOnError,
125
        ]);
126
127
        $client->on('request.error', function ($msg) {
128
            $this->error($msg);
129
        });
130
131
        // Store each response to disk just in case
132
        $client->on('request.complete', function ($verb, $args, $body) use ($latest) {
133
            Storage::disk('local')->put($latest, $body);
134
        });
135
136
        $recordsHarvested = 0;
137
138
        // Loop over all records using an iterator that pulls in more data when
139
        // the buffer is exhausted.
140
        $records = $client->records($this->start, $this->until, $this->set, $this->resume);
141
        while (true) {
142
143
            // If no records included in the last response
144
            if (!$records->valid()) {
145
                break 1;
146
            }
147
148
            $record = $records->current();
149
            ++$recordsHarvested;
150
151
            // In case of a crash, it can be useful to have the resumption_token,
152
            // but delete it when the harvest is complete
153
            if ($this->resume != $records->getResumptionToken()) {
154
                $this->resume = $records->getResumptionToken();
155
                if (is_null($this->resume)) {
156
                    Storage::disk('local')->delete($dest_path . '/resumption_token');
157
                } else {
158
                    Storage::disk('local')->put($dest_path . '/resumption_token', $this->resume);
159
                }
160
            }
161
162
            // Note that Bibsys doesn't start counting on 0, as given in the spec,
163
            // but it doesn't really matter since we're only interested in a
164
            // fixed order.
165
            $currentIndex = $records->key();
166
167
            // Move to stable location
168
            $destPath = sprintf('%s/response_%08d.xml', $dest_path, $currentIndex);
169
            if (Storage::disk('local')->exists($latest)) {
170
                Storage::disk('local')->move($latest, $destPath);
171
            }
172
173
            $this->dispatch(new ImportRecord($this->collection, $record->data->asXML()));
174
175
            if ($recordsHarvested % $this->statusUpdateEvery == 0) {
176
                if (is_null($this->start)) {
177
                    $this->status($recordsHarvested, $recordsHarvested);
178
                } else {
179
                    $this->status($recordsHarvested, $currentIndex);
180
                }
181
            }
182
183
            $attempt = 1;
184
            while (true) {
185
                try {
186
                    $records->next();
187
                    break 1;
188
                } catch (BadRequestError $e) {
189
                    $this->error($e->getMessage());
190
                    $this->error($e->getCode());
191
                    $this->error('Bad request. Attempt ' . $attempt . ' of 100. Sleeping 10 secs.');
192
                    if ($attempt > 100) {
193
                        throw $e;
194
                    }
195
                    ++$attempt;
196
                    sleep(10);
197
                }
198
            }
199
        }
200
201
        return $recordsHarvested;
202
    }
203
204
    /**
205
     * Execute the job.
206
     *
207
     * @throws BadRequestError
208
     * @throws \Exception
209
     */
210
    public function handle()
211
    {
212
        Log::info('[OaiPmhHarvest] Starting job. Requesting records from ' . ($this->start ?: '(no limit)') . ' until ' . ($this->until ?: '(no limit)') . '.');
213
214
        // For timing
215
        $this->startTime = $this->batchTime = microtime(true) - 1;
216
217
        $this->collection = Collection::where('name', '=', $this->name)->first();
218
        if (is_null($this->collection)) {
219
            $this->error("Collection '$this->name' not found in DB");
220
221
            return;
222
        }
223
224
        if ($this->fromDump) {
225
            $recordsHarvested = $this->fromDump();
226
        } else {
227
            $recordsHarvested = $this->fromNetwork();
228
        }
229
230
        Log::info('[OaiPmhHarvest] Harvest complete, got ' . $recordsHarvested . ' records.');
231
    }
232
233
    /**
234
     * Output a status message.
235
     *
236
     * @param int $fetched
237
     * @param int $current
238
     */
239
    public function status($fetched, $current)
240
    {
241
        $totalTime = microtime(true) - $this->startTime;
242
        $batchTime = microtime(true) - $this->batchTime;
243
        $mem = round(memory_get_usage() / 1024 / 102.4) / 10;
244
245
        $currentSpeed = ($fetched - $this->batchPos) / $batchTime;
246
        $avgSpeed = $fetched / $totalTime;
247
248
        $this->batchTime = microtime(true);
249
        $this->batchPos = $fetched;
250
251
        Log::debug(sprintf(
252
            '[OaiPmhHarvest] Got %d records so far - Recs/sec: %.1f (current), %.1f (avg) - Mem: %.1f MB.',
253
            $current,
254
            $currentSpeed,
255
            $avgSpeed,
256
            $mem
257
        ));
258
    }
259
}
260