1. Avro RPC简介

1.1. RPC

  • RPC逻辑上分为二层,一是传输层,负责网络通信;二是协议层,将数据按照一定协议格式打包和解包
  • 从序列化方式来看,Apache Thrift 和Google的Protocol Buffers和Avro应该是属于同一个级别的框架,都能跨语言,性能优秀,数据精简,但是Avro的动态模式(不用生成代码,而且性能很好)这个特点让人非常喜欢,比较适合RPC的数据交换。

1.2. Avro RPC的主要特点

Avro RPC 是一个支持跨语言实现的RPC服务框架。非常轻量级,实现简洁,使用方便,同时支持使用者进行二次开发,逻辑上该框架分为两层:

  • 网络传输层使用Netty的Nio实现。
  • 协议层可扩展,目前支持的数据序列化方式有Avro,Protocol Buffers ,Json, Hessian,Java序列化。 使用者可以注册自己的协议格式及序列化方式。

Avro RPC主要特点:

  • 客户端传输层与应用层逻辑分离,传输层主要职责包括创建连接,连接查找与复用,传输数据,接收服务端回复后回调应用层;
  • 客户端支持同步调用和异步调用。服务异步化能很好的提高系统吞吐量,建议使用异步调用。为防止异步发送请求过快,客户端增加了“请求流量限制”功能;
  • 服务端有一个协议注册工厂和序列化注册工厂。这样方便针对不同的应用场景来定制服务方式。RPC应该只是服务方式的一种。在分布式的系统架构中,分布式节点之间的通信会存在多种方式,比如MQ的TOP消息,一个消息可以有多个订阅者。因此avro-rpc不仅仅是一个RPC服务框架,还是一个分布式通信的一个基础骨架,提供了很好的扩展性;
  • Avro序列化框架是Hadoop下的一个子项目,其特点是数据序列化不带标签,因此序列化后的数据非常小。支持动态解析, 不像Thrift 与 Protocol Buffers必须根据IDL来生成代码,这样侵入性有点强。性能很好,基本上和 Protocol Buffers差不多;

2. Avro RPC开发

2.1 Maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>learn</groupId><artifactId>learn.avro</artifactId><version>1.0-SNAPSHOT</version><dependencies><!--avro core--><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.7.7</version></dependency><!--avro rpc support--><dependency><groupId>org.apache.avro</groupId><artifactId>avro-ipc</artifactId><version>1.7.7</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.7.7</version><executions><execution><phase>generate-sources</phase><goals><!--Maven goal that helps for code generation--><goal>schema</goal><!--For RPC used--><goal>protocol</goal><goal>idl-protocol</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory><outputDirectory>${project.basedir}/src/main/java/</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.7</source><target>1.7</target></configuration></plugin></plugins></build>
</project>

2.2 定义协议schema文件(在src/main/avro/mail.avpr)

{"namespace": "examples.avro.rpc","protocol": "Mail","types": [{"name": "Message", "type": "record","fields": [{"name": "to",   "type": "string"},{"name": "from", "type": "string"},{"name": "body", "type": "string"}]}],"messages": {"send": {"request": [{"name": "message", "type": "Message"}],"response": "string"}}
}

2.3 生成代码:

在Intellij Idea的Maven视图中,learn avro->Plugins->avro->avro:protocol,右击avro:protocol,执行Run Maven Build,生成protocol schema对应的Java实体类

2.3.1 Mail接口

/*** Autogenerated by Avro** DO NOT EDIT DIRECTLY*/
package examples.avro.rpc;@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public interface Mail {public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"Mail\",\"namespace\":\"example.proto\",\"types\":[{\"type\":\"record\",\"name\":\"Message\",\"fields\":[{\"name\":\"to\",\"type\":\"string\"},{\"name\":\"from\",\"type\":\"string\"},{\"name\":\"body\",\"type\":\"string\"}]}],\"messages\":{\"send\":{\"request\":[{\"name\":\"message\",\"type\":\"Message\"}],\"response\":\"string\"}}}");///Mail接口有1个方法send,参数是Message,Message是一个Avro类,可以序列化和反序列化java.lang.CharSequence send(Message message) throws org.apache.avro.AvroRemoteException;@SuppressWarnings("all")public interface Callback extends Mail {public static final org.apache.avro.Protocol PROTOCOL = Mail.PROTOCOL;void send(Message message, org.apache.avro.ipc.Callback<CharSequence> callback) throws java.io.IOException;}
}

