aimeos /
aimeos-base
| 1 | <?php |
||||
| 2 | |||||
| 3 | /** |
||||
| 4 | * @license LGPLv3, https://opensource.org/licenses/LGPL-3.0 |
||||
| 5 | * @copyright Aimeos (aimeos.org), 2017-2025 |
||||
| 6 | * @package Base |
||||
| 7 | * @subpackage Process |
||||
| 8 | */ |
||||
| 9 | |||||
| 10 | |||||
| 11 | namespace Aimeos\Base\Process; |
||||
| 12 | |||||
| 13 | |||||
| 14 | /** |
||||
| 15 | * Posix process control for parallel processing classes |
||||
| 16 | * |
||||
| 17 | * @package Base |
||||
| 18 | * @subpackage Process |
||||
| 19 | */ |
||||
| 20 | class Pcntl implements Iface |
||||
| 21 | { |
||||
| 22 | private int $max; |
||||
| 23 | private int $prio; |
||||
| 24 | private array $list = []; |
||||
| 25 | |||||
| 26 | |||||
| 27 | /** |
||||
| 28 | * Initializes the object and sets up the signal handler |
||||
| 29 | * |
||||
| 30 | * @param int $max Maximum number of tasks allowed to run in parallel |
||||
| 31 | * @param int $prio Task priority from -20 (high) to 20 (low) |
||||
| 32 | * @throws \Aimeos\Base\Process\Exception If setting up the signal handler failed |
||||
| 33 | */ |
||||
| 34 | public function __construct( int $max = 4, int $prio = 19 ) |
||||
| 35 | { |
||||
| 36 | $this->max = $max; |
||||
| 37 | $this->prio = $prio; |
||||
| 38 | |||||
| 39 | if( $this->isAvailable() ) |
||||
| 40 | { |
||||
| 41 | $handler = function( $signo ) { |
||||
| 42 | |||||
| 43 | foreach( $this->list as $pid => $entry ) |
||||
| 44 | { |
||||
| 45 | if( function_exists( 'posix_kill' ) ) { |
||||
| 46 | posix_kill( $pid, $signo ); |
||||
| 47 | } |
||||
| 48 | |||||
| 49 | $status = 0; |
||||
| 50 | pcntl_waitpid( $pid, $status ); |
||||
| 51 | } |
||||
| 52 | |||||
| 53 | exit( 0 ); |
||||
|
0 ignored issues
–
show
|
|||||
| 54 | }; |
||||
| 55 | |||||
| 56 | if( pcntl_signal( SIGTERM, $handler ) === false ) { |
||||
| 57 | throw new Exception( 'Unable to install signal handler: ' . pcntl_strerror( pcntl_get_last_error() ) ); |
||||
|
0 ignored issues
–
show
Are you sure
pcntl_strerror(pcntl_get_last_error()) of type string|true can be used in concatenation?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 58 | } |
||||
| 59 | } |
||||
| 60 | } |
||||
| 61 | |||||
| 62 | |||||
| 63 | /** |
||||
| 64 | * Clears the cloned object |
||||
| 65 | */ |
||||
| 66 | public function __clone() |
||||
| 67 | { |
||||
| 68 | $this->list = []; |
||||
| 69 | } |
||||
| 70 | |||||
| 71 | |||||
| 72 | /** |
||||
| 73 | * Checks if processing tasks in parallel is available |
||||
| 74 | * |
||||
| 75 | * @return bool True if available, false if not |
||||
| 76 | */ |
||||
| 77 | public function isAvailable() : bool |
||||
| 78 | { |
||||
| 79 | if( php_sapi_name() === 'cli' && $this->max > 0 |
||||
| 80 | && function_exists( 'pcntl_fork' ) && function_exists( 'pcntl_wait' ) |
||||
| 81 | && function_exists( 'pcntl_signal' ) && function_exists( 'pcntl_waitpid' ) |
||||
| 82 | && function_exists( 'pcntl_setpriority' ) |
||||
| 83 | ) { |
||||
| 84 | return true; |
||||
| 85 | } |
||||
| 86 | |||||
| 87 | return false; |
||||
| 88 | } |
||||
| 89 | |||||
| 90 | |||||
| 91 | /** |
||||
| 92 | * Starts a new task by executing the given anonymous function |
||||
| 93 | * |
||||
| 94 | * @param \Closure $fcn Anonymous function to execute |
||||
| 95 | * @param array $data List of parameters that is passed to the closure function |
||||
| 96 | * @param bool $restart True if the task should be restarted if it fails (only once) |
||||
| 97 | * @return \Aimeos\Base\Process\Iface Self object for method chaining |
||||
| 98 | * @throws \Aimeos\Base\Process\Exception If starting the new task failed |
||||
| 99 | */ |
||||
| 100 | public function start( \Closure $fcn, array $data, bool $restart = false ) : Iface |
||||
| 101 | { |
||||
| 102 | while( count( $this->list ) >= $this->max ) { |
||||
| 103 | $this->waitOne(); |
||||
| 104 | } |
||||
| 105 | |||||
| 106 | $data = $this->copy( $data ); |
||||
| 107 | flush(); // flush all pending output so it's not printed in childs again |
||||
| 108 | |||||
| 109 | if( ( $pid = pcntl_fork() ) === -1 ) { |
||||
| 110 | throw new Exception( 'Unable to fork new process: ' . pcntl_strerror( pcntl_get_last_error() ) ); |
||||
|
0 ignored issues
–
show
Are you sure
pcntl_strerror(pcntl_get_last_error()) of type string|true can be used in concatenation?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 111 | } |
||||
| 112 | |||||
| 113 | if( $pid === 0 ) // child process |
||||
| 114 | { |
||||
| 115 | $this->list = []; // use own child process list |
||||
| 116 | exit( $this->exec( $fcn, $data ) ); |
||||
|
0 ignored issues
–
show
|
|||||
| 117 | } |
||||
| 118 | |||||
| 119 | $this->list[$pid] = [$fcn, $data, $restart]; |
||||
| 120 | |||||
| 121 | return $this; |
||||
| 122 | } |
||||
| 123 | |||||
| 124 | |||||
| 125 | /** |
||||
| 126 | * Waits for the running tasks until all have finished |
||||
| 127 | * |
||||
| 128 | * @return \Aimeos\Base\Process\Iface Self object for method chaining |
||||
| 129 | */ |
||||
| 130 | public function wait() : Iface |
||||
| 131 | { |
||||
| 132 | while( !empty( $this->list ) ) { |
||||
| 133 | $this->waitOne(); |
||||
| 134 | } |
||||
| 135 | |||||
| 136 | return $this; |
||||
| 137 | } |
||||
| 138 | |||||
| 139 | |||||
| 140 | /** |
||||
| 141 | * Clone all objects in the function parameter list |
||||
| 142 | * |
||||
| 143 | * @param array $data Function parameter list |
||||
| 144 | * @return array Function parameter list with cloned objects |
||||
| 145 | */ |
||||
| 146 | protected function copy( array $data ) : array |
||||
| 147 | { |
||||
| 148 | foreach( $data as $key => $value ) |
||||
| 149 | { |
||||
| 150 | if( is_object( $value ) ) |
||||
| 151 | { |
||||
| 152 | $value = clone $value; |
||||
| 153 | |||||
| 154 | if( method_exists( $value, '__sleep' ) ) { |
||||
| 155 | $value->__sleep(); |
||||
| 156 | } |
||||
| 157 | |||||
| 158 | $data[$key] = $value; |
||||
| 159 | } |
||||
| 160 | } |
||||
| 161 | |||||
| 162 | return $data; |
||||
| 163 | } |
||||
| 164 | |||||
| 165 | |||||
| 166 | /** |
||||
| 167 | * Executes the worker function |
||||
| 168 | * |
||||
| 169 | * @param \Closure $fcn Worker function |
||||
| 170 | * @param array $data Function parameter list |
||||
| 171 | * @return int Process error code |
||||
| 172 | */ |
||||
| 173 | protected function exec( \Closure $fcn, array $data ) : int |
||||
| 174 | { |
||||
| 175 | pcntl_setpriority( $this->prio ); |
||||
| 176 | |||||
| 177 | if( ob_get_level() > 0 ) { |
||||
| 178 | ob_clean(); // avoid printing buffered messages of the parent again |
||||
| 179 | } |
||||
| 180 | |||||
| 181 | call_user_func_array( $fcn, $data ); |
||||
| 182 | return 0; |
||||
| 183 | } |
||||
| 184 | |||||
| 185 | |||||
| 186 | /** |
||||
| 187 | * Waits for the next running tasks to finish |
||||
| 188 | * |
||||
| 189 | * @throws \Aimeos\Base\Process\Exception If an error occurs or the task exited with an error |
||||
| 190 | */ |
||||
| 191 | protected function waitOne() |
||||
| 192 | { |
||||
| 193 | $status = -1; |
||||
| 194 | |||||
| 195 | if( ( $pid = pcntl_wait( $status ) ) === -1 ) { |
||||
| 196 | throw new Exception( 'Unable to wait for child process: ' . pcntl_strerror( pcntl_get_last_error() ) ); |
||||
|
0 ignored issues
–
show
Are you sure
pcntl_strerror(pcntl_get_last_error()) of type string|true can be used in concatenation?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 197 | } |
||||
| 198 | |||||
| 199 | list( $fcn, $data, $restart ) = $this->list[$pid]; |
||||
| 200 | unset( $this->list[$pid] ); |
||||
| 201 | |||||
| 202 | if( $status > 0 && $restart !== false ) { |
||||
| 203 | $this->start( $fcn, $data, false ); |
||||
| 204 | } |
||||
| 205 | } |
||||
| 206 | } |
||||
| 207 |
In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.