亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

使用swoole_process怎么實現一個進程池

發布時間:2021-04-12 17:23:48 來源:億速云 閱讀:379 作者:Leah 欄目:開發技術

使用swoole_process怎么實現一個進程池?很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

swoole —— 重新定義PHP

swoole 的進程之間有兩種通信方式,一種是消息隊列(queue),另一種是管道(pipe),對swoole_process 的研究在swoole中顯得尤為重要。

預備知識

IO多路復用

swoole 中的io多路復用表現為底層的 epoll進程模型,在C語言中表現為 epoll 函數。

  • epoll 模型下會持續監聽自己名下的素有socket 描述符 fd

  • 當觸發了 socket 監聽的事件時,epoll 函數才會響應,并返回所有監聽該時間的 socket 集合

  • epoll 的本質是阻塞IO,它的優點在于能同事處理大量socket連接

Event loop 事件循環

swoole 對 epoll 實現了一個Reactor線程模型封裝,設置了read事件和write事件的監聽回調函數。(詳見swoole_event_add)

  • Event loop 是一個Reactor線程,其中運行了一個epoll實例。

  • 通過swoole_event_add將socket描述符的一個事件添加到epoll監聽中,事件發生時將執行回調函數

  • 不可用于fpm環境下,因為fpm在任務結束時可能會關掉進程。

swoole_process

  • 基于C語言封裝的進程管理模塊,方便php來調用

  • 內置管道、消息隊列接口,方便實現進程間通信

我們在php-fpm.conf配置文件中發現,php-fpm中有兩種進程池管理設置。

  • 靜態模式 即初始化固定的進程數,當來了一個請求時,從中選取一個進程來處理。

  • 動態模式 指定最小、最大進程數,當請求量過大,進程數不超過最大限制時,新增線程去處理請求

接下來用swoole代碼來實現,這里只是為理解swoole_process、進程間通信、定時器等使用,實際情況使用封裝好的swoole_server來實現task任務隊列池會更方便。

假如有個定時投遞的任務隊列:

<?php

/**
 * 動態進程池,類似fpm
 * 動態新建進程
 * 有初始進程數,最小進程數,進程不夠處理時候新建進程,不超過最大進程數
 */

// 一個進程定時投遞任務

/**
 * 1. tick
 * 2. process及其管道通訊
 * 3. event loop 事件循環
 */
class processPool
{
  private $pool;

  /**
   * @var swoole_process[] 記錄所有worker的process對象
   */
  private $workers = [];

  /**
   * @var array 記錄worker工作狀態
   */
  private $used_workers = [];

  /**
   * @var int 最小進程數
   */
  private $min_woker_num = 5;

  /**
   * @var int 初始進程數
   */
  private $start_worker_num = 10;

  /**
   * @var int 最大進程數
   */
  private $max_woker_num = 20;

  /**
   * 進程閑置銷毀秒數
   * @var int
   */
  private $idle_seconds = 5;

  /**
   * @var int 當前進程數
   */
  private $curr_num;

  /**
   * 閑置進程時間戳
   * @var array
   */
  private $active_time = [];

  public function __construct()
  {
    $this->pool = new swoole_process(function () {
      // 循環建立worker進程
      for ($i = 0; $i < $this->start_worker_num; $i++) {
        $this->createWorker();
      }
      echo '初始化進程數:' . $this->curr_num . PHP_EOL;
      // 每秒定時往閑置的worker的管道中投遞任務
      swoole_timer_tick(1000, function ($timer_id) {
        static $count = 0;
        $count++;
        $need_create = true;
        foreach ($this->used_workers as $pid => $used) {
          if ($used == 0) {
            $need_create = false;
            $this->workers[$pid]->write($count . ' job');
            // 標記使用中
            $this->used_workers[$pid] = 1;
            $this->active_time[$pid] = time();
            break;
          }
        }
        foreach ($this->used_workers as $pid => $used)
          // 如果所有worker隊列都沒有閑置的,則新建一個worker來處理
          if ($need_create && $this->curr_num < $this->max_woker_num) {
            $new_pid = $this->createWorker();
            $this->workers[$new_pid]->write($count . ' job');
            $this->used_workers[$new_pid] = 1;
            $this->active_time[$new_pid] = time();
          }

        // 閑置超過一段時間則銷毀進程
        foreach ($this->active_time as $pid => $timestamp) {
          if ((time() - $timestamp) > $this->idle_seconds && $this->curr_num > $this->min_woker_num) {
            // 銷毀該進程
            if (isset($this->workers[$pid]) && $this->workers[$pid] instanceof swoole_process) {
              $this->workers[$pid]->write('exit');
              unset($this->workers[$pid]);
              $this->curr_num = count($this->workers);
              unset($this->used_workers[$pid]);
              unset($this->active_time[$pid]);
              echo "{$pid} destroyed\n";
              break;
            }
          }
        }

        echo "任務{$count}/{$this->curr_num}\n";

        if ($count == 20) {
          foreach ($this->workers as $pid => $worker) {
            $worker->write('exit');
          }
          // 關閉定時器
          swoole_timer_clear($timer_id);
          // 退出進程池
          $this->pool->exit(0);
          exit();
        }
      });

    });

    $master_pid = $this->pool->start();
    echo "Master $master_pid start\n";

    while ($ret = swoole_process::wait()) {
      $pid = $ret['pid'];
      echo "process {$pid} existed\n";
    }
  }

  /**
   * 創建一個新進程
   * @return int 新進程的pid
   */
  public function createWorker()
  {
    $worker_process = new swoole_process(function (swoole_process $worker) {
      // 給子進程管道綁定事件
      swoole_event_add($worker->pipe, function ($pipe) use ($worker) {
        $data = trim($worker->read());
        if ($data == 'exit') {
          $worker->exit(0);
          exit();
        }
        echo "{$worker->pid} 正在處理 {$data}\n";
        sleep(5);
        // 返回結果,表示空閑
        $worker->write("complete");
      });
    });

    $worker_pid = $worker_process->start();

    // 給父進程管道綁定事件
    swoole_event_add($worker_process->pipe, function ($pipe) use ($worker_process) {
      $data = trim($worker_process->read());
      if ($data == 'complete') {
        // 標記為空閑
//        echo "{$worker_process->pid} 空閑了\n";
        $this->used_workers[$worker_process->pid] = 0;
      }
    });

    // 保存process對象
    $this->workers[$worker_pid] = $worker_process;
    // 標記為空閑
    $this->used_workers[$worker_pid] = 0;
    $this->active_time[$worker_pid] = time();
    $this->curr_num = count($this->workers);
    return $worker_pid;
  }

}

new processPool();

看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

肥东县| 叙永县| 乌什县| 南汇区| 广安市| 赤水市| 济南市| 乡宁县| 弋阳县| 大港区| 西华县| 仁怀市| 萝北县| 永福县| 澎湖县| 长治市| 郓城县| 渝中区| 卢龙县| 乌拉特中旗| 山东省| 四会市| 上蔡县| 保康县| 潼南县| 汶上县| 永安市| 贵溪市| 甘南县| 体育| 广元市| 白玉县| 梅州市| 龙海市| 吴桥县| 罗城| 宁陵县| 沈丘县| 和平区| 龙门县| 涿鹿县|