Work::run()   A
last analyzed

Complexity

Conditions 4
Paths 6

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
nc 6
nop 0
dl 0
loc 12
rs 9.8666
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * Work
5
 *
6
 * Cooperative multitasking via coroutines.
7
 *
8
 * @package core
9
 * @author [email protected]
10
 * @reference http://nikic.github.io/2012/12/22/Cooperative-multitasking-using-coroutines-in-PHP.html
11
 * @copyright Caffeina srl - 2015 - http://caffeina.com
12
 */
13
14
class Work {
15
  use Module;
16
17
  protected static $pool = null;
18
  protected static $workers;
19
  protected static $lastID = 0;
20
21
  public static function add($id, $job=null){
22
    self::$pool or ( self::$pool = new \SplQueue() );
23
    if(is_callable($id) && $job===null){
24
      $job = $id;
25
      $id = ++self::$lastID;
26
    }
27
    $task = new TaskCoroutine($id, $job instanceof \Generator ? $job : $job());
28
    self::$workers[$id] = $task;
29
    self::$pool->enqueue($task);
30
    return $task;
31
  }
32
33
  public static function send($id,$passValue) {
34
     isset(self::$workers[$id]) && self::$workers[$id]->pass($passValue);
35
  }
36
37
  public static function run(){
38
    self::$pool or ( self::$pool = new \SplQueue() );
39
    while (!self::$pool->isEmpty()) {
40
      $task = self::$pool->dequeue();
41
      $task->run();
42
      if ($task->complete()) {
43
         unset(self::$workers[$task->id()]);
44
      } else {
45
          self::$pool->enqueue($task);
46
      }
47
    }
48
  }
49
50
  /**
51
   * Defer callback execution after script execution
52
   * @param callable $callback The deferred callback
53
   */
54
  public static function after(callable $callback){
55
    static::$inited_shutdown || static::install_shutdown();
56
    Event::on('core.shutdown', $callback);
57
  }
58
59
  /**
60
   * Single shot defer handeler install
61
   */
62
  protected static function install_shutdown(){
63
    if (static::$inited_shutdown) return;
64
    
65
    // Disable time limit
66
    set_time_limit(0);
67
    
68
    // HHVM support
69
    if(function_exists('register_postsend_function')){
70
      register_postsend_function(function(){
71
        Event::trigger('core.shutdown');
72
      });
73
    } else if(function_exists('fastcgi_finish_request')) {
74
      register_shutdown_function(function(){
75
        fastcgi_finish_request();
76
        Event::trigger('core.shutdown');
77
      });       
78
    } else {
79
      register_shutdown_function(function(){
80
        Event::trigger('core.shutdown');
81
      });
82
    }
83
84
    static::$inited_shutdown = true;
85
  }
86
}
87
88
class TaskCoroutine {
89
90
    protected $id;
91
    protected $coroutine;
92
    protected $passValue = null;
93
    protected $beforeFirstYield = true;
94
    protected static $inited_shutdown = false;
95
96
    public function __construct($id, \Generator $coroutine) {
97
        $this->id = $id;
98
        $this->coroutine = $coroutine;
99
    }
100
101
    public function id() {
102
        return $this->id;
103
    }
104
105
    public function pass($passValue) {
106
        $this->passValue = $passValue;
107
    }
108
109
    public function run() {
110
        if ($this->beforeFirstYield) {
111
            $this->beforeFirstYield = false;
112
            return $this->coroutine->current();
113
        } else {
114
            $retval = $this->coroutine->send($this->passValue);
115
            $this->passValue = null;
116
            return $retval;
117
        }
118
    }
119
120
    public function complete() {
121
        return ! $this->coroutine->valid();
122
    }
123
124
}
125