micro微服务 基础组件的组织方式 - Go语言中文社区

micro微服务 基础组件的组织方式


micro微服务 基础组件的组织方式

简介

micro是go语言实现的一个微服务框架,该框架自身实现了为服务常见的几大要素,网关,代理,注册中心,消息传递,也支持可插拔扩展。本本通过micro中的一个核心对象展开去探讨这个项目是如何实现这些组件并将其组织在一起工作的。

Notice: go代码有时候比较繁琐,截取源码的时候会删除部分不影响思想的代码会标记为...

核心服务

micro通过micro.NewService创建一个服务实例,所有的微服务实例(包括网关,代理等等)需要通过这个实例来与其他服务打交道。

    type service struct {
        opts Options

        once sync.Once
    }


代码很少,但是micro中所有微服务要素最后都会汇聚在这个类型上了,因为option包罗万象(^^).

    type Options struct {
        Broker    broker.Broker
        Cmd       cmd.Cmd
        Client    client.Client
        Server    server.Server
        Registry  registry.Registry
        Transport transport.Transport

        // Before and After funcs
        BeforeStart []func() error
        BeforeStop  []func() error
        AfterStart  []func() error
        AfterStop   []func() error

        // Other options for implementations of the interface
        // can be stored in a context
        Context context.Context
    }


这里可以看到微服务的常见组件了,当micro.Server初始化的时候会给这个对象设置上对应的组件,组件的设置方式包括默认值,cli指定,env读取。

func (s *service) Init(opts ...Option) {
	// process options
	for _, o := range opts {
		o(&s.opts)
	}

	s.once.Do(func() {
		// Initialise the command flags, overriding new service
		_ = s.opts.Cmd.Init(
			cmd.Broker(&s.opts.Broker),
			cmd.Registry(&s.opts.Registry),
			cmd.Transport(&s.opts.Transport),
			cmd.Client(&s.opts.Client),
			cmd.Server(&s.opts.Server),
		)
	})
}


Server是最终的行为实体,监听端口并提供业务服务,注册本地服务到注册中心。 Broker异步消息,mq等方法可以替换这个类型 Client服务调用客户端 Registry 注册中心 Cmd cli客户端 Transport 类似与socket,消息同步通信,服务监听等

服务类型

目前支持的服务类型有rpc,grpc。服务中如果存在两种不同的rpc协议,消息投递的时候会进行协议转换。这里详细说下默认的rpc服务。 rpc服务基于HTTP POST协议,服务启动的时候会尝试连接Broker,然后注册本服务到注册中心,最后监听服务端口.简单提一句这里是如何做到协议转换,如果http过来的消息要投递到一个grpc协议服务上,需要在Content-Type设置对应目的服务协议类型application/grpc,SerConn这里读取在Content-Type在获取对应的Codec进行协议转换,最后投递到对应服务,服务的返回内容也同样要处理下再返回。

    type Service interface {
        Init(...Option)
        Options() Options
        Client() client.Client
        Server() server.Server
        Run() error
        String() string
    }


rpc server

func (s *rpcServer) Start() error {
...
	ts, err := config.Transport.Listen(config.Address)
	// swap address
...
	// connect to the broker
	if err := config.Broker.Connect(); err != nil {
		return err
	}
...
	// use RegisterCheck func before register
	if err = s.opts.RegisterCheck(s.opts.Context); err != nil {
		log.Logf("Server %s-%s register check error: %s", config.Name, config.Id, err)
	} else {
		// announce self to the world
		if err = s.Register(); err != nil {
			log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err)
		}
	}
...
	go func() {
		for {
			// listen for connections
			err := ts.Accept(s.ServeConn)
...
		}
	}()
..
	return nil
}


协议转换

    func (s *rpcServer) ServeConn(sock transport.Socket) {
...
        for {
            var msg transport.Message
            if err := sock.Recv(&msg); err != nil {
                return
            }
...
            // we use this Content-Type header to identify the codec needed
            ct := msg.Header["Content-Type"]

            // strip our headers
            hdr := make(map[string]string)
            for k, v := range msg.Header {
                hdr[k] = v
            }

            // set local/remote ips
            hdr["Local"] = sock.Local()
            hdr["Remote"] = sock.Remote()
...
            // TODO: needs better error handling
            var err error
            if cf, err = s.newCodec(ct); err != nil {  //请求协议转换器
...返回错误
                return
            }
...
            rcodec := newRpcCodec(&msg, sock, cf)   //返回转换器

            // internal request
            request := &rpcRequest{
                service:     getHeader("Micro-Service", msg.Header),
                method:      getHeader("Micro-Method", msg.Header),
                endpoint:    getHeader("Micro-Endpoint", msg.Header),
                contentType: ct,
                codec:       rcodec,
                header:      msg.Header,
                body:        msg.Body,
                socket:      sock,
                stream:      true,
            }

            // internal response
            response := &rpcResponse{
                header: make(map[string]string),
                socket: sock,
                codec:  rcodec,
            }

            // set router
            r := Router(s.router)
...
            // serve the actual request using the request router
            if err := r.ServeRequest(ctx, request, response); err != nil {
                // write an error response
                err = rcodec.Write(&codec.Message{
                    Header: msg.Header,
                    Error:  err.Error(),
                    Type:   codec.Error,
                }, nil)
                // could not write the error response
                if err != nil {
                    log.Logf("rpc: unable to write error response: %v", err)
                }
                if s.wg != nil {
                    s.wg.Done()
                }
                return
            }
...
        }
    }


