Spring + ActiveMQ 整合实现发布/订阅(publish-subscribe)消息发送案例

news/2024/9/17 9:30:21 标签: java-activemq, spring, activemq, java, 后端

本节演示发布/订阅(publish-subscribe)模式的消息发送的 Spring + ActiveMQ 代码。

1、生产者代码
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">


  <!-- 创建一个ConnectionFactory,为了提升性能用了连接池 -->
  <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
    destroy-method="stop">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
          <value>tcp://localhost:61616</value>
        </property>
      </bean>
    </property>
    <property name="maxConnections" value="50" />
  </bean>


  <!-- 创建消息目的地,constructor-arg是目的地的名称,此处为spring-topic -->
  <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg index="0" value="spring-topic" />
  </bean>


  <!-- 构建JmsTemplate -->
  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="destinationTopic" />
    <property name="messageConverter">
      <bean
        class="org.springframework.jms.support.converter.SimpleMessageConverter" />
    </property>
  </bean>


</beans>

生产者关键代码:SpringTopicSender

package producer;


import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;


import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;


/**
 * 发布/订阅(publish-subscribe)消息发送,spring整合
 * 
 * @author JPM
 */
public class SpringTopicSender {
  public static void main(String[] args) {


    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
        "classpath:spring/springContext-activemq.xml");
    JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate");
    jmsTemplate.send(new MessageCreator() {


      public Message createMessage(Session session) throws JMSException {
        TextMessage message = session.createTextMessage();
        message.setText("hello,spring-topic!");
        return message;
      }
    });
    context.close();
  }
}

运行 SpringTopicSender 类,查看 ActiveMQ 管理界面

4ae52088b1bf959bf391d1eee24a83f3.png

说明已经发布了一个主题消息。

2、消费者代码(receive 方法获取消息)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">


  <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
    destroy-method="stop">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
          <value>tcp://localhost:61616</value>
        </property>
      </bean>
    </property>
    <property name="maxConnections" value="50" />
  </bean>


  <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg index="0" value="spring-topic" />
  </bean>


  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="destinationTopic" />
    <property name="messageConverter">
      <bean
        class="org.springframework.jms.support.converter.SimpleMessageConverter" />
    </property>
  </bean>


</beans>

消费者关键代码:SpringTopicReceiver1 和 SpringTopicReceiver2

package consumer;


import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;


/**
 * 发布/订阅(publish-subscribe)消息接收1,spring整合
 * 
 * @author JPM
 */
public class SpringTopicReceiver1 {


  public static void main(String[] args) {
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
        "classpath:spring/springContext-activemq.xml");
    JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate");
    String message = (String) jmsTemplate.receiveAndConvert();
    System.out.println("SpringTopicReceiver1--->" + message);
    context.close();
  }
}
package consumer;


import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;


/**
 * 发布/订阅(publish-subscribe)消息接收2,spring整合
 * 
 * @author JPM
 */
public class SpringTopicReceiver2 {


  public static void main(String[] args) {
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
        "classpath:spring/springContext-activemq.xml");
    JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate");
    String message = (String) jmsTemplate.receiveAndConvert();
    System.out.println("SpringTopicReceiver2--->" + message);
    context.close();
  }
}

运行 SpringTopicReceiver1 和 SpringTopicReceiver2 类,查看控制台和 ActiveMQ 管理界面

5061894619e6ce3b6c3aa0b9be742487.png

75fb47617af2245076ea21ea084d86c5.png

6dc0d3c2331892b589cb464fb2462484.png

604acc6ded3e59867453322909e81393.png

说明2个消费者已经启动,但是看控制台输出都是空的,说明没有消费到消息,原因是对于 topic 消息来说,消费者必须先启动,订阅了主题,然后才能收到主题发来的消息。我们刚才是先发送的主题消息,后启动的消费者,因此2个消费者都没有输出。

下面我们再通过刚才的生产者 SpringTopicSender 类发送一个主题消息,观察消费者的控制台和 ActiveMQ 管理界面的变化

04ce102bec35409d99c29560d5314a92.png

a97de2cfc78cfa5ef1d0ba713d3a3197.png

说明,2个消费者都获取到了主题消息。

3、消费者代码(使用消息监听器获取消息)

spring 关键代码 springContext-activemq1.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">


  <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
    destroy-method="stop">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
          <value>tcp://localhost:61616</value>
        </property>
      </bean>
    </property>
    <property name="maxConnections" value="50" />
  </bean>


  <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg index="0" value="spring-topic" />
  </bean>


  <bean id="jmsContainer1"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="destinationTopic" />
    <property name="messageListener" ref="messageListener1" />
  </bean>


  <bean id="messageListener1" class="consumer.SpringTopicListener1" />


</beans>

springContext-activemq2.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">


  <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
    destroy-method="stop">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
          <value>tcp://localhost:61616</value>
        </property>
      </bean>
    </property>
    <property name="maxConnections" value="50" />
  </bean>


  <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg index="0" value="spring-topic" />
  </bean>


  <bean id="jmsContainer2"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="destinationTopic" />
    <property name="messageListener" ref="messageListener2" />
  </bean>


  <bean id="messageListener2" class="consumer.SpringTopicListener2" />


</beans>

消费者关键代码 SpringTopicListener1

java">package consumer;


import java.io.IOException;


import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;


import org.springframework.context.support.ClassPathXmlApplicationContext;


/**
 * 发布/订阅(publish-subscribe)消息接收1,spring整合,使用Listener
 * 
 * @author JPM
 */
