对GRPC的通用封装 - Go语言中文社区

对GRPC的通用封装


写在前面

简介

通过封装将grpc的技术使用难度降低到普通Java程序的开发。
cn.com.yd.commons.grpc提供了4种形式的服务接口定义以适应不同的应用场景。
cn.com.yd.commons.grpc将请求参数和响应参数定义为bytes以统一因业务不同而导致的差异性;使用cglib进行动态代理,在grpc的方法中执行具体的业务处理。
在具体应用中应将此工程使用Maven打包后作为依赖引入目标工程中。

环境

JDK:1.8
GRPC:1.6.1
Protobuf:3.3.0
cglib:3.2.5
IDE:开发工具IDEA

Maven工程结构图

grpc通用封装工程组织结构.png

特色

  • 本地无需搭建GRPC环境,只需导入依赖即可
  • 开发平台不限于STS、IDEA
  • GRPC功能通用化
  • 使用GRPC像开发普通程序一样
  • 只需简单的少量配置

详细说明

开发环境准备

IDEA自带插件支持grpc开发,所以开发工具选择IDEA。在正式开始之前需要先安装Protobuf Support插件。
依次点击Intellij中的“File”-->"Settings"-->"Plugins"-->"Browse repositories",如图


安装Protobuf Support插件.png

输入Protobuf,如下所示


Protobuf support.png

安装完后,重启IDEA,插件安装完毕。

proto定义

在main目录下新建一个名为proto的文件夹,请确保文件夹的所处位置以及名称都正确,否则将不能进行编译,而且不报任何错误。

//指定正在使用proto3语法,如果你没有指定这个,编译器会使用proto2。这个指定语法行必须是文件的非空非注释的第一个行。
syntax = "proto3";

option java_package = "cn.com.yd.commons.grpc";
option java_outer_classname = "GrpcService";
option java_multiple_files = true;

// 定义通用的 Grpc 服务
service CommonService {
    // 处理请求
    rpc handle (Request) returns (Response) {
    }

    // 处理请求,服务端流式
    rpc serverStreamingHandle (Request) returns (stream Response) {
    }

    // 处理请求,客户端流式
    rpc clientStreamingHandle (stream Request) returns (Response) {
    }

    // 处理请求,双向流式
    rpc bidirectionalStreamingHandle (stream Request) returns (stream Response) {
    }
}

// 定义通用的 Grpc 请求体
message Request {
    bytes request = 1;
}

// 定义通用的 Grpc 响应体
message Response {
    bytes reponse = 1;
}

pom.xml中的配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.com.yd.commons</groupId>
    <artifactId>grpc</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <name>grpc</name>
    <description>grpc 基础类</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <grpc.version>1.11.0</grpc.version>
        <protobuf.version>3.3.0</protobuf.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty</artifactId>
            <version>${grpc.version}</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>${grpc.version}</version>
        </dependency>

        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>${grpc.version}</version>
        </dependency>

        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-core</artifactId>
            <version>1.6.0</version>
        </dependency>

        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-runtime</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.40</version>
        </dependency>
        <dependency>
            <groupId>cglib</groupId>
            <artifactId>cglib</artifactId>
            <version>3.2.5</version>
        </dependency>
    </dependencies>

    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.5.0.Final</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.1</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

编译

proto文件编写完成后进行编译以生成对应的class文件,编译后的效果大致如图


grpc 编译结果图.png

辅助工具类

GrpcHelper

其中包含了Request、Response与JSONObject之间的转换等,主要是对应用的辅助。

package cn.com.yd.commons.grpc.util;

