NIO 之 选择就绪模式
编程技术  /  houtizong 发布于 3年前   172
package cn.chenkangxian.nioconcurrent;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;import java.util.LinkedList;import java.util.List;/** * @Project: testNio * * @Author: chenkangxian * * @Annotation: 使用线程池来处理大量channel并发 * * @Date:2011-7-5 * * @Copyright: 2011 chenkangxian, All rights reserved. * */public class SelectSocketsThreadPool extends SelectSockets {private static final int MAX_THREADS = 5;private ThreadPool pool = new ThreadPool(MAX_THREADS);/** * 从socket中读数据 */protected void readDataFromSocket(SelectionKey key) throws Exception {WorkerThread worker = pool.getWorker();if (worker == null) {return;}worker.serviceChannel(key);}/** * * @Project: concurrentnio * * @Author: chenkangxian * * @Annotation:线程池 * * @Date:2011-7-20 * * @Copyright: 2011 chenkangxian, All rights reserved. * */private class ThreadPool {List idle = new LinkedList();/** * 线程池初始化 * * @param poolSize 线程池大小 */ThreadPool(int poolSize) {for (int i = 0; i < poolSize; i++) {WorkerThread thread = new WorkerThread(this);thread.setName("Worker" + (i + 1));thread.start();idle.add(thread);}}/** * 获得工作线程 * * Author: chenkangxian * * Last Modification Time: 2011-7-20 * * @return */WorkerThread getWorker() {WorkerThread worker = null;synchronized (idle) {if (idle.size() > 0) {worker = (WorkerThread) idle.remove(0);}}return (worker);}/** * 送回工作线程 * * Author: chenkangxian * * Last Modification Time: 2011-7-20 * * @param worker */void returnWorker(WorkerThread worker) {synchronized (idle) {idle.add(worker);}}}private class WorkerThread extends Thread {private ByteBuffer buffer = ByteBuffer.allocate(1024);private ThreadPool pool;private SelectionKey key;WorkerThread(ThreadPool pool) {this.pool = pool;}public synchronized void run() {System.out.println(this.getName() + " is ready");while (true) {try {this.wait();//等待被notify} catch (InterruptedException e) {e.printStackTrace();this.interrupt();}if (key == null) {//直到有keycontinue;}System.out.println(this.getName() + " has been awakened");try {drainChannel(key);} catch (Exception e) {System.out.println("Caught '" + e + "' closing channel");try {key.channel().close();} catch (IOException ex) {ex.printStackTrace();}key.selector().wakeup();}key = null;this.pool.returnWorker(this);}}synchronized void serviceChannel(SelectionKey key) {this.key = key;//消除读的关注key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));this.notify();}void drainChannel(SelectionKey key) throws Exception {SocketChannel channel = (SocketChannel) key.channel();int count;buffer.clear(); while ((count = channel.read(buffer)) > 0) {buffer.flip();while (buffer.hasRemaining()) {channel.write(buffer);}buffer.clear();}if (count < 0) {channel.close();return;}//重新开始关注读事件key.interestOps(key.interestOps() | SelectionKey.OP_READ);key.selector().wakeup();}}public static void main(String[] args) throws Exception {new SelectSocketsThreadPool().go(args);}}
package cn.chenkangxian.nioconcurrent;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.nio.ByteBuffer;import java.nio.channels.SelectableChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;/** * * @Project: concurrentnio * * @Author: chenkangxian * * @Annotation: * * @Date:2011-7-11 * * @Copyright: 2011 chenkangxian, All rights reserved. * */public class SelectSockets {public static int PORT_NUMBER = 1234;private ByteBuffer buffer = ByteBuffer.allocate(1024);public static void main(String[] args) throws Exception {new SelectSockets().go(args);}public void go(String[] args) throws Exception{int port = PORT_NUMBER;//if(args.length > 0){//port = Integer.parseInt(args[0]);//}//System.out.println("Listening on port " + port);ServerSocketChannel serverChannel = ServerSocketChannel.open();ServerSocket serverSocket = serverChannel.socket();Selector selector = Selector.open();serverSocket.bind(new InetSocketAddress(port));serverChannel.configureBlocking(false);serverChannel.register(selector, SelectionKey.OP_ACCEPT);while(true){int n = selector.select(); //没有轮询,单个selectorif(n == 0){continue; }Iterator it = selector.selectedKeys().iterator();while(it.hasNext()){SelectionKey key = (SelectionKey)it.next();if(key.isAcceptable()){ServerSocketChannel server = (ServerSocketChannel)key.channel();SocketChannel channel = server.accept();registerChannel(selector,channel,SelectionKey.OP_READ);sayHello(channel);}if(key.isReadable()){readDataFromSocket(key);}it.remove();}}}/** * 在selector上注册channel,并设置interest * * Author: chenkangxian * * Last Modification Time: 2011-7-11 * * @param selector 选择器 * * @param channel 通道 * * @param ops interest * * @throws Exception */protected void registerChannel(Selector selector,SelectableChannel channel, int ops) throws Exception{if(channel == null){return ; }channel.configureBlocking(false);channel.register(selector, ops);}/** * 处理有可用数据的通道 * * Author: chenkangxian * * Last Modification Time: 2011-7-11 * * @param key 可用通道对应的key * * @throws Exception */protected void readDataFromSocket(SelectionKey key) throws Exception{SocketChannel socketChannel = (SocketChannel)key.channel();int count;buffer.clear(); //Empty bufferwhile((count = socketChannel.read(buffer)) > 0){buffer.flip(); while(buffer.hasRemaining()){socketChannel.write(buffer);}buffer.clear(); }if(count < 0){socketChannel.close();}}/** * 打招呼 * * Author: chenkangxian * * Last Modification Time: 2011-7-11 * * @param channel 客户端channel * * @throws Exception */private void sayHello(SocketChannel channel) throws Exception{buffer.clear();buffer.put("Hello 哈罗! \r\n".getBytes());buffer.flip();channel.write(buffer);}}
请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!
技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成
网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];
文章归档
文章标签
友情链接