注册中心

注册中心包括服务发现和服务注册。micro中每个注册中心类型都要实现注册中心接口

    type Registry interface {
        Init(...Option) error
        Options() Options
        Register(*Service, ...RegisterOption) error
        Deregister(*Service) error
        GetService(string) ([]*Service, error)
        ListServices() ([]*Service, error)
        Watch(...WatchOption) (Watcher, error)
        String() string
    }


默认的注册中心是mdns,该程序会在本地监听一个组播地址接收网络中所有及其广播的信息,同时发送的信息也能被所有其他机器发现。每当程序启动时都会广播自己的服务信息,其他节点收到该信息后添加到自己的服务列表里面,服务关闭时会发出关闭信息。mdns自身不具备健康检查,熔断等功能,其出发点也仅是便于测试使用,因而不推荐在生产环境中使用。

Resolve

查找服务需要根据url或者content信息来获取服务名称,在通过服务名称到注册中心查找,获取到服务后,随机一个节点投递

    type Resolver interface {
        Resolve(r *http.Request) (*Endpoint, error)
        String() string
    }
    //默认的api resolve实例,除此之外还有host,path,grpc 三种resolve,可以根据需求在启动程序的时候指定或者设置在环境变量里面
    //
    func (r *Resolver) Resolve(req *http.Request) (*resolver.Endpoint, error) {
        var name, method string

        switch r.Options.Handler {
        // internal handlers
        case "meta", "api", "rpc", "micro":
        ///foo/bar/zool => go.micro.api.foo  方法:Bar.Zool
            ///foo/bar => go.micro.api.foo 方法:Foo.Bar
            name, method = apiRoute(req.URL.Path)
        default:
            //如果handler是web会走到这里   
            ///foo/bar/zool => go.micro.api.foo  method bar/zool 
            // 1/foo/bar/ => go.micro.api.1.foo  method bar 
            method = req.Method
            name = proxyRoute(req.URL.Path)
        }

        return &resolver.Endpoint{
            Name:   name,
            Method: method,
        }, nil
    }



插件

代码中定义了一个plugin,然而这个plugin并非是引入组件的作用,其作用在于在请求的管道中加一层,这样比较容易添加新的逻辑进去。然而真正需要着重提到的是micro引入新组件的方式,micro service的几个重要成员都有其对应的接口规约,只要正确实现了接口就可以轻松的接入新的组件。这里用一个引入kubernetes作为注册中心的例子。

kubernetes组件位于github.commicrogo-plugins中

    go get -u github.commicrogo-pluginskubernetes


由于在go中无法实现java/c#中包动态加载功能,语言本身不提供全局类型,函数扫描。往往只能通过一些曲折的办法来实现。而micro是通过在入口文件中导入包,利用init函数在启动时将需要的功能组件写入到一个map里面。 在import中加入插件

    _ "github.com/micro/go-plugins/registry/kubernetes"


   micro api --registry=kubernetes  --registry_address=yourAddress


工作原理: 在github.commicrogo-microconfigcmdcmd.go中有一个map用于保存所有的注册中心创建函数

	DefaultRegistries = map[string]func(...registry.Option) registry.Registry{
		"consul": consul.NewRegistry,
		"gossip": gossip.NewRegistry,
		"mdns":   mdns.NewRegistry,
		"memory": rmem.NewRegistry,
	}


kubernetes会在包的init中写入对应的创建函数

    func init() {
        cmd.DefaultRegistries["kubernetes"] = NewRegistry
    }


生效的位置

    if name := ctx.String("registry"); len(name) > 0 && (*c.opts.Registry).String() != name {
		r, ok := c.opts.Registries[name]
		if !ok {
			return fmt.Errorf("Registry %s not found", name)
		}

		*c.opts.Registry = r()
		serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))
		clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))

		if err := (*c.opts.Selector).Init(selector.Registry(*c.opts.Registry)); err != nil {
			log.Fatalf("Error configuring registry: %v", err)
		}

		clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))

		if err := (*c.opts.Broker).Init(broker.Registry(*c.opts.Registry)); err != nil {
			log.Fatalf("Error configuring broker: %v", err)
		}
	}


最后附上一张图说明下这个核心的对象的引用关系,组件引用那一块只是画了注册中心的,Broker,Server也都是类似的原理。

转载于:https://my.oschina.net/hunjixin/blog/3076782

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/chuifanju1796/article/details/100875293
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-06-28 00:38:17
  • 阅读 ( 1797 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