亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何理解Redis的Pub/Sub模式

發布時間:2021-11-11 09:58:23 來源:億速云 閱讀:233 作者:柒染 欄目:編程語言

如何理解Redis的Pub/Sub模式,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

Redis同樣支持消息的發布/訂閱(Pub/Sub)模式,這和中間件activemq有些類似。訂閱者(Subscriber)可以訂閱自己感興趣的頻道(Channel),發布者(Publisher)可以將消息發往指定的頻道(Channel),正是通過這種方式,可以將消息的發送者和接收者解耦。另外,由于可以動態的Subscribe和Unsubscribe,也可以提高系統的靈活性和可擴展性。

這里假設有一個可用的Redis環境(單節點和集群均可)。

在redis-cli中使用Pub/Sub

普通channel的Pub/Sub

先用一個客戶端來訂閱頻道:

如何理解Redis的Pub/Sub模式

上圖中先使用redis-cli作為客戶端連接了Redis,之后使用了SUBSCRIBE命令,后面的參數表示訂閱了china和hongkong兩個channel。可以看到"SUBSCRIBE china hongkong"這條命令的輸出是6行(可以分為2組,每一組是一個Message)。因為訂閱、取消訂閱的操作跟發布的消息都是通過消息(Message)的方式發送的,消息的第一個元素就是消息類型,它可以是以下幾種類型:

subscribe: means that we successfully subscribed to the channel given as the second element in the reply. The third argument represents the number of channels we are currently subscribed to.

unsubscribe: means that we successfully unsubscribed from the channel given as second element in the reply. The third argument represents the number of channels we are currently subscribed to. When the last argument is zero, we are no longer subscribed to any channel, and the client can issue any kind of Redis command as we are outside the Pub/Sub state.

message: it is a message received as result of a PUBLISH command issued by another client. The second element is the name of the originating channel, and the third argument is the actual message payload.

--from http://redis.io/topics/pubsub

上圖的訂閱命令將使得發往這兩個channel的消息會被這個客戶端接收到。需要注意的是,redis-cli客戶端在進入subscribe模式以后,將不能再響應其他的任何命令:

A client subscribed to one or more channels should not issue commands, although it can subscribe and unsubscribe to and from other channels.

The commands that are allowed in the context of a subscribed client are SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, PING and QUIT

--from http://redis.io/topics/pubsub

官網說客戶端在subscribe下除了可以使用以上命令外,不能使用其他命令了。但是本人在Subscribe狀態下使用上述幾個命令,根本沒反應。也就是說,使用redis-cli訂閱channel后,該客戶端將不能響應任何命令。除非按下(ctrl+c),但該操作不是取消訂閱,而是退出redis-cli,此時將回到shell命令行下。

關于這個情況,我在官網上沒有找到對這種情況的解釋,也有不少的人在網上問,找來找去,本人覺得還算合理的解釋是:

On this page: http://redis.io/commands/subscribe applies only to those clients.

The redis-cli is among those clients. So, the comment is not an instruction for users of redis-cli.

Instead, redis-cli blocks waiting for messages on the bus (only to be unsubcribed via a ctrl+c).

--from http://stackoverflow.com/questions/17621371/redis-unsubscribe

就是說,官網中說明的client,并不包含這里使用的redis-cli,于是它可以和其他的client有不同表現。(先不糾結這個問題,稍后再用jedis來測試一下。)

接下來再用一個客戶端來發布消息:

如何理解Redis的Pub/Sub模式

可以看到,新的一個客戶端使用PUBLISH命令往china頻道發布了一條叫"China News"的消息,接下來再看看訂閱端:

如何理解Redis的Pub/Sub模式

可以看見,這條消息已經被接收到了。可以看到,收到的消息中第一個參數是類型"message",第二個參數是channel名字"china",第三個參數是消息內容"China News",這和開始說的message類型的結構一致。

通配符的Pub/Sub

Redis還支持通配符的訂閱和發布。客戶端可以訂閱滿足一個或多個規則的channel消息,相應的命令是PSUBSCRIBE和PUNSUBSCRIBE。接下來我們再用另一個redis-cli客戶端來訂閱"chi*"的channel,如圖:

如何理解Redis的Pub/Sub模式

和subscribe/unsubscribe的輸出類似,可以看到第一部分是消息類型“psubscribe”,第二部分是訂閱的規則“chi*”,第三部分則是該客戶端目前訂閱的所有規則個數。

接下來再發布一條消息到china這個channel中,此時,兩個訂閱者應該都能收到該消息:

如何理解Redis的Pub/Sub模式

實際測試結果跟預期相同。需要注意的是,訂閱者2通過通配符訂閱的,收到的消息類型是“pmessage”:

