【Avro二】Avro RPC框架

编程技术  /  houtizong 发布于 3年前   72

1. Avro RPC简介

1.1. RPC

  • RPC逻辑上分为二层,一是传输层,负责网络通信;二是协议层,将数据按照一定协议格式打包和解包
  • 从序列化方式来看,Apache Thrift 和Google的Protocol Buffers和Avro应该是属于同一个级别的框架,都能跨语言,性能优秀,数据精简,但是Avro的动态模式(不用生成代码,而且性能很好)这个特点让人非常喜欢,比较适合RPC的数据交换。

1.2. Avro RPC的主要特点

Avro RPC 是一个支持跨语言实现的RPC服务框架。非常轻量级,实现简洁,使用方便,同时支持使用者进行二次开发,逻辑上该框架分为两层:

  • 网络传输层使用Netty的Nio实现。
  • 协议层可扩展,目前支持的数据序列化方式有Avro,Protocol Buffers ,Json, Hessian,Java序列化。 使用者可以注册自己的协议格式及序列化方式。

Avro RPC主要特点:

  • 客户端传输层与应用层逻辑分离,传输层主要职责包括创建连接,连接查找与复用,传输数据,接收服务端回复后回调应用层;
  • 客户端支持同步调用和异步调用。服务异步化能很好的提高系统吞吐量,建议使用异步调用。为防止异步发送请求过快,客户端增加了“请求流量限制”功能;
  • 服务端有一个协议注册工厂和序列化注册工厂。这样方便针对不同的应用场景来定制服务方式。RPC应该只是服务方式的一种。在分布式的系统架构中,分布式节点之间的通信会存在多种方式,比如MQ的TOP消息,一个消息可以有多个订阅者。因此avro-rpc不仅仅是一个RPC服务框架,还是一个分布式通信的一个基础骨架,提供了很好的扩展性;
  • Avro序列化框架是Hadoop下的一个子项目,其特点是数据序列化不带标签,因此序列化后的数据非常小。支持动态解析, 不像Thrift 与 Protocol Buffers必须根据IDL来生成代码,这样侵入性有点强。性能很好,基本上和 Protocol Buffers差不多;

2. Avro RPC开发

2.1 Maven依赖

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>learn</groupId>    <artifactId>learn.avro</artifactId>    <version>1.0-SNAPSHOT</version>    <dependencies>        <!--avro core-->        <dependency>            <groupId>org.apache.avro</groupId>            <artifactId>avro</artifactId>            <version>1.7.7</version>        </dependency>        <!--avro rpc support-->        <dependency>            <groupId>org.apache.avro</groupId>            <artifactId>avro-ipc</artifactId>            <version>1.7.7</version>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                <groupId>org.apache.avro</groupId>                <artifactId>avro-maven-plugin</artifactId>                <version>1.7.7</version>                <executions>                    <execution>                        <phase>generate-sources</phase>                        <goals>                            <!--Maven goal that helps for code generation-->                            <goal>schema</goal>                            <!--For RPC used-->                            <goal>protocol</goal>                            <goal>idl-protocol</goal>                        </goals>                        <configuration>                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>                        </configuration>                    </execution>                </executions>            </plugin>            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <configuration>                    <source>1.7</source>                    <target>1.7</target>                </configuration>            </plugin>        </plugins>    </build></project>

 

2.2 定义协议schema文件(在src/main/avro/mail.avpr)

{"namespace": "examples.avro.rpc", "protocol": "Mail", "types": [     {"name": "Message", "type": "record",      "fields": [          {"name": "to",   "type": "string"},          {"name": "from", "type": "string"},          {"name": "body", "type": "string"}      ]     } ], "messages": {     "send": {         "request": [{"name": "message", "type": "Message"}],         "response": "string"     } }}

 

2.3 生成代码:

在Intellij Idea的Maven视图中,learn avro->Plugins->avro->avro:protocol,右击avro:protocol,执行Run Maven Build,生成protocol schema对应的Java实体类

 

2.3.1 Mail接口

/** * Autogenerated by Avro * * DO NOT EDIT DIRECTLY */package examples.avro.rpc;@SuppressWarnings("all")@org.apache.avro.specific.AvroGeneratedpublic interface Mail {    public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"Mail\",\"namespace\":\"example.proto\",\"types\":[{\"type\":\"record\",\"name\":\"Message\",\"fields\":[{\"name\":\"to\",\"type\":\"string\"},{\"name\":\"from\",\"type\":\"string\"},{\"name\":\"body\",\"type\":\"string\"}]}],\"messages\":{\"send\":{\"request\":[{\"name\":\"message\",\"type\":\"Message\"}],\"response\":\"string\"}}}");    ///Mail接口有1个方法send,参数是Message,Message是一个Avro类,可以序列化和反序列化    java.lang.CharSequence send(Message message) throws org.apache.avro.AvroRemoteException;    @SuppressWarnings("all")    public interface Callback extends Mail {        public static final org.apache.avro.Protocol PROTOCOL = Mail.PROTOCOL;        void send(Message message, org.apache.avro.ipc.Callback<CharSequence> callback) throws java.io.IOException;    }}

 

