为什么80%的码农都做不了架构师?>>>
使用JAVA NIO简单实现Socket Server
package com.flyer.cn.javaIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class EchoServer {
public static SelectorLoop connectionBell;
public static SelectorLoop readBell;
public boolean isReadBellRunning=false;
private ExecutorService thdPool=Executors.newCachedThreadPool();
public static void main(String[] args) throws IOException {
new EchoServer().startServer();
}
// 启动服务器
public void startServer() throws IOException {
// 准备好一个闹钟.当有链接进来的时候响.
connectionBell = new SelectorLoop();
// 准备好一个闹装,当有read事件进来的时候响.
readBell = new SelectorLoop();
// 开启一个server channel来监听
ServerSocketChannel ssc = ServerSocketChannel.open();
// 开启非阻塞模式
ssc.configureBlocking(false);
ServerSocket socket = ssc.socket();
socket.bind(new InetSocketAddress("localhost",7878));
// 给闹钟规定好要监听报告的事件,这个闹钟只监听新连接事件.
ssc.register(connectionBell.getSelector(), SelectionKey.OP_ACCEPT);
new Thread(connectionBell,"connectionBell").start();
}
// Selector轮询线程类
public class SelectorLoop implements Runnable {
private Selector selector;
private ByteBuffer temp = ByteBuffer.allocate(1024);
public SelectorLoop() throws IOException {
this.selector = Selector.open();
}
public Selector getSelector() {
return this.selector;
}
@Override
public void run() {
while(true) {
try {
// 阻塞,只有当至少一个注册的事件发生的时候才会继续.
this.selector.select();
Set<SelectionKey> selectKeys = this.selector.selectedKeys();
Iterator<SelectionKey> it = selectKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if (key.isAcceptable()) {
// 这是一个connection accept事件, 并且这个事件是注册在serversocketchannel上的.
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
// 接受一个连接.
SocketChannel sc = ssc.accept();
// 对新的连接的channel注册read事件. 使用readBell闹钟.
sc.configureBlocking(false);
sc.register(readBell.getSelector(), SelectionKey.OP_READ);
System.out.println(" from client address:" + sc.getRemoteAddress());
// 如果读取线程还没有启动,那就启动一个读取线程.
synchronized(EchoServer.this) {
if (!EchoServer.this.isReadBellRunning) {
EchoServer.this.isReadBellRunning = true;
new Thread(readBell,"readBell").start();
}
}
}
else if (key.isReadable()){
int IntLength=4;
int ObjLength; //有效数据长度
int readObj;//从NIO信道中读出的数据长度
ByteBuffer bbInt = ByteBuffer.allocate(4); //读取INT头信息的缓存池
ByteBuffer bbObj = ByteBuffer.allocate(1024); //读取OBJ有效数据的缓存池
// 这是一个read事件,并且这个事件是注册在socketchannel上的.
SocketChannel channel = (SocketChannel) key.channel();
//读出INT数据头
channel.read(bbInt);
//获取INT头中标示的有效数据长度信息并清空INT缓存池
ObjLength = bbInt.getInt(0);
bbInt.clear();
//清空有效数据缓存池设置有效缓存池的大小
bbObj.clear();
bbObj.limit(ObjLength);
//循环读满缓存池以保证数据完整性
readObj = channel.read(bbObj);
while (readObj != ObjLength) {
readObj += channel.read(bbObj);
}
//
// // 写数据到buffer
// int count = sc.read(temp);
// if (count < 0) {
// // 客户端已经断开连接.
// key.cancel();
// sc.close();
// return;
// }
// // 切换buffer到读状态,内部指针归位.
bbObj.flip();
String msg = Charset.forName("UTF-8").decode(bbObj).toString();
// System.out.println(ObjLength+":"+readObj+" "+new Date().toLocaleString()+"Server received ["+msg+"] from client address:" + channel.getRemoteAddress());
if(ObjLength!=readObj){
System.out.println(ObjLength+":"+readObj+" "+new Date().toLocaleString()+"Server received ["+msg+"] from client address:" + channel.getRemoteAddress());
}
// 清空buffer
temp.clear();
thdPool.submit(new Dispatch(channel,msg));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class Dispatch implements Runnable{
private SocketChannel sc;
private String msg;
public Dispatch(SocketChannel _sc,String _msg){
this.sc=_sc;
this.msg=_msg;
}
public void run() {
try{
// Thread.sleep(500);
msg=msg+" "+new Date().toLocaleString();
// echo back.
sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
}
catch(Exception ex){
ex.printStackTrace();
}
}
}
}
}
客户端
package com.flyer.cn.javaIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
public class Client implements Runnable {
// 空闲计数器,如果空闲超过10次,将检测server是否中断连接.
private String clientName;
private static int idleCounter = 0;
private Selector selector;
private SocketChannel socketChannel;
private ByteBuffer temp = ByteBuffer.allocate(1024);
public static void main(String[] args) throws IOException {
for(int i=0;i<100;i++){
Client client= new Client("client"+i);
new Thread(client).start();
//client.sendFirstMsg();
}
}
public Client(String name) {
try{
this.clientName=name;
// 同样的,注册闹钟.
this.selector = Selector.open();
// 连接远程server
socketChannel = SocketChannel.open();
// 如果快速的建立了连接,返回true.如果没有建立,则返回false,并在连接后出发Connect事件.
Boolean isConnected = socketChannel.connect(new InetSocketAddress("localhost", 7878));
socketChannel.configureBlocking(false);
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
if (isConnected) {
this.sendFirstMsg(socketChannel,"Hello NIO.From "+this.clientName);
} else {
// 如果连接还在尝试中,则注册connect事件的监听. connect成功以后会出发connect事件.
key.interestOps(SelectionKey.OP_CONNECT);
}
}
catch(Exception ex){
ex.printStackTrace();
}
}
public void sendFirstMsg(SocketChannel socketChannel,String msg) throws IOException {
int IntLength=4;
ByteBuffer bb = ByteBuffer.allocate(1024);
//构造发送数据:整型数据头+有效数据段
byte[] arr = msg.getBytes(Charset.forName("UTF-8"));
final int ObjLength = arr.length; //获取有效数据段长度
bb.clear();
bb.limit(IntLength + ObjLength); //调整缓存池大小
bb.putInt(ObjLength);
bb.put(arr);
bb.position(0);
socketChannel.write(bb);
}
@Override
public void run() {
while (true) {
try {
// 阻塞,等待事件发生,或者1秒超时. num为发生事件的数量.
int num = this.selector.select(1000);
if (num ==0) {
idleCounter ++;
if(idleCounter >10) {
// 如果server断开了连接,发送消息将失败.
try {
this.sendFirstMsg(socketChannel,"Hello NIO.From "+this.clientName);
} catch(ClosedChannelException e) {
e.printStackTrace();
this.socketChannel.close();
return;
}
}
continue;
} else {
idleCounter = 0;
}
Set<SelectionKey> keys = this.selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if (key.isConnectable()) {
// socket connected
SocketChannel sc = (SocketChannel)key.channel();
if (sc.isConnectionPending()) {
sc.finishConnect();
}
// send first message;
this.sendFirstMsg(socketChannel,"Hello NIO.From "+this.clientName);
}
if (key.isReadable()) {
// msg received.
SocketChannel sc = (SocketChannel)key.channel();
this.temp = ByteBuffer.allocate(1024);
int count = sc.read(temp);
if (count<0) {
sc.close();
continue;
}
// 切换buffer到读状态,内部指针归位.
temp.flip();
String msg = Charset.forName("UTF-8").decode(temp).toString();
System.out.println("Client received ["+msg+"] from server address:" + sc.getRemoteAddress()+new Date().toLocaleString());
Thread.sleep(1000);
sendFirstMsg(sc,"Hello NIO.From "+this.clientName);
// echo back.
// sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
// 清空buffer
temp.clear();
}
}
} catch (Exception e) {
System.out.println("网络连接异常");
}
}
}
}