亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

thinkphp6使用think-queue怎么實現普通隊列和延遲隊列

發布時間:2022-04-20 13:37:22 來源:億速云 閱讀:697 作者:zzz 欄目:編程語言

本文小編為大家詳細介紹“thinkphp6使用think-queue怎么實現普通隊列和延遲隊列”,內容詳細,步驟清晰,細節處理妥當,希望這篇“thinkphp6使用think-queue怎么實現普通隊列和延遲隊列”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。

###TP6 隊列

TP6 中使用 think-queue 可以實現普通隊列和延遲隊列。

think-queue 是thinkphp 官方提供的一個消息隊列服務,它支持消息隊列的一些基本特性:

  • 消息的發布,獲取,執行,刪除,重發,失敗處理,延遲執行,超時控制等

  • 隊列的多隊列, 內存限制 ,啟動,停止,守護等

  • 消息隊列可降級為同步執行

消息隊列實現過程

1、通過生產者推送消息到消息隊列服務中

2、消息隊列服務將收到的消息存入redis隊列中(zset)

3、消費者進行監聽隊列,當監聽到隊列有新的消息時,獲取隊列第一條

4、處理獲取下來的消息調用業務類進行處理相關業務

5、業務處理后,需要從隊列中刪除消息

composer 安裝 think-queue

composer require topthink/think-queue

配置文件

安裝完 think-queue 后會在 config 目錄中生成 queue.php,這個文件是隊列的配置文件。

tp6中提供了多種消息隊列的實現方式,默認使用sync,我這里選擇使用Redis。

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => env('redis.host', '127.0.0.1'),
            'port'       => env('redis.port', '6379'),
            'password'   => env('redis.password','123456'),
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

創建目錄及隊列消費類文件

在 app 目錄下創建 queue 目錄,然后在該目錄下新建一個抽象類 Queue.php 文件,作為基礎類

<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/**
 * Class Queue 隊列消費基礎類
 * @package app\queue
 */abstract class Queue{
    /**
     * @describe:fire是消息隊列默認調用的方法
     * @param \think\queue\Job $job
     * @param $message
     */
    public function fire(Job $job, $data)
    {
        if (empty($data)) {
            Log::error(sprintf('[%s][%s] 隊列無消息', __CLASS__, __FUNCTION__));
            return ;
        }

        $jobId = $job->getJobId(); // 隊列的數據庫id或者redis key
        // $jobClassName = $job->getName(); // 隊列對象類
        // $queueName = $job->getQueue(); // 隊列名稱

        // 如果已經執行中或者執行完成就不再執行了
        if (!$this->checkJob($jobId, $data)) {
            $job->delete();
            Cache::store('redis')->delete($jobId);
            return ;
        }

        // 執行業務處理
        if ($this->execute($data)) {
            Log::record(sprintf('[%s][%s] 隊列執行成功', __CLASS__, __FUNCTION__));
            $job->delete(); // 任務執行成功后刪除
            Cache::store('redis')->delete($jobId); // 刪除redis中的緩存
        } else {
            // 檢查任務重試次數
            if ($job->attempts() > 3) {
                Log::error(sprintf('[%s][%s] 隊列執行重試次數超過3次,執行失敗', __CLASS__, __FUNCTION__));
                 // 第1種處理方式:重新發布任務,該任務延遲10秒后再執行;也可以不指定秒數立即執行
                //$job->release(10); 
                // 第2種處理方式:原任務的基礎上1分鐘執行一次并增加嘗試次數
                //$job->failed();   
                // 第3種處理方式:刪除任務
                $job->delete(); // 任務執行后刪除
                Cache::store('redis')->delete($jobId); // 刪除redis中的緩存
            }
        }
    }

    /**
     * 消息在到達消費者時可能已經不需要執行了
     * @param  string  $jobId
     * @param $message
     * @return bool 任務執行的結果
     * @throws \Psr\SimpleCache\InvalidArgumentException
     */
    protected function checkJob(string $jobId, $message): bool
    {
        // 查詢redis
        $data = Cache::store('redis')->get($jobId);
        if (!empty($data)) {
            return false;
        }
        Cache::store('redis')->set($jobId, $message);
        return true;
    }

    /**
     * @describe: 根據消息中的數據進行實際的業務處理
     * @param $data 數據
     * @return bool 返回結果
     */
    abstract protected function execute($data): bool;}

