亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

redis怎么實現隊列阻塞、延時、發布和訂閱

發布時間:2022-06-09 14:49:24 來源:億速云 閱讀:166 作者:iii 欄目:開發技術

這篇“redis怎么實現隊列阻塞、延時、發布和訂閱”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“redis怎么實現隊列阻塞、延時、發布和訂閱”文章吧。

Redis不僅可作為緩存服務器,還可以用作消息隊列。它的列表類型天生支持用作消息隊列。如下圖所示:

redis怎么實現隊列阻塞、延時、發布和訂閱

由于Redis的列表是使用雙向鏈表實現的,保存了頭節點和尾節點,所以在列表的頭部和尾部兩邊插入或獲取元素都是非常快的,時間復雜度為O(1)。

普通隊列

可以直接使用Redis的list數據類型實現消息隊列,只需簡單的兩個指令lpush和rpop或者rpush和lpop。

  • lpush+rpop:左進右出的隊列

  • rpush+lpop:左出右進的隊列

下面使用redis的命令來模擬普通隊列。
使用lpush命令生產消息:

>lpush queue:single 1
"1"
>lpush queue:single 2
"2"
>lpush queue:single 3
"3"

使用rpop命令消費消息:

>rpop queue:single
"1"
>rpop queue:single
"2"
>rpop queue:single
"3"

下面使用Java代碼來實現普通隊列。

生產者SingleProducer

package com.morris.redis.demo.queue.single;

import redis.clients.jedis.Jedis;

/**
 * 生產者
 */
public class SingleProducer {

    public static final String SINGLE_QUEUE_NAME = "queue:single";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i < 100; i++) {
            jedis.lpush(SINGLE_QUEUE_NAME, "hello " + i);
        }
        jedis.close();
    }
}

消費者SingleConsumer:

package com.morris.redis.demo.queue.single;

import redis.clients.jedis.Jedis;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * 消費者
 */
public class SingleConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
            if(Objects.nonNull(message)) {
                System.out.println(message);
            } else {
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }
}

上面的代碼已經基本實現了普通隊列的生產與消費,但是上述的例子中消息的消費者存在兩個問題:

  • 消費者需要不停的調用rpop方法查看redis的list中是否有待處理的數據(消息)。每調用一次都會發起一次連接,有可能list中沒有數據,造成大量的空輪詢,導致造成不必要的浪費。也許你可以使用Thread.sleep()等方法讓消費者線程隔一段時間再消費,如果睡眠時間過長,這樣不能處理一些時效性要求高的消息,睡眠時間過短,也會在連接上造成比較大的開銷。

  • 如果生產者速度大于消費者消費速度,消息隊列長度會一直增大,時間久了會占用大量內存空間。

阻塞隊列

消費者可以使用brpop指令從redis的list中獲取數據,這個指令只有在有元素時才返回,沒有則會阻塞直到超時返回null,于是消費端就不需要休眠后獲取數據了,這樣就相當于實現了一個阻塞隊列,

使用redis的brpop命令來模擬阻塞隊列。

>brpop queue:single 30

可以看到命令行阻塞在了brpop這里了,30s后沒數據就返回。

Java代碼實現如下:

生產者與普通隊列的生產者一致。

消費者BlockConsumer:

package com.morris.redis.demo.queue.block;

import redis.clients.jedis.Jedis;

import java.util.List;

/**
 * 消費者
 */
public class BlockConsumer {

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        while (true) {
            // 超時時間為1s
            List<String> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME);
            if (null != messageList && !messageList.isEmpty()) {
                System.out.println(messageList);
            }
        }
    }
}

缺點:無法實現一次生產多次消費。

發布訂閱模式

Redis除了對消息隊列提供支持外,還提供了一組命令用于支持發布/訂閱模式。利用Redis的pub/sub模式可以實現一次生產多次消費的隊列。

發布:PUBLISH指令可用于發布一條消息,格式:

PUBLISH channel message

返回值表示訂閱了該消息的數量。

訂閱:SUBSCRIBE指令用于接收一條消息,格式:

SUBSCRIBE channel

使用SUBSCRIBE指令后進入了訂閱模式,但是不會接收到訂閱之前publish發送的消息,這是因為只有在消息發出去前訂閱才會接收到。在這個模式下其他指令,只能看到回復。

回復分為三種類型:

  • 如果為subscribe,第二個值表示訂閱的頻道,第三個值表示是已訂閱的頻道的數量

  • 如果為message(消息),第二個值為產生該消息的頻道,第三個值為消息

  • 如果為unsubscribe,第二個值表示取消訂閱的頻道,第三個值表示當前客戶端的訂閱數量。

下面使用redis的命令來模擬發布訂閱模式。

生產者:

127.0.0.1:6379> publish queue hello
(integer) 1
127.0.0.1:6379> publish queue hi
(integer) 1

消費者:

127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1
1) "message"
2) "queue"
3) "hello"
1) "message"
2) "queue"
3) "hi"

Java代碼實現如下:

生產者PubsubProducer:

package com.morris.redis.demo.queue.pubsub;

import redis.clients.jedis.Jedis;

/**
 * 生產者
 */
public class PubsubProducer {

