
编程技术  /  houtizong 发布于 3年前   80

ClientCnxn是Zookeeper客户端和Zookeeper服务器端进行通信和事件通知处理的主要类,它内部包含两个类,1. SendThread 2. EventThread, SendThread负责客户端和服务器端的数据通信,也包括事件信息的传输,EventThread主要在客户端回调注册的Watchers进行通知处理





    /**     * Creates a connection object. The actual network connect doesn't get     * established until needed. The start() instance method must be called     * subsequent to construction.     *     * @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838     * @param hostProvider     *                the list of ZooKeeper servers to connect to     * @param sessionTimeout     *                the timeout for connections.     * @param zooKeeper     *                the zookeeper object that this connection is related to.     * @param watcher watcher for this connection     * @param clientCnxnSocket     *                the socket implementation used (e.g. NIO/Netty)     * @param sessionId session id if re-establishing session     * @param sessionPasswd session passwd if re-establishing session     * @param canBeReadOnly     *                whether the connection is allowed to go to read-only     *                mode in case of partitioning     * @throws IOException     */    public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {        this.zooKeeper = zooKeeper;        this.watcher = watcher;        this.sessionId = sessionId;        this.sessionPasswd = sessionPasswd;        this.sessionTimeout = sessionTimeout;        this.hostProvider = hostProvider;        this.chrootPath = chrootPath;        //如果zookeeper集群有1000台,那么会话超时时间岂不是要设置的很大?因此,zookeeper一般不会很大,3台或者5台足亦        connectTimeout = sessionTimeout / hostProvider.size();//链接超时时间是会话超时时间除以Zookeeper集群数        //读超时是会话超时的2/3        readTimeout = sessionTimeout * 2 / 3;        readOnly = canBeReadOnly;           //读线程,在ClientCnxnd的start方法中启动        sendThread = new SendThread(clientCnxnSocket);        //会话线程,在ClientCnxnd的start方法中启动        eventThread = new EventThread();    }




package org.apache.jute //该借口不是Zookeeper原生提供的,是Apache的jute提供的import java.io.IOException;/** * Interface that is implemented by generated classes. *  */public interface Record {    public void serialize(OutputArchive archive, String tag) //序列化,tag在XmlInputArchive序列化器中,用作xml元素标签        throws IOException;    public void deserialize(InputArchive archive, String tag)//反序列话        throws IOException;}














public class RequestHeader implements Record {  private int xid; //请求的事务id,具体的含义和功能接下来分析  private int type; //请求类型?  public RequestHeader() {  }  public RequestHeader(        int xid,        int type) {    this.xid=xid;    this.type=type;  }  public int getXid() {    return xid;  }  public void setXid(int m_) {    xid=m_;  }  public int getType() {    return type;  }  public void setType(int m_) {    type=m_;  }  //序列化操作,将xid和type序列化到OutputArchive中,  public void serialize(OutputArchive a_, String tag) throws java.io.IOException {    a_.startRecord(this,tag); //对于最常使用的BinaryOutputArchive,此方法空实现    a_.writeInt(xid,"xid");    a_.writeInt(type,"type");    a_.endRecord(this,tag);//对于最常使用的BinaryOutputArchive,此方法空实现  }  //序列化操作,将xid和type反序列化  public void deserialize(InputArchive a_, String tag) throws java.io.IOException {    a_.startRecord(tag);//对于最常使用的BinaryInputArchive,此方法空实现    xid=a_.readInt("xid");    type=a_.readInt("type");    a_.endRecord(tag);//对于最常使用的BinaryInputArchive,此方法空实现}  public String toString() {    try {      java.io.ByteArrayOutputStream s =        new java.io.ByteArrayOutputStream();      CsvOutputArchive a_ =         new CsvOutputArchive(s);      a_.startRecord(this,"");//对于CsvOutputArchive,startRecord方法    a_.writeInt(xid,"xid");    a_.writeInt(type,"type");      a_.endRecord(this,"");      return new String(s.toByteArray(), "UTF-8");    } catch (Throwable ex) {      ex.printStackTrace();    }    return "ERROR";  }  public void write(java.io.DataOutput out) throws java.io.IOException {    BinaryOutputArchive archive = new BinaryOutputArchive(out);    serialize(archive, "");  }  public void readFields(java.io.DataInput in) throws java.io.IOException {    BinaryInputArchive archive = new BinaryInputArchive(in);    deserialize(archive, "");  }  public int compareTo (Object peer_) throws ClassCastException {    if (!(peer_ instanceof RequestHeader)) {      throw new ClassCastException("Comparing different types of records.");    }    RequestHeader peer = (RequestHeader) peer_;    int ret = 0;    ret = (xid == peer.xid)? 0 :((xid<peer.xid)?-1:1);    if (ret != 0) return ret;    ret = (type == peer.type)? 0 :((type<peer.type)?-1:1);    if (ret != 0) return ret;     return ret;  }  public boolean equals(Object peer_) {    if (!(peer_ instanceof RequestHeader)) {      return false;    }    if (peer_ == this) {      return true;    }    RequestHeader peer = (RequestHeader) peer_;    boolean ret = false;    ret = (xid==peer.xid);    if (!ret) return ret;    ret = (type==peer.type);    if (!ret) return ret;     return ret;  }  public int hashCode() {    int result = 17;    int ret;    ret = (int)xid;    result = 37*result + ret;    ret = (int)type;    result = 37*result + ret;    return result;  }  public static String signature() {    return "LRequestHeader(ii)";  }}



  • 链接请求ConnectRequest
  • 创建znode请求CreateRequest
  • 节点是否存在请求ExistsRequest
  • 删除znode请求DeleteRequest
  • 获取child znodes请求GetChildrenRequest
  • 设置znode数据SetDataRequest
  • 事件WatcherEvent



public class CreateRequest implements Record {  private String path; //创建znode节点的path  private byte[] data; //创建znode节点时的节点数据  private java.util.List<org.apache.zookeeper.data.ACL> acl; //创建znode节点时的ACL  private int flags;//这个参数干啥的?  public CreateRequest() {  }  public CreateRequest(        String path,        byte[] data,        java.util.List<org.apache.zookeeper.data.ACL> acl,        int flags) {    this.path=path;    this.data=data;    this.acl=acl;    this.flags=flags;  }  public String getPath() {    return path;  }  public void setPath(String m_) {    path=m_;  }  public byte[] getData() {    return data;  }  public void setData(byte[] m_) {    data=m_;  }  public java.util.List<org.apache.zookeeper.data.ACL> getAcl() {    return acl;  }  public void setAcl(java.util.List<org.apache.zookeeper.data.ACL> m_) {    acl=m_;  }  public int getFlags() {    return flags;  }  public void setFlags(int m_) {    flags=m_;  }  public void serialize(OutputArchive a_, String tag) throws java.io.IOException {    a_.startRecord(this,tag);    a_.writeString(path,"path");//写入path    a_.writeBuffer(data,"data");//写入data字节数组    {      a_.startVector(acl,"acl");//写入acl,acl是List类型      if (acl!= null) {                  int len1 = acl.size();          for(int vidx1 = 0; vidx1<len1; vidx1++) {            org.apache.zookeeper.data.ACL e1 = (org.apache.zookeeper.data.ACL) acl.get(vidx1);            a_.writeRecord(e1,"e1");//ACL也是一个Record          }      }      a_.endVector(acl,"acl");    }    a_.writeInt(flags,"flags");//写入flags    a_.endRecord(this,tag);  }  public void deserialize(InputArchive a_, String tag) throws java.io.IOException {    a_.startRecord(tag);    path=a_.readString("path");    data=a_.readBuffer("data");    {      Index vidx1 = a_.startVector("acl");      if (vidx1!= null) {          acl=new java.util.ArrayList<org.apache.zookeeper.data.ACL>();          for (; !vidx1.done(); vidx1.incr()) {    org.apache.zookeeper.data.ACL e1;    e1= new org.apache.zookeeper.data.ACL();    a_.readRecord(e1,"e1");            acl.add(e1);          }      }    a_.endVector("acl");    }    flags=a_.readInt("flags");    a_.endRecord(tag);}  public String toString() {    try {      java.io.ByteArrayOutputStream s =        new java.io.ByteArrayOutputStream();      CsvOutputArchive a_ =         new CsvOutputArchive(s);      a_.startRecord(this,"");    a_.writeString(path,"path");    a_.writeBuffer(data,"data");    {      a_.startVector(acl,"acl");      if (acl!= null) {          int len1 = acl.size();          for(int vidx1 = 0; vidx1<len1; vidx1++) {            org.apache.zookeeper.data.ACL e1 = (org.apache.zookeeper.data.ACL) acl.get(vidx1);    a_.writeRecord(e1,"e1");          }      }      a_.endVector(acl,"acl");    }    a_.writeInt(flags,"flags");      a_.endRecord(this,"");      return new String(s.toByteArray(), "UTF-8");    } catch (Throwable ex) {      ex.printStackTrace();    }    return "ERROR";  }  public void write(java.io.DataOutput out) throws java.io.IOException {    BinaryOutputArchive archive = new BinaryOutputArchive(out);    serialize(archive, "");  }  public void readFields(java.io.DataInput in) throws java.io.IOException {    BinaryInputArchive archive = new BinaryInputArchive(in);    deserialize(archive, "");  }  public int compareTo (Object peer_) throws ClassCastException {    throw new UnsupportedOperationException("comparing CreateRequest is unimplemented");  }  public boolean equals(Object peer_) {    if (!(peer_ instanceof CreateRequest)) {      return false;    }    if (peer_ == this) {      return true;    }    CreateRequest peer = (CreateRequest) peer_;    boolean ret = false;    ret = path.equals(peer.path);    if (!ret) return ret;    ret = org.apache.jute.Utils.bufEquals(data,peer.data);    if (!ret) return ret;    ret = acl.equals(peer.acl);    if (!ret) return ret;    ret = (flags==peer.flags);    if (!ret) return ret;     return ret;  }  public int hashCode() {    int result = 17;    int ret;    ret = path.hashCode();    result = 37*result + ret;    ret = java.util.Arrays.toString(data).hashCode();    result = 37*result + ret;    ret = acl.hashCode();    result = 37*result + ret;    ret = (int)flags;    result = 37*result + ret;    return result;  }  public static String signature() {    return "LCreateRequest(sB[LACL(iLId(ss))]i)";  }}



  private int protocolVersion;
  private long lastZxidSeen; //客户端保存的Zxid最近时间,zxid有什么用呢?
  private int timeOut;//会话超时时间
  private long sessionId;
  private byte[] passwd;





   static class Packet {        RequestHeader requestHeader;//请求头        ReplyHeader replyHeader; //响应头        Record request;//请求正文        Record response; //响应正文        ByteBuffer bb;//上面四部分序列化的字节流        /** Client's view of the path (may differ due to chroot) **/        String clientPath;        /** Servers's view of the path (may differ due to chroot) **/        String serverPath;        boolean finished;        AsyncCallback cb;//异步请求的响应Callback        Object ctx;        WatchRegistration watchRegistration;        public boolean readOnly;        /** Convenience ctor */        Packet(RequestHeader requestHeader, ReplyHeader replyHeader,               Record request, Record response,               WatchRegistration watchRegistration) {            this(requestHeader, replyHeader, request, response,                 watchRegistration, false);        }        Packet(RequestHeader requestHeader, ReplyHeader replyHeader,               Record request, Record response,               WatchRegistration watchRegistration, boolean readOnly) {            this.requestHeader = requestHeader;            this.replyHeader = replyHeader;            this.request = request;            this.response = response;            this.readOnly = readOnly;            this.watchRegistration = watchRegistration;        }        public void createBB() {            try {                ByteArrayOutputStream baos = new ByteArrayOutputStream();                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);//序列化字节流容器                boa.writeInt(-1, "len"); // We'll fill this in later                if (requestHeader != null) {                    requestHeader.serialize(boa, "header");                }                if (request instanceof ConnectRequest) {                    request.serialize(boa, "connect");                    // append "am-I-allowed-to-be-readonly" flag                    boa.writeBool(readOnly, "readOnly");                } else if (request != null) {                    request.serialize(boa, "request");                }                baos.close();                this.bb = ByteBuffer.wrap(baos.toByteArray());//将字节流容器中的字节流复制给bb                this.bb.putInt(this.bb.capacity() - 4);                this.bb.rewind();            } catch (IOException e) {                LOG.warn("Ignoring unexpected exception", e);            }        }        @Override        public String toString() {            StringBuilder sb = new StringBuilder();            sb.append("clientPath:" + clientPath);            sb.append(" serverPath:" + serverPath);            sb.append(" finished:" + finished);            sb.append(" header:: " + requestHeader);            sb.append(" replyHeader:: " + replyHeader);            sb.append(" request:: " + request);            sb.append(" response:: " + response);            // jute toString is horrible, remove unnecessary newlines            return sb.toString().replaceAll("\r*\n+", " ");        }    }




/**     * These are the packets that have been sent and are waiting for a response.     */    private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();    /**     * These are the packets that need to be sent.     */    private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();





















技术博客集 - 网站简介:

4.站长邮箱:[email protected];

      订阅博客周刊 去订阅




Auther ·HouTiZong
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码