创建消息生产者
/** * 点对点消息生产者 * * @author Edward * */public class P2pProducer { public static void main(String[] args) { ConnectionFactory connectionFactory = null; Connection conn = null; Session session = null; Queue queue = null; MessageProducer messageProducer = null; try { // 创建工厂 // ActiveMQConnection.DEFAULT_USER 默认null // ActiveMQConnection.DEFAULT_PASSWORD 默认null // ActiveMQConnection.DEFAULT_BROKER_URL // 默认failover://tcp://localhost:61616 connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); // 创建连接 conn = connectionFactory.createConnection(); // 启动连接 conn.start(); // 创建会话 createSession(true, Session.AUTO_ACKNOWLEDGE); true 表示开启事务 // Session.AUTO_ACKNOWLEDGE 消息模式 session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建队列 queue = session.createQueue("P2pQueue"); // 创建消息生产者 messageProducer = session.createProducer(queue); // 创建消息 TextMessage message = session.createTextMessage(); message.setText("我是P2pProducer生产的消息"); // 发送消息 messageProducer.send(message); // 提交事务 session.commit(); System.out.println("OK"); } catch (JMSException e) { e.printStackTrace(); } finally { try { session.close(); conn.close(); } catch (JMSException e) { e.printStackTrace(); } } }}
运行成功,查看控制台:
创建消息消费者
/** * 点对点消息消费者 * * @author Edward * */public class P2pConsumer { public static void main(String[] args) { ConnectionFactory connectionFactory = null; Connection conn = null; Session session = null; Queue queue = null; MessageConsumer messageConsumer = null; try { // 创建工厂 // ActiveMQConnection.DEFAULT_USER 默认null // ActiveMQConnection.DEFAULT_PASSWORD 默认null // ActiveMQConnection.DEFAULT_BROKER_URL // 默认failover://tcp://localhost:61616 connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); // 创建连接 conn = connectionFactory.createConnection(); // 启动连接 conn.start(); // 创建会话 createSession(true, Session.AUTO_ACKNOWLEDGE); false 表示不开启事务 // Session.AUTO_ACKNOWLEDGE 消息模式 session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列 queue = session.createQueue("P2pQueue"); // 创建消息消费者 messageConsumer = session.createConsumer(queue); // 注册消费消息监听 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println("我收到的消息:" + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } }}
执行结果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.我收到的消息:我是P2pProducer生产的消息