2.3.2 Message类(根据schema文件生成)

/*** Autogenerated by Avro* * DO NOT EDIT DIRECTLY*/
package examples.avro.rpc;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Message extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Message\",\"namespace\":\"example.proto\",\"fields\":[{\"name\":\"to\",\"type\":\"string\"},{\"name\":\"from\",\"type\":\"string\"},{\"name\":\"body\",\"type\":\"string\"}]}");public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }@Deprecated public java.lang.CharSequence to;@Deprecated public java.lang.CharSequence from;@Deprecated public java.lang.CharSequence body;/*** Default constructor.  Note that this does not initialize fields* to their default values from the schema.  If that is desired then* one should use <code>newBuilder()</code>. */public Message() {}/*** All-args constructor.*/public Message(java.lang.CharSequence to, java.lang.CharSequence from, java.lang.CharSequence body) {this.to = to;this.from = from;this.body = body;}public org.apache.avro.Schema getSchema() { return SCHEMA$; }// Used by DatumWriter.  Applications should not call. public java.lang.Object get(int field$) {switch (field$) {case 0: return to;case 1: return from;case 2: return body;default: throw new org.apache.avro.AvroRuntimeException("Bad index");}}// Used by DatumReader.  Applications should not call. @SuppressWarnings(value="unchecked")public void put(int field$, java.lang.Object value$) {switch (field$) {case 0: to = (java.lang.CharSequence)value$; break;case 1: from = (java.lang.CharSequence)value$; break;case 2: body = (java.lang.CharSequence)value$; break;default: throw new org.apache.avro.AvroRuntimeException("Bad index");}}/*** Gets the value of the 'to' field.*/public java.lang.CharSequence getTo() {return to;}/*** Sets the value of the 'to' field.* @param value the value to set.*/public void setTo(java.lang.CharSequence value) {this.to = value;}/*** Gets the value of the 'from' field.*/public java.lang.CharSequence getFrom() {return from;}/*** Sets the value of the 'from' field.* @param value the value to set.*/public void setFrom(java.lang.CharSequence value) {this.from = value;}/*** Gets the value of the 'body' field.*/public java.lang.CharSequence getBody() {return body;}/*** Sets the value of the 'body' field.* @param value the value to set.*/public void setBody(java.lang.CharSequence value) {this.body = value;}/** Creates a new Message RecordBuilder */public static Message.Builder newBuilder() {return new Message.Builder();}/** Creates a new Message RecordBuilder by copying an existing Builder */public static Message.Builder newBuilder(Message.Builder other) {return new Message.Builder(other);}/** Creates a new Message RecordBuilder by copying an existing Message instance */public static Message.Builder newBuilder(Message other) {return new Message.Builder(other);}/*** RecordBuilder for Message instances.*/public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Message>implements org.apache.avro.data.RecordBuilder<Message> {private java.lang.CharSequence to;private java.lang.CharSequence from;private java.lang.CharSequence body;/** Creates a new Builder */private Builder() {super(Message.SCHEMA$);}/** Creates a Builder by copying an existing Builder */private Builder(Message.Builder other) {super(other);if (isValidValue(fields()[0], other.to)) {this.to = data().deepCopy(fields()[0].schema(), other.to);fieldSetFlags()[0] = true;}if (isValidValue(fields()[1], other.from)) {this.from = data().deepCopy(fields()[1].schema(), other.from);fieldSetFlags()[1] = true;}if (isValidValue(fields()[2], other.body)) {this.body = data().deepCopy(fields()[2].schema(), other.body);fieldSetFlags()[2] = true;}}/** Creates a Builder by copying an existing Message instance */private Builder(Message other) {super(Message.SCHEMA$);if (isValidValue(fields()[0], other.to)) {this.to = data().deepCopy(fields()[0].schema(), other.to);fieldSetFlags()[0] = true;}if (isValidValue(fields()[1], other.from)) {this.from = data().deepCopy(fields()[1].schema(), other.from);fieldSetFlags()[1] = true;}if (isValidValue(fields()[2], other.body)) {this.body = data().deepCopy(fields()[2].schema(), other.body);fieldSetFlags()[2] = true;}}/** Gets the value of the 'to' field */public java.lang.CharSequence getTo() {return to;}/** Sets the value of the 'to' field */public Message.Builder setTo(java.lang.CharSequence value) {validate(fields()[0], value);this.to = value;fieldSetFlags()[0] = true;return this; }/** Checks whether the 'to' field has been set */public boolean hasTo() {return fieldSetFlags()[0];}/** Clears the value of the 'to' field */public Message.Builder clearTo() {to = null;fieldSetFlags()[0] = false;return this;}/** Gets the value of the 'from' field */public java.lang.CharSequence getFrom() {return from;}/** Sets the value of the 'from' field */public Message.Builder setFrom(java.lang.CharSequence value) {validate(fields()[1], value);this.from = value;fieldSetFlags()[1] = true;return this; }/** Checks whether the 'from' field has been set */public boolean hasFrom() {return fieldSetFlags()[1];}/** Clears the value of the 'from' field */public Message.Builder clearFrom() {from = null;fieldSetFlags()[1] = false;return this;}/** Gets the value of the 'body' field */public java.lang.CharSequence getBody() {return body;}/** Sets the value of the 'body' field */public Message.Builder setBody(java.lang.CharSequence value) {validate(fields()[2], value);this.body = value;fieldSetFlags()[2] = true;return this; }/** Checks whether the 'body' field has been set */public boolean hasBody() {return fieldSetFlags()[2];}/** Clears the value of the 'body' field */public Message.Builder clearBody() {body = null;fieldSetFlags()[2] = false;return this;}@Overridepublic Message build() {try {Message record = new Message();record.to = fieldSetFlags()[0] ? this.to : (java.lang.CharSequence) defaultValue(fields()[0]);record.from = fieldSetFlags()[1] ? this.from : (java.lang.CharSequence) defaultValue(fields()[1]);record.body = fieldSetFlags()[2] ? this.body : (java.lang.CharSequence) defaultValue(fields()[2]);return record;} catch (Exception e) {throw new org.apache.avro.AvroRuntimeException(e);}}}
}

