Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like AbstractConnection often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
While breaking up the class, it is a good idea to analyze how other classes use AbstractConnection, and based on these observations, apply Extract Interface, too.
1 | <?php |
||
18 | class AbstractConnection extends AbstractChannel |
||
19 | { |
||
20 | /** |
||
21 | * @var array |
||
22 | * @internal |
||
23 | */ |
||
24 | public static $LIBRARY_PROPERTIES = array( |
||
25 | 'product' => array('S', 'AMQPLib'), |
||
26 | 'platform' => array('S', 'PHP'), |
||
27 | 'version' => array('S', '2.9'), |
||
28 | 'information' => array('S', ''), |
||
29 | 'copyright' => array('S', ''), |
||
30 | 'capabilities' => array( |
||
31 | 'F', |
||
32 | array( |
||
33 | 'authentication_failure_close' => array('t', true), |
||
34 | 'publisher_confirms' => array('t', true), |
||
35 | 'consumer_cancel_notify' => array('t', true), |
||
36 | 'exchange_exchange_bindings' => array('t', true), |
||
37 | 'basic.nack' => array('t', true), |
||
38 | 'connection.blocked' => array('t', true) |
||
39 | ) |
||
40 | ) |
||
41 | ); |
||
42 | |||
43 | /** |
||
44 | * @var AMQPChannel[] |
||
45 | * @internal |
||
46 | */ |
||
47 | public $channels = array(); |
||
48 | |||
49 | /** @var int */ |
||
50 | protected $version_major; |
||
51 | |||
52 | /** @var int */ |
||
53 | protected $version_minor; |
||
54 | |||
55 | /** @var array */ |
||
56 | protected $server_properties; |
||
57 | |||
58 | /** @var array */ |
||
59 | protected $mechanisms; |
||
60 | |||
61 | /** @var array */ |
||
62 | protected $locales; |
||
63 | |||
64 | /** @var bool */ |
||
65 | protected $wait_tune_ok; |
||
66 | |||
67 | /** @var string */ |
||
68 | protected $known_hosts; |
||
69 | |||
70 | /** @var AMQPReader */ |
||
71 | protected $input; |
||
72 | |||
73 | /** @var string */ |
||
74 | protected $vhost; |
||
75 | |||
76 | /** @var bool */ |
||
77 | protected $insist; |
||
78 | |||
79 | /** @var string */ |
||
80 | protected $login_method; |
||
81 | |||
82 | /** @var string */ |
||
83 | protected $login_response; |
||
84 | |||
85 | /** @var string */ |
||
86 | protected $locale; |
||
87 | |||
88 | /** @var int */ |
||
89 | protected $heartbeat; |
||
90 | |||
91 | /** @var float */ |
||
92 | protected $last_frame; |
||
93 | |||
94 | /** @var int */ |
||
95 | protected $channel_max = 65535; |
||
96 | |||
97 | /** @var int */ |
||
98 | protected $frame_max = 131072; |
||
99 | |||
100 | /** @var array Constructor parameters for clone */ |
||
101 | protected $construct_params; |
||
102 | |||
103 | /** @var bool Close the connection in destructor */ |
||
104 | protected $close_on_destruct = true; |
||
105 | |||
106 | /** @var bool Maintain connection status */ |
||
107 | protected $is_connected = false; |
||
108 | |||
109 | /** @var \PhpAmqpLib\Wire\IO\AbstractIO */ |
||
110 | protected $io; |
||
111 | |||
112 | /** @var \PhpAmqpLib\Wire\AMQPReader */ |
||
113 | protected $wait_frame_reader; |
||
114 | |||
115 | /** @var callable Handles connection blocking from the server */ |
||
116 | private $connection_block_handler; |
||
117 | |||
118 | /** @var callable Handles connection unblocking from the server */ |
||
119 | private $connection_unblock_handler; |
||
120 | |||
121 | /** @var int Connection timeout value*/ |
||
122 | protected $connection_timeout ; |
||
123 | |||
124 | /** |
||
125 | * Circular buffer to speed up prepare_content(). |
||
126 | * Max size limited by $prepare_content_cache_max_size. |
||
127 | * |
||
128 | * @var array |
||
129 | * @see prepare_content() |
||
130 | */ |
||
131 | private $prepare_content_cache = array(); |
||
132 | |||
133 | /** @var int Maximal size of $prepare_content_cache */ |
||
134 | private $prepare_content_cache_max_size = 100; |
||
135 | |||
136 | /** |
||
137 | * Maximum time to wait for channel operations, in seconds |
||
138 | * @var float $channel_rpc_timeout |
||
139 | */ |
||
140 | private $channel_rpc_timeout; |
||
141 | |||
142 | /** |
||
143 | * @param string $user |
||
144 | * @param string $password |
||
145 | * @param string $vhost |
||
146 | * @param bool $insist |
||
147 | * @param string $login_method |
||
148 | * @param null $login_response |
||
149 | * @param string $locale |
||
150 | * @param AbstractIO $io |
||
151 | * @param int $heartbeat |
||
152 | * @param int $connection_timeout |
||
153 | * @param float $channel_rpc_timeout |
||
154 | * @throws \Exception |
||
155 | */ |
||
156 | 186 | public function __construct( |
|
203 | |||
204 | /** |
||
205 | * Connects to the AMQP server |
||
206 | */ |
||
207 | 186 | protected function connect() |
|
264 | |||
265 | /** |
||
266 | * Reconnects using the original connection settings. |
||
267 | * This will not recreate any channels that were established previously |
||
268 | */ |
||
269 | 36 | public function reconnect() |
|
278 | |||
279 | /** |
||
280 | * Cloning will use the old properties to make a new connection to the same server |
||
281 | */ |
||
282 | public function __clone() |
||
286 | |||
287 | 12 | public function __destruct() |
|
293 | |||
294 | /** |
||
295 | * Attempts to close the connection safely |
||
296 | */ |
||
297 | 48 | protected function safeClose() |
|
307 | |||
308 | /** |
||
309 | * @param int $sec |
||
310 | * @param int $usec |
||
311 | * @return mixed |
||
312 | */ |
||
313 | View Code Duplication | public function select($sec, $usec = 0) |
|
325 | |||
326 | /** |
||
327 | * Allows to not close the connection |
||
328 | * it's useful after the fork when you don't want to close parent process connection |
||
329 | * |
||
330 | * @param bool $close |
||
331 | */ |
||
332 | public function set_close_on_destruct($close = true) |
||
336 | |||
337 | 150 | protected function close_input() |
|
346 | |||
347 | 150 | protected function close_socket() |
|
355 | |||
356 | /** |
||
357 | * @param string $data |
||
358 | */ |
||
359 | 180 | View Code Duplication | public function write($data) |
373 | |||
374 | 144 | protected function do_close() |
|
380 | |||
381 | /** |
||
382 | * @return int |
||
383 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
||
384 | */ |
||
385 | 174 | public function get_free_channel_id() |
|
395 | |||
396 | /** |
||
397 | * @param string $channel |
||
398 | * @param int $class_id |
||
399 | * @param int $weight |
||
400 | * @param int $body_size |
||
401 | * @param string $packed_properties |
||
402 | * @param string $body |
||
403 | * @param AMQPWriter $pkt |
||
404 | */ |
||
405 | 66 | public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null) |
|
410 | |||
411 | /** |
||
412 | * Returns a new AMQPWriter or mutates the provided $pkt |
||
413 | * |
||
414 | * @param string $channel |
||
415 | * @param int $class_id |
||
416 | * @param int $weight |
||
417 | * @param int $body_size |
||
418 | * @param string $packed_properties |
||
419 | * @param string $body |
||
420 | * @param AMQPWriter $pkt |
||
421 | * @return AMQPWriter |
||
422 | */ |
||
423 | 66 | public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null) |
|
478 | |||
479 | /** |
||
480 | * @param string $channel |
||
481 | * @param array $method_sig |
||
482 | * @param AMQPWriter|string $args |
||
483 | * @param null $pkt |
||
484 | */ |
||
485 | 180 | protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null) |
|
491 | |||
492 | /** |
||
493 | * Returns a new AMQPWriter or mutates the provided $pkt |
||
494 | * |
||
495 | * @param string $channel |
||
496 | * @param array $method_sig |
||
497 | * @param AMQPWriter|string $args |
||
498 | * @param AMQPWriter $pkt |
||
499 | * @return AMQPWriter |
||
500 | */ |
||
501 | 180 | protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null) |
|
524 | |||
525 | /** |
||
526 | * Waits for a frame from the server |
||
527 | * |
||
528 | * @param int|float|null $timeout |
||
529 | * @return array |
||
530 | * @throws \Exception |
||
531 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException |
||
532 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
||
533 | */ |
||
534 | 180 | protected function wait_frame($timeout = 0) |
|
589 | |||
590 | /** |
||
591 | * Waits for a frame from the server destined for a particular channel. |
||
592 | * |
||
593 | * @param string $channel_id |
||
594 | * @param int|float|null $timeout |
||
595 | * @return array |
||
596 | */ |
||
597 | 180 | protected function wait_channel($channel_id, $timeout = 0) |
|
652 | |||
653 | /** |
||
654 | * Fetches a channel object identified by the numeric channel_id, or |
||
655 | * create that object if it doesn't already exist. |
||
656 | * |
||
657 | * @param int $channel_id |
||
658 | * @return AMQPChannel |
||
659 | */ |
||
660 | 174 | public function channel($channel_id = null) |
|
672 | |||
673 | /** |
||
674 | * Requests a connection close |
||
675 | * |
||
676 | * @param int $reply_code |
||
677 | * @param string $reply_text |
||
678 | * @param array $method_sig |
||
679 | * @return mixed|null |
||
680 | */ |
||
681 | 144 | public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0)) |
|
682 | { |
||
683 | 144 | $result = null; |
|
684 | 144 | $this->io->disableHeartbeat(); |
|
685 | 144 | if (empty($this->protocolWriter) || !$this->isConnected()) { |
|
686 | 18 | return $result; |
|
687 | } |
||
688 | |||
689 | try { |
||
690 | 144 | $this->closeChannels(); |
|
691 | 144 | list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose( |
|
692 | 144 | $reply_code, |
|
693 | 120 | $reply_text, |
|
694 | 144 | $method_sig[0], |
|
695 | 144 | $method_sig[1] |
|
696 | 72 | ); |
|
697 | 144 | $this->send_method_frame(array($class_id, $method_id), $args); |
|
698 | 144 | $result = $this->wait( |
|
699 | 144 | array($this->waitHelper->get_wait('connection.close_ok')), |
|
700 | 144 | false, |
|
701 | 144 | $this->connection_timeout |
|
702 | 72 | ); |
|
703 | 72 | } catch (\Exception $exception) { |
|
704 | $this->do_close(); |
||
705 | throw $exception; |
||
706 | } |
||
707 | |||
708 | 144 | $this->setIsConnected(false); |
|
709 | |||
710 | 144 | return $result; |
|
711 | } |
||
712 | |||
713 | /** |
||
714 | * @param AMQPReader $reader |
||
715 | * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException |
||
716 | */ |
||
717 | protected function connection_close(AMQPReader $reader) |
||
718 | { |
||
719 | $reply_code = $reader->read_short(); |
||
720 | $reply_text = $reader->read_shortstr(); |
||
721 | $class_id = $reader->read_short(); |
||
722 | $method_id = $reader->read_short(); |
||
723 | |||
724 | $this->x_close_ok(); |
||
725 | |||
726 | throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id)); |
||
727 | } |
||
728 | |||
729 | /** |
||
730 | * Confirms a connection close |
||
731 | */ |
||
732 | protected function x_close_ok() |
||
733 | { |
||
734 | $this->send_method_frame( |
||
735 | explode(',', $this->waitHelper->get_wait('connection.close_ok')) |
||
736 | ); |
||
737 | $this->do_close(); |
||
738 | } |
||
739 | |||
740 | /** |
||
741 | * Confirm a connection close |
||
742 | * |
||
743 | * @param AMQPReader $args |
||
744 | */ |
||
745 | 144 | protected function connection_close_ok($args) |
|
746 | { |
||
747 | 144 | $this->do_close(); |
|
748 | 144 | } |
|
749 | |||
750 | /** |
||
751 | * @param string $virtual_host |
||
752 | * @param string $capabilities |
||
753 | * @param bool $insist |
||
754 | * @return mixed |
||
755 | */ |
||
756 | 180 | protected function x_open($virtual_host, $capabilities = '', $insist = false) |
|
757 | { |
||
758 | 180 | $args = new AMQPWriter(); |
|
759 | 180 | $args->write_shortstr($virtual_host); |
|
760 | 180 | $args->write_shortstr($capabilities); |
|
761 | 180 | $args->write_bits(array($insist)); |
|
762 | 180 | $this->send_method_frame(array(10, 40), $args); |
|
763 | |||
764 | $wait = array( |
||
765 | 180 | $this->waitHelper->get_wait('connection.open_ok') |
|
766 | 90 | ); |
|
767 | |||
768 | 180 | if ($this->protocolVersion == '0.8') { |
|
769 | $wait[] = $this->waitHelper->get_wait('connection.redirect'); |
||
770 | } |
||
771 | |||
772 | 180 | return $this->wait($wait); |
|
773 | } |
||
774 | |||
775 | /** |
||
776 | * Signals that the connection is ready |
||
777 | * |
||
778 | * @param AMQPReader $args |
||
779 | */ |
||
780 | 180 | protected function connection_open_ok($args) |
|
781 | { |
||
782 | 180 | $this->known_hosts = $args->read_shortstr(); |
|
783 | 180 | $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts); |
|
784 | 180 | } |
|
785 | |||
786 | /** |
||
787 | * Asks the client to use a different server |
||
788 | * |
||
789 | * @param AMQPReader $args |
||
790 | * @return string |
||
791 | */ |
||
792 | protected function connection_redirect($args) |
||
793 | { |
||
794 | $host = $args->read_shortstr(); |
||
795 | $this->known_hosts = $args->read_shortstr(); |
||
796 | $this->debug->debug_msg(sprintf( |
||
797 | 'Redirected to [%s], known_hosts [%s]', |
||
798 | $host, |
||
799 | $this->known_hosts |
||
800 | )); |
||
801 | |||
802 | return $host; |
||
803 | } |
||
804 | |||
805 | /** |
||
806 | * Security mechanism challenge |
||
807 | * |
||
808 | * @param AMQPReader $args |
||
809 | */ |
||
810 | protected function connection_secure($args) |
||
811 | { |
||
812 | $challenge = $args->read_longstr(); |
||
813 | } |
||
814 | |||
815 | /** |
||
816 | * Security mechanism response |
||
817 | * |
||
818 | * @param string $response |
||
819 | */ |
||
820 | protected function x_secure_ok($response) |
||
821 | { |
||
822 | $args = new AMQPWriter(); |
||
823 | $args->write_longstr($response); |
||
824 | $this->send_method_frame(array(10, 21), $args); |
||
825 | } |
||
826 | |||
827 | /** |
||
828 | * Starts connection negotiation |
||
829 | * |
||
830 | * @param AMQPReader $args |
||
831 | */ |
||
832 | 180 | protected function connection_start($args) |
|
833 | { |
||
834 | 180 | $this->version_major = $args->read_octet(); |
|
835 | 180 | $this->version_minor = $args->read_octet(); |
|
836 | 180 | $this->server_properties = $args->read_table(); |
|
837 | 180 | $this->mechanisms = explode(' ', $args->read_longstr()); |
|
838 | 180 | $this->locales = explode(' ', $args->read_longstr()); |
|
839 | |||
840 | 180 | $this->debug->debug_connection_start( |
|
841 | 180 | $this->version_major, |
|
842 | 180 | $this->version_minor, |
|
843 | 180 | $this->server_properties, |
|
844 | 180 | $this->mechanisms, |
|
845 | 180 | $this->locales |
|
846 | 90 | ); |
|
847 | 180 | } |
|
848 | |||
849 | /** |
||
850 | * @param AMQPTable|array $clientProperties |
||
851 | * @param string $mechanism |
||
852 | * @param string $response |
||
853 | * @param string $locale |
||
854 | */ |
||
855 | 180 | protected function x_start_ok($clientProperties, $mechanism, $response, $locale) |
|
856 | { |
||
857 | 180 | $args = new AMQPWriter(); |
|
858 | 180 | $args->write_table($clientProperties); |
|
859 | 180 | $args->write_shortstr($mechanism); |
|
860 | 180 | $args->write_longstr($response); |
|
861 | 180 | $args->write_shortstr($locale); |
|
862 | 180 | $this->send_method_frame(array(10, 11), $args); |
|
863 | 180 | } |
|
864 | |||
865 | /** |
||
866 | * Proposes connection tuning parameters |
||
867 | * |
||
868 | * @param AMQPReader $args |
||
869 | */ |
||
870 | 180 | protected function connection_tune($args) |
|
871 | { |
||
872 | 180 | $v = $args->read_short(); |
|
873 | 180 | if ($v) { |
|
874 | $this->channel_max = $v; |
||
875 | } |
||
876 | |||
877 | 180 | $v = $args->read_long(); |
|
878 | 180 | if ($v) { |
|
879 | 180 | $this->frame_max = $v; |
|
880 | 90 | } |
|
881 | |||
882 | // use server proposed value if not set |
||
883 | 180 | if ($this->heartbeat === null) { |
|
884 | $this->heartbeat = $args->read_short(); |
||
885 | } |
||
886 | |||
887 | 180 | $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat); |
|
888 | 180 | } |
|
889 | |||
890 | /** |
||
891 | * Negotiates connection tuning parameters |
||
892 | * |
||
893 | * @param int $channel_max |
||
894 | * @param int $frame_max |
||
895 | * @param int $heartbeat |
||
896 | */ |
||
897 | 180 | protected function x_tune_ok($channel_max, $frame_max, $heartbeat) |
|
898 | { |
||
899 | 180 | $args = new AMQPWriter(); |
|
900 | 180 | $args->write_short($channel_max); |
|
901 | 180 | $args->write_long($frame_max); |
|
902 | 180 | $args->write_short($heartbeat); |
|
903 | 180 | $this->send_method_frame(array(10, 31), $args); |
|
904 | 180 | $this->wait_tune_ok = false; |
|
905 | 180 | } |
|
906 | |||
907 | /** |
||
908 | * @return resource |
||
909 | * @deprecated No direct access to communication socket should be available. |
||
910 | */ |
||
911 | public function getSocket() |
||
915 | |||
916 | /** |
||
917 | * @return \PhpAmqpLib\Wire\IO\AbstractIO |
||
918 | * @deprecated |
||
919 | */ |
||
920 | public function getIO() |
||
921 | { |
||
922 | return $this->io; |
||
923 | } |
||
924 | |||
925 | /** |
||
926 | * Handles connection blocked notifications |
||
927 | * |
||
928 | * @param AMQPReader $args |
||
929 | */ |
||
930 | protected function connection_blocked(AMQPReader $args) |
||
935 | |||
936 | /** |
||
937 | * Handles connection unblocked notifications |
||
938 | * |
||
939 | * @param AMQPReader $args |
||
940 | */ |
||
941 | protected function connection_unblocked(AMQPReader $args) |
||
946 | |||
947 | /** |
||
948 | * Sets a handler which is called whenever a connection.block is sent from the server |
||
949 | * |
||
950 | * @param callable $callback |
||
951 | */ |
||
952 | public function set_connection_block_handler($callback) |
||
956 | |||
957 | /** |
||
958 | * Sets a handler which is called whenever a connection.block is sent from the server |
||
959 | * |
||
960 | * @param callable $callback |
||
961 | */ |
||
962 | public function set_connection_unblock_handler($callback) |
||
966 | |||
967 | /** |
||
968 | * Gets the connection status |
||
969 | * |
||
970 | * @return bool |
||
971 | */ |
||
972 | 186 | public function isConnected() |
|
976 | |||
977 | /** |
||
978 | * Set the connection status |
||
979 | * |
||
980 | * @param bool $is_connected |
||
981 | */ |
||
982 | 186 | protected function setIsConnected($is_connected) |
|
986 | |||
987 | /** |
||
988 | * Closes all available channels |
||
989 | */ |
||
990 | 150 | protected function closeChannels() |
|
1004 | |||
1005 | /** |
||
1006 | * Should the connection be attempted during construction? |
||
1007 | * |
||
1008 | * @return bool |
||
1009 | */ |
||
1010 | 162 | public function connectOnConstruct() |
|
1014 | |||
1015 | /** |
||
1016 | * @return array |
||
1017 | */ |
||
1018 | public function getServerProperties() |
||
1022 | |||
1023 | /** |
||
1024 | * Get the library properties for populating the client protocol information |
||
1025 | * |
||
1026 | * @return array |
||
1027 | */ |
||
1028 | 192 | public function getLibraryProperties() |
|
1032 | |||
1033 | public static function create_connection($hosts, $options = array()){ |
||
1051 | |||
1052 | public static function validate_host($host) { |
||
1066 | } |
||
1067 |
Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.
Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..