SimilarMoviesProcessor::addSimilarMovies()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 0
cts 4
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 2
1
<?php
2
3
namespace App\Movies\EventListener;
4
5
use App\Movies\Exception\TmdbMovieNotFoundException;
6
use App\Movies\Exception\TmdbRequestLimitException;
7
use App\Movies\Repository\MovieRepository;
8
use App\Movies\Service\TmdbNormalizerService;
9
use App\Movies\Service\TmdbSearchService;
10
use App\Movies\Service\TmdbSyncService;
11
use Enqueue\Client\Message;
12
use Enqueue\Client\MessagePriority;
13
use Enqueue\Client\ProducerInterface;
14
use Enqueue\Client\TopicSubscriberInterface;
15
use Interop\Queue\Context;
16
use Interop\Queue\Message as QMessage;
17
use Interop\Queue\Processor;
18
19
class SimilarMoviesProcessor implements Processor, TopicSubscriberInterface
20
{
21
    const LOAD_SIMILAR_MOVIES = 'LoadSimilarMoviesFromTMDB';
22
23
    private $searchService;
24
    private $movieRepository;
25
    private $normalizer;
26
    private $sync;
27
    private $producer;
28
29
    public function __construct(MovieRepository $movieRepository, TmdbSearchService $searchService, TmdbNormalizerService $normalizer, TmdbSyncService $sync, ProducerInterface $producer)
30
    {
31
        $this->movieRepository = $movieRepository;
32
        $this->searchService = $searchService;
33
        $this->normalizer = $normalizer;
34
        $this->sync = $sync;
35
        $this->producer = $producer;
36
    }
37
38
    public function process(QMessage $message, Context $session)
39
    {
40
        $movieId = $message->getBody();
41
        $movieId = json_decode($movieId, true);
42
43
        $movies = $this->movieRepository->findAllByIdsWithSimilarMovies([$movieId]);
44
45
        if (\count($movies) === 0) {
46
            return self::REJECT;
47
        }
48
49
        $allSimilarMoviesTable = [];
50
        $movie = reset($movies);
51
52
        if ($movie['sm_id'] !== null) {
53
            return self::ACK;
54
        }
55
56
        try {
57
            $similarMovies = $this->loadSimilarMoviesFromTMDB($movie['m_tmdb.id']);
58
        } catch (TmdbRequestLimitException $requestLimitException) {
59
            sleep(5);
60
61
            return self::REQUEUE;
62
        } catch (TmdbMovieNotFoundException $movieNotFoundException) {
63
            return self::ACK;
64
        }
65
66
        $allSimilarMoviesTable[$movie['m_id']] = array_map(function (array $newSimilarMovie) {
67
            return $newSimilarMovie['id'];
68
        }, $similarMovies);
69
70
        $this->sync->syncMovies($similarMovies);
71
72
        $this->addSimilarMovies($allSimilarMoviesTable);
73
74
        $message = $session = $moviesIds = $movies = $allSimilarMoviesTable = $totalSuccessfullyProcessedMovies = $requeueIds = $movie = null;
75
        unset($message, $session, $moviesIds, $movies, $allSimilarMoviesTable, $totalSuccessfullyProcessedMovies, $requeueIds, $movie);
76
77
        return self::ACK;
78
    }
79
80
    /**
81
     * @param int $tmdbId
82
     *
83
     * @throws TmdbRequestLimitException
84
     * @throws TmdbMovieNotFoundException
85
     * @throws \Psr\SimpleCache\InvalidArgumentException
86
     *
87
     * @return array
88
     */
89
    private function loadSimilarMoviesFromTMDB(int $tmdbId): array
90
    {
91
        $similarMoviesResponse = $this->searchService->findSimilarMoviesById($tmdbId);
92
        $movies = $similarMoviesResponse['results'] ?? [];
93
94
        // If we have a lot of movies then load it all
95
        if (isset($similarMoviesResponse['total_pages']) && $similarMoviesResponse['total_pages'] > 1) {
96
            // $i = 2 because $movies currently already has movies from page 1
97
            $similarMoviesResponse['total_pages'] = $similarMoviesResponse['total_pages'] > 5 ? 5 : $similarMoviesResponse['total_pages'];
98
            for ($i = 2; $i <= $similarMoviesResponse['total_pages']; ++$i) {
99
                $moviesOnPage = $this->searchService->findSimilarMoviesById($tmdbId, $i);
100
                $movies = array_merge($movies, $moviesOnPage['results']);
101
            }
102
        }
103
104
        return $movies;
105
    }
106
107
    private function addSimilarMovies(array $allSimilarMoviesTable)
108
    {
109
        $message = new Message(json_encode($allSimilarMoviesTable));
110
        // todo redis doesnt support priority $message->setPriority(MessagePriority::VERY_LOW);
111
        $this->producer->sendEvent(AddSimilarMoviesProcessor::ADD_SIMILAR_MOVIES, $message);
112
    }
113
114
    public static function getSubscribedTopics()
115
    {
116
        return [self::LOAD_SIMILAR_MOVIES];
117
    }
118
}
119