OaiPmhHarvest::fromNetwork()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 21
rs 9.584
c 0
b 0
f 0
cc 3
nc 3
nop 0
1
<?php
2
3
namespace Colligator\Jobs;
4
5
use Colligator\Collection;
6
use DateTime;
7
use Illuminate\Foundation\Bus\DispatchesJobs;
8
use Log;
9
use Phpoaipmh\Client;
10
use Phpoaipmh\Endpoint;
11
use Phpoaipmh\Exception\OaipmhException;
12
use Phpoaipmh\Granularity;
13
use Storage;
14
15
class OaiPmhHarvest extends Job
16
{
17
    use DispatchesJobs;
18
19
    public $name;
20
    public $url;
21
    public $schema;
22
    public $set;
23
    public $start;
24
    public $until;
25
    public $resume;
26
    public $maxRetries;
27
    public $sleepTimeOnError;
28
29
    /**
30
     * Start time for the full harvest.
31
     *
32
     * @var float
33
     */
34
    protected $startTime;
35
36
    /**
37
     * Start time for the current batch.
38
     *
39
     * @var float
40
     */
41
    protected $batchTime;
42
43
    /**
44
     * Harvest position.
45
     *
46
     * @var int
47
     */
48
    protected $batchPos = 0;
49
50
    /**
51
     * @var Collection
52
     */
53
    public $collection;
54
55
    /**
56
     * Number of records retrieved between each emitted OaiPmhHarvestStatus event.
57
     * A too small number will cause CPU overhead.
58
     *
59
     * @var int
60
     */
61
    protected $statusUpdateEvery = 50;
62
63
    /**
64
     * Create a new job instance.
65
     *
66
     * @param string $name     Harvest name from config
67
     * @param array  $config   Harvest config array (url, set, schema)
68
     * @param DateTime $start    Start date (optional)
69
     * @param DateTime $until    End date (optional)
70
     * @param string $resume   Resumption token for continuing an aborted harvest (optional)
71
     */
72
    public function __construct($name, $config, DateTime $start = null, DateTime $until = null, $resume = null)
73
    {
74
        $this->name = $name;
75
        $this->url = $config['url'];
76
        $this->schema = $config['schema'];
77
        $this->set = $config['set'];
78
        $this->start = $start;
79
        $this->until = $until;
80
        $this->resume = $resume;
81
        $this->maxRetries = array_get($config, 'max-retries', 1000);
82
        $this->sleepTimeOnError = array_get($config, 'sleep-time-on-error', 60);
83
    }
84
85
    public function fromNetwork()
86
    {
87
        $client = new Client($this->url);
88
        $endpoint = new Endpoint($client,Granularity::DATE);
89
90
        $recordsHarvested = 0;
91
92
        // Loop over all records using an iterator that pulls in more data when
93
        // the buffer is exhausted.
94
        foreach ($endpoint->listRecords($this->schema, $this->start, $this->until, $this->set, $this->resume) as $record) {
95
            ++$recordsHarvested;
96
97
            $this->dispatch(new ImportRecord($this->collection, $record->asXML()));
98
99
            if ($recordsHarvested % $this->statusUpdateEvery == 0) {
100
                $this->status($recordsHarvested, $recordsHarvested);
101
            }
102
        }
103
104
        return $recordsHarvested;
105
    }
106
107
    /**
108
     * Execute the job.
109
     */
110
    public function handle()
111
    {
112
        Log::info('[OaiPmhHarvest] Starting job. Requesting records from ' . ($this->start ?: '(no limit)') . ' until ' . ($this->until ?: '(no limit)') . '.');
113
114
        // For timing
115
        $this->startTime = $this->batchTime = microtime(true) - 1;
116
117
        $this->collection = Collection::where('name', '=', $this->name)->first();
118
        if (is_null($this->collection)) {
119
            $this->error("Collection '$this->name' not found in DB");
120
121
            return;
122
        }
123
124
        try {
125
            $recordsHarvested = $this->fromNetwork();
126
            Log::info('[OaiPmhHarvest] Harvest complete, got ' . $recordsHarvested . ' records.');
127
        } catch (OaipmhException $e) {
128
            Log::warning('[OaiPmhHarvest] Harvest stopped with error ' . $e->getCode() . ': ' . $e->getMessage());
129
        }
130
    }
131
132
    /**
133
     * Output a status message.
134
     *
135
     * @param int $fetched
136
     * @param int $current
137
     */
138
    public function status($fetched, $current)
139
    {
140
        $totalTime = microtime(true) - $this->startTime;
141
        $batchTime = microtime(true) - $this->batchTime;
142
        $mem = round(memory_get_usage() / 1024 / 102.4) / 10;
143
144
        $currentSpeed = ($fetched - $this->batchPos) / $batchTime;
145
        $avgSpeed = $fetched / $totalTime;
146
147
        $this->batchTime = microtime(true);
148
        $this->batchPos = $fetched;
149
150
        Log::debug(sprintf(
151
            '[OaiPmhHarvest] Got %d records so far - Recs/sec: %.1f (current), %.1f (avg) - Mem: %.1f MB.',
152
            $current,
153
            $currentSpeed,
154
            $avgSpeed,
155
            $mem
156
        ));
157
    }
158
}
159