NIO 之 选择就绪模式

编程技术  /  houtizong 发布于 3年前   172

    [size=small;]? ? ?[/size][size=small;][size=medium;]Java NIO非堵塞应用通常适用用在I/O读写等方面,我们知道,系统运行的性能瓶颈通常在I/O读写,包括对端口和文件的操作上,过去,在打开一个I/O通道后,read()将一直等待在端口一边读取字节内容,如果没有内容进来,read()也是傻傻的等,这会影响我们程序继续做其他事情,那么改进做法就是开设线程,让线程去等待,但是这样做也是相当耗费资源的。[/size][/size]

[size=small;][size=medium;]? ? ? ?Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。[/size][/size]

[size=small;][size=medium;]? ? ? Java NIO出现不只是一个技术性能的提高,你会发现网络上到处在介绍它,因为它具有里程碑意义,从JDK1.4开始,Java开始提高性能相关的功能,从而使得Java在底层或者并行分布式计算等操作上已经可以和C或Perl等语言并驾齐驱。[/size][/size]

?


<p style="text-align: center;">? ? ? ? ? ? ? ? ? ? ?[size=small;]图 1 ? [size=10.5pt;" lang="EN-US]<span><span style="font: 7.0pt ;">? ?[/size][/size]</span><span>类结构图</span></span>
?

?

