编码实现MQ连接池实现JMS消息发送连接管理
编程技术  /  houtizong 发布于 3年前   177
package com.mq.service;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.pool.PooledConnectionFactory;/** * 链接工厂管理类 * 自己工厂定义成了单例模式,连接池是静态块进行初始化,具体实现自己看着办 */public class MQPooledConnectionFactory {private static ActiveMQConnectionFactory connectionFactory;/** * 获得自己创建的链接工厂,这个工厂只初始化一次 */public static ActiveMQConnectionFactory getMyActiveMQConnectionFactory() {if (null == connectionFactory) {connectionFactory = new ActiveMQConnectionFactory("system","manage", "tcp://127.0.0.1:61616");}return connectionFactory;}private static PooledConnectionFactory pooledConnectionFactory;static {try {// 需要创建一个链接工厂然后设置到连接池中ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();activeMQConnectionFactory.setUserName("system");activeMQConnectionFactory.setPassword("manage");activeMQConnectionFactory.setBrokerURL("tcp://127.0.0.1:61616");// 如果将消息工厂作为属性设置则会有类型不匹配的错误,虽然Spring配置文件中是这么配置的,这里必须在初始化的时候设置进去pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);// 链接最大活跃数,为了在测试中区别我们使用的到底是不是一个对象和看是否能控制连接数(实际上是会话数),我们在这里设置为1int maximumActive = 1;pooledConnectionFactory.setMaximumActive(maximumActive);} catch (Exception e) {e.printStackTrace();}}/** * 获得链接池工厂 */public static PooledConnectionFactory getPooledConnectionFactory() {return pooledConnectionFactory;}/** * 对象回收销毁时停止链接 */@Overrideprotected void finalize() throws Throwable {pooledConnectionFactory.stop();super.finalize();}}这个类会提供连接池工厂和MQ链接工厂的创建和获得方式。特别要注意到的是,在初始化连接池时要设置一个链接工厂进去,而且只能在初始化时作为参数传递进去,如果你看过spring的配置你会以为要做为属性传递,而且他确实有这个属性。
package com.mq.service;import java.util.Map;import java.util.Set;import javax.jms.Connection;import javax.jms.MapMessage;import javax.jms.MessageProducer;import javax.jms.Queue;import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory;/** * 数据发送类,用于发送数据 * 如果获得链接,请查看被注释的代码 */public class MQ_Service {// 发送消息@SuppressWarnings("static-access")public void send(Map<String, String> map) throws Exception {ActiveMQConnectionFactory connectionFactory = null;Connection connection = null;Session session = null;Queue queue = null;MessageProducer producer = null;MapMessage messagep = null;// ====================================try {// 获得我们自己初始化的链接工厂然后创建链接connectionFactory = MQPooledConnectionFactory.getMyActiveMQConnectionFactory();connection = connectionFactory.createConnection();// 链接直接从链接池工厂进行获得//connection = MQPooledConnectionFactory.getPooledConnectionFactory().createConnection();session = connection.createSession(false, session.AUTO_ACKNOWLEDGE);queue = session.createQueue("CUI_JMS");producer = session.createProducer(queue);// 链接开始,如果我们使用的是连接池,那么即使你不开始,也是没有问题的connection.start();} catch (Exception e) {e.printStackTrace();}// ====================================messagep = session.createMapMessage();Set<String> keySet = map.keySet();for (String key : keySet) {String value = map.get(key);messagep.setBytes(key, value.getBytes("UTF-8"));System.out.println(key + "-->" + value);}producer.send(messagep);messagep.clearBody();messagep.clearProperties();// ===================================// 通过打印会话的内存地址和链接的客户端编号就可以知道我们使用的是不是同一个会话和链接System.out.println(session.toString());System.out.println(connection.getClientID());// 无论使用的自己的工厂还是连接池的,都要将会话关闭// 如果不关闭,在使用连接池的时可以看到效果,发送两次时只能发送一次,造成堵塞session.close();// 使用自己的工厂和连接池的区别是,运行后自己工厂链接调用关闭程序结束// 而调用连接池链接进行关闭实际上没有关闭,因为连接池要维护这个链接connection.close();messagep = null;}private MQ_Service() {}// 发送对象每次创建一个,用以区别我们使用的对象public static MQ_Service getInstance() {return new MQ_Service();}// 对外开发发送消息方法@SuppressWarnings("unchecked")public synchronized void sendMessage(Map map)throws Exception {this.send(map);}}取消注释内容,并注释掉获得我们自己初始化的链接工厂然后创建链接下面的两行代码,我们的链接就是从连接池获得了。
package com.mq.service;import java.util.HashMap;import java.util.Map;/** * 调用发送 */public class MQ_Sender {public static void main(String[] args) throws Exception {// 循环调用,这里定义调用两次for (int i = 0; i < 2; i++) {MQ_Service sender = MQ_Service.getInstance();Map<String, String> map = new HashMap<String, String>();map.put("MESS_NUM", "112110119");map.put("MESS_DEPT", "本部");sender.sendMessage(map);System.out.println("数据已经发送完毕!");}}}通过上面的例子,你就可以在不使用配置的情况下,在代码中也使用MQ连接池。并且更明了的俩接MQ连接池和数据库连接池的不同之处。
请您到ITEYE看我的原创:http://cuisuqiang.iteye.com
或支持我的个人博客,地址:http://www.javacui.com
请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!
技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成
网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];
文章归档
文章标签
友情链接