您好,登錄后才能下訂單哦!
今天和大家分享springboot整合activeMq之topic(主題) - - 發布/訂閱模式,類似微信公眾號,我們關注公共就可以收到消息,topic需要消費者先訂閱才能收到消息,如果沒有消費者訂閱,生產者產生的消息就是廢消息(發布/訂閱模式,生產者生產了一個消息,可以由多個消費者進行消費)。本次實例支持websocket、消息重發、持久化…
版本信息:SpringBoot2.1.5 ActiveMQ 5.15.10?
pom文件?
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
yml文件配置?
server
port: 8085
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
jms:
pub-sub-domain: true
#自己的主題名字
myTopic: boot_actviemq_topic
配置類?
package com.example.topic_customer.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import javax.jms.ConnectionFactory;
import javax.jms.Topic;
/**
* @Date 2019/11/13 10:22
* @Desc 消費者配置類
*/
@Configuration
public class BeanConfig {
@Value("${myTopic}")
private String myTopic;
/**
* websocket配置
*
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
@Bean
public Topic topic() {
return new ActiveMQTopic(myTopic);
}
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
//是否在每次嘗試重新發送失敗后,增長這個等待時間
redeliveryPolicy.setUseExponentialBackOff(true);
//重發次數,默認為6次,這里設置為10次,-1表示不限次數
redeliveryPolicy.setMaximumRedeliveries(-1);
//重發時間間隔,默認為1毫秒,設置為10000毫秒
redeliveryPolicy.setInitialRedeliveryDelay(10000);
//表示沒有拖延只有UseExponentialBackOff(true)為true時生效
//第一次失敗后重新發送之前等待10000毫秒,第二次失敗再等待10000 * 2毫秒
//第三次翻倍10000 * 2 * 2,以此類推
redeliveryPolicy.setBackOffMultiplier(2);
//是否避免消息碰撞
redeliveryPolicy.setUseCollisionAvoidance(true);
//設置重發最大拖延時間360000毫秒 表示沒有拖延只有UseExponentialBackOff(true)為true時生效
redeliveryPolicy.setMaximumRedeliveryDelay(360000);
return redeliveryPolicy;
}
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
//設置重發屬性
connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
return connectionFactory;
}
/**
* JMS 隊列的監聽容器工廠
*/
@Bean(name = "jmsTopicListener")
public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setPubSubDomain(true);
factory.setSessionTransacted(true);
factory.setAutoStartup(true);
//開啟持久化訂閱
factory.setSubscriptionDurable(true);
//重連間隔時間
factory.setRecoveryInterval(1000L);
factory.setClientId("topic_provider:zb1");
return factory;
}
}
設置消費者持久化主要有兩點:?
.TopicCustomer類?
package com.example.topic_customer.customer;
import lombok.Data;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @Date 2019/11/13 13:31
* @Desc
*/
@Component
@ServerEndpoint("/websocket")
@Data
public class TopicCustomer {
/**
* 每個客戶端都會有相應的session,服務端可以發送相關消息
*/
private javax.websocket.Session session;
/**
* J.U.C包下線程安全的類,主要用來存放每個客戶端對應的webSocket連接
*/
private static CopyOnWriteArraySet<TopicCustomer> copyOnWriteArraySet = new CopyOnWriteArraySet<>();
@OnOpen
public void onOpen(javax.websocket.Session session) {
this.session = session;
copyOnWriteArraySet.add(this);
}
@OnClose
public void onClose() {
copyOnWriteArraySet.remove(this);
}
@OnMessage
public void onMessage(String message) {
}
@OnError
public void onError(javax.websocket.Session session, Throwable error) {
error.printStackTrace();
}
@JmsListener(destination = "${myTopic}", containerFactory = "jmsTopicListener")
public void receive(TextMessage textMessage, javax.jms.Session session) throws JMSException {
//遍歷客戶端
for (TopicCustomer webSocket : copyOnWriteArraySet) {
try {
//服務器主動推送
webSocket.session.getBasicRemote().sendText(textMessage.getText());
System.out.println("-- 接收到topic持久化消息 -- " + textMessage.getText());
} catch (Exception e) {
System.out.println("-----測試重發-----");
session.rollback();// 此不可省略 重發信息使用
}
}
}
}
啟動類
package com.example.topic_customer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TopicCustomerApplication {
public static void main(String[] args) {
SpringApplication.run(TopicCustomerApplication.class, args);
}
}
消費者啟動成功后mq的截圖:?
yml配置文件?
server:
port: 8084
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
jms:
pub-sub-domain: true
myTopic: boot_actviemq_topic
配置類?
package com.example.topicprovider.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.stereotype.Component;
import javax.jms.ConnectionFactory;
import javax.jms.Topic;
/**
* @Date 2019/11/13 10:22
* @Desc 生產者配置文件
*/
@Component
public class BeanConfig {
@Value("${myTopic}")
private String myTopic;
public RedeliveryPolicy redeliveryPolicy(){
RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
//是否在每次嘗試重新發送失敗后,增長這個等待時間
redeliveryPolicy.setUseExponentialBackOff(true);
//重發次數,默認為6次,這里設置為10次,-1表示不限次數
redeliveryPolicy.setMaximumRedeliveries(-1);
//重發時間間隔,默認為1毫秒,設置為10000毫秒
redeliveryPolicy.setInitialRedeliveryDelay(10000);
//表示沒有拖延只有UseExponentialBackOff(true)為true時生效
//第一次失敗后重新發送之前等待10000毫秒,第二次失敗再等待10000 * 2毫秒
//第三次翻倍10000 * 2 * 2,以此類推
redeliveryPolicy.setBackOffMultiplier(2);
//是否避免消息碰撞
redeliveryPolicy.setUseCollisionAvoidance(true);
//設置重發最大拖延時間360000毫秒 表示沒有拖延只有UseExponentialBackOff(true)為true時生效
redeliveryPolicy.setMaximumRedeliveryDelay(360000);
return redeliveryPolicy;
}
@Bean
public Topic topic() {
return new ActiveMQTopic(myTopic);
}
public ConnectionFactory connectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
//設置重發屬性
connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
return connectionFactory;
}
/**
* JMS 隊列的監聽容器工廠
*/
@Bean(name = "jmsTopicListener")
public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setPubSubDomain(true);
factory.setSessionTransacted(true);
factory.setAutoStartup(true);
//開啟持久化訂閱
factory.setSubscriptionDurable(true);
//重連間隔時間
factory.setRecoveryInterval(1000L);
return factory;
}
}
TopicProvider類?
package com.example.topicprovider.topic_provider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.jms.Topic;
import java.util.UUID;
/**
* @Date 2019/11/13 10:25
* @Desc
*/
@Component
public class TopicProvider {
@Autowired
private Topic topic;
@Autowired
private JmsTemplate jmsTemplate;
@Scheduled(fixedDelay = 10000)
private void produceMsg() {
jmsTemplate.convertAndSend(topic, "主題生產者" + UUID.randomUUID().toString().substring(1, 7));
System.out.println( jmsTemplate.getDeliveryMode());
System.out.println("主題生產者1");
}
}
啟動類?
package com.example.topicprovider;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class TopicProviderApplication {
public static void main(String[] args) {
SpringApplication.run(TopicProviderApplication.class, args);
}
}
啟動成功后結果圖:
最后
喜歡的可以關注我的公眾號:java小瓜哥的分享平臺。謝謝支持!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。