2.3.2 Message类(根据schema文件生成)

/** * Autogenerated by Avro *  * DO NOT EDIT DIRECTLY */package examples.avro.rpc;@SuppressWarnings("all")@org.apache.avro.specific.AvroGeneratedpublic class Message extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Message\",\"namespace\":\"example.proto\",\"fields\":[{\"name\":\"to\",\"type\":\"string\"},{\"name\":\"from\",\"type\":\"string\"},{\"name\":\"body\",\"type\":\"string\"}]}");  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }  @Deprecated public java.lang.CharSequence to;  @Deprecated public java.lang.CharSequence from;  @Deprecated public java.lang.CharSequence body;  /**   * Default constructor.  Note that this does not initialize fields   * to their default values from the schema.  If that is desired then   * one should use <code>newBuilder()</code>.    */  public Message() {}  /**   * All-args constructor.   */  public Message(java.lang.CharSequence to, java.lang.CharSequence from, java.lang.CharSequence body) {    this.to = to;    this.from = from;    this.body = body;  }  public org.apache.avro.Schema getSchema() { return SCHEMA$; }  // Used by DatumWriter.  Applications should not call.   public java.lang.Object get(int field$) {    switch (field$) {    case 0: return to;    case 1: return from;    case 2: return body;    default: throw new org.apache.avro.AvroRuntimeException("Bad index");    }  }  // Used by DatumReader.  Applications should not call.   @SuppressWarnings(value="unchecked")  public void put(int field$, java.lang.Object value$) {    switch (field$) {    case 0: to = (java.lang.CharSequence)value$; break;    case 1: from = (java.lang.CharSequence)value$; break;    case 2: body = (java.lang.CharSequence)value$; break;    default: throw new org.apache.avro.AvroRuntimeException("Bad index");    }  }  /**   * Gets the value of the 'to' field.   */  public java.lang.CharSequence getTo() {    return to;  }  /**   * Sets the value of the 'to' field.   * @param value the value to set.   */  public void setTo(java.lang.CharSequence value) {    this.to = value;  }  /**   * Gets the value of the 'from' field.   */  public java.lang.CharSequence getFrom() {    return from;  }  /**   * Sets the value of the 'from' field.   * @param value the value to set.   */  public void setFrom(java.lang.CharSequence value) {    this.from = value;  }  /**   * Gets the value of the 'body' field.   */  public java.lang.CharSequence getBody() {    return body;  }  /**   * Sets the value of the 'body' field.   * @param value the value to set.   */  public void setBody(java.lang.CharSequence value) {    this.body = value;  }  /** Creates a new Message RecordBuilder */  public static Message.Builder newBuilder() {    return new Message.Builder();  }    /** Creates a new Message RecordBuilder by copying an existing Builder */  public static Message.Builder newBuilder(Message.Builder other) {    return new Message.Builder(other);  }    /** Creates a new Message RecordBuilder by copying an existing Message instance */  public static Message.Builder newBuilder(Message other) {    return new Message.Builder(other);  }    /**   * RecordBuilder for Message instances.   */  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Message>    implements org.apache.avro.data.RecordBuilder<Message> {    private java.lang.CharSequence to;    private java.lang.CharSequence from;    private java.lang.CharSequence body;    /** Creates a new Builder */    private Builder() {      super(Message.SCHEMA$);    }        /** Creates a Builder by copying an existing Builder */    private Builder(Message.Builder other) {      super(other);      if (isValidValue(fields()[0], other.to)) {        this.to = data().deepCopy(fields()[0].schema(), other.to);        fieldSetFlags()[0] = true;      }      if (isValidValue(fields()[1], other.from)) {        this.from = data().deepCopy(fields()[1].schema(), other.from);        fieldSetFlags()[1] = true;      }      if (isValidValue(fields()[2], other.body)) {        this.body = data().deepCopy(fields()[2].schema(), other.body);        fieldSetFlags()[2] = true;      }    }        /** Creates a Builder by copying an existing Message instance */    private Builder(Message other) {            super(Message.SCHEMA$);      if (isValidValue(fields()[0], other.to)) {        this.to = data().deepCopy(fields()[0].schema(), other.to);        fieldSetFlags()[0] = true;      }      if (isValidValue(fields()[1], other.from)) {        this.from = data().deepCopy(fields()[1].schema(), other.from);        fieldSetFlags()[1] = true;      }      if (isValidValue(fields()[2], other.body)) {        this.body = data().deepCopy(fields()[2].schema(), other.body);        fieldSetFlags()[2] = true;      }    }    /** Gets the value of the 'to' field */    public java.lang.CharSequence getTo() {      return to;    }        /** Sets the value of the 'to' field */    public Message.Builder setTo(java.lang.CharSequence value) {      validate(fields()[0], value);      this.to = value;      fieldSetFlags()[0] = true;      return this;     }        /** Checks whether the 'to' field has been set */    public boolean hasTo() {      return fieldSetFlags()[0];    }        /** Clears the value of the 'to' field */    public Message.Builder clearTo() {      to = null;      fieldSetFlags()[0] = false;      return this;    }    /** Gets the value of the 'from' field */    public java.lang.CharSequence getFrom() {      return from;    }        /** Sets the value of the 'from' field */    public Message.Builder setFrom(java.lang.CharSequence value) {      validate(fields()[1], value);      this.from = value;      fieldSetFlags()[1] = true;      return this;     }        /** Checks whether the 'from' field has been set */    public boolean hasFrom() {      return fieldSetFlags()[1];    }        /** Clears the value of the 'from' field */    public Message.Builder clearFrom() {      from = null;      fieldSetFlags()[1] = false;      return this;    }    /** Gets the value of the 'body' field */    public java.lang.CharSequence getBody() {      return body;    }        /** Sets the value of the 'body' field */    public Message.Builder setBody(java.lang.CharSequence value) {      validate(fields()[2], value);      this.body = value;      fieldSetFlags()[2] = true;      return this;     }        /** Checks whether the 'body' field has been set */    public boolean hasBody() {      return fieldSetFlags()[2];    }        /** Clears the value of the 'body' field */    public Message.Builder clearBody() {      body = null;      fieldSetFlags()[2] = false;      return this;    }    @Override    public Message build() {      try {        Message record = new Message();        record.to = fieldSetFlags()[0] ? this.to : (java.lang.CharSequence) defaultValue(fields()[0]);        record.from = fieldSetFlags()[1] ? this.from : (java.lang.CharSequence) defaultValue(fields()[1]);        record.body = fieldSetFlags()[2] ? this.body : (java.lang.CharSequence) defaultValue(fields()[2]);        return record;      } catch (Exception e) {        throw new org.apache.avro.AvroRuntimeException(e);      }    }  }}

 

