这篇文章主要讲解了“基于Java NIO的即时聊天服务器模型怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“基于Java NIO的即时聊天服务器模型怎么实现”吧!
废话不多说,关于NIO的SelectionKey、Selector、Channel网上的介绍例子都很多,直接上代码:
JsonParser
Json的解析类,随便封装了下,使用的最近比较火的fastjson
public class JsonParser { private static JSONObject mJson; public synchronized static String get(String json,String key) { mJson = JSON.parseObject(json); return mJson.getString(key); } }
Main
入口,不解释
public class Main { public static void main(String... args) { new SeekServer().start(); } }
Log
public class Log { public static void i(Object obj) { System.out.println(obj); } public static void e(Object e) { System.err.println(e); } }
SeekServer:
服务器端的入口,请求的封装和接收都在此类,端口暂时写死在了代码里,mSelector.select(TIME_OUT) > 0 目的是为了当服务器空闲的时候(没有任何读写甚至请求断开事件),循环时有个间隔时间,不然基本上相当于while(true){//nothing}了,你懂的。
public class SeekServer extends Thread{ private final int ACCPET_PORT = 55555; private final int TIME_OUT = 1000; private Selector mSelector = null; private ServerSocketChannel mSocketChannel = null; private ServerSocket mServerSocket = null; private InetSocketAddress mAddress = null; public SeekServer() { long sign = System.currentTimeMillis(); try { mSocketChannel = ServerSocketChannel.open(); if(mSocketChannel == null) { System.out.println("can't open server socket channel"); } mServerSocket = mSocketChannel.socket(); mAddress = new InetSocketAddress(ACCPET_PORT); mServerSocket.bind(mAddress); Log.i("server bind port is " + ACCPET_PORT); mSelector = Selector.open(); mSocketChannel.configureBlocking(false); SelectionKey key = mSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT); key.attach(new Acceptor()); //检测Session状态 Looper.getInstance().loop(); //开始处理Session SessionProcessor.start(); Log.i("Seek server startup in " + (System.currentTimeMillis() - sign) + "ms!"); } catch (ClosedChannelException e) { Log.e(e.getMessage()); } catch (IOException e) { Log.e(e.getMessage()); } } public void run() { Log.i("server is listening..."); while(!Thread.interrupted()) { try { if(mSelector.select(TIME_OUT) > 0) { Set<SelectionKey> keys = mSelector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); SelectionKey key = null; while(iterator.hasNext()) { key = iterator.next(); Handler at = (Handler) key.attachment(); if(at != null) { at.exec(); } iterator.remove(); } } } catch (IOException e) { Log.e(e.getMessage()); } } } class Acceptor extends Handler{ public void exec(){ try { SocketChannel sc = mSocketChannel.accept(); new Session(sc, mSelector); } catch (ClosedChannelException e) { Log.e(e); } catch (IOException e) { Log.e(e); } } } }
Handler:
只有一个抽象方法exec,Session将会继承它。
public abstract class Handler { public abstract void exec(); }
Session:
封装了用户的请求和SelectionKey和SocketChannel,每次接收到新的请求时都重置它的最后活动时间,通过状态mState=READING or SENDING 去执行消息的接收与发送,当客户端异常断开时则从SessionManager清除该会话。
public class Session extends Handler{ private SocketChannel mChannel; private SelectionKey mKey; private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate(10240); private Charset charset = Charset.forName("UTF-8"); private CharsetDecoder mDecoder = charset.newDecoder(); private CharsetEncoder mEncoder = charset.newEncoder(); private long lastPant;//最后活动时间 private final int TIME_OUT = 1000 * 60 * 5; //Session超时时间 private String key; private String sendData = ""; private String receiveData = null; public static final int READING = 0,SENDING = 1; int mState = READING; public Session(SocketChannel socket, Selector selector) throws IOException { this.mChannel = socket; mChannel = socket; mChannel.configureBlocking(false); mKey = mChannel.register(selector, 0); mKey.attach(this); mKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); lastPant = Calendar.getInstance().getTimeInMillis(); } public String getReceiveData() { return receiveData; } public void clear() { receiveData = null; } public void setSendData(String sendData) { mState = SENDING; mKey.interestOps(SelectionKey.OP_WRITE); this.sendData = sendData + "\n"; } public boolean isKeekAlive() { return lastPant + TIME_OUT > Calendar.getInstance().getTimeInMillis(); } public void setAlive() { lastPant = Calendar.getInstance().getTimeInMillis(); } /** * 注销当前Session */ public void distroy() { try { mChannel.close(); mKey.cancel(); } catch (IOException e) {} } @Override public synchronized void exec() { try { if(mState == READING) { read(); }else if(mState == SENDING) { write(); } } catch (IOException e) { SessionManager.remove(key); try { mChannel.close(); } catch (IOException e1) { Log.e(e1); } mKey.cancel(); } } public void read() throws IOException{ mRreceiveBuffer.clear(); int sign = mChannel.read(mRreceiveBuffer); if(sign == -1) { //客户端连接关闭 mChannel.close(); mKey.cancel(); } if(sign > 0) { mRreceiveBuffer.flip(); receiveData = mDecoder.decode(mRreceiveBuffer).toString(); setAlive(); setSign(); SessionManager.addSession(key, this); } } private void setSign() { //设置当前Session的Key key = JsonParser.get(receiveData,"imei"); //检测消息类型是否为心跳包 // String type = jo.getString("type"); // if(type.equals("HEART_BEAT")) { // setAlive(); // } } /** * 写消息 */ public void write() { try { mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData))); sendData = null; mState = READING; mKey.interestOps(SelectionKey.OP_READ); } catch (CharacterCodingException e) { e.printStackTrace(); } catch (IOException e) { try { mChannel.close(); } catch (IOException e1) { Log.e(e1); } } } }
SessionManager:
将所有Session存放到ConcurrentHashMap,这里使用手机用户的imei做key,ConcurrentHashMap因为是线程安全的,所以能很大程度上避免自己去实现同步的过程,
封装了一些操作Session的方法例如get,remove等。
public class SessionManager { private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>(); public static void addSession(String key,Session session) { sessions.put(key, session); } public static Session getSession(String key) { return sessions.get(key); } public static Set<String> getSessionKeys() { return sessions.keySet(); } public static int getSessionCount() { return sessions.size(); } public static void remove(String[] keys) { for(String key:keys) { if(sessions.containsKey(key)) { sessions.get(key).distroy(); sessions.remove(key); } } } public static void remove(String key) { if(sessions.containsKey(key)) { sessions.get(key).distroy(); sessions.remove(key); } } }
SessionProcessor
里面使用了JDK自带的线程池,用来分发处理所有Session中当前需要处理的请求(线程池的初始化参数不是太熟,望有了解的童鞋能告诉我),内部类Process则是将Session再次封装成SocketRequest和SocketResponse(看到这里是不是有点熟悉的感觉,对没错,JavaWeb里到处都是request和response)。
public class SessionProcessor implements Runnable{ private static Runnable processor = new SessionProcessor(); private static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 200, 500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy()); public static void start() { new Thread(processor).start(); } @Override public void run() { while(true) { Session tmp = null; for(String key:SessionManager.getSessionKeys()) { tmp = SessionManager.getSession(key); //处理Session未处理的请求 if(tmp.getReceiveData() != null) { pool.execute(new Process(tmp)); } } try { Thread.sleep(10); } catch (InterruptedException e) { Log.e(e); } } } class Process implements Runnable { private SocketRequest request; private SocketResponse response; public Process(Session session) { //将Session封装成Request和Response request = new SocketRequest(session); response = new SocketResponse(session); } @Override public void run() { new RequestTransform().transfer(request, response); } } }
RequestTransform里的transfer方法利用反射对请求参数中的请求类别和请求动作来调用不同类的不同方法(UserHandler和MessageHandler)
public class RequestTransform { public void transfer(SocketRequest request,SocketResponse response) { String action = request.getValue("action"); String handlerName = request.getValue("handler"); //根据Session的请求类型,让不同的类方法去处理 try { Class<?> c= Class.forName("com.seek.server.handler." + handlerName); Class<?>[] arg=new Class[]{SocketRequest.class,SocketResponse.class}; Method method=c.getMethod(action,arg); method.invoke(c.newInstance(), new Object[]{request,response}); } catch (Exception e) { e.printStackTrace(); } } }
SocketRequest和SocketResponse
public class SocketRequest { private Session mSession; private String mReceive; public SocketRequest(Session session) { mSession = session; mReceive = session.getReceiveData(); mSession.clear(); } public String getValue(String key) { return JsonParser.get(mReceive, key); } public String getQueryString() { return mReceive; } }
public class SocketResponse { private Session mSession; public SocketResponse(Session session) { mSession = session; } public void write(String msg) { mSession.setSendData(msg); } }
最后则是两个处理请求的Handler
public class UserHandler { public void login(SocketRequest request,SocketResponse response) { System.out.println(request.getQueryString()); //TODO: 处理用户登录 response.write("你肯定收到消息了"); } }
public class MessageHandler { public void send(SocketRequest request,SocketResponse response) { System.out.println(request.getQueryString()); //消息发送 String key = request.getValue("imei"); Session session = SessionManager.getSession(key); new SocketResponse(session).write(request.getValue("sms")); } }
还有个监测是否超时的类Looper,定期去删除Session
public class Looper extends Thread{ private static Looper looper = new Looper(); private static boolean isStart = false; private final int INTERVAL = 1000 * 60 * 5; private Looper(){} public static Looper getInstance() { return looper; } public void loop() { if(!isStart) { isStart = true; this.start(); } } public void run() { Task task = new Task(); while(true) { //Session过期检测 task.checkState(); //心跳包检测 //task.sendAck(); try { Thread.sleep(INTERVAL); } catch (InterruptedException e) { Log.e(e); } } } }
public class Task { public void checkState() { Set<String> keys = SessionManager.getSessionKeys(); if(keys.size() == 0) { return; } List<String> removes = new ArrayList<String>(); Iterator<String> iterator = keys.iterator(); String key = null; while(iterator.hasNext()) { key = iterator.next(); if(!SessionManager.getSession(key).isKeekAlive()) { removes.add(key); } } if(removes.size() > 0) { Log.i("sessions is time out,remove " + removes.size() + "session"); } SessionManager.remove(removes.toArray(new String[removes.size()])); } public void sendAck() { Set<String> keys = SessionManager.getSessionKeys(); if(keys.size() == 0) { return; } Iterator<String> iterator = keys.iterator(); while(iterator.hasNext()) { iterator.next(); //TODO 发送心跳包 } } }
注意,在Task和SessionProcessor类里都有对SessionManager的sessions做遍历,文中使用的方法并不是很好,主要是效率问题,推荐使用遍历Entry的方式来获取Key和Value,因为一直在JavaWeb上折腾,所以会的童鞋看到Request和Response会挺亲切,这个例子没有经过任何安全和性能测试,如果需要放到生产环境上得话请先自行做测试- -!
客户端请求时的数据内容例如{handler:"UserHandler",action:"login",imei:"2364656512636".......},这些约定就自己来定了。
感谢各位的阅读,以上就是“基于Java NIO的即时聊天服务器模型怎么实现”的内容了,经过本文的学习后,相信大家对基于Java NIO的即时聊天服务器模型怎么实现这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。