您的位置:首页 > 其它

RPC框架系列——Avro

2013-08-07 13:48 302 查看

RPC框架系列——Avro

Posted on 2011/09/09
by
Jeoygin
in
网络,
计算机技术
浏览次数:5,067
1.下载与安装

  官方网站:http://avro.apache.org/

  下载地址:http://labs.renren.com/apache-mirror//avro/avro-1.5.1/avro-src-1.5.1.tar.gz

  安装之前确保已经装了maven

cd /usr/local/src
wget http://labs.renren.com/apache-mirror//avro/avro-1.5.1/avro-src-1.5.1.tar.gz tar zxvf avro-src-1.5.1.tar.gz
cd avro-src-1.5.1/lang/java
mvn clean install -DskipTests

  安装后,avro-1.5.1.jar位于avro-src-1.5.1/lang/java/avro/target

2.消息结构与服务接口

  Avro的模式主要由JSON对象来表示,Avro支持8种基本类型(Primitive Type)和6种复杂类型(Complex Type:records、enums、arrays、maps、unions 和fixed),基本类型可以由JSON字符串来表示。

  Avro支持两种序列化编码方式:二进制编码和JSON编码,使用二进制编码会高效序列化,并且序列化后得到的结果会比较小。

  基本类型:

null: no value
boolean: a binary value
int: 32-bit signed integer
long: 64-bit signed integer
float: single precision (32-bit) IEEE 754 floating-point number
double: double precision (64-bit) IEEE 754 floating-point number
bytes: sequence of 8-bit unsigned bytes
string: unicode character sequence

  首先编写一个message.avpr文件,定义一个消息结构。

{
"namespace": "avro",
"protocol": "messageProtocol",
"doc": "This is a message.",
"name": "Message",

"types": [
{"name":"message", "type":"record",
"fields":[
{"name":"name", "type":"string"},
{"name":"type", "type":"int"},
{"name":"price", "type":"double"},
{"name":"valid", "type":"boolean"},
{"name":"content", "type":"bytes"}
]}
],

"messages": {
"sendMessage":{
"doc" : "test",
"request" :[{"name":"message","type":"message" }],
"response" :"message"
}
}
}

  其中定义了1种类型叫做message,有5个成员name、type、price、valid、content。还定义了1个消息服务叫做sendMessage,输入有一个参数,类型是message,返回message。

3.序列化

  Avro有两种序列化编码:binary和JSON。

3.1.Binary Encoding

  基本类型:

    null:0字节

    boolean:1个字节——0(false)或1(true)

    int和long使用变长的zig-zag编码

    float:4个字节

    double:8个字节

    bytes:1个long,后边跟着字节序列

    string:1个long,后边跟着UTF-8编码的字符

3.2.records

  按字段声明的顺序编码值,如下面一个record schema:

{
"type": "record",
"name": "test",
"fields" : [
{"name": "a", "type": "long"},
{"name": "b", "type": "string"}]
}

  实例化这个record,a字段的值是27(编码为0×36),b字段的值是“foo”(编码为06 66 6f 6f),那么这个record编码结果是:

36 06 66 6f 6f

3.3.enums

  一个enum被编码为一个int,比如,考虑这个enum。

{"type": "enum", "name": "Foo", "symbols": ["A", "B", "C", "D"] }

  这将被编码为一个取值范围为[0,3]的int,0表示“A”,3表示“D”。

3.4.arrays

  arrays编码为block序列,每个block包含一个long的count值,紧跟着的是array items,一个block的count为0表示该block是array的结尾。

3.5.maps

  mapss编码为block序列,每个block包含一个long的count值,紧跟着的是key/value对,一个block的count为0表示该block是map的结尾。

3.6.union

  union编码以一个long值开始,表示后边的数据是union中的哪种数据类型。

3.7.fixed

  编码为指定数目的字节。

4.rpc通信实现

  Avro的RPC实现不需要定义服务接口,但需要从.avpr文件中解析协议,协议中定义了消息结构和消息服务。message.avpr中定义了一个类型叫message,定义了一个服务叫sendMessage。

  工具类Utils.java:

package
avro;

import java.io.File;
import java.io.IOException;
import java.net.URL;

import org.apache.avro.Protocol;

public class
Utils {
public static
Protocol getProtocol()
{
Protocol protocol =
null;
try {
URL url =
Utils.class.getClassLoader().getResource("message.avpr");
protocol =
Protocol.parse(new File(url.getPath()));
} catch
(IOException
e)
{
e.printStackTrace();
}
return protocol;
}
}

  服务端实现Server.java:

package
avro;

import org.apache.avro.Protocol;
import org.apache.avro.Protocol.Message;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.ipc.generic.GenericResponder;

public class
Server extends
GenericResponder
{
private Protocol
protocol =
null;
private int
port;

public Server(Protocol
protocol,
int port)
{
super(protocol);
this.protocol =
protocol;
this.port =
port;
}

public Object
respond(Message
message, Object
request)
throws Exception
{
GenericRecord req =
(GenericRecord)
request;
GenericRecord msg =
(GenericRecord)(req.get("message"));
// process the request

return msg;
}

public void
run()
{
try {
HttpServer server =
new HttpServer(this,
port);

server.start();
} catch
(Exception
e)
{
e.printStackTrace();
}
}

public static
void main(String[]
args)
{
if (args.length
!= 1)
{
System.out.println("Usage:
Server port");
System.exit(0);
}
int port =
Integer.parseInt(args[0]);
new Server(Utils.getProtocol(),
port).run();
}
}

  客户端实现Client.java:

package
avro;

import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;

import org.apache.avro.util.Utf8;
import org.apache.avro.Protocol;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.generic.GenericRequestor;

public class
Client {
private Protocol
protocol =
null;
private String
host = null;
private int
port = 0;
private int
size = 0;
private int
count = 0;

public Client(Protocol
protocol,
String host,
int port,
int size,
int count)
{
this.protocol =
protocol;
this.host =
host;
this.port =
port;
this.size =
size;
this.count =
count;
}

public long
sendMessage()
throws Exception
{
GenericRecord requestData =
new GenericData.Record(
protocol.getType("message"));
// initiate the request data


GenericRecord request =
new GenericData.Record(protocol.getMessages()
.get("sendMessage").getRequest());
request.put("message",
requestData);

Transceiver t =
new HttpTransceiver(new
URL("http://" +
host + ":"
+ port));
GenericRequestor requestor =
new GenericRequestor(protocol,
t);

long start =
System.currentTimeMillis();
for (int
i = 0;
i < count;
i++)
{
requestor.request("sendMessage",
request);
}
long end =
System.currentTimeMillis();
System.out.println(end
- start);
return end -
start;
}

public long
run()
{
long res =
0;
try {
res =
sendMessage();
} catch
(Exception
e)
{
e.printStackTrace();
}
return res;
}

public static
void main(String[]
args)
throws Exception
{
if (args.length
!= 4)
{
System.out.println("Usage:
Client host port dataSize count");
System.exit(0);
}

String host =
args[0];
int port =
Integer.parseInt(args[1]);
int size =
Integer.parseInt(args[2]);
int count =
Integer.parseInt(args[3]);
new Client(Utils.getProtocol(),
host, port,
size, count).run();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: