Liny_@NotePad

沉迷ACG中

一个简单的非阻塞通讯DEMO。。。

YOYO posted @ 2009年11月11日 06:48 in 【Java SE】 with tags im nio 网络编程 , 3335 阅读

nio的Channel + Selector实现,1服务端+n客户端
内容是客户端向服务端发送信息,服务端组播给所有客户端,并没有做客户端异常退出时的处理 - -。
写了一晚上终于。。囧无比……
由于设置interestOps时直接覆盖了原来的标记,没有做或操作,导致只能读一次,还查了半天 TAT。

具体流程是:
服务端 bind->关注OP_ACCEPT-->accept---->关注OP_READ------------------>READ信息,关注WRITE--------------->组WRITE
客户端 ---->关注OP_CONNECT-->connect-->关注OP_READ|OP_WRITE-->WRITE信息,取消关注OP_WRITE-->READ信息

乍看之下很Observer,但是实际上是Reactor(不同之处在于前者只与单个事件源关联,后者则是将事件分派到各自的处理程序中,与多个事件源关联)。

这样循环。。。客户端除了第一次connect关注了WRITE之外其他都是直接调用channel.write直接从通道输出。
(第一次connect时也可以,这里是为了演示如何发送OP_WRITE的……)
要注意OP_WRITE事件发生后要记得取消(register & ~OP_WRITE),因为OP_WRITE总是准备好的。。很容易卡住。

Server:服务端核心,因为只有一个线程,所以用了个HashSet来存客户端集合。

  1. package org.yoyo.nio.server;
  2.  
  3. import java.io.IOException;
  4. import java.net.InetAddress;
  5. import java.net.InetSocketAddress;
  6. import java.nio.ByteBuffer;
  7. import java.nio.CharBuffer;
  8. import java.nio.channels.SelectionKey;
  9. import java.nio.channels.Selector;
  10. import java.nio.channels.ServerSocketChannel;
  11. import java.nio.channels.SocketChannel;
  12. import java.nio.charset.Charset;
  13. import java.nio.charset.CharsetDecoder;
  14. import java.nio.charset.CharsetEncoder;
  15. import java.util.HashSet;
  16. import java.util.Iterator;
  17. import java.util.Set;
  18.  
  19. /**
  20. * nio Server
  21. *
  22. * @author YOYO
  23. *
  24. */
  25. public class Server implements Runnable {
  26.  
  27.         /** Server Channel */
  28.         private ServerSocketChannel server = null;
  29.  
  30.         /** Server Selector */
  31.         private Selector selector = null;
  32.  
  33.         /** Buffer for saving Receive Message */
  34.         private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  35.  
  36.         /** Charset Decoder */
  37.         private static CharsetDecoder decoder = Charset.forName("GBK").newDecoder();
  38.  
  39.         /** Charset Encoder */
  40.         private static CharsetEncoder encoder = Charset.forName("GBK").newEncoder();
  41.  
  42.         /** Client HashSet */
  43.         private Set<SocketChannel> clients = new HashSet<SocketChannel>();
  44.  
  45.         /**
  46.          * Start Server
  47.          *
  48.          * @param port
  49.          */
  50.         public Server(int port) {
  51.                 try {
  52.                         server = ServerSocketChannel.open();
  53.                         server.configureBlocking(false);
  54.  
  55.                         InetSocketAddress address = new InetSocketAddress(InetAddress
  56.                                         .getLocalHost(), port);
  57.                         server.socket().bind(address);
  58.  
  59.                         selector = Selector.open();
  60.                         server.register(selector, SelectionKey.OP_ACCEPT);
  61.                 } catch (IOException e) {
  62.                         e.printStackTrace();
  63.                 }
  64.         }
  65.  
  66.         /**
  67.          * run Thread
  68.          */
  69.         public void run() {
  70.                 while (true) {
  71.                         try {
  72.                                 selector.select();
  73.                                 Iterator<SelectionKey> iter = selector.selectedKeys()
  74.                                                 .iterator();
  75.  
  76.                                 while (iter.hasNext()) {
  77.                                         SelectionKey key = iter.next();
  78.                                         iter.remove();
  79.  
  80.                                         doProcess(key);
  81.                                 }
  82.                         } catch (IOException e) {
  83.                                 e.printStackTrace();
  84.                         }
  85.                 }
  86.         }
  87.  
  88.         /**
  89.          * do Process
  90.          *
  91.          * @param key
  92.          * @throws IOException
  93.          */
  94.         public void doProcess(SelectionKey key) throws IOException {
  95.                 if (key.isAcceptable()) {
  96.                         doAccept(key);
  97.                         return;
  98.                 }
  99.                
  100.                 if (!key.isValid()) {
  101.                         clients.remove(key.channel());
  102.                         key.cancel();
  103.                         return;
  104.                 }
  105.  
  106.                 if (key.isReadable()) {
  107.                         doRead(key);
  108.                         return;
  109.                 }
  110.  
  111.                 if (key.isWritable()) {
  112.                         doWrite(key);
  113.                         return;
  114.                 }
  115.         }
  116.  
  117.         /**
  118.          * do Accept
  119.          *
  120.          * @param server
  121.          * @throws IOException
  122.          */
  123.         public void doAccept(SelectionKey key) throws IOException {
  124.                 SocketChannel client = (((ServerSocketChannel) key.channel()).accept());
  125.                 client.configureBlocking(false);
  126.  
  127.                 // 注册读事件
  128.                 client.register(selector, SelectionKey.OP_READ);
  129.  
  130.                 if (!clients.contains(client)) {
  131.                         clients.add(client);
  132.                 }
  133.         }
  134.  
  135.         /**
  136.          * do Read
  137.          *
  138.          * @param key
  139.          * @throws IOException
  140.          */
  141.         public void doRead(SelectionKey key) throws IOException {
  142.                 SocketChannel client = (SocketChannel) key.channel();
  143.  
  144.                 if (client.read(byteBuffer) >= 0) {
  145.                         byteBuffer.flip();
  146.  
  147.                         CharBuffer charBuffer = decoder.decode(byteBuffer);
  148.                         String msg = charBuffer.toString();
  149.                         byteBuffer.clear();
  150.                        
  151.                         System.out.println(msg);
  152.  
  153.                         // 注册写事件
  154.                         client.register(selector, key.interestOps() | SelectionKey.OP_WRITE).attach(msg);
  155.                 } else {
  156.                         client.close();
  157.                         key.cancel();
  158.                 }
  159.         }
  160.  
  161.         /**
  162.          * do Write
  163.          *
  164.          * @param key
  165.          * @throws IOException
  166.          */
  167.         public void doWrite(SelectionKey key) throws IOException {
  168.                 // 注销写事件
  169.                 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
  170.                 if (key.attachment() == null) return;
  171.  
  172.                 String msg = (String) key.attachment();
  173.                 for (SocketChannel client: clients) {
  174.                         client.write(encoder.encode(CharBuffer.wrap(msg)));
  175.                 }
  176.         }
  177.  
  178. }

