ZmqServer::commandGenerate()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 11
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 11
ccs 6
cts 6
cp 1
rs 9.4285
cc 2
eloc 7
nc 2
nop 0
crap 2
1
<?php
2
3
/**
4
 * ZeroMQ interface for cruftflake.
5
 * 
6
 * @author @davegardnerisme
7
 */
8
9
namespace Gendoria\CruftFlake\Zmq;
10
11
use Exception;
12
use Gendoria\CruftFlake\Generator\Generator;
13
use Gendoria\CruftFlake\ServerInterface;
14
use Psr\Log\LoggerAwareInterface;
15
use Psr\Log\LoggerInterface;
16
use Psr\Log\NullLogger;
17
use ZMQ;
18
use ZMQContext;
19
use ZMQSocket;
20
21
class ZmqServer implements ServerInterface, LoggerAwareInterface
22
{
23
    /**
24
     * Cruft flake generator.
25
     * 
26
     * @var Generator
27
     */
28
    private $generator;
29
30
    /**
31
     * DSN.
32
     * 
33
     * @var string
34
     */
35
    private $dsn;
36
37
    /**
38
     * Logger.
39
     * 
40
     * @var LoggerInterface
41
     */
42
    private $logger;
43
    private $debugMode = false;
44
45
    /**
46
     * Constructor.
47
     * 
48
     * @param @inject Generator $generator
49
     * @param string            $dsn       Where socket should be bound. Default 'tcp://*:5599'
50
     * @param bool              $debugMode Debug mode. If set to true, server will only listen for one command, before exiting.
51
     */
52 8
    public function __construct(Generator $generator, $dsn = 'tcp://*:5599', $debugMode = false)
53
    {
54 8
        $this->generator = $generator;
55 8
        $this->dsn = $dsn;
56 8
        $this->logger = new NullLogger();
57 8
        $this->debugMode = $debugMode;
58 8
    }
59
60
    /**
61
     * Run ZMQ interface for generator.
62
     * 
63
     * Req-rep pattern; msgs are commands:
64
     * 
65
     * GEN    = Generate ID
66
     * STATUS = Get status string
67
     */
68 6
    public function run()
69
    {
70 6
        $receiver = $this->getZmqSocket($this->dsn);
71 6
        while (true) {
72 6
            $msg = $receiver->recv();
73 6
            if ($msg !== false) {
74 5
                $this->logger->debug('ZMQ server received command: '.$msg);
75 5
                $response = $this->runCommand($msg);
76 5
                $receiver->send(json_encode($response));
77 5
            }
78 6
            $this->generator->heartbeat();
79 6
            if ($this->debugMode) {
80 6
                break;
81
            }
82
        }
83 6
    }
84
    
85 5
    private function runCommand($msg)
86
    {
87
        switch ($msg) {
88 5
            case 'GEN':
89 3
                return $this->commandGenerate();
90 2
            case 'STATUS':
91 1
                return $this->commandStatus();
92 1
            default:
93 1
                $this->logger->debug('Unknown command received: '.$msg);
94
95 1
                return $this->createResponse('UNKNOWN COMMAND', 404);
96 1
        }
97
    }
98
99
    /**
100
     * Create generate command response.
101
     * 
102
     * @return array
103
     */
104 3
    private function commandGenerate()
105
    {
106
        try {
107 3
            $response = $this->createResponse($this->generator->generate());
108 3
        } catch (Exception $e) {
109 1
            $this->logger->error('Generator error: '.$e->getMessage(), array($e, $this));
110 1
            $response = $this->createResponse('ERROR', 500);
111
        }
112
113 3
        return $response;
114
    }
115
116
    /**
117
     * Create status command response.
118
     * 
119
     * @return array
120
     */
121 1
    private function commandStatus()
122
    {
123 1
        return $this->createResponse($this->generator->status());
124
    }
125
126
    /**
127
     * Prepare response.
128
     * 
129
     * @param mixed $message Return message. Anything, which is JSON serializable.
130
     * @param int   $code    Response code.
131
     * 
132
     * @return array
133
     */
134 5
    private function createResponse($message, $code = 200)
135
    {
136
        return array(
137 5
            'code' => $code,
138 5
            'message' => $message,
139 5
        );
140
    }
141
142
    /**
143
     * Get ZMQ socket.
144
     * 
145
     * @param string $dsn DSN, on which ZMQ connection should listen.
146
     *
147
     * @return \ZMQSocket
148
     */
149 1
    protected function getZmqSocket($dsn)
150
    {
151 1
        $context = new ZMQContext();
152 1
        $receiver = new ZMQSocket($context, ZMQ::SOCKET_REP);
153
        //Receive timeout is set to 5s, to allow heartbeat operations to take place.
154 1
        $receiver->setSockOpt(ZMQ::SOCKOPT_RCVTIMEO, 5000);
155 1
        $this->logger->debug("Binding to {$dsn}");
156 1
        $receiver->bind($dsn);
157
158 1
        return $receiver;
159
    }
160
161 1
    public function setLogger(LoggerInterface $logger)
162
    {
163 1
        $this->logger = $logger;
164 1
    }
165
}
166