2.3.3 AvroServer类

package examples.avro.rpc;import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.avro.util.Utf8;import java.io.IOException;
import java.net.InetSocketAddress;//Server端的实现Mai服务
class MailImpl implements Mail {public Utf8 send(Message message) {System.out.println("Message Received:" + message);return new Utf8("Received your message: " + message.getFrom().toString()+ " with body " + message.getBody().toString());}
}public class AvroServer {private static Server server;public static void main(String[] args) throws Exception {System.out.println("Starting server");startServer();Thread.sleep(1000);System.out.println("Server started");Thread.sleep(60 * 1000);server.close();}private static void startServer() throws IOException {server = new NettyServer(new SpecificResponder(Mail.class, new MailImpl()), new InetSocketAddress(65111));}
}

2.3.3 AvroClient类

package examples.avro.rpc;import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.avro.util.Utf8;import java.net.InetSocketAddress;public class AvroClient {public static void main(String[] args) throws Exception {NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(65111));///获取Mail接口的proxy实现Mail proxy = SpecificRequestor.getClient(Mail.class, client);System.out.println("Client of Mail Proxy is built");// fill in the Message record and send itargs = new String[]{"to:Tom", "from:Jack", "body:How are you"};Message message = new Message();message.setTo(new Utf8(args[0]));message.setFrom(new Utf8(args[1]));message.setBody(new Utf8(args[2]));System.out.println("RPC call with message:  " + message.toString());///底层给服务器发送send方法调用System.out.println("Result: " + proxy.send(message));// cleanupclient.close();}
}

本文支持对Avro RPC的粗浅尝试,Avro Client端用的同步通信方式

