JAVA NIO非阻塞服务器客户端示例

网上铺天盖地的都是那个echo server, 问题是没有相应的client代码, 坑爹得很, 研究了好两天, 写了个大概.

所谓非阻塞, 基本上就是在一个线程里用轮询的方式处理各通道事件. 所谓selectionkey也就是事件. 其它的就简单啦.

Server端:

public class Server {
private InetAddress serverAddress;
private int port;
private Selector selector;
private Map<SocketChannel,List<byte[]>> dataMap;

public Server(InetAddress a, int p) throws IOException {
serverAddress = a;
port = p;
dataMap = new HashMap<SocketChannel,List<byte[]>>();
startServer();
}

private void startServer() throws IOException {
selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);

InetSocketAddress listenAddr = new InetSocketAddress(serverAddress, port);
serverChannel.socket().bind(listenAddr);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

log(“Server ready”);

while (true) {
log(“selector select…”);
selector.select();

Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = (SelectionKey) keys.next();

keys.remove();

if (! key.isValid()) {
continue;
}

if (key.isAcceptable()) {
accept(key);
}
else if (key.isReadable()) {
read(key);
}
else if (key.isWritable()) {
write(key);
}
}
}
}

private void accept(SelectionKey key) throws IOException {
log(“accept”);

ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);

Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
log(“Connected to: ” + remoteAddr);

dataMap.put(channel, new ArrayList<byte[]>());
channel.register(selector, SelectionKey.OP_READ);
}

private void read(SelectionKey key) throws IOException {
log(“read”);

SocketChannel channel = (SocketChannel) key.channel();

ByteBuffer buffer = ByteBuffer.allocate(8192);
int numRead = -1;
try {
numRead = channel.read(buffer);
}
catch (IOException e) {
e.printStackTrace();
}

if (numRead == -1) {
dataMap.remove(channel);
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
log(“Connection closed by client: ” + remoteAddr);
channel.close();
key.cancel();
return;
}

byte[] data = new byte[numRead];
System.arraycopy(buffer.array(), 0, data, 0, numRead);
log(“Got: ” + new String(data, “US-ASCII”));

handleData(data);
}

private void write(SelectionKey key) throws IOException {
log(“write”);

SocketChannel channel = (SocketChannel) key.channel();
List<byte[]> pendingData = dataMap.get(channel);
Iterator<byte[]> items = pendingData.iterator();
while (items.hasNext()) {
byte[] item = items.next();
items.remove();
channel.write(ByteBuffer.wrap(item));
log(“Write: ” + new String(item, “US-ASCII”));
}
key.interestOps(SelectionKey.OP_READ);
}
}

 

客户端:

private class ClientThread extends Thread {
SocketChannel schannel = null;
private Selector selector;
private Vector<byte[]> dataList;

ByteCache cache = new ByteCache();
int commandLen = -1;
private boolean _run = true;

public ClientThread() {
dataList = new Vector<byte[]>();
}

public void run() {
try {
schannel = SocketChannel.open();
schannel.configureBlocking(false);
selector = Selector.open();

schannel.connect(new InetSocketAddress(“xxx.xxx.xxx.xxx”, 8989));
schannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
catch (Exception e) {
connectFail();
}

try {
while (_run) {
int cnt = selector.select();
if (cnt < 1) {
continue;
}

Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = (SelectionKey) keys.next();
keys.remove();

if (!key.isValid()) {
continue;
}

if (key.isConnectable()) {
connect(key);
}
else if (key.isReadable()) {
read(key);
}
else if (key.isWritable()) {
write(key);
}
}
}
}
catch(Exception ex) {
ex.printStackTrace();
}
}

private void connect(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel)key.channel();
sc.finishConnect();
connected();
}

private void read(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(8192);

int len = -1;
try {
len = sc.read(buffer);
if (len > -1) {
byte[] data = new byte[len];
System.arraycopy(buffer.array(), 0, data, 0, len);
handleData(data);
}
}
else {
key.cancel();
connectionLost();
return;
}
}
catch (IOException e) {
e.printStackTrace();
}
key.interestOps(SelectionKey.OP_READ);
}

private void write(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel)key.channel();
Iterator<byte[]> it = dataList.iterator();
while (it.hasNext()) {
byte[] outBytes = it.next();
it.remove();

byte[] pack = new byte[outBytes.length + 4];
byte[] lenBytes = ByteCache.intToBytes(outBytes.length);
System.arraycopy(lenBytes, 0, pack, 0, 4);
System.arraycopy(outBytes, 0, pack, 4, outBytes.length);

sc.write(ByteBuffer.wrap(pack));
key.interestOps(SelectionKey.OP_READ);
}
}

public void send(byte[] outBytes) {
dataList.add(outBytes);
SelectionKey key = schannel.keyFor(selector);
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
selector.wakeup();
}

public void cancel() {
try {
_run = false;
dataList.clear();

if (selector != null) {
selector.close();
}

if (schannel != null) {
schannel.close();
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}

登录后才可评论.