所有真正的消費類繼承基礎抽象類

<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{
    protected function execute($data): bool
    {
       // 具體消費業務邏輯
    }}

生產者邏輯

use think\facade\Queue;

// 普通隊列生成調用方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);

// 延時隊列生成調用方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延時隊列 10 秒后執行:
Queue::later(10 , Test::class, $data, $queueName);

開啟進程監聽任務并執行

php think queue:listen
php think queue:work

命令模式介紹

命令模式
  • queue:work 命令

    work 命令: 該命令將啟動一個 work 進程來處理消息隊列。

    php think queue:work --queue TestQueue
  • queue:listen 命令

    listen 命令: 該命令將會創建一個 listen 父進程 ,然后由父進程通過 proc_open(‘php think queue:work’) 的方式來創建一個work 子 進程來處理消息隊列,且限制該work進程的執行時間。

    php think queue:listen --queue TestQueue
命令行參數
  • Work 模式

    php think queue:work \
    --daemon            //是否循環執行,如果不加該參數,則該命令處理完下一個消息就退出
    --queue  helloJobQueue  //要處理的隊列的名稱
    --delay  0 \        //如果本次任務執行拋出異常且任務未被刪除時,設置其下次執行前延遲多少秒,默認為0
    --force  \          //系統處于維護狀態時是否仍然處理任務,并未找到相關說明
    --memory 128 \      //該進程允許使用的內存上限,以 M 為單位
    --sleep  3 \        //如果隊列中無任務,則sleep多少秒后重新檢查(work+daemon模式)或者退出(listen或非daemon模式)
    --tries  2          //如果任務已經超過嘗試次數上限,則觸發‘任務嘗試次數超限’事件,默認為0
  • Listen 模式

    php think queue:listen \
    --queue  helloJobQueue \   //監聽的隊列的名稱
    --delay  0 \         //如果本次任務執行拋出異常且任務未被刪除時,設置其下次執行前延遲多少秒,默認為0
    --memory 128 \       //該進程允許使用的內存上限,以 M 為單位
    --sleep  3 \         //如果隊列中無任務,則多長時間后重新檢查,daemon模式下有效
    --tries  0 \         //如果任務已經超過重發次數上限,則進入失敗處理邏輯,默認為0
    --timeout 60         //創建的work子進程的允許執行的最長時間,以秒為單位

    可以看到 listen 模式下,不包含 --deamon 參數,原因下面會說明

  • 消息隊列的開始,停止與重啟

    • 開始一個消息隊列:

      php think queue:work
    • 停止所有的消息隊列:

      php think queue:restart
    • 重啟所有的消息隊列:

      php think queue:restart 
      php think queue:work

讀到這里,這篇“thinkphp6使用think-queue怎么實現普通隊列和延遲隊列”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

商南县| 常宁市| 新民市| 新竹市| 青冈县| 静安区| 怀安县| 乌兰县| 柳江县| 姜堰市| 淳化县| 南靖县| 商南县| 亳州市| 双桥区| 城市| 南陵县| 阳东县| 克东县| 监利县| 渝北区| 马关县| 岚皋县| 岢岚县| 江源县| 颍上县| 渑池县| 伊金霍洛旗| 扎兰屯市| 休宁县| 出国| 重庆市| 城步| 凤山县| 吴江市| 塘沽区| 观塘区| 项城市| 二手房| 台山市| 甘谷县|