您的位置:首页 > 其它

grpc-服务端与客户端四种数据传递方式(2)

2017-07-23 18:25 507 查看
gpc服务端和客户端的数据发送有四种方式,客户端启动服务端的启动代码在上篇文章已经描述,这里将只列出关键实现的代码。

1.客户端发送一个对象,服务端返回一个对象

这种方式类似于传统的Http请求数据的方式,在上篇文章有一个简单的实现例子,在这里不再描述。

2.客户端发送一个对象,服务端返回一个Stream对象

Stream对象在传输过程中会被当做集合,用Iterator来遍历处理。来看一个实现例子:

proto文件:

rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {}


grpc中和thrift不同的是请求和返回的对象必须是message,将对象声明为Stream将会以流的方式传输。

服务端实现:

@Override
public void getStudentsByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) {
System.out.println("接受客户端信息: " + request.getAge());
responseObserver.onNext(StudentResponse.newBuilder().setName("张三").setAge(20).setCity("北京").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("李四").setAge(30).setCity("天津").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("王五").setAge(40).setCity("武汉").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("赵六").setAge(50).setCity("深圳").build());

responseObserver.onCompleted();
}


类似第一种方式,这种方式通过多次调用onNext来组装多个消息,从而最后返回一个Stream对象。

客户端实现:

Iterator<StudentResponse> iterable = blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(20).build());
while (iterable.hasNext()) {
StudentResponse studentResponse = iterable.next();
System.out.println(studentResponse.getName() + "," + studentResponse.getAge() + "," + studentResponse.getCity());
}


3.客户端发送Stream对象,服务端返回一个简单对象

proto文件:

rpc GetStudentsWrapperByages(stream StudentRequest) returns (StudentResponseList) {}

服务端实现:

@Override
public StreamObserver<StudentRequest> getStudentsWrapperByages(StreamObserver<StudentResponseList> responseObserver) {
return new StreamObserver<StudentRequest>() {
@Override
public void onNext(StudentRequest value) {
System.out.println("onNext: " + value.getAge());
}

@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}

@Override
public void onCompleted() {
StudentResponse studentResponse = StudentResponse.newBuilder().setName("张三").setAge(20).setCity("西安").build();
StudentResponse studentResponse2 = StudentResponse.newBuilder().setName("李四").setAge(30).setCity("广州").build();

StudentResponseList studentResponseList = StudentResponseList.newBuilder().addStudentResponse(studentResponse).addStudentResponse(studentResponse2).build();

responseObserver.onNext(studentResponseList);
responseObserver.onCompleted();
}
};
}


客户端每个消息过来都会调用一次onNext方法,当客户端发送完毕后,会执行onCompleted来返回一个对象给客户端。

客户端实现:

StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
@Override
public void onNext(StudentResponseList value) {
value.getStudentResponseList().forEach(studentResponse -> {
System.out.println(studentResponse.getName());
System.out.println(studentResponse.getAge());
System.out.println(studentResponse.getCity());
System.out.println("***********");

});
}

@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}

@Override
public void onCompleted() {
System.out.println("completed!");
}
};

StreamObserver<StudentRequest> studentRequestStreamObserver = studentServiceStub.getStudentsWrapperByages(studentResponseListStreamObserver);

studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(20).build());
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(30).build());
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(40).build());
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(50).build());
studentRequestStreamObserver.onCompleted();


studentResponseListStreamObserver这一部分是响应服务端返回的数据,studentRequestStreamObserver这一部分是用来向服务端发送一个Stream对象。当执行完并不会有我们预料的结果,有一个关键点需要注意:

客户端向服务端发送一个Stream, 不能跟之前一样使用阻塞的stu去发送请求,而应该使用异步的stb来处理:

StudentServiceGrpc.StudentServiceStub studentServiceStub = StudentServiceGrpc.newStub(managedChannel);


源码如下:

/**
* Creates a new async stub that supports all call types for the service
*/
public static StudentServiceStub newStub(io.grpc.Channel channel) {
return new StudentServiceStub(channel);
}


创建一个异步的stub去调用service,甚至会在onNext发送的消息还没有发送到服务端的时候整个程序就执行完了,因为是异步的并不会有阻塞的等待,可以在程序末尾添加一个thread.sleep(seconds),然后程序会正常执行,可以验证异步的这个问题。

4.客户端和服务端都传输的是Stream对象

proto文件:

rpc BiTalk(stream StreamRequest) returns (stream StreamResponse) {}

服务端实现:

@Override
public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
return new StreamObserver<StreamRequest>() {
@Override
public void onNext(StreamRequest value) {
System.out.println("onNest: " + value.getRequestInfo());

responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
}

@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}

@Overri
b76f
de
public void onCompleted() {
responseObserver.onCompleted();
}
};
}


客户端实现:

requestStreamObserver = studentServiceStub.biTalk(new StreamObserver() { hljs cs">            @Override
public void onNext(StreamResponse value) {
System.out.println(value.getResponseInfo());
}

@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}

@Override
public void onCompleted() {
System.out.println("onCompleted!");
}
});

for (int i = 0; i < 10;i ++) {
requestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}


源代码地址:

https://github.com/huiGod/netty_lecture

main.java.com.huiGod.grpc
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  stream grpc