【Avro二】Avro RPC框架相关推荐

  1. Java实现简单的RPC框架

    一.RPC简介 RPC,全称为Remote Procedure Call,即远程过程调用,它是一个计算机通信协议.它允许像调用本地服务一样调用远程服务.它可以有不同的实现方式.如RMI(远程方法调用) ...

  2. RPC框架面试总结-RPC原理及实现

    一.什么是RPC RPC是远程调用过程的简写,是一个协议,处于网络通信协议的第五层:会话层,其下就是TCP/IP协议,在建立在其基础上的通信会话协议.RPC定义了交互的模式,而应用程序使用这些模式,来 ...

  3. 什么是RPC?RPC框架dubbo的核心流程

    一.REST 与 RPC: 1.什么是 REST 和 RPC 协议: 在单体应用中,各模块间的调用是通过编程语言级别的方法函数来实现,但分布式系统运行在多台机器上,一般来说,每个服务实例都是一个进程, ...

  4. 自己动手从0开始实现一个分布式RPC框架

    简介: 如果一个程序员能清楚的了解RPC框架所具备的要素,掌握RPC框架中涉及的服务注册发现.负载均衡.序列化协议.RPC通信协议.Socket通信.异步调用.熔断降级等技术,可以全方位的提升基本素质 ...

  5. Spark RPC框架源码分析(二)RPC运行时序

    前情提要: Spark RPC框架源码分析(一)简述 一. Spark RPC概述 上一篇我们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Re ...

  6. 带你手写基于 Spring 的可插拔式 RPC 框架(二)整体结构

    前言 上一篇文章中我们已经知道了什么是 RPC 框架和为什么要做一个 RPC 框架了,这一章我们来从宏观上分析,怎么来实现一个 RPC 框架,这个框架都有那些模块以及这些模块的作用. 总体设计 在我们 ...

  7. RPC框架原理及从零实现系列博客(二):11个类实现简单RPC框架

    项目1.0版本源码 https://github.com/wephone/Me... 在上一博文中 跟大家讲了RPC的实现思路 思路毕竟只是思路 那么这篇就带着源码给大家讲解下实现过程中的各个具体问题 ...

  8. Rpc框架dubbo-client(v2.6.3) 源码阅读(二)

    接上一篇 dubbo-server 之后,再来看一下 dubbo-client 是如何工作的. dubbo提供者服务示例, 其结构是这样的! dubbo://192.168.11.6:20880/co ...

  9. RPC框架(一)RPC简介

    一.概述 二.RPC 2.1.RPC定义 2.2.RPC主要组成部分 三.影响RPC框架性能的因素 四.工业界的 RPC 框架一览 4.1.国内 4.2.国外 五.如何选择RPC框架 一.概述 随着公 ...

  10. 支持多序列化的RPC框架avro-rpc

    为什么80%的码农都做不了架构师?>>>    avro-rpc http://code.google.com/p/avro-rpc/ 1.开发背景 公司的运营管理平台建立在J2EE ...

最新文章

  1. Nginx入门笔记之————配置文件结构
  2. LibreOffice 3.6.6 修复了 50 个 Bug
  3. java 输出 三角形_Java实现输出三角形
  4. 软件需求说明书文档格式
  5. IP通信基础 4月15日
  6. yumdownloader和 repotrack下载rpm包
  7. Ubuntu 16.04下使用gcc输出汇编的.0文件为可执行文件时出现:`_start'被多次定义
  8. 肉体之爱的解释圣经_可以解释的AI简介,以及我们为什么需要它
  9. shell脚本中if流程控制语句的应用
  10. sublime text 3中安装ctags支持函数跳转,安装convertToUtf8支持中文步骤[工具篇]
  11. LED恒流驱动IC汇总
  12. 旋转向量解法(罗德里格公式推导及理解)
  13. 武汉大学服务器项目投标函,武汉大学
  14. 港科报道 | 香港科技大学(广州)专题新闻发布会成功举办
  15. logo是啥_Logo什么意思,如何设计logo
  16. matlab mex命令,matlab 调用mex
  17. requests实际使用例子
  18. 浙江省计算机二级题库excel,浙江省计算机二级excel题目
  19. CAD—dxf dwg格式解析库
  20. TCP协议的长连接和短连接详解

热门文章

  1. 阿里天池大赛脱敏多标签文本分类初赛20名方案分享
  2. linux的虚拟内存是4G,而每个进程都有自己独立的4G内存空间,怎么理解?进程虚拟地址4G指拥有4G的寻址能力,需要页表转换为实际物理地址,每个进程用到的内核是直接映射,地址的进程地址-3G的关系
  3. 通知与服务——消息通知——通知推送Notification
  4. 计算机积分符号,积分符号积分区间怎么打出来
  5. Win11WSA无法启动的解决办法
  6. day fit into much one too_PGone Talking too much歌词
  7. Shopee本土店如何做,各国家站点市场分析及热销产品
  8. linux getfattr中文乱码,Linux下快速解析nf_conntrack
  9. GC日志的查看(日志意思)
  10. 数据提取-数据提取软件