JAVA NIO非阻塞服务器客户端示例

网上铺天盖地的都是那个echo server, 问题是没有相应的client代码, 坑爹得很, 研究了好两天, 写了个大概.

所谓非阻塞, 基本上就是在一个线程里用轮询的方式处理各通道事件. 所谓selectionkey也就是事件. 其它的就简单啦.

Server端:



public class Server {

private InetAddress serverAddress;

private int port;

private Selector selector;

private Map<SocketChannel,List<byte[]>> dataMap;


public Server(InetAddress a, int p) throws IOException {

serverAddress = a;

port = p;

dataMap = new HashMap<SocketChannel,List<byte[]>>();

startServer();


}

private void startServer() throws IOException {

selector = Selector.open();

ServerSocketChannel serverChannel = ServerSocketChannel.open();

serverChannel.configureBlocking(false);


InetSocketAddress listenAddr = new InetSocketAddress(serverAddress, port);

serverChannel.socket().bind(listenAddr);

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

log(“Server ready”);

while (true) {


log(“selector select…”);

selector.select();

Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

while (keys.hasNext()) {

SelectionKey key = (SelectionKey) keys.next();


keys.remove();

if (! key.isValid()) {

continue;

}

if (key.isAcceptable()) {


accept(key);

}

else if (key.isReadable()) {

read(key);

}


else if (key.isWritable()) {

write(key);

}

}

}


}

private void accept(SelectionKey key) throws IOException {

log(“accept”);

ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();

SocketChannel channel = serverChannel.accept();


channel.configureBlocking(false);

Socket socket = channel.socket();

SocketAddress remoteAddr = socket.getRemoteSocketAddress();

log(“Connected to: ” + remoteAddr);

dataMap.put(channel, new ArrayList<byte[]>());


channel.register(selector, SelectionKey.OP_READ);

}

private void read(SelectionKey key) throws IOException {

log(“read”);

SocketChannel channel = (SocketChannel) key.channel();


ByteBuffer buffer = ByteBuffer.allocate(8192);

int numRead = -1;

try {

numRead = channel.read(buffer);

}


catch (IOException e) {

e.printStackTrace();

}

if (numRead == -1) {

dataMap.remove(channel);


Socket socket = channel.socket();

SocketAddress remoteAddr = socket.getRemoteSocketAddress();

log(“Connection closed by client: ” + remoteAddr);

channel.close();

key.cancel();


return;

}

byte[] data = new byte[numRead];

System.arraycopy(buffer.array(), 0, data, 0, numRead);

log(“Got: ” + new String(data, “US-ASCII”));


handleData(data);

}

private void write(SelectionKey key) throws IOException {

log(“write”);

SocketChannel channel = (SocketChannel) key.channel();


List<byte[]> pendingData = dataMap.get(channel);

Iterator<byte[]> items = pendingData.iterator();

while (items.hasNext()) {

byte[] item = items.next();

items.remove();


channel.write(ByteBuffer.wrap(item));

log(“Write: ” + new String(item, “US-ASCII”));

}

key.interestOps(SelectionKey.OP_READ);

}


}

客户端:

private class ClientThread extends Thread {

SocketChannel schannel = null;

private Selector selector;


private Vector<byte[]> dataList;

ByteCache cache = new ByteCache();

int commandLen = -1;

private boolean _run = true;

public ClientThread() {


dataList = new Vector<byte[]>();

}

public void run() {

try {

schannel = SocketChannel.open();


schannel.configureBlocking(false);

selector = Selector.open();

schannel.connect(new InetSocketAddress(“xxx.xxx.xxx.xxx”, 8989));

schannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);

}


catch (Exception e) {

connectFail();

}

try {

while (_run) {


int cnt = selector.select();

if (cnt < 1) {

continue;

}

Iterator<SelectionKey> keys = selector.selectedKeys().iterator();


while (keys.hasNext()) {

SelectionKey key = (SelectionKey) keys.next();

keys.remove();

if (!key.isValid()) {

continue;


}

if (key.isConnectable()) {

connect(key);

}

else if (key.isReadable()) {


read(key);

}

else if (key.isWritable()) {

write(key);

}


}

}

}

catch(Exception ex) {

ex.printStackTrace();


}

}

private void connect(SelectionKey key) throws IOException {

SocketChannel sc = (SocketChannel)key.channel();

sc.finishConnect();


connected();

}

private void read(SelectionKey key) throws IOException {

SocketChannel sc = (SocketChannel) key.channel();

ByteBuffer buffer = ByteBuffer.allocate(8192);


int len = -1;

try {

len = sc.read(buffer);

if (len > -1) {

byte[] data = new byte[len];


System.arraycopy(buffer.array(), 0, data, 0, len);

handleData(data);

}

}

else {


key.cancel();

connectionLost();

return;

}

}


catch (IOException e) {

e.printStackTrace();

}

key.interestOps(SelectionKey.OP_READ);

}


private void write(SelectionKey key) throws IOException {

SocketChannel sc = (SocketChannel)key.channel();

Iterator<byte[]> it = dataList.iterator();

while (it.hasNext()) {

byte[] outBytes = it.next();


it.remove();

byte[] pack = new byte[outBytes.length + 4];

byte[] lenBytes = ByteCache.intToBytes(outBytes.length);

System.arraycopy(lenBytes, 0, pack, 0, 4);

System.arraycopy(outBytes, 0, pack, 4, outBytes.length);


sc.write(ByteBuffer.wrap(pack));

key.interestOps(SelectionKey.OP_READ);

}

}

public void send(byte[] outBytes) {


dataList.add(outBytes);

SelectionKey key = schannel.keyFor(selector);

key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);

selector.wakeup();

}


public void cancel() {

try {

_run = false;

dataList.clear();

if (selector != null) {


selector.close();

}

if (schannel != null) {

schannel.close();

}


}

catch (Exception e) {

e.printStackTrace();

}

}


}

登录后才可评论.