public class SpringTopicListener1 implements MessageListener {


  public void onMessage(Message message) {
    String msg = null;
    try {
      msg = ((TextMessage) message).getText();
    } catch (JMSException e) {
      e.printStackTrace();
    }
    System.out.println("SpringTopicListener1--->" + msg);
  }


  public static void main(String[] args) {
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
        "classpath:spring/springContext-activemq1  .xml");
    try {
      System.in.read();
    } catch (IOException e) {
      e.printStackTrace();
    }
    context.close();
  }


}

SpringTopicListener2

java">package consumer;


import java.io.IOException;


import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;


import org.springframework.context.support.ClassPathXmlApplicationContext;


/**
 * 发布/订阅(publish-subscribe)消息接收2,spring整合,使用Listener
 * 
 * @author JPM
 */
public class SpringTopicListener2 implements MessageListener {


  public void onMessage(Message message) {
    String msg = null;
    try {
      msg = ((TextMessage) message).getText();
    } catch (JMSException e) {
      e.printStackTrace();
    }
    System.out.println("SpringTopicListener2--->" + msg);
  }


  public static void main(String[] args) {
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
        "classpath:spring/springContext-activemq2.xml");
    try {
      System.in.read();
    } catch (IOException e) {
      e.printStackTrace();
    }
    context.close();
  }


}

运行 SpringTopicListener1 和 SpringTopicListener2 类,查看 ActiveMQ 管理界面

d98b29b240c9d8d37e2969562504593a.png

成功启动一个消费者

a18109abf8957112836c90b8a91c08fc.png

成功启动第二个消费者。

首先运行刚才的生产者 SpringTopicSender  类发送一条主题消息,查看 ActiveMQ 管理界面

1225b0618f4d697f3b79964f4e9dc3ed.png

说明消费了主题消息。

查看 SpringTopicListener1 和 SpringTopicListener2 的控制台

efb1c61ce4c95788cefba31ecbf20d6e.png

从控制台来看,两个订阅者都获取到了生产者发布的消息。


http://www.niftyadmin.cn/n/5646960.html

相关文章

【Linux】借命令行参数的引导,探索环境变量的奥秘

目录 1.命令行参数 1.1.概念&#xff1a; 1.2.利用命令行参数打造计算器&#xff1a; 2.环境变量 2.1.环境变量是什么&#xff1f; 2.2.有什么方法可以不用带路径&#xff0c;直接就可以运行自己的程序呢&#xff1f; 法一&#xff1a; 法二&#xff1a; 2.3.通过代码…

自定义类型:结构体(续)

目录 一. 结构体的内存对齐 1.1 为什么存在内存对齐&#xff1f; 1.2 修改默认对齐数 二. 结构体传参 三. 结构体实现位段 一. 结构体的内存对齐 在前面的文章里我们已经讲过一部分的内存对齐的知识&#xff0c;并举出了两个例子&#xff0c;我们再举出两个例子继续说明&…

Web安全之CSRF攻击详解与防护

在互联网应用中&#xff0c;安全性问题是开发者必须时刻关注的核心内容之一。跨站请求伪造&#xff08;Cross-Site Request Forgery, CSRF&#xff09;&#xff0c;是一种常见的Web安全漏洞。通过CSRF攻击&#xff0c;黑客可以冒用受害者的身份&#xff0c;发送恶意请求&#x…

Linux服务器Java启动脚本

Linux服务器Java启动脚本 1、初版2、优化版本3、常用脚本仓库 本文章介绍了如何在Linux服务器上执行Java并启动jar包&#xff0c; 通常我们会使用nohup直接启动&#xff0c;但是还是需要手动停止然后再次启动&#xff0c; 那如何更优雅的在服务器上启动jar包呢&#xff0c;让我…

使用acme.sh申请Let‘s Encrypt的SSL证书并安装的步骤

在CentOS 7上使用acme.sh申请Let’s Encrypt的SSL证书并安装在Nginx服务器的步骤&#xff0c;可以分为以下几个主要步骤&#xff1a; 一、安装acme.sh 安装必要的工具&#xff1a; 确保你的系统中已安装socat和curl&#xff08;如果尚未安装&#xff0c;可以使用yum install s…

【网络安全】Exif 数据储存型XSS

未经许可,不得转载。 文章目录 Exif步骤Exif EXIF(Exchangeable Image File Format)数据是一种存储在图像文件中的元数据格式,常用于数码照片和扫描图像。它包含了与图像相关的各种信息,比如拍摄日期和时间、相机品牌和型号、拍摄时的设置(如曝光时间、光圈、ISO等)、地…

LTM4622双路电源芯片的相关测试

最近需要使用LTM4622电源芯片&#xff0c;所以做了demo板进行测试&#xff1a; 1.原理图如下&#xff1a; 2.用AD画的电路板如下&#xff1a; 3.芯片相关知识&#xff1a; 芯片的封装&#xff1a; 典型应用&#xff1a; 管脚定义 VIN:电源的输入管脚。 RUN:相当于输出使能脚…

如何在Flask中处理错误

在Flask中处理错误是确保Web应用健壮性和用户体验的重要部分。错误处理不仅涉及捕获和响应服务器或客户端生成的错误&#xff0c;还包括为这些错误提供有意义的反馈&#xff0c;无论是向开发者报告&#xff08;如调试信息&#xff09;还是向最终用户展示&#xff08;如友好的错…