java grpc 流式_gRPC的通信方式-客户端流式、服务端流式、双向流式在Java的调用示例... - Go语言中文社区

java grpc 流式_gRPC的通信方式-客户端流式、服务端流式、双向流式在Java的调用示例...


场景

gPRC简介以及Java中使用gPRC实现客户端与服务端通信(附代码下载):

在上面的博客中介绍了gRPC以及使用最基本的rpc通信方式即一个请求对象返回一个响应的方式进行通信。

除此之外gRPC还有以下三种方式。

服务端流式

一个请求对象,服务端返回多个结果对象

proto示例语法

rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {}

客户端流式

客户端传入多个请求对象,服务端返回一个响应结果。

proto示例语法

rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList) {}

双向流式

传入多个对象可以返回多个响应对象

注:

实现

服务端流式实现

在上面博客的基础上,打开Person.proto文件

message StudentRequest {

int32 age= 1;

}

message StudentResponse {string name = 1;

int32 age= 2;string city = 3;

}

添加两个message作为请求和响应对象。

因为gRPC的请求和响应对象必须在message中定义,不能直接使用string或者int32这种作为参数。

然后在新建接口方法

service PersonService {

rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {}

}

此方法是要请求参数为一个age,然后返回多个学生对象。

然后调用插件生成代码。

然后来到PersonServiceImpl中对接口方法进行实现

@Overridepublic void getStudentsByAge(StudentRequest request, StreamObserverresponseObserver) {

System.out.println("接收到的客户端消息为:"+request.getAge());

responseObserver.onNext(StudentResponse.newBuilder().setName("1公众号:霸道的程序猿")

.setAge(30)

.setCity("北京")

.build());

responseObserver.onNext(StudentResponse.newBuilder().setName("2公众号:霸道的程序猿")

.setAge(40)

.setCity("上海")

.build());

responseObserver.onNext(StudentResponse.newBuilder().setName("3公众号:霸道的程序猿")

.setAge(50)

.setCity("广州")

.build());

responseObserver.onCompleted();

}

然后来到客户端中

ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost",8899)

.usePlaintext().build();

PersonServiceGrpc.PersonServiceBlockingStub blockingStub=PersonServiceGrpc.newBlockingStub(managedChannel);

System.out.println("请求-流式响应,调用getRealNameByUsername");

Iterator iter = blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(20).build());while(iter.hasNext())

{

StudentResponse studentResponse=iter.next();

System.out.println(studentResponse.getName());

System.out.println(studentResponse.getAge());

System.out.println(studentResponse.getCity());

}

然后运行服务端后再运行客户端

此时服务端

13723374.html

客户端流式实现

打开proto文件

message StudentRequest {

int32 age= 1;

}

message StudentResponse {string name = 1;

int32 age= 2;string city = 3;

}

message StudentResponseList {

repeated StudentResponse studentResponse= 1;

}

添加响应的list,要实现客户端发动流式多个请求参数(年龄),服务端返回单个list对象,其中每个List的数据是学生对象。

添加接口方法

service PersonService {

rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList) {}

}

然后调用插件生成代码。打开PersonServiceImpl进行方法的实现

@Overridepublic StreamObserver getStudentsWrapperByAges(final StreamObserverresponseObserver) {return new StreamObserver() {public voidonNext(StudentRequest studentRequest) {

System.out.println("onNext:"+studentRequest.getAge());

}public voidonError(Throwable throwable) {

System.out.println(throwable.getMessage());

}public voidonCompleted() {

StudentResponse studentResponse=StudentResponse.newBuilder()

.setName("公众号:霸道的程序猿")

.setAge(20)

.setCity("北京").build();

StudentResponse studentResponse1=StudentResponse.newBuilder()

.setName("1公众号:霸道的程序猿")

.setAge(30)

.setCity("上海").build();

StudentResponseList studentResponseList=StudentResponseList.newBuilder()

.addStudentResponse(studentResponse).addStudentResponse(studentResponse1).build();

responseObserver.onNext(studentResponseList);

responseObserver.onCompleted();

}

};

}