Client:客户端核心

  1. package org.yoyo.nio.client;
  2.  
  3. import java.io.IOException;
  4. import java.net.InetSocketAddress;
  5. import java.nio.ByteBuffer;
  6. import java.nio.CharBuffer;
  7. import java.nio.channels.SelectionKey;
  8. import java.nio.channels.Selector;
  9. import java.nio.channels.SocketChannel;
  10. import java.nio.charset.Charset;
  11. import java.nio.charset.CharsetDecoder;
  12. import java.nio.charset.CharsetEncoder;
  13. import java.util.Iterator;
  14.  
  15. /**
  16. * nio Client
  17. *
  18. * @author YOYO
  19. *
  20. */
  21. public class Client implements Runnable {
  22.  
  23.         /** Client Channel */
  24.         private SocketChannel client = null;
  25.  
  26.         /** Client Selector */
  27.         private Selector selector = null;
  28.  
  29.         /** Buffer for saving Receive Message */
  30.         private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  31.  
  32.         /** Charset Decoder */
  33.         private static CharsetDecoder decoder = Charset.forName("GBK").newDecoder();
  34.  
  35.         /** Charset Encoder */
  36.         private static CharsetEncoder encoder = Charset.forName("GBK").newEncoder();
  37.  
  38.         /** Client Name */
  39.         private String name = null;
  40.  
  41.         /**
  42.          * Start Client
  43.          *
  44.          * @param host
  45.          * @param port
  46.          * @param name
  47.          */
  48.         public Client(String host, int port, String name) {
  49.                 try {
  50.                         client = SocketChannel.open();
  51.                         client.configureBlocking(false);
  52.  
  53.                         selector = Selector.open();
  54.  
  55.                         client.register(selector, SelectionKey.OP_CONNECT);
  56.  
  57.                         this.name = name;
  58.  
  59.                         InetSocketAddress address = new InetSocketAddress(host, port);
  60.                         client.connect(address);
  61.                 } catch (IOException e) {
  62.                         e.printStackTrace();
  63.                 }
  64.         }
  65.  
  66.         /**
  67.          * run Thread
  68.          */
  69.         public void run() {
  70.                 CLIENT_THREAD: while (true) {
  71.                         try {
  72.                                 selector.select();
  73.                                 Iterator<SelectionKey> iter = selector.selectedKeys()
  74.                                                 .iterator();
  75.  
  76.                                 while (iter.hasNext()) {
  77.                                         SelectionKey key = iter.next();
  78.                                         iter.remove();
  79.  
  80.                                         if (!doProcess(key)) {
  81.                                                 break CLIENT_THREAD;
  82.                                         }
  83.                                 }
  84.                         } catch (IOException e) {
  85.                                 e.printStackTrace();
  86.                         }
  87.                 }
  88.  
  89.                 try {
  90.                         client.close();
  91.                 } catch (IOException e) {
  92.                         e.printStackTrace();
  93.                 }
  94.         }
  95.  
  96.         /**
  97.          * do Process
  98.          *
  99.          * @param key
  100.          * @return
  101.          * @throws IOException
  102.          */
  103.         public boolean doProcess(SelectionKey key) throws IOException {
  104.                 if (key.isConnectable()) {
  105.                         return doConnect(key);
  106.                 }
  107.  
  108.                 if (!key.isValid()) {
  109.                         key.cancel();
  110.                         return false;
  111.                 }
  112.  
  113.                 if (key.isReadable()) {
  114.                         return doRead(key);
  115.                 }
  116.  
  117.                 if (key.isWritable()) {
  118.                         return doWrite(key);
  119.                 }
  120.  
  121.                 return true;
  122.         }
  123.  
  124.         /**
  125.          * do Connect
  126.          *
  127.          * @param channel
  128.          * @return
  129.          * @throws IOException
  130.          */
  131.         public boolean doConnect(SelectionKey key) throws IOException {
  132.                 SocketChannel server = (SocketChannel) key.channel();
  133.                
  134.                 if (server.isConnectionPending()) {
  135.                         server.finishConnect();
  136.                 }
  137.  
  138.                 server.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE)
  139.                                 .attach(name + "进入房间。");
  140.                 return true;
  141.         }
  142.  
  143.         /**
  144.          * do Read
  145.          *
  146.          * @param channel
  147.          * @return
  148.          * @throws IOException
  149.          */
  150.         public boolean doRead(SelectionKey key) throws IOException {
  151.                 if (((SocketChannel) key.channel()).read(byteBuffer) >= 0) {
  152.                         byteBuffer.flip();
  153.  
  154.                         CharBuffer charBuffer = decoder.decode(byteBuffer);
  155.                         byteBuffer.clear();
  156.  
  157.                         System.out.println(charBuffer);
  158.                         return true;
  159.                 } else {
  160.                         return false;
  161.                 }
  162.         }
  163.  
  164.         /**
  165.          * do Write
  166.          *
  167.          * @param channel
  168.          * @return
  169.          * @throws IOException
  170.          */
  171.         public boolean doWrite(SelectionKey key) throws IOException {
  172.                 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
  173.                 ((SocketChannel) key.channel()).write(encoder.encode(CharBuffer
  174.                                 .wrap((String) key.attachment())));
  175.                 return true;
  176.         }
  177.  
  178.         /**
  179.          * Send Message
  180.          *
  181.          * @param message
  182.          */
  183.         public void sendMessage(String msg) {
  184.                 try {
  185.                         client.write(encoder.encode(CharBuffer.wrap(name + ": " + msg)));
  186.                 } catch (IOException e) {
  187.                         e.printStackTrace();
  188.                 }
  189.         }
  190.  
  191. }

