网上铺天盖地的都是那个echo server, 问题是没有相应的client代码, 坑爹得很, 研究了好两天, 写了个大概.
所谓非阻塞, 基本上就是在一个线程里用轮询的方式处理各通道事件. 所谓selectionkey也就是事件. 其它的就简单啦.
Server端:
public class Server {
private InetAddress serverAddress;
private int port;
private Selector selector;
private Map<SocketChannel,List<byte[]>> dataMap;
public Server(InetAddress a, int p) throws IOException {
serverAddress = a;
port = p;
dataMap = new HashMap<SocketChannel,List<byte[]>>();
startServer();
}
private void startServer() throws IOException {
selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress listenAddr = new InetSocketAddress(serverAddress, port);
serverChannel.socket().bind(listenAddr);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
log(“Server ready”);
while (true) {
log(“selector select…”);
selector.select();
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = (SelectionKey) keys.next();
keys.remove();
if (! key.isValid()) {
continue;
}
if (key.isAcceptable()) {
accept(key);
}
else if (key.isReadable()) {
read(key);
}
else if (key.isWritable()) {
write(key);
}
}
}
}
private void accept(SelectionKey key) throws IOException {
log(“accept”);
ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
log(“Connected to: ” + remoteAddr);
dataMap.put(channel, new ArrayList<byte[]>());
channel.register(selector, SelectionKey.OP_READ);
}
private void read(SelectionKey key) throws IOException {
log(“read”);
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(8192);
int numRead = -1;
try {
numRead = channel.read(buffer);
}
catch (IOException e) {
e.printStackTrace();
}
if (numRead == -1) {
dataMap.remove(channel);
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
log(“Connection closed by client: ” + remoteAddr);
channel.close();
key.cancel();
return;
}
byte[] data = new byte[numRead];
System.arraycopy(buffer.array(), 0, data, 0, numRead);
log(“Got: ” + new String(data, “US-ASCII”));
handleData(data);
}
private void write(SelectionKey key) throws IOException {
log(“write”);
SocketChannel channel = (SocketChannel) key.channel();
List<byte[]> pendingData = dataMap.get(channel);
Iterator<byte[]> items = pendingData.iterator();
while (items.hasNext()) {
byte[] item = items.next();
items.remove();
channel.write(ByteBuffer.wrap(item));
log(“Write: ” + new String(item, “US-ASCII”));
}
key.interestOps(SelectionKey.OP_READ);
}
}
客户端:
private class ClientThread extends Thread {
SocketChannel schannel = null;
private Selector selector;
private Vector<byte[]> dataList;
ByteCache cache = new ByteCache();
int commandLen = -1;
private boolean _run = true;
public ClientThread() {
dataList = new Vector<byte[]>();
}
public void run() {
try {
schannel = SocketChannel.open();
schannel.configureBlocking(false);
selector = Selector.open();
schannel.connect(new InetSocketAddress(“xxx.xxx.xxx.xxx”, 8989));
schannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
catch (Exception e) {
connectFail();
}
try {
while (_run) {
int cnt = selector.select();
if (cnt < 1) {
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = (SelectionKey) keys.next();
keys.remove();
if (!key.isValid()) {
continue;
}
if (key.isConnectable()) {
connect(key);
}
else if (key.isReadable()) {
read(key);
}
else if (key.isWritable()) {
write(key);
}
}
}
}
catch(Exception ex) {
ex.printStackTrace();
}
}
private void connect(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel)key.channel();
sc.finishConnect();
connected();
}
private void read(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(8192);
int len = -1;
try {
len = sc.read(buffer);
if (len > -1) {
byte[] data = new byte[len];
System.arraycopy(buffer.array(), 0, data, 0, len);
handleData(data);
}
}
else {
key.cancel();
connectionLost();
return;
}
}
catch (IOException e) {
e.printStackTrace();
}
key.interestOps(SelectionKey.OP_READ);
}
private void write(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel)key.channel();
Iterator<byte[]> it = dataList.iterator();
while (it.hasNext()) {
byte[] outBytes = it.next();
it.remove();
byte[] pack = new byte[outBytes.length + 4];
byte[] lenBytes = ByteCache.intToBytes(outBytes.length);
System.arraycopy(lenBytes, 0, pack, 0, 4);
System.arraycopy(outBytes, 0, pack, 4, outBytes.length);
sc.write(ByteBuffer.wrap(pack));
key.interestOps(SelectionKey.OP_READ);
}
}
public void send(byte[] outBytes) {
dataList.add(outBytes);
SelectionKey key = schannel.keyFor(selector);
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
selector.wakeup();
}
public void cancel() {
try {
_run = false;
dataList.clear();
if (selector != null) {
selector.close();
}
if (schannel != null) {
schannel.close();
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}