一个简单的非阻塞通讯DEMO。。。
nio的Channel + Selector实现,1服务端+n客户端
内容是客户端向服务端发送信息,服务端组播给所有客户端,并没有做客户端异常退出时的处理 - -。
写了一晚上终于。。囧无比……
由于设置interestOps时直接覆盖了原来的标记,没有做或操作,导致只能读一次,还查了半天 TAT。
具体流程是:
服务端 bind->关注OP_ACCEPT-->accept---->关注OP_READ------------------>READ信息,关注WRITE--------------->组WRITE
客户端 ---->关注OP_CONNECT-->connect-->关注OP_READ|OP_WRITE-->WRITE信息,取消关注OP_WRITE-->READ信息
乍看之下很Observer,但是实际上是Reactor(不同之处在于前者只与单个事件源关联,后者则是将事件分派到各自的处理程序中,与多个事件源关联)。
这样循环。。。客户端除了第一次connect关注了WRITE之外其他都是直接调用channel.write直接从通道输出。
(第一次connect时也可以,这里是为了演示如何发送OP_WRITE的……)
要注意OP_WRITE事件发生后要记得取消(register & ~OP_WRITE),因为OP_WRITE总是准备好的。。很容易卡住。
Server:服务端核心,因为只有一个线程,所以用了个HashSet来存客户端集合。
-
package org.yoyo.nio.server;
-
-
import java.io.IOException;
-
import java.net.InetAddress;
-
import java.net.InetSocketAddress;
-
import java.nio.ByteBuffer;
-
import java.nio.CharBuffer;
-
import java.nio.channels.SelectionKey;
-
import java.nio.channels.Selector;
-
import java.nio.channels.ServerSocketChannel;
-
import java.nio.channels.SocketChannel;
-
import java.nio.charset.Charset;
-
import java.nio.charset.CharsetDecoder;
-
import java.nio.charset.CharsetEncoder;
-
import java.util.HashSet;
-
import java.util.Iterator;
-
import java.util.Set;
-
-
/**
-
* nio Server
-
*
-
* @author YOYO
-
*
-
*/
-
-
/** Server Channel */
-
private ServerSocketChannel server = null;
-
-
/** Server Selector */
-
private Selector selector = null;
-
-
/** Buffer for saving Receive Message */
-
private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
-
-
/** Charset Decoder */
-
private static CharsetDecoder decoder = Charset.forName("GBK").newDecoder();
-
-
/** Charset Encoder */
-
private static CharsetEncoder encoder = Charset.forName("GBK").newEncoder();
-
-
/** Client HashSet */
-
private Set<SocketChannel> clients = new HashSet<SocketChannel>();
-
-
/**
-
* Start Server
-
*
-
* @param port
-
*/
-
public Server(int port) {
-
try {
-
server = ServerSocketChannel.open();
-
server.configureBlocking(false);
-
-
.getLocalHost(), port);
-
server.socket().bind(address);
-
-
selector = Selector.open();
-
server.register(selector, SelectionKey.OP_ACCEPT);
-
e.printStackTrace();
-
}
-
}
-
-
/**
-
* run Thread
-
*/
-
public void run() {
-
while (true) {
-
try {
-
selector.select();
-
Iterator<SelectionKey> iter = selector.selectedKeys()
-
.iterator();
-
-
while (iter.hasNext()) {
-
SelectionKey key = iter.next();
-
iter.remove();
-
-
doProcess(key);
-
}
-
e.printStackTrace();
-
}
-
}
-
}
-
-
/**
-
* do Process
-
*
-
* @param key
-
* @throws IOException
-
*/
-
if (key.isAcceptable()) {
-
doAccept(key);
-
return;
-
}
-
-
if (!key.isValid()) {
-
clients.remove(key.channel());
-
key.cancel();
-
return;
-
}
-
-
if (key.isReadable()) {
-
doRead(key);
-
return;
-
}
-
-
if (key.isWritable()) {
-
doWrite(key);
-
return;
-
}
-
}
-
-
/**
-
* do Accept
-
*
-
* @param server
-
* @throws IOException
-
*/
-
SocketChannel client = (((ServerSocketChannel) key.channel()).accept());
-
client.configureBlocking(false);
-
-
// 注册读事件
-
client.register(selector, SelectionKey.OP_READ);
-
-
if (!clients.contains(client)) {
-
clients.add(client);
-
}
-
}
-
-
/**
-
* do Read
-
*
-
* @param key
-
* @throws IOException
-
*/
-
SocketChannel client = (SocketChannel) key.channel();
-
-
if (client.read(byteBuffer) >= 0) {
-
byteBuffer.flip();
-
-
CharBuffer charBuffer = decoder.decode(byteBuffer);
-
byteBuffer.clear();
-
-
-
// 注册写事件
-
client.register(selector, key.interestOps() | SelectionKey.OP_WRITE).attach(msg);
-
} else {
-
client.close();
-
key.cancel();
-
}
-
}
-
-
/**
-
* do Write
-
*
-
* @param key
-
* @throws IOException
-
*/
-
// 注销写事件
-
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
if (key.attachment() == null) return;
-
-
for (SocketChannel client: clients) {
-
client.write(encoder.encode(CharBuffer.wrap(msg)));
-
}
-
}
-
-
}
Client:客户端核心
-
package org.yoyo.nio.client;
-
-
import java.io.IOException;
-
import java.net.InetSocketAddress;
-
import java.nio.ByteBuffer;
-
import java.nio.CharBuffer;
-
import java.nio.channels.SelectionKey;
-
import java.nio.channels.Selector;
-
import java.nio.channels.SocketChannel;
-
import java.nio.charset.Charset;
-
import java.nio.charset.CharsetDecoder;
-
import java.nio.charset.CharsetEncoder;
-
import java.util.Iterator;
-
-
/**
-
* nio Client
-
*
-
* @author YOYO
-
*
-
*/
-
-
/** Client Channel */
-
private SocketChannel client = null;
-
-
/** Client Selector */
-
private Selector selector = null;
-
-
/** Buffer for saving Receive Message */
-
private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
-
-
/** Charset Decoder */
-
private static CharsetDecoder decoder = Charset.forName("GBK").newDecoder();
-
-
/** Charset Encoder */
-
private static CharsetEncoder encoder = Charset.forName("GBK").newEncoder();
-
-
/** Client Name */
-
-
/**
-
* Start Client
-
*
-
* @param host
-
* @param port
-
* @param name
-
*/
-
try {
-
client = SocketChannel.open();
-
client.configureBlocking(false);
-
-
selector = Selector.open();
-
-
client.register(selector, SelectionKey.OP_CONNECT);
-
-
this.name = name;
-
-
InetSocketAddress address = new InetSocketAddress(host, port);
-
client.connect(address);
-
e.printStackTrace();
-
}
-
}
-
-
/**
-
* run Thread
-
*/
-
public void run() {
-
CLIENT_THREAD: while (true) {
-
try {
-
selector.select();
-
Iterator<SelectionKey> iter = selector.selectedKeys()
-
.iterator();
-
-
while (iter.hasNext()) {
-
SelectionKey key = iter.next();
-
iter.remove();
-
-
if (!doProcess(key)) {
-
break CLIENT_THREAD;
-
}
-
}
-
e.printStackTrace();
-
}
-
}
-
-
try {
-
client.close();
-
e.printStackTrace();
-
}
-
}
-
-
/**
-
* do Process
-
*
-
* @param key
-
* @return
-
* @throws IOException
-
*/
-
if (key.isConnectable()) {
-
return doConnect(key);
-
}
-
-
if (!key.isValid()) {
-
key.cancel();
-
return false;
-
}
-
-
if (key.isReadable()) {
-
return doRead(key);
-
}
-
-
if (key.isWritable()) {
-
return doWrite(key);
-
}
-
-
return true;
-
}
-
-
/**
-
* do Connect
-
*
-
* @param channel
-
* @return
-
* @throws IOException
-
*/
-
SocketChannel server = (SocketChannel) key.channel();
-
-
if (server.isConnectionPending()) {
-
server.finishConnect();
-
}
-
-
server.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE)
-
.attach(name + "进入房间。");
-
return true;
-
}
-
-
/**
-
* do Read
-
*
-
* @param channel
-
* @return
-
* @throws IOException
-
*/
-
if (((SocketChannel) key.channel()).read(byteBuffer) >= 0) {
-
byteBuffer.flip();
-
-
CharBuffer charBuffer = decoder.decode(byteBuffer);
-
byteBuffer.clear();
-
-
return true;
-
} else {
-
return false;
-
}
-
}
-
-
/**
-
* do Write
-
*
-
* @param channel
-
* @return
-
* @throws IOException
-
*/
-
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
((SocketChannel) key.channel()).write(encoder.encode(CharBuffer
-
return true;
-
}
-
-
/**
-
* Send Message
-
*
-
* @param message
-
*/
-
try {
-
client.write(encoder.encode(CharBuffer.wrap(name + ": " + msg)));
-
e.printStackTrace();
-
}
-
}
-
-
}
TestServer:启动服务端用。
TestClient:客户端连接后,直接在控制台上发送语句即可 = =。
-
package org.yoyo.nio.test;
-
-
import java.net.InetAddress;
-
import java.net.UnknownHostException;
-
import java.util.Scanner;
-
-
import org.yoyo.nio.client.Client;
-
-
public class TestClient {
-
-
/**
-
* @param args
-
*/
-
try {
-
.getHostAddress(), 8889, "YOYO");
-
-
while (scanner.hasNext()) {
-
client.sendMessage(scanner.next());
-
}
-
e.printStackTrace();
-
}
-
}
-
-
}