TestServer:启动服务端用。

  1. package org.yoyo.nio.test;
  2.  
  3. import org.yoyo.nio.server.Server;
  4.  
  5. public class TestServer {
  6.  
  7.         /**
  8.          * @param args
  9.          */
  10.         public static void main(String[] args) {
  11.                 System.out.println("[SERVER]");
  12.                 new Thread(new Server(8889)).start();
  13.         }
  14.  
  15. }

TestClient:客户端连接后,直接在控制台上发送语句即可 = =。

  1. package org.yoyo.nio.test;
  2.  
  3. import java.net.InetAddress;
  4. import java.net.UnknownHostException;
  5. import java.util.Scanner;
  6.  
  7. import org.yoyo.nio.client.Client;
  8.  
  9. public class TestClient {
  10.  
  11.         /**
  12.          * @param args
  13.          */
  14.         public static void main(String[] args) {
  15.                 System.out.println("[CLIENT]");
  16.                 try {
  17.                         Client client = new Client(InetAddress.getLocalHost()
  18.                                         .getHostAddress(), 8889, "YOYO");
  19.                         new Thread(client).start();
  20.  
  21.                         Scanner scanner = new Scanner(System.in);
  22.                         while (scanner.hasNext()) {
  23.                                 client.sendMessage(scanner.next());
  24.                         }
  25.                 } catch (UnknownHostException e) {
  26.                         e.printStackTrace();
  27.                 }
  28.         }
  29.  
  30. }

 


登录 *


loading captcha image...
(输入验证码)
or Ctrl+Enter