首先需要安装ApacheMQ,windows和centos安装方式
ActiveMQ默认启动到8161端口,启动完了后在浏览器地址栏输入: 要求输入用户名密码,默认用户名密码为admin、admin 这个用户名密码是在conf/users.properties中配置的 输入用户名密码后便可看到如下图的ActiveMQ控制台界面了
pom引入:
org.springframework.boot spring-boot-starter-parent 1.3.3.RELEASE org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-test org.springframework.boot spring-boot-starter-web org.apache.activemq activemq-core 5.7.0
**点对点(一对一的消息) 消息生产者(发送者)代码: **
public class apachemqproducer { private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; private static final String BROKERURL = "tcp://192.168.10.200:61616"; private static final String QUEUENAME = "muQueue"; public static void main(String[] args) throws JMSException { sendmsg(); } public static void sendmsg() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUENAME); MessageProducer producer = session.createProducer(queue); for (int i = 1; i < 6; i++) { TextMessage message = session.createTextMessage("This is my first apache mq MESSAGE--"+i); producer.send(message); } System.out.println("消息发送完毕"); session.close(); connection.close(); }}
** 消息消费者(接收者)代码:**
public class apachemqconsumer { private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; private static final String BROKERURL = "tcp://192.168.10.200:61616"; private static final String QUEUENAME = "muQueue"; public static void main(String[] args) throws JMSException { receivemsg(); } public static void receivemsg() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); Connection connection = factory.createConnection(); connection.start(); //下面注释掉的是自动验收消息 //Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUENAME); MessageConsumer consumer = session.createConsumer(queue); while (true) { TextMessage receive = (TextMessage) consumer.receive(); if (receive != null) { System.out.println(receive.getText()); //手动实现验收消息 receive.acknowledge(); }else { break; } } session.commit(); connection.close(); }}
下面实现订阅式(一对多)代码: 生产者代码:
public class TopicProducer { private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; private static final String BROKERURL = "tcp://192.168.10.200:61616"; private static final String TOPICNAME = "myTopic"; public static void main(String[] args) throws JMSException { sendmsg(); } public static void sendmsg() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TOPICNAME); MessageProducer producer = session.createProducer(topic); for (int i = 1; i < 6; i++) { TextMessage message = session.createTextMessage("This is my first apache mq Topic Message--"+i); producer.send(message); } System.out.println("话题发送完毕"); session.close(); connection.close(); }}
消费者代码:
public class TopicConsumer { private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; private static final String BROKERURL = "tcp://192.168.10.200:61616"; private static final String TOPICNAME = "myTopic"; public static void main(String[] args) throws JMSException { receivemsg(); } public static void receivemsg() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TOPICNAME); MessageConsumer consumer = session.createConsumer(topic); while (true) { TextMessage receive = (TextMessage) consumer.receive(); if (receive != null) { System.out.println(receive.getText()); }else { break; } } session.commit(); connection.close(); }}
这样就可以实现MQ的实例了。我们在操作过程中可以先启动生产者,不启动消费者的情况下进行消息的发送,发送完毕之后再启动消费者去消费信息
这样的话就基本实现了功能了!