一个Socket连接管理器
编程技术  /  houtizong 发布于 3年前   193
为了方便开发人员使用,我在闲暇之余写了一个简单的Socket连接管理的处理程序,程序简单仅供参考!
其他更多精彩,请你访问我的博客:http://cuisuqiang.iteye.com/
处理机说明:
读取配置文件,根据配置文件信息保持一个连接配置变量集合。
根据连接变量集合初始化相应的连接。
启动一个线程,处理检查连接的有效性,处理方法是发送心跳包,如发现不能使用的连接,则设置该连接为占用,并启动一个线程去不断的初始化该连接。
用户使用接口根据名称去获得连接,如果是保持的连接则直接从连接集合获取,并检查有效性。如果是不用保持的,则去创建一个连接返回。
备注:如果返回了空的信息,则说明没有正常的连接可以返回,你可以尝试不断的获取。
下面来看看代码:
首先创建一个实体对象,对应配置信息
package com.socket;/** * @说明 连接的公共属性 * @author cuisuqiang * @version 1.0 * @since */public class SocketEntity {/** * 连接的名字,以名字作为Key */private String name;/** * 连接的IP */private String ip;/** * 连接的端口 */private int port;/** * 是否保持连接 */private boolean keepConn;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public int getPort() {return port;}public void setPort(int port) {this.port = port;}public boolean isKeepConn() {return keepConn;}public void setKeepConn(boolean keepConn) {this.keepConn = keepConn;}}
创建配置文件socket.properties
socket1_isKeep属性相当于Spring中的单例还是每次创建
# @author cuisuqiangsocket1=socket1socket1_ip=127.0.0.1socket1_port=8001socket1_isKeep=1socket2=socket2socket2_ip=127.0.0.1socket2_port=8001socket2_isKeep=0# 连接的数量,一定要和实际配置的数量匹配socketConnCount=2# 公共的检测间隔 秒commonCheckTime=5
然后初始化配置信息和连接,同时该类中有静态变量来保持连接对象
package com.socket;import java.util.ArrayList;import java.util.LinkedHashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;/** * @说明 变量保持 * @author cuisuqiang * @version 1.0 * @since */public class SocketKeep {private static Log logger = LogFactory.getLog(SocketKeep.class);/** * 配置信息加载 */public static List<SocketEntity> socketEntityList = new ArrayList<SocketEntity>();/** * 连接对象保持,只保持需要系统保持的连接 */public static Map<String, SocketCui> socketMap = new LinkedHashMap<String, SocketCui>();/** * 连接对象是否锁定 1:锁定,其他未锁定 */public static Map<String, String> socketIsLock = new LinkedHashMap<String, String>();/** * 共用连接检测间隔 */public static int commonCheckTime = 2;/** * 连接的数量,一定要和实际配置的数量匹配 */public static int socketConnCount = 0;public static ExecutorService executorService = null;// 线程池/** * 初始化所有连接信息 */public static void initSocketKeep() {Properties properties = null;try {properties = new Properties();properties.load(SocketKeep.class.getClassLoader().getResourceAsStream("socket.properties"));logger.warn("加载socket.properties文件成功!");} catch (Exception e) {logger.error("加载socket.properties文件失败!", e);properties = null;}if (null != properties) {try {commonCheckTime = Integer.parseInt(properties.getProperty("commonCheckTime"));socketConnCount = Integer.parseInt(properties.getProperty("socketConnCount"));executorService = Executors.newFixedThreadPool(socketConnCount + 1);} catch (Exception e) {executorService = Executors.newFixedThreadPool(1);logger.error("解析共用信息时错误!", e);// 系统忽略这两个属性的加载异常}SocketEntity socketEntity = null;for (int i = 1; i <= socketConnCount; i++) {String name = properties.getProperty("socket" + i);if(null != name){socketEntity = new SocketEntity();String ip = properties.getProperty("socket" + i + "_ip");String port = properties.getProperty("socket" + i + "_port");String isKeep = properties.getProperty("socket" + i + "_isKeep");socketEntity.setName(name);socketEntity.setIp(ip);socketEntity.setPort(Integer.parseInt(port));boolean keepConn = false;if(null != isKeep && "1".equals(isKeep)){keepConn = true;}socketEntity.setKeepConn(keepConn);socketEntityList.add(socketEntity);}}}logger.warn("加载Socket连接配置信息结束!");logger.warn("开始初始化Socket连接!");SocketCui socket = null;for(SocketEntity socketEntity : socketEntityList){if(null != socketEntity && socketEntity.isKeepConn()){try {socket = new SocketCui(socketEntity.getIp(),socketEntity.getPort());socket.setSoTimeout(0);socket.setKeepAlive(true);socket.setName(socketEntity.getName());} catch (Exception e) {logger.error("初始化某个连接时错误!错误的连接将放弃!资源名称:" + socketEntity.getName(), e);socket = null;}if(null != socket){socketMap.put(socketEntity.getName(), socket);}else{socketMap.put(socketEntity.getName(), new SocketCui());}socketIsLock.put(socketEntity.getName(), "0");}}// 开始执行检查executorService.execute(new CheckThread());logger.warn("初始化Socket连接结束!");}}
启动的线程是用于检查连接的
package com.socket;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;/** * @说明 轮询检测某个连接当前是否可用 * @author cuisuqiang * @version 1.0 * @since 当遇到一个错误的连接,将会启动重连,同时挂起该连接的使用 */public class CheckThread implements Runnable {private static Log logger = LogFactory.getLog(CheckThread.class);public void run() {while(true){SocketCui socket = null;for(SocketEntity socketEntity : SocketKeep.socketEntityList){if(null != socketEntity && socketEntity.isKeepConn()){String isLock = SocketKeep.socketIsLock.get(socketEntity.getName());// 如果当前未被使用if(!"1".equals(isLock)){// 锁定引用SocketKeep.socketIsLock.put(socketEntity.getName(), "1");socket = SocketKeep.socketMap.get(socketEntity.getName());try {// 发送一个心跳包socket.sendUrgentData(0xFF);// 释放资源SocketKeep.socketIsLock.put(socketEntity.getName(), "0");} catch (Exception e) {logger.error("检查连接时异常!启动重连!资源名称:" + socketEntity.getName(), e);// 如果异常,应该建立一个线程去初始化该连接InitSocket initS = new InitSocket(socketEntity.getName());new Thread(initS).start();}}}}// 执行间隔try {logger.error("本次检测结束!");Thread.sleep(SocketKeep.commonCheckTime * 1000);} catch (Exception e) {}}}}
当检查线程发现无效的连接时会启动新的线程初始化该连接
package com.socket;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;/** * @说明 负责初始化失效的连接 * @author cuisuqiang * @version 1.0 * @since */public class InitSocket implements Runnable{private static Log logger = LogFactory.getLog(InitSocket.class);/** * 是否有某个连接的配置信息,只有有配置信息才能建立连接 */private static boolean isHave = false;private SocketEntity socketEntity = null;private String name;public InitSocket(String name){this.name = name;// 检测是否有某个连接的配置信息for(SocketEntity socketEntity : SocketKeep.socketEntityList){if(null != socketEntity && socketEntity.isKeepConn()){if(socketEntity.getName().equals(name)){this.setSocketEntity(socketEntity);isHave = true;}}}}public void run() {boolean isError = true;SocketCui socket = null;if(isHave){while(isError){try {socket = new SocketCui(this.getSocketEntity().getIp(),this.getSocketEntity().getPort());socket.setSoTimeout(0);socket.setKeepAlive(true);socket.setName(this.name);// 发送一个心跳包socket.sendUrgentData(0xFF);} catch (Exception e) {logger.error("建立资源连接时错误!资源:" + this.name, e);socket = null;}if(null != socket){SocketKeep.socketMap.put(this.getSocketEntity().getName(), socket);// 设置连接当前可用SocketKeep.socketIsLock.put(this.getSocketEntity().getName(), "0");logger.warn("建立资源连接成功!资源名称:" + this.name);isError = false;}try {Thread.sleep(2 * 1000);} catch (Exception e) {}}}else{logger.error("没有发现指定资源的配置信息!资源名称:" + this.name);}logger.warn("初始化资源执行结束!资源名称:" + this.name);}public SocketEntity getSocketEntity() {return socketEntity;}public void setSocketEntity(SocketEntity socketEntity) {this.socketEntity = socketEntity;}}
同时注意,用户在使用连接后会调用关闭方法。我们是不能让连接关闭的,要保持常连接。所以如果用户指定的是保持这个连接,那么返回的连接对象就不是原来的Socket对象了,我们要重写这个对象
package com.socket;import java.io.IOException;import java.net.Socket;import java.net.UnknownHostException;/** * @说明 被重新定义的连接对象,增加了名字这个属性,重写了关闭的方法 * @author cuisuqiang * @version 1.0 * @since */public class SocketCui extends Socket{/** * 为对象增加名称属性 */private String name;public SocketCui() {}public SocketCui(String ip,int port) throws UnknownHostException, IOException{super(ip, port);}/** * 覆盖关闭的方法 */@Overridepublic synchronized void close() throws IOException {SocketKeep.socketIsLock.put(this.name, "0");}public String getName() {return name;}public void setName(String name) {this.name = name;}}
这样,关闭时只会解除其占用,而不会真正关闭该连接。
我们来写一个服务端,这个服务端一直接受连接,并检查连接的有效性,当失效时不处理
同时打印接收到的连接信息
package com.test;import java.net.*;import java.text.SimpleDateFormat;import java.util.Date;/** * @说明 服务端,始终接受连接 * @author cuisuqiang * @version 1.0 * @since */public class ServiceTest {public static void main(String[] args) {try {ServerSocket ss1 = new ServerSocket(8001);Runnable accumelatora1 = new Accumulatort(ss1);Thread threada = new Thread(accumelatora1, "ThreadA");threada.start();System.out.println("服务启动完毕!");} catch (Exception e) {e.printStackTrace();}}}class Accumulatort implements Runnable {ServerSocket ss = null;public Accumulatort(ServerSocket s) {this.ss = s;}@SuppressWarnings("unchecked")public void run() {try {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");while (true) {Socket s = ss.accept();System.out.println(format.format(new Date()) + " " + "---------收到请求!");new Thread(new ServiceImpl(s)).start();}} catch (Exception e) {e.printStackTrace();}}}
服务端处理连接的实现类
package com.test;import java.net.Socket;/** * @说明 循环发送心跳包保持连接属性 * @author cuisuqiang * @version 1.0 * @since */public class ServiceImpl implements Runnable {Socket socket = null;public ServiceImpl(Socket s) {this.socket = s;}public void run() {boolean isKeep = true;try {while (isKeep) {socket.sendUrgentData(0xFF);Thread.sleep(1 * 1000);}} catch (Exception e) {isKeep = false;}}}
先启动服务端,然后我们再写一个测试端,这个测试类会不断去管理器中获取相应的连接,同时打印连接信息
通过打印的连接信息,我们可以知道获取的是不是同一个对象
同时如果你一直获取单例的对象,那么可能出现该连接被检查线程占用的情况
package com.test;import java.net.Socket;import java.text.SimpleDateFormat;import java.util.Date;import com.socket.CommonSocket;import com.socket.SocketKeep;/** * @说明 循环去请求获得相应的连接然后打印连接地址 * @author cuisuqiang * @version 1.0 * @since */public class GetSocketTest {public static void main(String[] args) {SocketKeep.initSocketKeep();while(true){SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {Socket socket1 = CommonSocket.getSocketByName("socket1");if(null != socket1){System.out.println(format.format(new Date()) + " " + socket1.toString());socket1.close();}} catch (Exception e) {e.printStackTrace();}try {Socket socket2 = CommonSocket.getSocketByName("socket2");if(null != socket2){System.out.println(format.format(new Date()) + " " + socket2.toString());socket2.close();}} catch (Exception e) {e.printStackTrace();}try {Thread.sleep(1000);} catch (Exception e) {}}}}
当然别忘了加日志的包!
我们来看一下服务端的打印信息
服务启动完毕!2012-04-18 16:59:48 ---------收到请求!2012-04-18 16:59:48 ---------收到请求!2012-04-18 16:59:49 ---------收到请求!2012-04-18 16:59:50 ---------收到请求!2012-04-18 16:59:51 ---------收到请求!2012-04-18 16:59:52 ---------收到请求!2012-04-18 16:59:53 ---------收到请求!2012-04-18 16:59:54 ---------收到请求!2012-04-18 16:59:55 ---------收到请求!2012-04-18 16:59:56 ---------收到请求!2012-04-18 16:59:57 ---------收到请求!2012-04-18 16:59:58 ---------收到请求!2012-04-18 16:59:59 ---------收到请求!
我们可以看到,除刚开始外,会一直收到连接,因为我们一直在获取一个非单例的连接对象
我们再看一下客户端打印信息
2012-04-18 16:59:48 Socket[addr=/127.0.0.1,port=8001,localport=4389]2012-04-18 16:59:48 Socket[addr=/127.0.0.1,port=8001,localport=4390]2012-04-18 16:59:49 Socket[addr=/127.0.0.1,port=8001,localport=4389]2012-04-18 16:59:49 Socket[addr=/127.0.0.1,port=8001,localport=4391]2012-04-18 16:59:50 Socket[addr=/127.0.0.1,port=8001,localport=4389]2012-04-18 16:59:50 Socket[addr=/127.0.0.1,port=8001,localport=4392]2012-04-18 16:59:51 Socket[addr=/127.0.0.1,port=8001,localport=4389]2012-04-18 16:59:51 Socket[addr=/127.0.0.1,port=8001,localport=4393]2012-04-18 16:59:52 Socket[addr=/127.0.0.1,port=8001,localport=4389]2012-04-18 16:59:52 Socket[addr=/127.0.0.1,port=8001,localport=4394]2012-04-18 16:59:53 Socket[addr=/127.0.0.1,port=8001,localport=4389]2012-04-18 16:59:53 Socket[addr=/127.0.0.1,port=8001,localport=4395]2012-04-18 16:59:54 Socket[addr=/127.0.0.1,port=8001,localport=4389]2012-04-18 16:59:54 Socket[addr=/127.0.0.1,port=8001,localport=4396]2012-04-18 16:59:55 Socket[addr=/127.0.0.1,port=8001,localport=4389]2012-04-18 16:59:55 Socket[addr=/127.0.0.1,port=8001,localport=4397]2012-04-18 16:59:56 Socket[addr=/127.0.0.1,port=8001,localport=4389]2012-04-18 16:59:56 Socket[addr=/127.0.0.1,port=8001,localport=4398]2012-04-18 16:59:57 Socket[addr=/127.0.0.1,port=8001,localport=4389]2012-04-18 16:59:57 Socket[addr=/127.0.0.1,port=8001,localport=4399]2012-04-18 16:59:58 Socket[addr=/127.0.0.1,port=8001,localport=4400]2012-04-18 16:59:59 Socket[addr=/127.0.0.1,port=8001,localport=4389]2012-04-18 16:59:59 Socket[addr=/127.0.0.1,port=8001,localport=4401]
可以看到,socket1是一直被保持的,而socket2每次创建一个
再看一下日志
2012-04-18 16:59:48 [com.socket.SocketKeep]-[WARN] 加载socket.properties文件成功!2012-04-18 16:59:48 [com.socket.SocketKeep]-[WARN] 加载Socket连接配置信息结束!2012-04-18 16:59:48 [com.socket.SocketKeep]-[WARN] 开始初始化Socket连接!2012-04-18 16:59:48 [com.socket.SocketKeep]-[WARN] 初始化Socket连接结束!2012-04-18 16:59:48 [com.socket.CommonSocket]-[WARN] 为用户建立请求连接!资源名称:socket22012-04-18 16:59:48 [com.socket.CheckThread]-[ERROR] 本次检测结束!2012-04-18 16:59:49 [com.socket.CommonSocket]-[WARN] 为用户建立请求连接!资源名称:socket22012-04-18 16:59:50 [com.socket.CommonSocket]-[WARN] 为用户建立请求连接!资源名称:socket22012-04-18 16:59:51 [com.socket.CommonSocket]-[WARN] 为用户建立请求连接!资源名称:socket22012-04-18 16:59:52 [com.socket.CommonSocket]-[WARN] 为用户建立请求连接!资源名称:socket22012-04-18 16:59:53 [com.socket.CheckThread]-[ERROR] 本次检测结束!2012-04-18 16:59:53 [com.socket.CommonSocket]-[WARN] 为用户建立请求连接!资源名称:socket22012-04-18 16:59:54 [com.socket.CommonSocket]-[WARN] 为用户建立请求连接!资源名称:socket22012-04-18 16:59:55 [com.socket.CommonSocket]-[WARN] 为用户建立请求连接!资源名称:socket22012-04-18 16:59:56 [com.socket.CommonSocket]-[WARN] 为用户建立请求连接!资源名称:socket22012-04-18 16:59:57 [com.socket.CommonSocket]-[WARN] 为用户建立请求连接!资源名称:socket22012-04-18 16:59:58 [com.socket.CommonSocket]-[WARN] 当前连接正被占用,请稍候尝试!资源名称:socket12012-04-18 16:59:58 [com.socket.CheckThread]-[ERROR] 本次检测结束!2012-04-18 16:59:58 [com.socket.CommonSocket]-[WARN] 为用户建立请求连接!资源名称:socket22012-04-18 16:59:59 [com.socket.CommonSocket]-[WARN] 为用户建立请求连接!资源名称:socket2
可以看到,socket2会一直被重新创建,而且socket1会发现被占用的情况。
你也可以中断服务端然后再重启服务端试试,看看效果。
源码下面可以下载,仅供参考!转载说明出处,谢谢合作!
请您到ITEYE看我的原创:http://cuisuqiang.iteye.com
或支持我的个人博客,地址:http://www.javacui.com
请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!
技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成
网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];
文章归档
文章标签
友情链接