本篇内容介绍了“如何解决Java Socket通信技术收发线程互斥的问题”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
Java Socket通信技术在很长的时间里都在使用,在不少的程序员眼中都有很多高的评价。那么下面我们就看看如何才能掌握这门复杂的编程语言,希望大家在今后的Java Socket通信技术使用中有所收获。
下面就是Java Socket通信技术在解决收发线程互斥的代码介绍。
package com.bill99.svr; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; /** *<p>title: socket通信包装类</p> *<p>Description: </p> *<p>CopyRight: CopyRight (c) 2009</p> *<p>Company: 99bill.com</p> *<p>Create date: 2009-10-14</P> *author sunnylocus<A href="mailto:sunnylocus@163.com"> </A> * v0.10 2009-10-14 初类 * v0.11 2009-11-12 对命令收发逻辑及收发线程互斥机制进行了优化,处理命令速度由原来8~16个/秒提高到25~32个/秒 */ public class SocketConnection { private volatile Socket socket; private int timeout = 1000*10; //超时时间,初始值10秒 private boolean isLaunchHeartcheck = false;//是否已启动心跳检测 private boolean isNetworkConnect = false; //网络是否已连接 private static String host = ""; private static int port; static InputStream inStream = null; static OutputStream outStream = null; private static Logger log =Logger.getLogger(SocketConnection.class); private static SocketConnection socketConnection = null; private static java.util.Timer heartTimer=null; //private final Map<String, Object> recMsgMap= Collections.synchronizedMap(new HashMap<String, Object>()); private final ConcurrentHashMap<String, Object> recMsgMap = new ConcurrentHashMap<String, Object>(); private static Thread receiveThread = null; private final ReentrantLock lock = new ReentrantLock(); private SocketConnection(){ Properties conf = new Properties(); try { conf.load(SocketConnection.class.getResourceAsStream("test.conf")); this.timeout = Integer.valueOf(conf.getProperty("timeout")); init(conf.getProperty("ip"),Integer.valueOf(conf.getProperty("port"))); } catch(IOException e) { log.fatal("socket初始化异常!",e); throw new RuntimeException("socket初始化异常,请检查配置参数"); } } /** * 单态模式 */ public static SocketConnection getInstance() { if(socketConnection==null) { synchronized(SocketConnection.class) { if(socketConnection==null) { socketConnection = new SocketConnection(); return socketConnection; } } } return socketConnection; } private void init(String host,int port) throws IOException { InetSocketAddress addr = new InetSocketAddress(host,port); socket = new Socket(); synchronized (this) { log.info("【准备与"+addr+"建立连接】"); socket.connect(addr, timeout); log.info("【与"+addr+"连接已建立】"); inStream = socket.getInputStream(); outStream = socket.getOutputStream(); socket.setTcpNoDelay(true);//数据不作缓冲,立即发送 socket.setSoLinger(true, 0);//socket关闭时,立即释放资源 socket.setKeepAlive(true); socket.setTrafficClass(0x04|0x10);//高可靠性和最小延迟传输 isNetworkConnect=true; receiveThread = new Thread(new ReceiveWorker()); receiveThread.start(); SocketConnection.host=host; SocketConnection.port=port; if(!isLaunchHeartcheck) launchHeartcheck(); } } /** * 心跳包检测 */ private void launchHeartcheck() { if(socket == null) throw new IllegalStateException("socket is not established!"); heartTimer = new Timer(); isLaunchHeartcheck = true; heartTimer.schedule(new TimerTask() { public void run() { String msgStreamNo = StreamNoGenerator.getStreamNo("kq"); int mstType =9999;//999-心跳包请求 SimpleDateFormat dateformate = new SimpleDateFormat("yyyyMMddHHmmss"); String msgDateTime = dateformate.format(new Date()); int msgLength =38;//消息头长度 String commandstr = "00" +msgLength + mstType + msgStreamNo; log.info("心跳检测包 -> IVR "+commandstr); int reconnCounter = 1; while(true) { String responseMsg =null; try { responseMsg = readReqMsg(commandstr); } catch (IOException e) { log.error("IO流异常",e); reconnCounter ++; } if(responseMsg!=null) { log.info("心跳响应包 <- IVR "+responseMsg); reconnCounter = 1; break; } else { reconnCounter ++; } if(reconnCounter >3) {//重连次数已达三次,判定网络连接中断,重新建立连接。连接未被建立时不释放锁 reConnectToCTCC(); break; } } } },1000 * 60*1,1000*60*2); } /** * 重连与目标IP建立重连 */ private void reConnectToCTCC() { new Thread(new Runnable(){ public void run(){ log.info("重新建立与"+host+":"+port+"的连接"); //清理工作,中断计时器,中断接收线程,恢复初始变量 heartTimer.cancel(); isLaunchHeartcheck=false; isNetworkConnect = false; receiveThread.interrupt(); try { socket.close(); } catch (IOException e1) {log.error("重连时,关闭socket连接发生IO流异常",e1);} //---------------- synchronized(this){ for(; ;){ try { Thread.currentThread(); Thread.sleep(1000 * 1); init(host,port); this.notifyAll(); break ; } catch (IOException e) { log.error("重新建立连接未成功",e); } catch (InterruptedException e){ log.error("重连线程中断",e); } } } } }).start(); } /** * 发送命令并接受响应 * @param requestMsg * @return * @throws SocketTimeoutException * @throws IOException */ public String readReqMsg(String requestMsg) throws IOException { if(requestMsg ==null) { return null; } if(!isNetworkConnect) { synchronized(this){ try { this.wait(1000*5); //等待5秒,如果网络还没有恢复,抛出IO流异常 if(!isNetworkConnect) { throw new IOException("网络连接中断!"); } } catch (InterruptedException e) { log.error("发送线程中断",e); } } } String msgNo = requestMsg.substring(8, 8 + 24);//读取流水号 outStream = socket.getOutputStream(); outStream.write(requestMsg.getBytes()); outStream.flush(); Condition msglock = lock.newCondition(); //消息锁 //注册等待接收消息 recMsgMap.put(msgNo, msglock); try { lock.lock(); msglock.await(timeout,TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("发送线程中断",e); } finally { lock.unlock(); } Object respMsg = recMsgMap.remove(msgNo); //响应信息 if(respMsg!=null &&(respMsg != msglock)) { //已经接收到消息,注销等待,成功返回消息 return (String) respMsg; } else { log.error(msgNo+" 超时,未收到响应消息"); throw new SocketTimeoutException(msgNo+" 超时,未收到响应消息"); } } public void finalize() { if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } //消息接收线程 private class ReceiveWorker implements Runnable { String intStr= null; public void run() { while(!Thread.interrupted()){ try { byte[] headBytes = new byte[4]; if(inStream.read(headBytes)==-1){ log.warn("读到流未尾,对方已关闭流!"); reConnectToCTCC();//读到流未尾,对方已关闭流 return; } byte[] tmp =new byte[4]; tmp = headBytes; String tempStr = new String(tmp).trim(); if(tempStr==null || tempStr.equals("")) { log.error("received message is null"); continue; } intStr = new String(tmp); int totalLength =Integer.parseInt(intStr); //---------------- byte[] msgBytes = new byte[totalLength-4]; inStream.read(msgBytes); String resultMsg = new String(headBytes)+ new String(msgBytes); //抽出消息ID String msgNo = resultMsg.substring(8, 8 + 24); Condition msglock =(Condition) recMsgMap.get(msgNo); if(msglock ==null) { log.warn(msgNo+"序号可能已被注销!响应消息丢弃"); recMsgMap.remove(msgNo); continue; } recMsgMap.put(msgNo, resultMsg); try{ lock.lock(); msglock.signalAll(); }finally { lock.unlock(); } }catch(SocketException e){ log.error("服务端关闭socket",e); reConnectToCTCC(); } catch(IOException e) { log.error("接收线程读取响应数据时发生IO流异常",e); } catch(NumberFormatException e){ log.error("收到没良心包,String转int异常,异常字符:"+intStr); } } } } }
“如何解决Java Socket通信技术收发线程互斥的问题”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。