1 | <?php |
||
2 | |||
3 | namespace Brouzie\Sphinxy; |
||
4 | |||
5 | use Brouzie\Sphinxy\Exception\ConnectionException; |
||
6 | use Brouzie\Sphinxy\Indexer\IndexerInterface; |
||
7 | use Symfony\Component\DependencyInjection\ContainerInterface; |
||
8 | |||
9 | class IndexManager |
||
10 | { |
||
11 | protected $conn; |
||
12 | |||
13 | protected $container; |
||
14 | |||
15 | protected $indexers = array(); |
||
16 | |||
17 | /** |
||
18 | * @param Registry $registry |
||
19 | * @param ContainerInterface $container |
||
20 | * @param array $indexers |
||
21 | */ |
||
22 | public function __construct(Registry $registry, ContainerInterface $container, array $indexers) |
||
23 | { |
||
24 | $this->conn = $registry->getConnection(); |
||
25 | $this->container = $container; |
||
26 | $this->indexers = $indexers; |
||
27 | } |
||
28 | |||
29 | public function reindex($index, $batchSize = 1000, callable $batchCallback = null, array $rangeCriterias = array()) |
||
30 | { |
||
31 | $logger = $this->conn->getLogger(); |
||
32 | $this->conn->setLogger(null); |
||
33 | |||
34 | $indexer = $this->getIndexer($index); |
||
35 | $range = array_replace($indexer->getRangeCriterias(), $rangeCriterias); |
||
36 | |||
37 | $reindexCallback = function ($data) use ($index, $indexer, $batchCallback, $range) { |
||
38 | if (null !== $batchCallback) { |
||
39 | $batchCallback( |
||
40 | array( |
||
41 | 'id_from' => $data['id_from'], |
||
42 | 'id_to' => $data['id_to'], |
||
43 | 'min_id' => $range['min'], |
||
44 | 'max_id' => $range['max'], |
||
45 | ) |
||
46 | ); |
||
47 | } |
||
48 | |||
49 | $items = $indexer->getItemsByInterval($data['id_from'], $data['id_to']); |
||
50 | $this->processItems($index, $indexer, $items); |
||
51 | }; |
||
52 | |||
53 | $idFrom = $range['min']; |
||
54 | do { |
||
55 | $idTo = $idFrom + $batchSize; |
||
56 | $this->safeExecute($reindexCallback, array(array('id_from' => $idFrom, 'id_to' => $idTo))); |
||
57 | $idFrom = $idTo; |
||
58 | } while ($idFrom <= $range['max']); |
||
59 | $this->conn->setLogger($logger); |
||
60 | } |
||
61 | |||
62 | public function reindexItems($index, $itemsIds, $batchSize = 100) |
||
63 | { |
||
64 | $indexer = $this->getIndexer($index); |
||
65 | |||
66 | $reindexItemsCallback = function ($itemsIdsToProcess) use ($index, $indexer) { |
||
67 | $items = $indexer->getItemsByIds($itemsIdsToProcess); |
||
68 | $this->processItems($index, $indexer, $items); |
||
69 | }; |
||
70 | |||
71 | do { |
||
72 | $itemsIdsToProcess = array_splice($itemsIds, 0, $batchSize); |
||
73 | $this->safeExecute($reindexItemsCallback, array($itemsIdsToProcess)); |
||
74 | } while ($itemsIdsToProcess); |
||
75 | } |
||
76 | |||
77 | public function removeItems($index, $itemsIds) |
||
78 | { |
||
79 | $removeItemsCallback = function () use ($index, $itemsIds) { |
||
80 | return $this->conn->createQueryBuilder() |
||
81 | ->delete($this->conn->getEscaper()->quoteIdentifier($index)) |
||
82 | ->where('id IN :ids') |
||
83 | ->setParameter('ids', $itemsIds) |
||
84 | ->execute(); |
||
85 | }; |
||
86 | |||
87 | return $this->safeExecute($removeItemsCallback); |
||
88 | } |
||
89 | |||
90 | public function getIndexRange($index) |
||
91 | { |
||
92 | $getIndexRangeCallback = function () use ($index) { |
||
93 | return $this->conn |
||
94 | ->createQueryBuilder() |
||
95 | ->select('MIN(id) AS `min`, MAX(id) AS `max`') |
||
96 | ->from($this->conn->getEscaper()->quoteIdentifier($index)) |
||
97 | ->getResult() |
||
98 | ->getSingleRow(array('min' => 0, 'max' => 0)); |
||
99 | }; |
||
100 | |||
101 | return $this->safeExecute($getIndexRangeCallback); |
||
102 | } |
||
103 | |||
104 | public function truncate($index) |
||
105 | { |
||
106 | $this->conn->executeUpdate(sprintf('TRUNCATE RTINDEX %s', $this->conn->getEscaper()->quoteIdentifier($index))); |
||
107 | } |
||
108 | |||
109 | /** |
||
110 | * @param $index |
||
111 | * |
||
112 | * @return IndexerInterface |
||
113 | * |
||
114 | * @throws \InvalidArgumentException When index not defined |
||
115 | */ |
||
116 | protected function getIndexer($index) |
||
117 | { |
||
118 | if (!isset($this->indexers[$index])) { |
||
119 | throw new \InvalidArgumentException('Unknown index'); |
||
120 | } |
||
121 | |||
122 | return $this->container->get($this->indexers[$index]); |
||
123 | } |
||
124 | |||
125 | /** |
||
126 | * @param $index |
||
127 | * @param IndexerInterface $indexer |
||
128 | * @param $items |
||
129 | */ |
||
130 | protected function processItems($index, IndexerInterface $indexer, $items) |
||
131 | { |
||
132 | $items = $indexer->processItems($items); |
||
133 | |||
134 | if (!count($items)) { |
||
135 | return; |
||
136 | } |
||
137 | |||
138 | $this->safeExecute(function () use ($index, $indexer, $items) { |
||
139 | $escaper = $this->conn->getEscaper(); |
||
140 | $insertQb = $this->conn |
||
141 | ->createQueryBuilder() |
||
142 | ->replace($escaper->quoteIdentifier($index)); |
||
143 | |||
144 | foreach ($items as $item) { |
||
145 | $insertQb->addValues($escaper->quoteSetArr($indexer->serializeItem($item))); |
||
146 | } |
||
147 | |||
148 | $insertQb->execute(); |
||
149 | }); |
||
150 | } |
||
151 | |||
152 | protected function safeExecute(callable $callable, array $args = array(), $retriesCount = 3, $sleep = 20) |
||
153 | { |
||
154 | for ($i = 0; $i < $retriesCount; $i++) { |
||
155 | try { |
||
156 | return call_user_func_array($callable, $args); |
||
157 | } catch (ConnectionException $e) { |
||
158 | sleep($sleep); |
||
159 | $this->conn->checkConnection(); |
||
160 | continue; |
||
161 | } |
||
162 | } |
||
163 | |||
164 | throw $e; |
||
165 | } |
||
166 | } |
||
167 |