社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
场景
gPRC简介以及Java中使用gPRC实现客户端与服务端通信(附代码下载):
在上面的博客中介绍了gRPC以及使用最基本的rpc通信方式即一个请求对象返回一个响应的方式进行通信。
除此之外gRPC还有以下三种方式。
服务端流式
一个请求对象,服务端返回多个结果对象
proto示例语法
rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {}
客户端流式
客户端传入多个请求对象,服务端返回一个响应结果。
proto示例语法
rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList) {}
双向流式
传入多个对象可以返回多个响应对象
注:
实现
服务端流式实现
在上面博客的基础上,打开Person.proto文件
message StudentRequest {
int32 age= 1;
}
message StudentResponse {string name = 1;
int32 age= 2;string city = 3;
}
添加两个message作为请求和响应对象。
因为gRPC的请求和响应对象必须在message中定义,不能直接使用string或者int32这种作为参数。
然后在新建接口方法
service PersonService {
rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {}
}
此方法是要请求参数为一个age,然后返回多个学生对象。
然后调用插件生成代码。
然后来到PersonServiceImpl中对接口方法进行实现
@Overridepublic void getStudentsByAge(StudentRequest request, StreamObserverresponseObserver) {
System.out.println("接收到的客户端消息为:"+request.getAge());
responseObserver.onNext(StudentResponse.newBuilder().setName("1公众号:霸道的程序猿")
.setAge(30)
.setCity("北京")
.build());
responseObserver.onNext(StudentResponse.newBuilder().setName("2公众号:霸道的程序猿")
.setAge(40)
.setCity("上海")
.build());
responseObserver.onNext(StudentResponse.newBuilder().setName("3公众号:霸道的程序猿")
.setAge(50)
.setCity("广州")
.build());
responseObserver.onCompleted();
}
然后来到客户端中
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost",8899)
.usePlaintext().build();
PersonServiceGrpc.PersonServiceBlockingStub blockingStub=PersonServiceGrpc.newBlockingStub(managedChannel);
System.out.println("请求-流式响应,调用getRealNameByUsername");
Iterator iter = blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(20).build());while(iter.hasNext())
{
StudentResponse studentResponse=iter.next();
System.out.println(studentResponse.getName());
System.out.println(studentResponse.getAge());
System.out.println(studentResponse.getCity());
}
然后运行服务端后再运行客户端
此时服务端
客户端流式实现
打开proto文件
message StudentRequest {
int32 age= 1;
}
message StudentResponse {string name = 1;
int32 age= 2;string city = 3;
}
message StudentResponseList {
repeated StudentResponse studentResponse= 1;
}
添加响应的list,要实现客户端发动流式多个请求参数(年龄),服务端返回单个list对象,其中每个List的数据是学生对象。
添加接口方法
service PersonService {
rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList) {}
}
然后调用插件生成代码。打开PersonServiceImpl进行方法的实现
@Overridepublic StreamObserver getStudentsWrapperByAges(final StreamObserverresponseObserver) {return new StreamObserver() {public voidonNext(StudentRequest studentRequest) {
System.out.println("onNext:"+studentRequest.getAge());
}public voidonError(Throwable throwable) {
System.out.println(throwable.getMessage());
}public voidonCompleted() {
StudentResponse studentResponse=StudentResponse.newBuilder()
.setName("公众号:霸道的程序猿")
.setAge(20)
.setCity("北京").build();
StudentResponse studentResponse1=StudentResponse.newBuilder()
.setName("1公众号:霸道的程序猿")
.setAge(30)
.setCity("上海").build();
StudentResponseList studentResponseList=StudentResponseList.newBuilder()
.addStudentResponse(studentResponse).addStudentResponse(studentResponse1).build();
responseObserver.onNext(studentResponseList);
responseObserver.onCompleted();
}
};
}
与上面不同,客户端如果是流式请求的话,那么客户端必须使用异步的stub
PersonServiceGrpc.PersonServiceStub stub = PersonServiceGrpc.newStub(managedChannel);
客户端代码为
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost",8899)
.usePlaintext().build();
PersonServiceGrpc.PersonServiceStub stub=PersonServiceGrpc.newStub(managedChannel);
System.out.println("-----------------------------");
System.out.println("流式请求-响应,调用GetStudentsWrapperByAges");
StreamObserver studentResponseListStreamObserver = new StreamObserver() {public voidonNext(StudentResponseList studentResponseList) {
studentResponseList.getStudentResponseList().forEach(studengResponse->{
System.out.println(studengResponse.getName());
System.out.println(studengResponse.getAge());
System.out.println(studengResponse.getCity());
});
}public voidonError(Throwable throwable) {
System.out.println(throwable.getMessage());
}public voidonCompleted() {
System.out.println("completed");
}
};
StreamObserver studentRequestStreamObserver =stub.getStudentsWrapperByAges(studentResponseListStreamObserver);
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(20).build());
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(30).build());
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(40).build());
studentRequestStreamObserver.onCompleted();try{
Thread.sleep(50000);
}catch(InterruptedException e) {
e.printStackTrace();
}
因为是异步的所以必须使进程进行休眠才能看到效果
运行服务端后运行客户端
此时服务端
双向流式实现
打开proto文件
message StreamRequest {string request_info = 1;
}
message StreamResponse {string response_info = 1;
}
新建流式请求与响应参数,然后新建接口方法
service PersonService {
rpc BiTalk(stream StreamRequest) returns (stream StreamResponse) {}
}
然后实现接口方法
@Overridepublic StreamObserver biTalk(StreamObserverresponseObserver) {return new StreamObserver() {
@Overridepublic voidonNext(StreamRequest streamRequest) {
System.out.println(streamRequest.getRequestInfo());
responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
}
@Overridepublic voidonError(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Overridepublic voidonCompleted() {
responseObserver.onCompleted();
}
};
}
在客户端中
package com.badao.grpcjava;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.time.LocalDate;
import java.util.Iterator;public classGrpcClient {public static voidmain(String[] args) {
ManagedChannel managedChannel= ManagedChannelBuilder.forAddress("localhost",8899)
.usePlaintext().build();
PersonServiceGrpc.PersonServiceStub stub=PersonServiceGrpc.newStub(managedChannel);
System.out.println("-----------------------------");
System.out.println("流式请求-流式响应,调用BiTalk");
StreamObserver requestStreamObserver = stub.biTalk(new StreamObserver() {
@Overridepublic voidonNext(StreamResponse streamResponse) {
System.out.println(streamResponse.getResponseInfo());
}
@Overridepublic voidonError(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Overridepublic voidonCompleted() {
System.out.println("onComplated");
}
});for(int i =0;i<10;i++)
{
requestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDate.now().toString()).build());try{
Thread.sleep(1000);
}catch(InterruptedException e) {
e.printStackTrace();
}
}try{
Thread.sleep(50000);
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}
运行服务端后运行客户端
示例代码下载
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!