[Zookeeper学习笔记十]Zookeeper源代码分析之ClientCnxn数据序列化和反序列化
编程技术  /  houtizong 发布于 3年前   75
ClientCnxn是Zookeeper客户端和Zookeeper服务器端进行通信和事件通知处理的主要类,它内部包含两个类,1. SendThread 2. EventThread, SendThread负责客户端和服务器端的数据通信,也包括事件信息的传输,EventThread主要在客户端回调注册的Watchers进行通知处理
ClientCnxn构造方法
/** * Creates a connection object. The actual network connect doesn't get * established until needed. The start() instance method must be called * subsequent to construction. * * @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838 * @param hostProvider * the list of ZooKeeper servers to connect to * @param sessionTimeout * the timeout for connections. * @param zooKeeper * the zookeeper object that this connection is related to. * @param watcher watcher for this connection * @param clientCnxnSocket * the socket implementation used (e.g. NIO/Netty) * @param sessionId session id if re-establishing session * @param sessionPasswd session passwd if re-establishing session * @param canBeReadOnly * whether the connection is allowed to go to read-only * mode in case of partitioning * @throws IOException */ public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { this.zooKeeper = zooKeeper; this.watcher = watcher; this.sessionId = sessionId; this.sessionPasswd = sessionPasswd; this.sessionTimeout = sessionTimeout; this.hostProvider = hostProvider; this.chrootPath = chrootPath; //如果zookeeper集群有1000台,那么会话超时时间岂不是要设置的很大?因此,zookeeper一般不会很大,3台或者5台足亦 connectTimeout = sessionTimeout / hostProvider.size();//链接超时时间是会话超时时间除以Zookeeper集群数 //读超时是会话超时的2/3 readTimeout = sessionTimeout * 2 / 3; readOnly = canBeReadOnly; //读线程,在ClientCnxnd的start方法中启动 sendThread = new SendThread(clientCnxnSocket); //会话线程,在ClientCnxnd的start方法中启动 eventThread = new EventThread(); }
对于SendThread数据传输线程包含两方面的内容,1是基于TCP/IP的Socket的数据传输,2.传输的数据内容。首先关注传输的数据内容,在TCP/IP传输的数据都是字节,因此,在SendThread发送数据之前,需要将要传输的数据结构进行序列化成字节流,服务器端会反序列化成相应的数据结构。当客户端收到服务器返回的字节流时,客户端将其反序列化为相应的数据结构。
首先看看数据的序列化和反序列化,接口定义:
package org.apache.jute //该借口不是Zookeeper原生提供的,是Apache的jute提供的import java.io.IOException;/** * Interface that is implemented by generated classes. * */public interface Record { public void serialize(OutputArchive archive, String tag) //序列化,tag在XmlInputArchive序列化器中,用作xml元素标签 throws IOException; public void deserialize(InputArchive archive, String tag)//反序列话 throws IOException;}
OutputArchive接口是数据结构序列化为字节流的字节流写入器,InputArchive接口是字节流反序列化为数据结构的字节流读取器。OutputArchive和InputArchive接口有三个成对使用的实现类
BinaryOutputArchive和BinaryInputArchive底层使用DataOutput和DataInput作为字节容器
XmlOutputArchive和XmlInputArchive底层使用PrintStream,XmlInputArchive使用Xml解析的方式得到相应的数据结构
CsvOutputArchive和CsvInputArchive底层使用PrintStream作为自己容器
Zookeeper客户端向服务器端发送请求,包含请求头和请求正文两部分,每个请求的请求头的类型都是一样的,而请求正文根据请求的不同,分为多种类型。
Zookeeper服务器端向客户端返回响应数据,包含响应头和响应正文两部分,每个响应的响应头的类型都是一样的,而响应正文根据请求的不同,分为多种类型。
请求头,各种请求正文,响应头和响应正文因为要在Socket上进行数据传输,所以它们应该都是可序列化和反序列话的,因此它们都是可序列化的
请求头:
public class RequestHeader implements Record { private int xid; //请求的事务id,具体的含义和功能接下来分析 private int type; //请求类型? public RequestHeader() { } public RequestHeader( int xid, int type) { this.xid=xid; this.type=type; } public int getXid() { return xid; } public void setXid(int m_) { xid=m_; } public int getType() { return type; } public void setType(int m_) { type=m_; } //序列化操作,将xid和type序列化到OutputArchive中, public void serialize(OutputArchive a_, String tag) throws java.io.IOException { a_.startRecord(this,tag); //对于最常使用的BinaryOutputArchive,此方法空实现 a_.writeInt(xid,"xid"); a_.writeInt(type,"type"); a_.endRecord(this,tag);//对于最常使用的BinaryOutputArchive,此方法空实现 } //序列化操作,将xid和type反序列化 public void deserialize(InputArchive a_, String tag) throws java.io.IOException { a_.startRecord(tag);//对于最常使用的BinaryInputArchive,此方法空实现 xid=a_.readInt("xid"); type=a_.readInt("type"); a_.endRecord(tag);//对于最常使用的BinaryInputArchive,此方法空实现} public String toString() { try { java.io.ByteArrayOutputStream s = new java.io.ByteArrayOutputStream(); CsvOutputArchive a_ = new CsvOutputArchive(s); a_.startRecord(this,"");//对于CsvOutputArchive,startRecord方法 a_.writeInt(xid,"xid"); a_.writeInt(type,"type"); a_.endRecord(this,""); return new String(s.toByteArray(), "UTF-8"); } catch (Throwable ex) { ex.printStackTrace(); } return "ERROR"; } public void write(java.io.DataOutput out) throws java.io.IOException { BinaryOutputArchive archive = new BinaryOutputArchive(out); serialize(archive, ""); } public void readFields(java.io.DataInput in) throws java.io.IOException { BinaryInputArchive archive = new BinaryInputArchive(in); deserialize(archive, ""); } public int compareTo (Object peer_) throws ClassCastException { if (!(peer_ instanceof RequestHeader)) { throw new ClassCastException("Comparing different types of records."); } RequestHeader peer = (RequestHeader) peer_; int ret = 0; ret = (xid == peer.xid)? 0 :((xid<peer.xid)?-1:1); if (ret != 0) return ret; ret = (type == peer.type)? 0 :((type<peer.type)?-1:1); if (ret != 0) return ret; return ret; } public boolean equals(Object peer_) { if (!(peer_ instanceof RequestHeader)) { return false; } if (peer_ == this) { return true; } RequestHeader peer = (RequestHeader) peer_; boolean ret = false; ret = (xid==peer.xid); if (!ret) return ret; ret = (type==peer.type); if (!ret) return ret; return ret; } public int hashCode() { int result = 17; int ret; ret = (int)xid; result = 37*result + ret; ret = (int)type; result = 37*result + ret; return result; } public static String signature() { return "LRequestHeader(ii)"; }}
请求正文有很多,比如
以CreateRequest为例进行分析
public class CreateRequest implements Record { private String path; //创建znode节点的path private byte[] data; //创建znode节点时的节点数据 private java.util.List<org.apache.zookeeper.data.ACL> acl; //创建znode节点时的ACL private int flags;//这个参数干啥的? public CreateRequest() { } public CreateRequest( String path, byte[] data, java.util.List<org.apache.zookeeper.data.ACL> acl, int flags) { this.path=path; this.data=data; this.acl=acl; this.flags=flags; } public String getPath() { return path; } public void setPath(String m_) { path=m_; } public byte[] getData() { return data; } public void setData(byte[] m_) { data=m_; } public java.util.List<org.apache.zookeeper.data.ACL> getAcl() { return acl; } public void setAcl(java.util.List<org.apache.zookeeper.data.ACL> m_) { acl=m_; } public int getFlags() { return flags; } public void setFlags(int m_) { flags=m_; } public void serialize(OutputArchive a_, String tag) throws java.io.IOException { a_.startRecord(this,tag); a_.writeString(path,"path");//写入path a_.writeBuffer(data,"data");//写入data字节数组 { a_.startVector(acl,"acl");//写入acl,acl是List类型 if (acl!= null) { int len1 = acl.size(); for(int vidx1 = 0; vidx1<len1; vidx1++) { org.apache.zookeeper.data.ACL e1 = (org.apache.zookeeper.data.ACL) acl.get(vidx1); a_.writeRecord(e1,"e1");//ACL也是一个Record } } a_.endVector(acl,"acl"); } a_.writeInt(flags,"flags");//写入flags a_.endRecord(this,tag); } public void deserialize(InputArchive a_, String tag) throws java.io.IOException { a_.startRecord(tag); path=a_.readString("path"); data=a_.readBuffer("data"); { Index vidx1 = a_.startVector("acl"); if (vidx1!= null) { acl=new java.util.ArrayList<org.apache.zookeeper.data.ACL>(); for (; !vidx1.done(); vidx1.incr()) { org.apache.zookeeper.data.ACL e1; e1= new org.apache.zookeeper.data.ACL(); a_.readRecord(e1,"e1"); acl.add(e1); } } a_.endVector("acl"); } flags=a_.readInt("flags"); a_.endRecord(tag);} public String toString() { try { java.io.ByteArrayOutputStream s = new java.io.ByteArrayOutputStream(); CsvOutputArchive a_ = new CsvOutputArchive(s); a_.startRecord(this,""); a_.writeString(path,"path"); a_.writeBuffer(data,"data"); { a_.startVector(acl,"acl"); if (acl!= null) { int len1 = acl.size(); for(int vidx1 = 0; vidx1<len1; vidx1++) { org.apache.zookeeper.data.ACL e1 = (org.apache.zookeeper.data.ACL) acl.get(vidx1); a_.writeRecord(e1,"e1"); } } a_.endVector(acl,"acl"); } a_.writeInt(flags,"flags"); a_.endRecord(this,""); return new String(s.toByteArray(), "UTF-8"); } catch (Throwable ex) { ex.printStackTrace(); } return "ERROR"; } public void write(java.io.DataOutput out) throws java.io.IOException { BinaryOutputArchive archive = new BinaryOutputArchive(out); serialize(archive, ""); } public void readFields(java.io.DataInput in) throws java.io.IOException { BinaryInputArchive archive = new BinaryInputArchive(in); deserialize(archive, ""); } public int compareTo (Object peer_) throws ClassCastException { throw new UnsupportedOperationException("comparing CreateRequest is unimplemented"); } public boolean equals(Object peer_) { if (!(peer_ instanceof CreateRequest)) { return false; } if (peer_ == this) { return true; } CreateRequest peer = (CreateRequest) peer_; boolean ret = false; ret = path.equals(peer.path); if (!ret) return ret; ret = org.apache.jute.Utils.bufEquals(data,peer.data); if (!ret) return ret; ret = acl.equals(peer.acl); if (!ret) return ret; ret = (flags==peer.flags); if (!ret) return ret; return ret; } public int hashCode() { int result = 17; int ret; ret = path.hashCode(); result = 37*result + ret; ret = java.util.Arrays.toString(data).hashCode(); result = 37*result + ret; ret = acl.hashCode(); result = 37*result + ret; ret = (int)flags; result = 37*result + ret; return result; } public static String signature() { return "LCreateRequest(sB[LACL(iLId(ss))]i)"; }}
ConnectRequest的请求数据:
private int protocolVersion;
private long lastZxidSeen; //客户端保存的Zxid最近时间,zxid有什么用呢?
private int timeOut;//会话超时时间
private long sessionId;
private byte[] passwd;
ClientCnxn的内部类Packet类封装了请求头,响应头,请求正文和响应征正文
static class Packet { RequestHeader requestHeader;//请求头 ReplyHeader replyHeader; //响应头 Record request;//请求正文 Record response; //响应正文 ByteBuffer bb;//上面四部分序列化的字节流 /** Client's view of the path (may differ due to chroot) **/ String clientPath; /** Servers's view of the path (may differ due to chroot) **/ String serverPath; boolean finished; AsyncCallback cb;//异步请求的响应Callback Object ctx; WatchRegistration watchRegistration; public boolean readOnly; /** Convenience ctor */ Packet(RequestHeader requestHeader, ReplyHeader replyHeader, Record request, Record response, WatchRegistration watchRegistration) { this(requestHeader, replyHeader, request, response, watchRegistration, false); } Packet(RequestHeader requestHeader, ReplyHeader replyHeader, Record request, Record response, WatchRegistration watchRegistration, boolean readOnly) { this.requestHeader = requestHeader; this.replyHeader = replyHeader; this.request = request; this.response = response; this.readOnly = readOnly; this.watchRegistration = watchRegistration; } public void createBB() { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);//序列化字节流容器 boa.writeInt(-1, "len"); // We'll fill this in later if (requestHeader != null) { requestHeader.serialize(boa, "header"); } if (request instanceof ConnectRequest) { request.serialize(boa, "connect"); // append "am-I-allowed-to-be-readonly" flag boa.writeBool(readOnly, "readOnly"); } else if (request != null) { request.serialize(boa, "request"); } baos.close(); this.bb = ByteBuffer.wrap(baos.toByteArray());//将字节流容器中的字节流复制给bb this.bb.putInt(this.bb.capacity() - 4); this.bb.rewind(); } catch (IOException e) { LOG.warn("Ignoring unexpected exception", e); } } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("clientPath:" + clientPath); sb.append(" serverPath:" + serverPath); sb.append(" finished:" + finished); sb.append(" header:: " + requestHeader); sb.append(" replyHeader:: " + replyHeader); sb.append(" request:: " + request); sb.append(" response:: " + response); // jute toString is horrible, remove unnecessary newlines return sb.toString().replaceAll("\r*\n+", " "); } }
ClientCnxn类包含两个队列(LinkedList),队列中的元素都是Packet类型,pengdingQueue表示请求已经发送,等待响应结果;outgoingQueue表示等待发送请求的请求序列
/** * These are the packets that have been sent and are waiting for a response. */ private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>(); /** * These are the packets that need to be sent. */ private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
ClientCnxn的Socket的数据传输,将另外一篇进行单独分析
请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!
技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成
网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];
文章归档
文章标签
友情链接