pmessage: it is a message received as result of a PUBLISH command issued by another client, matching a pattern-matching subscription. The second element is the original pattern matched, the third element is the name of the originating channel, and the last element the actual message payload.

--from http://redis.io/topics/pubsub

第二部分是匹配的模式“chi*”,第三部分是實際的channel名字“china”,第四部分是消息內容“China Daily”。

我們再發布一條消息到chinnna中,此時只有訂閱者2能接收到消息了:

如何理解Redis的Pub/Sub模式

同樣,在使用PSUBSCRIBE進入訂閱模式以后,該redis-cli也不能再監聽其他任何的命令,要退出該模式,只能使用ctrl+c。

使用Jedis實現Pub/Sub

Jedis是Redis客戶端的一種Java實現,在http://redis.io/clients#java中也能找到。

這里使用maven來管理包的依賴,由于使用了Log4j來輸出日志,因此會用到log4j的jar包:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.8.0</version>
</dependency>
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>

Jedis中的JedisPubSub抽象類提供了訂閱和取消的功能。想處理訂閱和取消訂閱某些channel的相關事件,我們得擴展JedisPubSub類并實現相關的方法:

package com.demo.redis;

import org.apache.log4j.Logger;
import redis.clients.jedis.JedisPubSub;

public class Subscriber extends JedisPubSub {//注意這里繼承了抽象類JedisPubSub

    private static final Logger LOGGER = Logger.getLogger(Subscriber.class);

    @Override
    public void onMessage(String channel, String message) {
    	LOGGER.info(String.format("Message. Channel: %s, Msg: %s", channel, message));
    }

    @Override
    public void onPMessage(String pattern, String channel, String message) {
    	LOGGER.info(String.format("PMessage. Pattern: %s, Channel: %s, Msg: %s", 
    	    pattern, channel, message));
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
    	LOGGER.info("onSubscribe");
    }

    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
    	LOGGER.info("onUnsubscribe");
    }

    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
    	LOGGER.info("onPUnsubscribe");
    }

    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
    	LOGGER.info("onPSubscribe");
    }
}

有了訂閱者,我們還需要一個發布者:

package com.demo.redis;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;

public class Publisher {

    private static final Logger LOGGER = Logger.getLogger(Publisher.class);
    private final Jedis publisherJedis;
    private final String channel;

    public Publisher(Jedis publisherJedis, String channel) {
        this.publisherJedis = publisherJedis;
        this.channel = channel;
    }

    /**
     * 不停的讀取輸入,然后發布到channel上面,遇到quit則停止發布。
     */
    public void startPublish() {
    	LOGGER.info("Type your message (quit for terminate)");
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                String line = reader.readLine();
                if (!"quit".equals(line)) {
                    publisherJedis.publish(channel, line);
                } else {
                    break;
                }
            }
        } catch (IOException e) {
            LOGGER.error("IO failure while reading input", e);
        }
    }
}

為簡單起見,這個發布者接收控制臺的輸入,然后將輸入的消息發布到指定的channel上面,如果輸入quit,則停止發布消息。

接下來是主函數:

package com.demo.redis;

import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class Program {
    
    public static final String CHANNEL_NAME = "MyChannel";
    //我這里的Redis是一個集群,192.168.56.101和192.168.56.102都可以使用
    public static final String REDIS_HOST = "192.168.56.101";
    public static final int REDIS_PORT = 7000;
    
    private final static Logger LOGGER = Logger.getLogger(Program.class);
    private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig();
    private final static JedisPool JEDIS_POOL = 
            new JedisPool(POOL_CONFIG, REDIS_HOST, REDIS_PORT, 0);
    
    public static void main(String[] args) throws Exception {
        final Jedis subscriberJedis = JEDIS_POOL.getResource();
        final Jedis publisherJedis = JEDIS_POOL.getResource();
        final Subscriber subscriber = new Subscriber();
        //訂閱線程:接收消息
        new Thread(new Runnable() {
            public void run() {
                try {
                    LOGGER.info("Subscribing to \"MyChannel\". This thread will be blocked.");
                    //使用subscriber訂閱CHANNEL_NAME上的消息,這一句之后,線程進入訂閱模式,阻塞。
                    subscriberJedis.subscribe(subscriber, CHANNEL_NAME);
                    
                    //當unsubscribe()方法被調用時,才執行以下代碼
                    LOGGER.info("Subscription ended.");
                } catch (Exception e) {
                    LOGGER.error("Subscribing failed.", e);
                }
            }
        }).start();
        
        //主線程:發布消息到CHANNEL_NAME頻道上
        new Publisher(publisherJedis, CHANNEL_NAME).startPublish();
        publisherJedis.close();
        
        //Unsubscribe
        subscriber.unsubscribe();
        subscriberJedis.close();
    }
}

