Issues (48)

src/Socket/NativeSocket.php (4 issues)

1
<?php
2
3
namespace Pheanstalk\Socket;
4
5
use Pheanstalk\Exception;
6
use Pheanstalk\Socket;
7
8
/**
9
 * A Socket implementation around a fsockopen() stream.
10
 *
11
 * @author  Paul Annesley
12
 * @package Pheanstalk
13
 * @license http://www.opensource.org/licenses/mit-license.php
14
 */
15
class NativeSocket implements Socket
16
{
17
    /**
18
     * The default timeout for a blocking read on the socket.
19
     */
20
    const SOCKET_TIMEOUT = 1;
21
22
    /**
23
     * Number of retries for attempted writes which return zero length.
24
     */
25
    const WRITE_RETRIES = 8;
26
27
    private $_socket;
28
29
    /**
30
     * @param string $host
31
     * @param int    $port
32
     * @param int    $connectTimeout
33
     * @param bool   $connectPersistent
34
     */
35 28
    public function __construct($host, $port, $connectTimeout, $connectPersistent)
36
    {
37 28
        if ($connectPersistent) {
38 4
            $this->_socket = $this->_wrapper()
39 4
                ->pfsockopen($host, $port, $errno, $errstr, $connectTimeout);
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $errstr seems to be never defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable $errno seems to be never defined.
Loading history...
40
        } else {
41 24
            $this->_socket = $this->_wrapper()
42 24
                ->fsockopen($host, $port, $errno, $errstr, $connectTimeout);
43
        }
44
45 28
        if (!$this->_socket) {
46 2
            throw new Exception\ConnectionException($errno, $errstr." (connecting to $host:$port)");
47
        }
48
49 26
        $this->_wrapper()
50 26
            ->stream_set_timeout($this->_socket, self::SOCKET_TIMEOUT);
51
    }
52
53
    /* (non-phpdoc)
54
     * @see Socket::write()
55
     */
56 24
    public function write($data)
57
    {
58 24
        $history = new WriteHistory(self::WRITE_RETRIES);
59
60 24
        for ($written = 0, $fwrite = 0; $written < strlen($data); $written += $fwrite) {
0 ignored issues
show
The assignment to $fwrite is dead and can be removed.
Loading history...
61 24
            $fwrite = $this->_wrapper()
62 24
                ->fwrite($this->_socket, substr($data, $written));
63
64 24
            $history->log($fwrite);
65
66 24
            if ($history->isFullWithNoWrites()) {
67 1
                throw new Exception\SocketException(sprintf(
68 1
                    'fwrite() failed to write data after %u tries',
69 1
                    self::WRITE_RETRIES
70
                ));
71
            }
72
        }
73
    }
74
75
    /**
76
     * Request a socket until the returned datas are a valid xml string
77
     * @param null $length
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $length is correct as it would always require null to be passed?
Loading history...
78
     *
79
     * @return string
80
     * @throws Exception\SocketException
81
     */
82 25
    public function getLine($length = null)
83
    {
84 25
        $timeout = (int) 5;
85 25
        $timer   = microtime(true);
86 25
        $data = '';
87 25
        libxml_use_internal_errors(true);
88
        do {
89 25
            libxml_clear_errors();
90 25
                $data .= isset($length) ?
91 25
                    $this->_wrapper()->fgets($this->_socket, $length) : $this->_wrapper()->fgets($this->_socket);
92 25
            simplexml_load_string($data);
93 25
            if (!empty(libxml_get_errors()) && microtime(true) - $timer > $timeout) {
94 1
                $this->disconnect();
95 1
                throw new Exception\SocketException('Socket timed out!');
96 25
            } elseif (empty(libxml_get_errors())) {
97
                try {
98 24
                    $xml = new \SimpleXMLElement($data);
99 1
                } catch (\Exception $e) {
100 1
                    $error = (string) (isset($xml['error'])) ? $xml['error'] : 'Socket closed by server!';
101 1
                    throw new Exception\SocketException($error);
102
                }
103
            }
104 24
        } while (!empty(libxml_get_errors()));
105
106 23
        libxml_clear_errors();
107
108 23
        return rtrim($data);
109
    }
110
111 2
    public function disconnect()
112
    {
113 2
        $this->_wrapper()->fclose($this->_socket);
114
    }
115
116
    // ----------------------------------------
117
118
    /**
119
     * Wrapper class for all stream functions.
120
     * Facilitates mocking/stubbing stream operations in unit tests.
121
     */
122 28
    private function _wrapper()
123
    {
124 28
        return StreamFunctions::instance();
125
    }
126
}
127