FileProvider::receive()   B
last analyzed

Complexity

Conditions 6
Paths 10

Size

Total Lines 38
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 24
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 38
rs 8.439
c 0
b 0
f 0
ccs 24
cts 24
cp 1
cc 6
eloc 27
nc 10
nop 1
crap 6
1
<?php
2
namespace Uecode\Bundle\QPushBundle\Provider;
3
4
use Doctrine\Common\Cache\Cache;
5
use Monolog\Logger;
6
use Symfony\Component\Filesystem\Filesystem;
7
use Symfony\Component\Finder\Finder;
8
use Symfony\Component\Finder\SplFileInfo;
9
use Uecode\Bundle\QPushBundle\Event\MessageEvent;
10
use Uecode\Bundle\QPushBundle\Message\Message;
11
12
class FileProvider extends AbstractProvider
13
{
14
    protected $filePointerList = [];
15
    protected $queuePath;
16
17 10
    public function __construct($name, array $options, $client, Cache $cache, Logger $logger) {
18 10
        $this->name     = $name;
19
        /* md5 only contain numeric and A to F, so it is file system safe */
20 10
        $this->queuePath = $options['path'].DIRECTORY_SEPARATOR.str_replace('-', '', hash('md5', $name));
21 10
        $this->options  = $options;
22 10
        $this->cache    = $cache;
23 10
        $this->logger   = $logger;
24 10
    }
25
26 1
    public function getProvider()
27
    {
28 1
        return 'File';
29
    }
30
31 8
    public function create()
32
    {
33 8
        $fs = new Filesystem();
34 8
        if (!$fs->exists($this->queuePath)) {
35 8
            $fs->mkdir($this->queuePath);
36 8
            return $fs->exists($this->queuePath);
37
        }
38
        return true;
39
    }
40
41 5
    public function publish(array $message, array $options = [])
42
    {
43 5
        $fileName = microtime(false);
44 5
        $fileName = str_replace(' ', '', $fileName);
45 5
        $path = substr(hash('md5', $fileName), 0, 3);
46
47 5
        $fs = new Filesystem();
48 5
        if (!$fs->exists($this->queuePath.DIRECTORY_SEPARATOR.$path)) {
49 5
            $fs->mkdir($this->queuePath.DIRECTORY_SEPARATOR.$path);
50
        }
51
52 5
        $fs->dumpFile(
53 5
            $this->queuePath.DIRECTORY_SEPARATOR.$path.DIRECTORY_SEPARATOR.$fileName.'.json',
54 5
            json_encode($message)
55
        );
56 5
        return $fileName;
57
    }
58
59
    /**
60
     * @param array $options
61
     * @return Message[]
62
     */
63 6
    public function receive(array $options = [])
64
    {
65 6
        $finder = new Finder();
66
        $finder
67 6
            ->files()
68 6
            ->ignoreDotFiles(true)
69 6
            ->ignoreUnreadableDirs(true)
70 6
            ->ignoreVCS(true)
71 6
            ->name('*.json')
72 6
            ->in($this->queuePath)
73
        ;
74 6
        if ($this->options['message_delay'] > 0) {
75 1
            $finder->date(
76 1
                sprintf('< %d seconds ago', $this->options['message_delay'])
77
            );
78
        }
79
        $finder
80 6
            ->date(
81 6
                sprintf('> %d seconds ago', $this->options['message_expiration'])
82
            )
83
        ;
84 6
        $messages = [];
85
        /** @var SplFileInfo $file */
86 6
        foreach ($finder as $file) {
87 4
            $filePointer = fopen($file->getRealPath(), 'r+');
88 4
            $id = substr($file->getFilename(), 0, -5);
89 4
            if (!isset($this->filePointerList[$id]) && flock($filePointer, LOCK_EX | LOCK_NB)) {
90 4
                $this->filePointerList[$id] = $filePointer;
91 4
                $messages[] = new Message($id, json_decode($file->getContents(), true), []);
92
            } else {
93 1
                fclose($filePointer);
94
            }
95 4
            if (count($messages) === (int) $this->options['messages_to_receive']) {
96 4
                break;
97
            }
98
        }
99 6
        return $messages;
100
    }
101
102 2
    public function delete($id)
103
    {
104 2
        $success = false;
105 2
        if (isset($this->filePointerList[$id])) {
106 2
            $fileName = $id;
107 2
            $path = substr(hash('md5', (string)$fileName), 0, 3);
108 2
            $fs = new Filesystem();
109 2
            $fs->remove(
110 2
                $this->queuePath . DIRECTORY_SEPARATOR . $path . DIRECTORY_SEPARATOR . $fileName . '.json'
111
            );
112 2
            fclose($this->filePointerList[$id]);
113 2
            unset($this->filePointerList[$id]);
114 2
            $success = true;
115
        }
116 2
        if (rand(1,10) === 5) {
117
            $this->cleanUp();
118
        }
119 2
        return $success;
120
    }
121
122 2
    public function cleanUp()
123
    {
124 2
        $finder = new Finder();
125
        $finder
126 2
            ->files()
127 2
            ->in($this->queuePath)
128 2
            ->ignoreDotFiles(true)
129 2
            ->ignoreUnreadableDirs(true)
130 2
            ->ignoreVCS(true)
131 2
            ->depth('< 2')
132 2
            ->name('*.json')
133
        ;
134 2
        $finder->date(
135 2
            sprintf('< %d seconds ago', $this->options['message_expiration'])
136
        );
137
        /** @var SplFileInfo $file */
138 2
        foreach ($finder as $file) {
139 2
            @unlink($file->getRealPath());
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
140
        }
141 2
    }
142
143 1
    public function destroy()
144
    {
145 1
        $fs = new Filesystem();
146 1
        $fs->remove($this->queuePath);
147 1
        $this->filePointerList = [];
148 1
        return !is_dir($this->queuePath);
149
    }
150
151
    /**
152
     * Removes the message from queue after all other listeners have fired
153
     *
154
     * If an earlier listener has erred or stopped propagation, this method
155
     * will not fire and the Queued Message should become visible in queue again.
156
     *
157
     * Stops Event Propagation after removing the Message
158
     *
159
     * @param MessageEvent $event The SQS Message Event
160
     * @return bool|void
161
     */
162 1
    public function onMessageReceived(MessageEvent $event)
163
    {
164 1
        $id = $event->getMessage()->getId();
165 1
        $this->delete($id);
166 1
        $event->stopPropagation();
167 1
    }
168
}
169