import cn.com.yd.commons.grpc.Request;
import cn.com.yd.commons.grpc.Response;
import com.alibaba.fastjson.JSONObject;
import com.google.protobuf.ByteString;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.runtime.RuntimeSchema;
import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.reflect.FastClass;
import net.sf.cglib.reflect.FastMethod;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class GrpcHelper {

    /**
     * 转换成GRPC方法需要的参数
     *
     * @param bean   Spring管理的bean实例名称
     * @param method 要执行的业务方法
     * @param args   业务方法的参数,可以为null
     * @return
     */
    public static Request toRequest(String bean, String method, Object[] args) {
        JSONObject json = new JSONObject();
        json.put("bean", bean);
        json.put("method", method);
        if (null != args) {
            json.put("args", args);
        }
        byte[] bytes = ProtobufUtils.serialize(json);
        return Request.newBuilder().setRequest(ByteString.copyFrom(bytes)).build();
    }

    /**
     * 将GRPC方法的请求参数转换成JSONObject
     *
     * @param request GRPC方法的请求参数
     * @return
     */
    public static JSONObject toJSONObject(Request request) {
        return ProtobufUtils.deserialize(request.getRequest().toByteArray(), JSONObject.class);
    }

    /**
     * 将响应结果转换成JSONObject
     * @param response 响应结果
     * @return
     */
    public static JSONObject toJSONObject(Response response) {
        return ProtobufUtils.deserialize(response.getReponse().toByteArray(), JSONObject.class);
    }


    public static JSONObject success() {
        JSONObject json = new JSONObject();
        json.put("state", true);
        json.put("msg", "成功");
        json.put("obj", null);
        return json;
    }

    /**
     * 执行具体的业务方法并返回执行结果
     *
     * @param bean   Spring管理的bean实例
     * @param method 要执行的业务方法
     * @param args   业务方法的参数,可以为null
     * @return
     */
    public static Object execute(Object bean, String method, Object[] args) {
        Object result = null;
        try {
            Enhancer enhancer = new Enhancer();
            enhancer.setSuperclass(bean.getClass());
            FastClass serviceFastClass = FastClass.create(bean.getClass());
            Class<?>[] argTypes = null;
            if (args != null) {
                argTypes = new Class[args.length];
                for (int i = 0; i < args.length; i++) {
                    Class<?> type = args[i].getClass();
                    argTypes[i] = type;
                }
            }
            FastMethod serviceFastMethod = serviceFastClass.getMethod(method, argTypes);
            result = serviceFastMethod.invoke(bean, args);
        } catch (Exception e) {
            result = e;
            e.printStackTrace();
        }
        return result;
    }

    /**
     * 将执行结果转换成Response
     * @param result 执行结果
     * @return
     */
    public static Response toResponse(Object result) {
        JSONObject json = null;
        if (result instanceof JSONObject) {
            json = (JSONObject) result;
        } else {
            json = new JSONObject();
            if (result instanceof Throwable) {
                json.put("state", false);
                json.put("msg", "失败");
                json.put("obj", ((Throwable) result).getMessage());
            } else {
                json.put("state", true);
                json.put("msg", "成功");
                json.put("obj", result);
            }
        }
        return Response.newBuilder().setReponse(ByteString.copyFrom(ProtobufUtils.serialize(json))).build();
    }
}

class ProtobufUtils {

    // 缓存 schema 对象的 map
    private static Map<Class<?>, RuntimeSchema<?>> cachedSchema = new ConcurrentHashMap<>();

    /**
     * 根据获取相应类型的schema方法
     */
    @SuppressWarnings({"unchecked", "unused"})
    private static <T> RuntimeSchema<T> getSchema(Class<T> clazz) {
        RuntimeSchema<T> schema = (RuntimeSchema<T>) cachedSchema.get(clazz);
        if (schema == null) {
            schema = RuntimeSchema.createFrom(clazz);
            cachedSchema.put(clazz, schema);
        }
        return schema;
    }

    /**
     * 序列化方法,将对象序列化为字节数组(对象 ---> 字节数组)
     */
    @SuppressWarnings("unchecked")
    public static <T> byte[] serialize(T obj) {
        Class<T> clazz = (Class<T>) obj.getClass();
        RuntimeSchema<T> schema = getSchema(clazz);
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
    }

    /**
     * 反序列化方法,将字节数组反序列化为对象(字节数组 ---> 对象)
     */
    public static <T> T deserialize(byte[] data, Class<T> clazz) {
        RuntimeSchema<T> schema = RuntimeSchema.createFrom(clazz);
        T message = schema.newMessage();
        ProtostuffIOUtil.mergeFrom(data, message, schema);
        return message;
    }
}

GrpcServer

package cn.com.yd.commons.grpc.util;

import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class GrpcServer {
    private final Server server;//服务器

    public GrpcServer(int port, BindableService... services) throws IOException {
        //初始化Server参数
        ServerBuilder builder = ServerBuilder.forPort(port);
        for(BindableService bs:services){
            builder.addService(bs);
        }
        server = builder.build();
    }

    /**
     * 启动服务
     */
    public void start() throws IOException {
        server.start();
        System.out.println("Server started, listening on " + server.getPort());
        //程序退出时关闭资源
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.err.println("*** shutting down gRPC server since JVM is shutting down");
            GrpcServer.this.stop();
            System.err.println("*** server shut down");
        }));
    }

    /**
     * 关闭服务
     */
    public void stop() {
        if (server != null) {
            server.shutdown();
        }
    }

    /**
     * 使得server一直处于运行状态
     */
    public void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }
}

GrpcClient

import cn.com.yd.commons.grpc.CommonServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.TimeUnit;
public class GrpcClient {
    private static ManagedChannel channel;//grpc信道,需要指定端口和地址
    private static CommonServiceGrpc.CommonServiceBlockingStub blockingStub;//阻塞/同步存根
    private static CommonServiceGrpc.CommonServiceStub asyncStub;//非阻塞,异步存根
    public GrpcClient(String server, int port) {
        System.out.println("server:"+server+",port:"+port);
        //创建信道
        channel = ManagedChannelBuilder.forAddress(server, port)
                .usePlaintext()
                .build();
        //创建存根
        blockingStub = CommonServiceGrpc.newBlockingStub(channel);
        asyncStub = CommonServiceGrpc.newStub(channel);
    }

    /**
     * 关闭方法
     */
    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    public CommonServiceGrpc.CommonServiceBlockingStub getBlockingStub() {
        return blockingStub;
    }

    public CommonServiceGrpc.CommonServiceStub getAsyncStub() {
        return asyncStub;
    }
}

工程源码

暂不公开。

版权声明:本文来源简书,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://www.jianshu.com/p/0e8c72c88315
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-02-02 14:52:53
  • 阅读 ( 1344 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