package cn.chenkangxian.nioconcurrent;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;import java.util.LinkedList;import java.util.List;/** * @Project: testNio *  * @Author: chenkangxian *  * @Annotation: 使用线程池来处理大量channel并发 *  * @Date:2011-7-5 *  * @Copyright: 2011 chenkangxian, All rights reserved. *  */public class SelectSocketsThreadPool extends SelectSockets {private static final int MAX_THREADS = 5;private ThreadPool pool = new ThreadPool(MAX_THREADS);/** * 从socket中读数据 */protected void readDataFromSocket(SelectionKey key) throws Exception {WorkerThread worker = pool.getWorker();if (worker == null) {return;}worker.serviceChannel(key);}/** *   * @Project: concurrentnio * * @Author: chenkangxian * * @Annotation:线程池 * * @Date:2011-7-20 * * @Copyright: 2011 chenkangxian, All rights reserved. * */private class ThreadPool {List idle = new LinkedList();/** * 线程池初始化 *  * @param poolSize 线程池大小 */ThreadPool(int poolSize) {for (int i = 0; i < poolSize; i++) {WorkerThread thread = new WorkerThread(this);thread.setName("Worker" + (i + 1));thread.start();idle.add(thread);}}/** * 获得工作线程 *  * Author: chenkangxian * * Last Modification Time: 2011-7-20 * * @return */WorkerThread getWorker() {WorkerThread worker = null;synchronized (idle) {if (idle.size() > 0) {worker = (WorkerThread) idle.remove(0);}}return (worker);}/** * 送回工作线程 *  * Author: chenkangxian * * Last Modification Time: 2011-7-20 * * @param worker */void returnWorker(WorkerThread worker) {synchronized (idle) {idle.add(worker);}}}private class WorkerThread extends Thread {private ByteBuffer buffer = ByteBuffer.allocate(1024);private ThreadPool pool;private SelectionKey key;WorkerThread(ThreadPool pool) {this.pool = pool;}public synchronized void run() {System.out.println(this.getName() + " is ready");while (true) {try {this.wait();//等待被notify} catch (InterruptedException e) {e.printStackTrace();this.interrupt();}if (key == null) {//直到有keycontinue;}System.out.println(this.getName() + " has been awakened");try {drainChannel(key);} catch (Exception e) {System.out.println("Caught '" + e + "' closing channel");try {key.channel().close();} catch (IOException ex) {ex.printStackTrace();}key.selector().wakeup();}key = null;this.pool.returnWorker(this);}}synchronized void serviceChannel(SelectionKey key) {this.key = key;//消除读的关注key.interestOps(key.interestOps() &amp; (~SelectionKey.OP_READ));this.notify();}void drainChannel(SelectionKey key) throws Exception {SocketChannel channel = (SocketChannel) key.channel();int count;buffer.clear(); while ((count = channel.read(buffer)) > 0) {buffer.flip();while (buffer.hasRemaining()) {channel.write(buffer);}buffer.clear();}if (count < 0) {channel.close();return;}//重新开始关注读事件key.interestOps(key.interestOps() | SelectionKey.OP_READ);key.selector().wakeup();}}public static void main(String[] args) throws Exception {new SelectSocketsThreadPool().go(args);}}

?
?

package cn.chenkangxian.nioconcurrent;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.nio.ByteBuffer;import java.nio.channels.SelectableChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;/** *  * @Project: concurrentnio * * @Author: chenkangxian * * @Annotation:  * * @Date:2011-7-11 * * @Copyright: 2011 chenkangxian, All rights reserved. * */public class SelectSockets {public static int PORT_NUMBER = 1234;private ByteBuffer buffer = ByteBuffer.allocate(1024);public static void main(String[] args) throws Exception {new SelectSockets().go(args);}public void go(String[] args) throws Exception{int port = PORT_NUMBER;//if(args.length > 0){//port = Integer.parseInt(args[0]);//}//System.out.println("Listening on port " + port);ServerSocketChannel serverChannel = ServerSocketChannel.open();ServerSocket serverSocket = serverChannel.socket();Selector selector = Selector.open();serverSocket.bind(new InetSocketAddress(port));serverChannel.configureBlocking(false);serverChannel.register(selector, SelectionKey.OP_ACCEPT);while(true){int n = selector.select(); //没有轮询,单个selectorif(n == 0){continue; }Iterator it = selector.selectedKeys().iterator();while(it.hasNext()){SelectionKey key = (SelectionKey)it.next();if(key.isAcceptable()){ServerSocketChannel server = (ServerSocketChannel)key.channel();SocketChannel channel = server.accept();registerChannel(selector,channel,SelectionKey.OP_READ);sayHello(channel);}if(key.isReadable()){readDataFromSocket(key);}it.remove();}}}/** * 在selector上注册channel,并设置interest *  * Author: chenkangxian * * Last Modification Time: 2011-7-11 * * @param selector 选择器 *  * @param channel 通道 *  * @param ops interest *  * @throws Exception */protected void registerChannel(Selector selector,SelectableChannel channel, int ops) throws Exception{if(channel == null){return ; }channel.configureBlocking(false);channel.register(selector, ops);}/** * 处理有可用数据的通道 *  * Author: chenkangxian * * Last Modification Time: 2011-7-11 * * @param key 可用通道对应的key *  * @throws Exception */protected void readDataFromSocket(SelectionKey key) throws Exception{SocketChannel socketChannel = (SocketChannel)key.channel();int count;buffer.clear(); //Empty bufferwhile((count = socketChannel.read(buffer)) > 0){buffer.flip(); while(buffer.hasRemaining()){socketChannel.write(buffer);}buffer.clear(); }if(count < 0){socketChannel.close();}}/** * 打招呼 *  * Author: chenkangxian * * Last Modification Time: 2011-7-11 * * @param channel 客户端channel *  * @throws Exception */private void sayHello(SocketChannel channel) throws Exception{buffer.clear();buffer.put("Hello 哈罗! \r\n".getBytes());buffer.flip();channel.write(buffer);}}

?
?

?

?
?

 
   
     
        <ul style="display:none;">
         
  • <img src='http://dl.iteye.com/upload/attachment/598672/428eea1f-27fb-3c51-91f0-3a112a20490b-thumb.jpg' class='magplus' title='点击查看原始大小图片' />

  •          
             
  • 大小: 28.7 KB

  •         </ul>
         
       
       
         
       
     

    请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

    留言需要登陆哦

    技术博客集 - 网站简介:
    前后端技术:
    后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

    网站主要作用:
    1.编程技术分享及讨论交流,内置聊天系统;
    2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
    3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
    4.站长邮箱:[email protected];

        订阅博客周刊 去订阅

    文章归档

    文章标签

    友情链接

    Auther ·HouTiZong
    侯体宗的博客
    © 2020 zongscan.com
    版权所有ICP证 : 粤ICP备20027696号
    PHP交流群 也可以扫右边的二维码
    侯体宗的博客