ActiveMQ 简单实例

Reading time ~3 minutes

一、环境

  • OS:Cent OS 7
  • Java:JDK 1.7
  • Zookeeper:3.3.3
  • ActiveMQ:5.14.0

ActiveMQ的安装配置请参考上一篇文章《ActiveMQ 高可用集群安装、配置》

二、简单使用

1.添加Maven依赖

在pom.xml文件中添加如下依赖

<!-- activemq start -->
<dependency>  
    <groupId>org.apache.activemq</groupId>  
    <artifactId>activemq-all</artifactId>  
    <version>5.14.1</version>  
</dependency>  

<dependency>  
    <groupId>org.apache.activemq</groupId>  
    <artifactId>activemq-pool</artifactId>  
    <version>5.14.1</version>  
</dependency>
<!-- activemq end -->

2.配置文件

建立配置文件: applicationContext-activemq-simple.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">


    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://10.10.5.223:61617" />
		<!-- 将该值开启官方说法是可以取得更高的发送速度(5倍)。 -->
		<property name="useAsyncSend" value="true" />
		<!-- 对于一个connection如果只有一个session,该值有效,否则该值无效,默认这个参数的值为true。 -->
		<property name="alwaysSessionAsync" value="true" />
		<property name="useDedicatedTaskRunner" value="true" />
	</bean>

	<!-- 点对点的队列 -->
	<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
		<!-- 设置消息队列的名字 ,consumer.prefetchSize则代表我们在此使用“消费者”预分配协议,在消费者内在足够时可以使这个值更大以获得更好的吞吐性能。 -->
		<constructor-arg value="ymk.queue?consumer.prefetchSize=100" />
	</bean>

	<!-- 发布/订阅 -->
	<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
		<!-- 设置消息队列的名字 ,consumer.prefetchSize则代表我们在此使用“消费者”预分配协议,在消费者内在足够时可以使这个值更大以获得更好的吞吐性能。 -->
		<constructor-arg value="ymk.topic?consumer.prefetchSize=100" />
	</bean>

	<!-- 设置事务型消息的重发机制,对于destination这个队列的重发机制为间隔100毫秒重发一次 -->
	<amq:redeliveryPolicy id="activeMQRedeliveryPolicy" destination="#destinationQueue" redeliveryDelay="100" maximumRedeliveries="1" />

</beans>  

3.点对点

(1) Queue发送端代码

package me.lingfeng.activemq.test;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 *
 * 点对点发送端
 *
 * @author Administrator
 *
 */
public class QueueSenderTest {
	public static void sendWithAuto() {
		ActiveMQConnectionFactory factory = null;
		Connection conn = null;
		Destination destination = null;
		Session session = null;
		MessageProducer producer = null;
		try {

			ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/applicationContext-activemq-simple.xml");
			// 获取队列
			destination = (Destination) context.getBean("destinationQueue");
			// 获取连接工厂
			factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory");
			conn = factory.createConnection();
			// 获取操作连接 ,true为事物型消息 false为简单消息
			session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
			producer = session.createProducer(destination);
			// 发送的消息
			Message message = session.createTextMessage("Hello JMS Queue!");
			producer.send(message);

			session.commit();
		} catch (Exception e) {
			try {
				session.rollback();
			} catch (JMSException e1) {
				e1.printStackTrace();
			}
			e.printStackTrace();
		} finally {
			try {
				producer.close();
				producer = null;

				session.close();
				session = null;

				conn.stop();

				conn.close();
			} catch (Exception e) {
				e.printStackTrace();
			}

		}

	}

	public static void main(String[] args) {
		sendWithAuto();
	}
}

通过监控界面查看队列里的消息 Alt text

(2) Queue接收端代码

package me.lingfeng.activemq.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 *
 * 点对点接收监听
 *
 * @author Administrator
 *
 */
public class QueueConsumerListenerTest extends Thread implements MessageListener {
	private Destination destination = null;
	private Session session = null;

	public void run() {
		receive();
	}

