亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RabbitMQ發布確認高級問題的示例分析

發布時間:2022-01-04 12:59:15 來源:億速云 閱讀:114 作者:柒染 欄目:開發技術

RabbitMQ發布確認高級問題的示例分析,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

1、發布確認高級

1. 存在的問題

再生產環境中由于一些不明原因導致rabbitmq重啟,在RabbitMQ重啟期間生產者消息投遞失敗,會導致消息丟失。

1.1、發布確認SpringBoot版本

1.1.1、確認機制方案

RabbitMQ發布確認高級問題的示例分析

當消息不能正常被接收的時候,我們需要將消息存放在緩存中。

1.1.2、代碼架構圖

RabbitMQ發布確認高級問題的示例分析

1.1.3、配置文件
spring.rabbitmq.host=192.168.123.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated
  • NONE:禁用發布確認模式,是默認值。

  • CORRELATED:發布消息成功到交換機會觸發回調方方法。

  • CORRELATED:就是發布一個就確認一個。

1.1.4、配置類
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";

    public static final String CONFIRM_ROUTING_KEY = "key1";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,
                                        @Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }
}
1.1.5、回調接口
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 回調接口
 */

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 交換機接受失敗后進行回調
     * 1. 保存消息的ID及相關消息
     * 2. 是否接收成功
     * 3. 接受失敗的原因
     * @param correlationData
     * @param b
     * @param s
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if(b == true){
            log.info("交換機已經收到id為:{}的消息",id);
        }else{
            log.info("交換機還未收到id為:{}消息,由于原因:{}",id,s);
        }
    }
}
1.1.6、生產者
import com.xiao.springbootrabbitmq.utils.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;

@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

    @Autowired
    RabbitTemplate rabbitTemplate;



    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        CorrelationData correlationData1 = new CorrelationData("1");
        String routingKey1 = "key1";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1);

        CorrelationData correlationData2 = new CorrelationData("2");
        String routingKey2 = "key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2);
        log.info("發送得內容是:{}",message);
    }
}
1.1.7、消費者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ConfirmConsumer {
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";

    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveMessage(Message message){
        String msg = new String(message.getBody());
        log.info("接收到隊列" + CONFIRM_QUEUE_NAME + "消息:{}",msg);
    }
}
1.1.8、測試結果

1. 第一種情況

RabbitMQ發布確認高級問題的示例分析

ID1的消息正常送達,ID2的消息由于RoutingKey的錯誤,導致不能正常被消費,但是交換機還是正常收到了消息,所以此時由于交換機正常接收之后的原因丟失的消息不能正常被接收

2. 第二種情況

RabbitMQ發布確認高級問題的示例分析

我們再上一種情況下修改了ID1的消息的交換機的名稱,所以此時回調函數會進行回答由于什么原因導致交換機無法接收成功消息

1.2、回退消息

1.2.1、Mandatory參數
  • 在僅開啟了生產者確認機制的情況下,交換機接收到消息后,會直接給消息生產者發送確認消息,如果發現該消息不可路由(就是消息被交換機成功接收后,無法到達隊列),那么消息會直接被丟棄,此時生產者是不知道消息被丟棄這個事件的

  • 通過設置該參數可以在消息傳遞過程中不可達目的地時將消息返回給生產者。

1.2.2、配置文件
spring.rabbitmq.publisher-returns=true

需要在配置文件種開啟返回回調

1.2.3、生產者代碼
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;

@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        CorrelationData correlationData1 = new CorrelationData("1");
        String routingKey1 = "key1";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1);
        log.info("發送得內容是:{}",message + routingKey1);

        CorrelationData correlationData2 = new CorrelationData("2");
        String routingKey2 = "key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2);
        log.info("發送得內容是:{}",message + routingKey2);
    }
}
1.2.4、回調接口代碼
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 回調接口
 */

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 交換機接受失敗后進行回調
     * 1. 保存消息的ID及相關消息
     * 2. 是否接收成功
     * 3. 接受失敗的原因
     * @param correlationData
     * @param b
     * @param s
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if(b == true){
            log.info("交換機已經收到id為:{}的消息",id);
        }else{
            log.info("交換機還未收到id為:{}消息,由于原因:{}",id,s);
        }
    }


    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        Message message = returnedMessage.getMessage();
        String exchange = returnedMessage.getExchange();
        String routingKey = returnedMessage.getRoutingKey();
        String replyText = returnedMessage.getReplyText();
        log.error("消息{},被交換機{}退回,回退原因:{},路由Key:{}",new String(message.getBody()),exchange,replyText,routingKey);
    }
}
1.2.5、測試結果

其他類的代碼與上一小節案例相同

RabbitMQ發布確認高級問題的示例分析

ID2的消息由于RoutingKey不可路由,但是還是被回調函數處理了。

1.3、備份交換機

1.3.1、代碼架構圖

RabbitMQ發布確認高級問題的示例分析

這里我們新增了備份交換機、備份隊列、報警隊列。它們綁定關系如圖所示。如果確認交換機成功接收的消息無法路由到相應的隊列,就會被確認交換機發送給備份交換機

1.3.2、配置類代碼
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";

    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";

    public static final String BACKUP_QUEUE_NAME = "backup_queue";

    public static final String WARNING_QUEUE_NAME = "warning_queue";

    public static final String CONFIRM_ROUTING_KEY = "key1";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
                .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
    }

    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    @Bean("backupQueue")
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    @Bean("warningQueue")
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,
                                        @Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }

    @Bean
    public Binding queueBindingExchange1(@Qualifier("backupExchange") FanoutExchange backupExchange,
                                        @Qualifier("backupQueue") Queue backupQueue){
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    @Bean
    public Binding queueBindingExchange2(@Qualifier("backupExchange") FanoutExchange backupExchange,
                                         @Qualifier("warningQueue") Queue warningQueue){
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }
}
1.3.3、消費者代碼
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class WarningConsumer {
    public static final String WARNING_QUEUE_NAME = "warning_queue";

    @RabbitListener(queues = WARNING_QUEUE_NAME)
    public void receiveMessage(Message message){
        String msg = new String(message.getBody());
        log.info("報警發現不可路由的消息內容為:{}",msg);
    }
}
1.3.4、測試結果

RabbitMQ發布確認高級問題的示例分析

mandatory參數與備份交換機可以一起使用的時候,如果兩者同時開啟,備份交換機優先級高

看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

彰化市| 开江县| 鄄城县| 湘潭县| 武鸣县| 阿城市| 城步| 长沙市| 霍邱县| 隆林| 梁山县| 互助| 银川市| 通城县| 南京市| 西充县| 新宾| 友谊县| 石城县| 德阳市| 麟游县| 伊金霍洛旗| 汾西县| 阳山县| 合川市| 当阳市| 天全县| 和平县| 双峰县| 松江区| 陇南市| 夹江县| 青阳县| 武清区| 瓮安县| 新化县| 焉耆| 永登县| 永胜县| 阜南县| 洛川县|