您好,登錄后才能下訂單哦!
這篇文章主要講解了“基于Java NIO的即時聊天服務器模型怎么實現”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“基于Java NIO的即時聊天服務器模型怎么實現”吧!
廢話不多說,關于NIO的SelectionKey、Selector、Channel網上的介紹例子都很多,直接上代碼:
JsonParser
Json的解析類,隨便封裝了下,使用的最近比較火的fastjson
public class JsonParser { private static JSONObject mJson; public synchronized static String get(String json,String key) { mJson = JSON.parseObject(json); return mJson.getString(key); } }
Main
入口,不解釋
public class Main { public static void main(String... args) { new SeekServer().start(); } }
Log
public class Log { public static void i(Object obj) { System.out.println(obj); } public static void e(Object e) { System.err.println(e); } }
SeekServer:
服務器端的入口,請求的封裝和接收都在此類,端口暫時寫死在了代碼里,mSelector.select(TIME_OUT) > 0 目的是為了當服務器空閑的時候(沒有任何讀寫甚至請求斷開事件),循環時有個間隔時間,不然基本上相當于while(true){//nothing}了,你懂的。
public class SeekServer extends Thread{ private final int ACCPET_PORT = 55555; private final int TIME_OUT = 1000; private Selector mSelector = null; private ServerSocketChannel mSocketChannel = null; private ServerSocket mServerSocket = null; private InetSocketAddress mAddress = null; public SeekServer() { long sign = System.currentTimeMillis(); try { mSocketChannel = ServerSocketChannel.open(); if(mSocketChannel == null) { System.out.println("can't open server socket channel"); } mServerSocket = mSocketChannel.socket(); mAddress = new InetSocketAddress(ACCPET_PORT); mServerSocket.bind(mAddress); Log.i("server bind port is " + ACCPET_PORT); mSelector = Selector.open(); mSocketChannel.configureBlocking(false); SelectionKey key = mSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT); key.attach(new Acceptor()); //檢測Session狀態 Looper.getInstance().loop(); //開始處理Session SessionProcessor.start(); Log.i("Seek server startup in " + (System.currentTimeMillis() - sign) + "ms!"); } catch (ClosedChannelException e) { Log.e(e.getMessage()); } catch (IOException e) { Log.e(e.getMessage()); } } public void run() { Log.i("server is listening..."); while(!Thread.interrupted()) { try { if(mSelector.select(TIME_OUT) > 0) { Set<SelectionKey> keys = mSelector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); SelectionKey key = null; while(iterator.hasNext()) { key = iterator.next(); Handler at = (Handler) key.attachment(); if(at != null) { at.exec(); } iterator.remove(); } } } catch (IOException e) { Log.e(e.getMessage()); } } } class Acceptor extends Handler{ public void exec(){ try { SocketChannel sc = mSocketChannel.accept(); new Session(sc, mSelector); } catch (ClosedChannelException e) { Log.e(e); } catch (IOException e) { Log.e(e); } } } }
Handler:
只有一個抽象方法exec,Session將會繼承它。
public abstract class Handler { public abstract void exec(); }
Session:
封裝了用戶的請求和SelectionKey和SocketChannel,每次接收到新的請求時都重置它的最后活動時間,通過狀態mState=READING or SENDING 去執行消息的接收與發送,當客戶端異常斷開時則從SessionManager清除該會話。
public class Session extends Handler{ private SocketChannel mChannel; private SelectionKey mKey; private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate(10240); private Charset charset = Charset.forName("UTF-8"); private CharsetDecoder mDecoder = charset.newDecoder(); private CharsetEncoder mEncoder = charset.newEncoder(); private long lastPant;//最后活動時間 private final int TIME_OUT = 1000 * 60 * 5; //Session超時時間 private String key; private String sendData = ""; private String receiveData = null; public static final int READING = 0,SENDING = 1; int mState = READING; public Session(SocketChannel socket, Selector selector) throws IOException { this.mChannel = socket; mChannel = socket; mChannel.configureBlocking(false); mKey = mChannel.register(selector, 0); mKey.attach(this); mKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); lastPant = Calendar.getInstance().getTimeInMillis(); } public String getReceiveData() { return receiveData; } public void clear() { receiveData = null; } public void setSendData(String sendData) { mState = SENDING; mKey.interestOps(SelectionKey.OP_WRITE); this.sendData = sendData + "\n"; } public boolean isKeekAlive() { return lastPant + TIME_OUT > Calendar.getInstance().getTimeInMillis(); } public void setAlive() { lastPant = Calendar.getInstance().getTimeInMillis(); } /** * 注銷當前Session */ public void distroy() { try { mChannel.close(); mKey.cancel(); } catch (IOException e) {} } @Override public synchronized void exec() { try { if(mState == READING) { read(); }else if(mState == SENDING) { write(); } } catch (IOException e) { SessionManager.remove(key); try { mChannel.close(); } catch (IOException e1) { Log.e(e1); } mKey.cancel(); } } public void read() throws IOException{ mRreceiveBuffer.clear(); int sign = mChannel.read(mRreceiveBuffer); if(sign == -1) { //客戶端連接關閉 mChannel.close(); mKey.cancel(); } if(sign > 0) { mRreceiveBuffer.flip(); receiveData = mDecoder.decode(mRreceiveBuffer).toString(); setAlive(); setSign(); SessionManager.addSession(key, this); } } private void setSign() { //設置當前Session的Key key = JsonParser.get(receiveData,"imei"); //檢測消息類型是否為心跳包 // String type = jo.getString("type"); // if(type.equals("HEART_BEAT")) { // setAlive(); // } } /** * 寫消息 */ public void write() { try { mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData))); sendData = null; mState = READING; mKey.interestOps(SelectionKey.OP_READ); } catch (CharacterCodingException e) { e.printStackTrace(); } catch (IOException e) { try { mChannel.close(); } catch (IOException e1) { Log.e(e1); } } } }
SessionManager:
將所有Session存放到ConcurrentHashMap,這里使用手機用戶的imei做key,ConcurrentHashMap因為是線程安全的,所以能很大程度上避免自己去實現同步的過程,
封裝了一些操作Session的方法例如get,remove等。
public class SessionManager { private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>(); public static void addSession(String key,Session session) { sessions.put(key, session); } public static Session getSession(String key) { return sessions.get(key); } public static Set<String> getSessionKeys() { return sessions.keySet(); } public static int getSessionCount() { return sessions.size(); } public static void remove(String[] keys) { for(String key:keys) { if(sessions.containsKey(key)) { sessions.get(key).distroy(); sessions.remove(key); } } } public static void remove(String key) { if(sessions.containsKey(key)) { sessions.get(key).distroy(); sessions.remove(key); } } }
SessionProcessor
里面使用了JDK自帶的線程池,用來分發處理所有Session中當前需要處理的請求(線程池的初始化參數不是太熟,望有了解的童鞋能告訴我),內部類Process則是將Session再次封裝成SocketRequest和SocketResponse(看到這里是不是有點熟悉的感覺,對沒錯,JavaWeb里到處都是request和response)。
public class SessionProcessor implements Runnable{ private static Runnable processor = new SessionProcessor(); private static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 200, 500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy()); public static void start() { new Thread(processor).start(); } @Override public void run() { while(true) { Session tmp = null; for(String key:SessionManager.getSessionKeys()) { tmp = SessionManager.getSession(key); //處理Session未處理的請求 if(tmp.getReceiveData() != null) { pool.execute(new Process(tmp)); } } try { Thread.sleep(10); } catch (InterruptedException e) { Log.e(e); } } } class Process implements Runnable { private SocketRequest request; private SocketResponse response; public Process(Session session) { //將Session封裝成Request和Response request = new SocketRequest(session); response = new SocketResponse(session); } @Override public void run() { new RequestTransform().transfer(request, response); } } }
RequestTransform里的transfer方法利用反射對請求參數中的請求類別和請求動作來調用不同類的不同方法(UserHandler和MessageHandler)
public class RequestTransform { public void transfer(SocketRequest request,SocketResponse response) { String action = request.getValue("action"); String handlerName = request.getValue("handler"); //根據Session的請求類型,讓不同的類方法去處理 try { Class<?> c= Class.forName("com.seek.server.handler." + handlerName); Class<?>[] arg=new Class[]{SocketRequest.class,SocketResponse.class}; Method method=c.getMethod(action,arg); method.invoke(c.newInstance(), new Object[]{request,response}); } catch (Exception e) { e.printStackTrace(); } } }
SocketRequest和SocketResponse
public class SocketRequest { private Session mSession; private String mReceive; public SocketRequest(Session session) { mSession = session; mReceive = session.getReceiveData(); mSession.clear(); } public String getValue(String key) { return JsonParser.get(mReceive, key); } public String getQueryString() { return mReceive; } }
public class SocketResponse { private Session mSession; public SocketResponse(Session session) { mSession = session; } public void write(String msg) { mSession.setSendData(msg); } }
最后則是兩個處理請求的Handler
public class UserHandler { public void login(SocketRequest request,SocketResponse response) { System.out.println(request.getQueryString()); //TODO: 處理用戶登錄 response.write("你肯定收到消息了"); } }
public class MessageHandler { public void send(SocketRequest request,SocketResponse response) { System.out.println(request.getQueryString()); //消息發送 String key = request.getValue("imei"); Session session = SessionManager.getSession(key); new SocketResponse(session).write(request.getValue("sms")); } }
還有個監測是否超時的類Looper,定期去刪除Session
public class Looper extends Thread{ private static Looper looper = new Looper(); private static boolean isStart = false; private final int INTERVAL = 1000 * 60 * 5; private Looper(){} public static Looper getInstance() { return looper; } public void loop() { if(!isStart) { isStart = true; this.start(); } } public void run() { Task task = new Task(); while(true) { //Session過期檢測 task.checkState(); //心跳包檢測 //task.sendAck(); try { Thread.sleep(INTERVAL); } catch (InterruptedException e) { Log.e(e); } } } }
public class Task { public void checkState() { Set<String> keys = SessionManager.getSessionKeys(); if(keys.size() == 0) { return; } List<String> removes = new ArrayList<String>(); Iterator<String> iterator = keys.iterator(); String key = null; while(iterator.hasNext()) { key = iterator.next(); if(!SessionManager.getSession(key).isKeekAlive()) { removes.add(key); } } if(removes.size() > 0) { Log.i("sessions is time out,remove " + removes.size() + "session"); } SessionManager.remove(removes.toArray(new String[removes.size()])); } public void sendAck() { Set<String> keys = SessionManager.getSessionKeys(); if(keys.size() == 0) { return; } Iterator<String> iterator = keys.iterator(); while(iterator.hasNext()) { iterator.next(); //TODO 發送心跳包 } } }
注意,在Task和SessionProcessor類里都有對SessionManager的sessions做遍歷,文中使用的方法并不是很好,主要是效率問題,推薦使用遍歷Entry的方式來獲取Key和Value,因為一直在JavaWeb上折騰,所以會的童鞋看到Request和Response會挺親切,這個例子沒有經過任何安全和性能測試,如果需要放到生產環境上得話請先自行做測試- -!
客戶端請求時的數據內容例如{handler:"UserHandler",action:"login",imei:"2364656512636".......},這些約定就自己來定了。
感謝各位的閱讀,以上就是“基于Java NIO的即時聊天服務器模型怎么實現”的內容了,經過本文的學習后,相信大家對基于Java NIO的即時聊天服務器模型怎么實現這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。