	public void receive() {
		ConnectionFactory factory = null;
		Connection conn = null;
		try {
			final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/applicationContext-activemq-simple.xml");
			factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory");
			conn = factory.createConnection();
			conn.start();
			session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
			destination = (Destination) context.getBean("destinationQueue");
			MessageConsumer consumer = session.createConsumer(destination);
			consumer.setMessageListener(this);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public void onMessage(Message message) {

		try {
			// 接收到的消息
			TextMessage tm = (TextMessage) message;
			System.out.println("QueueConsumerListenerTest Receive Message: " + tm.getText());

			session.commit();
		} catch (Exception e) {
			try {
				session.rollback();
			} catch (JMSException e1) {
				e1.printStackTrace();
			}
			e.printStackTrace();
		}

	}

	public static void main(String[] args) {
		QueueConsumerListenerTest tranConsumer = new QueueConsumerListenerTest();
		tranConsumer.start();
	}
}

通过监控界面查看接收者信息 Alt text

4.发布/订阅

(1) Topic发送端代码

package me.lingfeng.activemq.test;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 *
 * 发布/订阅发送端
 *
 * @author Administrator
 *
 */
public class TopicSenderTest {
	public static void sendWithAuto() {
		ActiveMQConnectionFactory factory = null;
		Connection conn = null;
		Destination destination = null;
		Session session = null;
		MessageProducer producer = null;
		try {

			ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/applicationContext-activemq-simple.xml");
			// 获取队列
			destination = (Destination) context.getBean("destinationTopic");
			// 获取连接工厂
			factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory");
			conn = factory.createConnection();
			// 获取操作连接 ,true为事物型消息 false为简单消息
			session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
			producer = session.createProducer(destination);
			// 发送的消息
			Message message = session.createTextMessage("Hello JMS Topic!");
			producer.send(message);

			session.commit();
		} catch (Exception e) {
			try {
				session.rollback();
			} catch (JMSException e1) {
				e1.printStackTrace();
			}
			e.printStackTrace();
		} finally {
			try {
				producer.close();
				producer = null;

				session.close();
				session = null;

				conn.stop();

				conn.close();
			} catch (Exception e) {
				e.printStackTrace();
			}

		}

	}

	public static void main(String[] args) {
		sendWithAuto();
	}
}

(2) Topic接收端代码

package me.lingfeng.activemq.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 *
 * 发布/订阅接收监听
 *
 * @author Administrator
 *
 */
public class TopicConsumerListenerTest extends Thread implements MessageListener {
	private Destination destination = null;
	private Session session = null;

	public void run() {
		receive();
	}

	public void receive() {
		ConnectionFactory factory = null;
		Connection conn = null;
		try {
			final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/applicationContext-activemq-simple.xml");
			factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory");
			conn = factory.createConnection();
			conn.start();
			session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
			destination = (Destination) context.getBean("destinationTopic");
			MessageConsumer consumer = session.createConsumer(destination);
			consumer.setMessageListener(this);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public void onMessage(Message message) {

		try {
			// 接收到的消息
			TextMessage tm = (TextMessage) message;
			System.out.println("TopicConsumerListenerTest Receive Message: " + tm.getText());

			session.commit();
		} catch (Exception e) {
			try {
				session.rollback();
			} catch (JMSException e1) {
				e1.printStackTrace();
			}
			e.printStackTrace();
		}

	}

	public static void main(String[] args) {
		TopicConsumerListenerTest tranConsumer = new TopicConsumerListenerTest();
		tranConsumer.start();
	}
}

转载请注明出处:
文章地址:ActiveMQ 简单实例
文章作者:凌风
原始连接:https://lingfeng.me/blog/activemq/activemq-java/
许可协议:转载请注明原文链接及作者。

系列博文:ActiveMQ系列文章

HomeBrew 安装及常用命令

HomeBrew 是 Mac OSX 上的软件包管理工具,能在 Mac 中方便的安装软件或者卸载软件, 使用命令,非常方便。 Continue reading