Completed
Push — master ( 8bfd95...2626ba )
by Dan Michael O.
12:11
created

OaiPmhHarvest   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 198
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 12

Importance

Changes 0
Metric Value
wmc 22
c 0
b 0
f 0
lcom 1
cbo 12
dl 0
loc 198
rs 10

4 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 13 1
B fromDump() 0 22 5
C fromNetwork() 0 91 11
B handle() 0 21 5
1
<?php
2
3
namespace Colligator\Jobs;
4
5
use Colligator\Collection;
6
use Colligator\Events\OaiPmhHarvestComplete;
7
use Colligator\Events\OaiPmhHarvestStatus;
8
use Event;
9
use Illuminate\Foundation\Bus\DispatchesJobs;
10
use Log;
11
use Scriptotek\OaiPmh\BadRequestError;
12
use Scriptotek\OaiPmh\Client as OaiPmhClient;
13
use Scriptotek\OaiPmh\ListRecordsResponse;
14
use Storage;
15
16
class OaiPmhHarvest extends Job
17
{
18
    use DispatchesJobs;
19
20
    public $name;
21
    public $url;
22
    public $schema;
23
    public $set;
24
    public $start;
25
    public $until;
26
    public $resume;
27
    public $fromDump;
28
    public $maxRetries;
29
    public $sleepTimeOnError;
30
31
    /**
32
     * @var Collection
33
     */
34
    public $collection;
35
36
    /**
37
     * Number of records retrieved between each emitted OaiPmhHarvestStatus event.
38
     * A too small number will cause CPU overhead.
39
     *
40
     * @var int
41
     */
42
    protected $statusUpdateEvery = 50;
43
44
    /**
45
     * Create a new job instance.
46
     *
47
     * @param string $name     Harvest name from config
48
     * @param array  $config   Harvest config array (url, set, schema)
49
     * @param string $start    Start date (optional)
50
     * @param string $until    End date (optional)
51
     * @param string $resume   Resumption token for continuing an aborted harvest (optional)
52
     * @param bool   $fromDump Import from local dump
53
     */
54
    public function __construct($name, $config, $start = null, $until = null, $resume = null, $fromDump = false)
55
    {
56
        $this->name = $name;
57
        $this->url = $config['url'];
58
        $this->schema = $config['schema'];
59
        $this->set = $config['set'];
60
        $this->start = $start;
61
        $this->until = $until;
62
        $this->resume = $resume;
63
        $this->fromDump = $fromDump;
64
        $this->maxRetries = array_get($config, 'max-retries', 1000);
65
        $this->sleepTimeOnError = array_get($config, 'sleep-time-on-error', 60);
66
    }
67
68
    /**
69
     * Import local XML dump rather than talking to the OAI-PMH server.
70
     */
71
    public function fromDump()
72
    {
73
        $files = Storage::disk('local')->files('harvests/' . $this->name);
74
        $recordsHarvested = 0;
75
        foreach ($files as $filename) {
76
            if (!preg_match('/.xml$/', $filename)) {
77
                continue;
78
            }
79
80
            $response = new ListRecordsResponse(Storage::disk('local')->get($filename));
81
            foreach ($response->records as $record) {
82
                $this->dispatch(new ImportRecord($this->collection, $record->data));
83
                ++$recordsHarvested;
84
                if ($recordsHarvested % $this->statusUpdateEvery == 0) {
85
                    Event::fire(new OaiPmhHarvestStatus($recordsHarvested, $recordsHarvested, $response->numberOfRecords));
0 ignored issues
show
Unused Code introduced by
The call to OaiPmhHarvestStatus::__construct() has too many arguments starting with $response->numberOfRecords.

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.

In this case you can add the @ignore PhpDoc annotation to the duplicate definition and it will be ignored.

Loading history...
86
                }
87
            }
88
89
        }
90
91
        return $recordsHarvested;
92
    }
93
94
    public function fromNetwork()
95
    {
96
        $dest_path = 'harvests/' . $this->name . '/';
97
98
        Storage::disk('local')->deleteDir($dest_path);
99
100
        $latest = $dest_path . 'latest.xml';
101
102
        $client = new OaiPmhClient($this->url, [
103
            'schema'              => $this->schema,
104
            'user-agent'          => 'Colligator/0.1',
105
            'max-retries'         => $this->maxRetries,
106
            'sleep-time-on-error' => $this->sleepTimeOnError,
107
        ]);
108
109
        $client->on('request.error', function ($msg) {
110
            $this->error($msg);
111
        });
112
113
        // Store each response to disk just in case
114
        $client->on('request.complete', function ($verb, $args, $body) use ($latest) {
115
            Storage::disk('local')->put($latest, $body);
116
        });
117
118
        $recordsHarvested = 0;
119
120
        // Loop over all records using an iterator that pulls in more data when
121
        // the buffer is exhausted.
122
        $records = $client->records($this->start, $this->until, $this->set, $this->resume);
123
        while (true) {
124
125
            // If no records included in the last response
126
            if (!$records->valid()) {
127
                break 1;
128
            }
129
130
            $record = $records->current();
131
            ++$recordsHarvested;
132
133
            // In case of a crash, it can be useful to have the resumption_token,
134
            // but delete it when the harvest is complete
135
            if ($this->resume != $records->getResumptionToken()) {
136
                $this->resume = $records->getResumptionToken();
137
                if (is_null($this->resume)) {
138
                    Storage::disk('local')->delete($dest_path . '/resumption_token');
139
                } else {
140
                    Storage::disk('local')->put($dest_path . '/resumption_token', $this->resume);
141
                }
142
            }
143
144
            // Note that Bibsys doesn't start counting on 0, as given in the spec,
145
            // but it doesn't really matter since we're only interested in a
146
            // fixed order.
147
            $currentIndex = $records->key();
148
149
            // Move to stable location
150
            $destPath = sprintf('%s/response_%08d.xml', $dest_path, $currentIndex);
151
            if (Storage::disk('local')->exists($latest)) {
152
                Storage::disk('local')->move($latest, $destPath);
153
            }
154
155
            $this->dispatch(new ImportRecord($this->collection, $record->data));
156
157
            if ($recordsHarvested % $this->statusUpdateEvery == 0) {
158
                if (is_null($this->start)) {
159
                    Event::fire(new OaiPmhHarvestStatus($recordsHarvested, $recordsHarvested));
160
                } else {
161
                    Event::fire(new OaiPmhHarvestStatus($recordsHarvested, $currentIndex));
162
                }
163
            }
164
165
            $attempt = 1;
166
            while (true) {
167
                try {
168
                    $records->next();
169
                    break 1;
170
                } catch (BadRequestError $e) {
171
                    $this->error($e->getMessage());
172
                    $this->error($e->getCode());
173
                    $this->error('Bad request. Attempt ' . $attempt . ' of 100. Sleeping 10 secs.');
174
                    if ($attempt > 100) {
175
                        throw $e;
176
                    }
177
                    ++$attempt;
178
                    sleep(10);
179
                }
180
            }
181
        }
182
183
        return $recordsHarvested;
184
    }
185
186
    /**
187
     * Execute the job.
188
     *
189
     * @throws BadRequestError
190
     * @throws \Exception
191
     */
192
    public function handle()
193
    {
194
        Log::info('[OaiPmhHarvestJob] Starting job. Requesting records from ' . ($this->start ?: '(no limit)') . ' until ' . ($this->until ?: '(no limit)') . '.');
195
196
        $this->collection = Collection::where('name', '=', $this->name)->first();
197
        if (is_null($this->collection)) {
198
            $this->error("Collection '$this->name' not found in DB");
199
200
            return;
201
        }
202
203
        if ($this->fromDump) {
204
            $recordsHarvested = $this->fromDump();
205
        } else {
206
            $recordsHarvested = $this->fromNetwork();
207
        }
208
209
        Log::info('[OaiPmhHarvestJob] Complete, got ' . $recordsHarvested . ' records.');
210
211
        Event::fire(new OaiPmhHarvestComplete($recordsHarvested));
212
    }
213
}
214