主類Program中定義了channel名字、連接redis的地址和端口,并使用JedisPool來獲取Jedis實例。由于訂閱者(subscriber)在進入訂閱狀態后會阻塞線程,因此新起一個線程(new Thread())作為訂閱線程,并是用主線程來發布消息。待發布者(類中的new Publisher)停止發布消息(控制臺中輸入quit即可)時,解除訂閱者的訂閱(subscriber.unsubscribe()方法)。此時訂閱線程解除阻塞,打印結束的日志并退出。

運行程序之前,還需要一個簡單的log4j配置以觀察輸出:

log4j.rootLogger=INFO,stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %m%n

運行Program,以下是執行結果:

如何理解Redis的Pub/Sub模式

從結果看,當訂閱者訂閱后,訂閱線程阻塞,主線程中的Publisher接收輸入后,發布消息到MyChannel中,此時訂閱該channel的訂閱者收到消息并打印。

Jedis源碼簡要分析

關于使用UNSUBSCRIBE

開始使用redis-cli時,在subscriber進入監聽狀態后,并不能使用UNSUBSCRIBE和PUNSUBSCRIBE命令,現在在Jedis中,在訂閱線程阻塞時,通過在main線程中調用改subscriber的unsubscribe()方法來解除阻塞。查看Jedis源碼,其實該方法也就是給redis發送了一個UNSUBSCRIBE命令而已:

如何理解Redis的Pub/Sub模式

因此這里是支持在“客戶端”使用UNSUBSCRIBE命令的。

關于訂閱者接收消息

在接收消息前,需要訂閱channel,訂閱完成之后,會執行一個循環,這個循環會一直阻塞,直到該Client沒有訂閱數為止,如下圖:

如何理解Redis的Pub/Sub模式

中間省略的其他行,主要是用于解析收到的Redis響應,這段代碼也是根據響應的第一部分確定響應的消息類型,然后挨個解析響應的后續內容,最后根據解析到消息類型,并使用后續解析到的內容作為參數來回調相應的方法,省略的內容如下:

final byte[] resp = (byte[]) firstObj;
if (Arrays.equals(SUBSCRIBE.raw, resp)) {
  subscribedChannels = ((Long) reply.get(2)).intValue();
  final byte[] bchannel = (byte[]) reply.get(1);
  final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
  //調用onSubscribe方法,該方法在我們的Subscriber類中實現
  onSubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
  subscribedChannels = ((Long) reply.get(2)).intValue();
  final byte[] bchannel = (byte[]) reply.get(1);
  final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
  //調用onUnsubscribe方法,該方法在我們的Subscriber類中實現
  onUnsubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.raw, resp)) {
  final byte[] bchannel = (byte[]) reply.get(1);
  final byte[] bmesg = (byte[]) reply.get(2);
  final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
  final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
  //調用onMessage方法,該方法在我們的Subscriber類中實現
  onMessage(strchannel, strmesg);
} else if (Arrays.equals(PMESSAGE.raw, resp)) {
  final byte[] bpattern = (byte[]) reply.get(1);
  final byte[] bchannel = (byte[]) reply.get(2);
  final byte[] bmesg = (byte[]) reply.get(3);
  final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
  final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
  final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
  //調用onPMessage方法,該方法在我們的Subscriber類中實現
  onPMessage(strpattern, strchannel, strmesg);
} else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
  subscribedChannels = ((Long) reply.get(2)).intValue();
  final byte[] bpattern = (byte[]) reply.get(1);
  final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
  onPSubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
  subscribedChannels = ((Long) reply.get(2)).intValue();
  final byte[] bpattern = (byte[]) reply.get(1);
  final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
  //調用onPUnsubscribe方法,該方法在我們的Subscriber類中實現
  onPUnsubscribe(strpattern, subscribedChannels);
} else {
  //對于其他Redis沒有定義的返回消息類型,則直接報錯
  throw new JedisException("Unknown message type: " + firstObj);
}

以上就是為什么我們需要在Subscriber中實現這幾個方法的原因了(這些方法并不是抽象的,可以選擇實現使用到的方法)。

關于如何理解Redis的Pub/Sub模式問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

罗山县| 隆林| 新乡县| 河西区| 宾川县| 隆昌县| 漠河县| 波密县| 东台市| 昌都县| 尚志市| 永胜县| 马公市| 乌兰浩特市| 三门县| 呈贡县| 平泉县| 漳浦县| 济南市| 师宗县| 北流市| 紫云| 安多县| 黄龙县| 普格县| 石门县| 陆川县| 南昌市| 曲周县| 西宁市| 广州市| 淅川县| 昭苏县| 揭西县| 阜城县| 萨迦县| 土默特右旗| 陕西省| 龙口市| 阿克苏市| 枣庄市|