    public static final String PUBSUB_QUEUE_NAME = "queue:pubsub";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i < 100; i++) {
            jedis.publish(PUBSUB_QUEUE_NAME, "hello " + i);
        }
        jedis.close();
    }
}

消費者PubsubConsumer:

package com.morris.redis.demo.queue.pubsub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

/**
 * 消費者
 */
public class PubsubConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();

        JedisPubSub jedisPubSub = new JedisPubSub() {

            @Override
            public void onMessage(String channel, String message) {
                System.out.println("receive message: " + message);
                if(message.indexOf("99") > -1) {
                    this.unsubscribe();
                }
            }

            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                System.out.println("subscribe channel: " + channel);
            }

            @Override
            public void onUnsubscribe(String channel, int subscribedChannels) {
                System.out.println("unsubscribe channel " + channel);
            }
        };

        jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME);
    }
}

消費者可以啟動多個,每個消費者都能收到所有的消息。

可以使用指令UNSUBSCRIBE退訂,如果不加參數,則會退訂所有由SUBSCRIBE指令訂閱的頻道。

Redis還支持基于通配符的消息訂閱,使用指令PSUBSCRIBE (pattern subscribe),例如:

psubscribe channel.*

用PSUBSCRIBE指令訂閱的頻道也要使用指令PUNSUBSCRIBE指令退訂,該指令無法退訂SUBSCRIBE訂閱的頻道,同理UNSUBSCRIBE也不能退訂PSUBSCRIBE指令訂閱的頻道。

同時PUNSUBSCRIBE指令通配符不會展開。例如:PUNSUBSCRIBE \*不會匹配到channel.\*,所以要取消訂閱channel.\*就要這樣寫PUBSUBSCRIBE channel.\*。

Redis的pub/sub也有其缺點,那就是如果消費者下線,生產者的消息會丟失。

延時隊列和優先級隊列

Redis中有個數據類型叫Zset,其本質就是在數據類型Set的基礎上加了個排序的功能而已,除了保存原始的數據value之外,還提供另一個屬性score,這一屬性在添加修改元素時候可以進行指定,每次指定后,Zset會自動重新按新的score值進行排序。

如果score字段設置為消息的優先級,優先級最高的消息排在第一位,這樣就能實現一個優先級隊列。

如果score字段代表的是消息想要執行時間的時間戳,將它插入Zset集合中,便會按照時間戳大小進行排序,也就是對執行時間先后進行排序,集合中最先要執行的消息就會排在第一位,這樣的話,只需要起一個死循環線程不斷獲取集合中的第一個元素,如果當前時間戳大于等于該元素的score就將它取出來進行消費刪除,就可以達到延時執行的目的,注意不需要遍歷整個Zset集合,以免造成性能浪費。

下面使用redis的zset來模擬延時隊列。

生產者:

127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3
(integer) 0

消費者:

127.0.0.1:6379> zrange queue:delay 0 0 withscores
1) "order1"
2) "1"
127.0.0.1:6379> zrem queue:delay order1
(integer) 1

Java代碼如下:

生產者DelayProducer:

package com.morris.redis.demo.queue.delay;

import redis.clients.jedis.Jedis;

import java.util.Date;
import java.util.Random;

/**
 * 生產者
 */
public class DelayProducer {

    public static final String DELAY_QUEUE_NAME = "queue:delay";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        long now = new Date().getTime();
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            int second = random.nextInt(30); // 隨機訂單失效時間
            jedis.zadd(DELAY_QUEUE_NAME, now + second * 1000, "order"+i);
        }
        jedis.close();
    }
}

消費者:

package com.morris.redis.demo.queue.delay;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;

import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * 消費者
 */
public class DelayConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            long now = new Date().getTime();
            Set<Tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
            if(tupleSet.isEmpty()) {
                TimeUnit.MILLISECONDS.sleep(500);
            } else {
                for (Tuple tuple : tupleSet) {
                    Double score = tuple.getScore();
                    long time = score.longValue();
                    if(time < now) {
                        jedis.zrem(DelayProducer.DELAY_QUEUE_NAME, tuple.getElement());
                        System.out.println("order[" + tuple.getElement() +"] is timeout at " + time);
                    } else {
                        TimeUnit.MILLISECONDS.sleep(500);
                    }
                    break;
                }
            }
        }
    }
}

應用場景

延時隊列可用于訂單超時失效的場景
二級緩存(local+redis)中,當有緩存需要更新時,可以使用發布訂閱模式通知其他服務器使得本地緩存失效。

以上就是關于“redis怎么實現隊列阻塞、延時、發布和訂閱”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

高州市| 宿松县| 彩票| 瓮安县| 万荣县| 顺平县| 梁山县| 周口市| 邹城市| 松潘县| 梁平县| 平乡县| 安多县| 卢龙县| 彭州市| 尖扎县| 临邑县| 泸定县| 宕昌县| 北票市| 莲花县| 新干县| 宝丰县| 都江堰市| 镇坪县| 栾城县| 衡水市| 本溪市| 长宁区| 永康市| 乌兰浩特市| 太仓市| 沧州市| 扎鲁特旗| 彭水| 红河县| 佛山市| 辽阳市| 庄浪县| 湘潭市| 中阳县|