2.3.3 AvroServer类

 

package examples.avro.rpc;import org.apache.avro.ipc.NettyServer;import org.apache.avro.ipc.Server;import org.apache.avro.ipc.specific.SpecificResponder;import org.apache.avro.util.Utf8;import java.io.IOException;import java.net.InetSocketAddress;//Server端的实现Mai服务class MailImpl implements Mail {    public Utf8 send(Message message) {        System.out.println("Message Received:" + message);        return new Utf8("Received your message: " + message.getFrom().toString()                + " with body " + message.getBody().toString());    }}public class AvroServer {    private static Server server;    public static void main(String[] args) throws Exception {        System.out.println("Starting server");        startServer();        Thread.sleep(1000);        System.out.println("Server started");        Thread.sleep(60 * 1000);        server.close();    }    private static void startServer() throws IOException {        server = new NettyServer(new SpecificResponder(Mail.class, new MailImpl()), new InetSocketAddress(65111));    }}

 

 

2.3.3 AvroClient类

package examples.avro.rpc;import org.apache.avro.ipc.NettyTransceiver;import org.apache.avro.ipc.specific.SpecificRequestor;import org.apache.avro.util.Utf8;import java.net.InetSocketAddress;public class AvroClient {    public static void main(String[] args) throws Exception {        NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(65111));        ///获取Mail接口的proxy实现        Mail proxy = SpecificRequestor.getClient(Mail.class, client);        System.out.println("Client of Mail Proxy is built");        // fill in the Message record and send it        args = new String[]{"to:Tom", "from:Jack", "body:How are you"};        Message message = new Message();        message.setTo(new Utf8(args[0]));        message.setFrom(new Utf8(args[1]));        message.setBody(new Utf8(args[2]));        System.out.println("RPC call with message:  " + message.toString());        ///底层给服务器发送send方法调用        System.out.println("Result: " + proxy.send(message));        // cleanup        client.close();    }}

 

本文支持对Avro RPC的粗浅尝试,Avro Client端用的同步通信方式

 

 

 

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客