与上面不同,客户端如果是流式请求的话,那么客户端必须使用异步的stub

PersonServiceGrpc.PersonServiceStub stub = PersonServiceGrpc.newStub(managedChannel);

客户端代码为

ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost",8899)

.usePlaintext().build();

PersonServiceGrpc.PersonServiceStub stub=PersonServiceGrpc.newStub(managedChannel);

System.out.println("-----------------------------");

System.out.println("流式请求-响应,调用GetStudentsWrapperByAges");

StreamObserver studentResponseListStreamObserver = new StreamObserver() {public voidonNext(StudentResponseList studentResponseList) {

studentResponseList.getStudentResponseList().forEach(studengResponse->{

System.out.println(studengResponse.getName());

System.out.println(studengResponse.getAge());

System.out.println(studengResponse.getCity());

});

}public voidonError(Throwable throwable) {

System.out.println(throwable.getMessage());

}public voidonCompleted() {

System.out.println("completed");

}

};

StreamObserver studentRequestStreamObserver =stub.getStudentsWrapperByAges(studentResponseListStreamObserver);

studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(20).build());

studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(30).build());

studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(40).build());

studentRequestStreamObserver.onCompleted();try{

Thread.sleep(50000);

}catch(InterruptedException e) {

e.printStackTrace();

}

因为是异步的所以必须使进程进行休眠才能看到效果

运行服务端后运行客户端

此时服务端

13723374.html

双向流式实现

打开proto文件

message StreamRequest {string request_info = 1;

}

message StreamResponse {string response_info = 1;

}

新建流式请求与响应参数,然后新建接口方法

service PersonService {

rpc BiTalk(stream StreamRequest) returns (stream StreamResponse) {}

}

然后实现接口方法

@Overridepublic StreamObserver biTalk(StreamObserverresponseObserver) {return new StreamObserver() {

@Overridepublic voidonNext(StreamRequest streamRequest) {

System.out.println(streamRequest.getRequestInfo());

responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());

}

@Overridepublic voidonError(Throwable throwable) {

System.out.println(throwable.getMessage());

}

@Overridepublic voidonCompleted() {

responseObserver.onCompleted();

}

};

}

在客户端中

package com.badao.grpcjava;

import io.grpc.ManagedChannel;

import io.grpc.ManagedChannelBuilder;

import io.grpc.stub.StreamObserver;

import java.time.LocalDate;

import java.util.Iterator;public classGrpcClient {public static voidmain(String[] args) {

ManagedChannel managedChannel= ManagedChannelBuilder.forAddress("localhost",8899)

.usePlaintext().build();

PersonServiceGrpc.PersonServiceStub stub=PersonServiceGrpc.newStub(managedChannel);

System.out.println("-----------------------------");

System.out.println("流式请求-流式响应,调用BiTalk");

StreamObserver requestStreamObserver = stub.biTalk(new StreamObserver() {

@Overridepublic voidonNext(StreamResponse streamResponse) {

System.out.println(streamResponse.getResponseInfo());

}

@Overridepublic voidonError(Throwable throwable) {

System.out.println(throwable.getMessage());

}

@Overridepublic voidonCompleted() {

System.out.println("onComplated");

}

});for(int i =0;i<10;i++)

{

requestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDate.now().toString()).build());try{

Thread.sleep(1000);

}catch(InterruptedException e) {

e.printStackTrace();

}

}try{

Thread.sleep(50000);

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

运行服务端后运行客户端

13723374.html

50261be193418dcb37ebbdef258d1d6e.gif

示例代码下载

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_34886431/article/details/114619249
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2021-05-16 04:07:02
  • 阅读 ( 1142 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

推荐文章

猜你喜欢