Programing In K8s 1:Client-go 实现分析与二次开发 - Go语言中文社区

Programing In K8s 1:Client-go 实现分析与二次开发


本文篇幅较长,预计需要2-3小时的阅读时间

Programing In K8s :Client-go 实现分析与二次开发

1. 简介

K8s具有标准的CS结构,API Server 作为唯一与内部存储ETCD进行通信的组件,充当了集群中唯一一个服务端的角色;其他组件,例如kubelet、Kube-Proxy、Kubectl、Kube-Schedule以及各种资源的controller,都可以看作是某种客户端,承担自身职责的同时,需要同API Server保持通信,以实现K8s整体的功能。

Client-go就是所有广义K8s客户端的基础库,一方面,K8s各个组件或多或少都用到它的功能,另一方面,它的代码逻辑和组件自身的逻辑深度解耦,如果想要阅读、学习K8s的源码,client go很适合作为入门组件。

Client-go作为一个活跃的开源项目,应对一些场景时,它采用的解决方案已经被大家在实践中广泛验证过,作为一名运维或者后端开发,熟悉、了解这些解决方案,对自己去解决工作中的一些问题,想必也有很大的助力。

本文涉及的内容、源码分析、撰写的demo,全部依据目前client-go最新的代码版本 V14.0。

github地址: https://github.com/kubernetes/client-go

2. Client-go 结构

在这里插入图片描述

RESTClient是所有客户端的父类,底层调用了Go语言nethttp库,访问API Server的RESTful接口。

RESTClient的操作相对原始,使用样例如下:

// 构建config对象,通常会存放在~/.kube/config的路径;如果运行在集群中,会有所不同
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
// 封装error判断
mustSuccess(err)
config.APIPath = "api"
config.GroupVersion = &corev1.SchemeGroupVersion
config.NegotiatedSerializer = scheme.Codecs
restClient, err := rest.RESTClientFor(config)
mustSuccess(err)
result := &corev1.PodList{}
// 实际是在Do方法里调用了底层的net/http库向api-server发送request,最后将结果解析出放入result中
err = restClient.Get().Namespace("sandbox").Resource("pods").
VersionedParams(&metav1.ListOptions{Limit: 40}, scheme.ParameterCodec).
Do(context.TODO()).Into(result)
mustSuccess(err)
for _, d := range result.Items {
    fmt.Printf("NameSpace: %v t Name: %v t Status: %+v n", d.Namespace, d.Name, d.Status.Phase)
}

ClientSet是使用最多的客户端,它继承自RESTClient,使用K8s的代码生成机制(client-gen机制),在编译过程中,会根据目前K8s内置的资源信息,自动生成他们的客户端代码(前提是需要添加适当的注解),使用者可以通过builder pattern进行初始化,得到自己在意的目标资源类型的客户端。ClientSet如同它的名字一样,代表的是一组内置资源的客户端。

例如:

clientset, err := kubernetes.NewForConfig(config) // 根据config对象创建clientSet对象
mustSuccess(err)
podClient := clientset.CoreV1().Pods("development") // 根据Pod资源的Group、Version、Recource Name创建资源定制客户端,传入的字符串表示资源所在的ns;podClient对象具有ListUpdateDeletePatchGet等curd接口

DynamiClient动态客户端,可以根据传入的GVR(group version resource)生成一个可以操作特定资源的客户端。但是不是内存安全的客户端,返回的结果通常是非结构化的。需要额外经过一次类型转换才能变为目标资源类型的对象,这一步存在内存安全的风险。相比ClientSet,动态客户端不局限于K8s的内置资源,可以用于处理CRD(custome resource define)自定义资源,但是缺点在于安全性不高。DynamicClient使用的样例代码如下:

结构化的类型通常属于k8s runtime object的子类型;非结构化的对象通常是map[string]interface{}的形式,通过一个字典存储对象的属性;K8s所有的内置资源都可以通过代码生成机制,拥有默认的资源转换方法

dynamicClient, err := dynamic.NewForConfig(config)
mustSuccess(err)
gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
// 返回非结构化的对象
unstructObj, err := dynamicClient.Resource(gvr).Namespace("sandbox").List(context.TODO(), metav1.ListOptions{Limit: 40})
mustSuccess(err)
podList := &corev1.PodList{}
// 额外做一次类型转换,如果这里传错类型,就会有类型安全的风险
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), podList)
mustSuccess(err)
for _, po := range podList.Items {
	fmt.Printf("NAMESPACE: %v t NAME: %v t STATUS: %v n", po.Namespace, po.Name, po.Status)
}

DiscoveryClient发现客户端,主要用于处理向服务端请求当前集群支持的资源信息,例如命令kubectl api-resources使用的就是发现客户端,由于发现客户端获取的数据量比较大,并且集群的资源信息变更并不频繁,因此发现客户端会在本地建立文件缓存,默认十分钟之内的请求,使用本地缓存,超过十分钟之后则重新请求服务端。DiscoveryClient的使用样例代码如下:

discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
mustSuccess(err)

_, APIResourceList, err := discoveryClient.ServerGroupsAndResources()
mustSuccess(err)

for _, list := range APIResourceList {
    gv, err := schema.ParseGroupVersion(list.GroupVersion)
    mustSuccess(err)

    for _, resource := range list.APIResources {
        fmt.Printf("name: %v t group: %v t verison: %v n",
                   resource.Name, gv.Group, gv.Version)
    }
}

本地缓存路径:

在这里插入图片描述

本地存储了serverresources.json文件,感兴趣的可以打开看下,是json格式化后的资源信息。

参考代码文件pkg/kubectl/cmd/apiresources/apiresources.go,可以看到kubectl api-resources命令里确实使用了discoveryClient:

func (o *APIResourceOptions) RunAPIResources(cmd *cobra.Command, f cmdutil.Factory) error {
 ...
    // discoveryCilent
	discoveryclient, err := f.ToDiscoveryClient()
	if err != nil {
		return err
	}

    // 是否可以读本地缓存
	if !o.Cached {
		// Always request fresh data from the server
		discoveryclient.Invalidate()
	}

	errs := []error{}
    
	lists, err := discoveryclient.ServerPreferredResources()	
...
}

总结一下:

客户端名称源码目录简单描述
RESTClientclient-go/rest/基础客户端,对HTTP Request封装
ClientSetclient-go/kubernetes/在RESTClient基础上封装了对Resource和Version,也就是说我们使用ClientSet的话是必须要知道Resource和Version, 例如AppsV1().Deployments或者CoreV1.Pods,缺点是不能访问CRD自定义资源
DynamicClientclient-go/dynamic/包含一组动态的客户端,可以对任意的K8S API对象执行通用操作,包括CRD自定义资源
DiscoveryClientclient-go/discovery/ClientSet必须要知道Resource和Version, 但使用者通常很难记住所有的GVR信息,这个DiscoveryClient是提供一个发现客户端,发现API Server支持的资源组,资源版本和资源信息

3. Client-go 内部原理

官方的client-go架构图如下,可以看到Informer机制是里面的核心模块。Informer顾名思义就是消息通知器。是连接本地客户端与API Server的关键。

img

针对Informer中的组件,我们自下而上的分析。

3.1 Indexer

在Informer的结构图中,Local Storage就是Indexer,Indexer字面意思就是索引器,索引器+存储,有经验的开发,大概已经能理解这两者之间的关联了。Indexer通过某种方式构建资源对象的索引,来存储资源对象。相应的,使用者可以依据这种索引,快速检索到自己关注的资源对象。

Indexer是一个继承自Store的接口,Delta_FIFO也同样继承自Store,一个Indexer对象中,可以存在多种不同的索引。

首先看看indexer和Store的声明:

// 文件路径: k8s.io/client-go/tools/cache/index.go
type Indexer interface {
	Store // 继承接口Store
	// indexName是索引的类型名,obj是一个资源对象,该方法会计算obj在某一个indexer中的索引值,并返回该索引值下已存储的资源对象
	Index(indexName string, obj interface{}) ([]interface{}, error)
	// indexKey是indexName索引类中一个索引键,函数返回indexKey指定的所有对象键,这个对象键是Indexer内唯一的,在添加的时候会计算
	IndexKeys(indexName, indexedValue string) ([]string, error)
	// 获取indexName索引类中的所有索引键
	ListIndexFuncValues(indexName string) []string
	// 和IndexKeys方法类似,只是返回的是对象的list,而不是对象键的list
	ByIndex(indexName, indexedValue string) ([]interface{}, error)
	// 返回目前所有的indexers
	GetIndexers() Indexers
	// 添加索引分类
	AddIndexers(newIndexers Indexers) error
}
// Store声明 , 文件路径:k8s.io/client-go/tools/cache/store.go
// 接口含义类似一般的KV存储,不做额外解释
type Store interface {
	Add(obj interface{}) error
	Update(obj interface{}) error
	Delete(obj interface{}) error
	List() []interface{}
	ListKeys() []string
	Get(obj interface{}) (item interface{}, exists bool, err error)
	GetByKey(key string) (item interface{}, exists bool, err error)
	Replace([]interface{}, string) error
	Resync() error
}

可以看到indexer里面,索引的概念很关键,那么indexer是怎么实现索引的呢?

client-go/tools/cache/index.go内还定义了以下的内容

// 文件路径: k8s.io/client-go/tools/cache/index.go
// 计算索引的函数类型,值得注意的是,这里返回的索引值是一个数组,也就是一个对象可以得到多个索引值
type IndexFunc func(obj interface{}) ([]string, error) 
// 计算索引的方法不止一个,通过给他们命名来加以区别,存储索引名与索引方法的映射
type Indexers map[string]IndexFunc   
// map a name to a index,和Indexers类似,存储的是索引名与索引的映射
type Indices map[string]Index  
// 索引键与值列表的映射
type Index map[string]sets.String                      

只看说明有一些绕(中文里索引一词,一会儿是动词,一会儿是名词),这里我画了两个图解释一下:

在这里插入图片描述

在这里插入图片描述

不难发现,其实可以类比MySql里面索引的实现,Items里面存储的是聚簇索引,Index里面存储的是没有数据信息的二级索引,即使在二级索引里找到了对象键,要想找到原始的object,还需要回Items里面查找。

Indexer的结构大致如上所述,但是细心的同学应该发现了,Indexers仅仅是一个接口,不是具体的实现,因为Informer中实际使用的,是类型cachecache的声明及代码分析如下:

// 文件路径: k8s.io/client-go/tools/cache/store.go

// `*cache` implements Indexer in terms of a ThreadSafeStore and an
// associated KeyFunc.
type cache struct {
	// cacheStorage 是一个ThreadSafeStore类型的对象,实际使用的是threadSafeMap类型
	cacheStorage ThreadSafeStore
	// keyFunc 是用来计算对象键的
	keyFunc KeyFunc
}

// 文件路径: k8s.io/client-go/tools/cache/thread_safe_store.go

// threadSafeMap implements ThreadSafeStore
// 这个结构很清晰了,items存储的是对象键与对象的映射,indexersindices则保存了索引记录、索引方法
type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	// indexers maps a name to an IndexFunc
	indexers Indexers
	// indices maps a name to an Index
	indices Indices
}

// ThreadSafeStore 实现了线程安全的存储接口
type ThreadSafeStore interface {
	Add(key string, obj interface{})
	Update(key string, obj interface{})
	Delete(key string)
	Get(key string) (item interface{}, exists bool)
	List() []interface{}
	ListKeys() []string
	Replace(map[string]interface{}, string)
	Index(indexName string, obj interface{}) ([]interface{}, error)
	IndexKeys(indexName, indexKey string) ([]string, error)
	ListIndexFuncValues(name string) []string
	ByIndex(indexName, indexKey string) ([]interface{}, error)
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
	// Resync is a no-op and is deprecated
	Resync() error
}

总结一下:

Indexer是Informer实现本地缓存的关键模块。作为Indexer的主要实现,cache是一个存储在内存中的缓存器,初始化时,会指定keyFunc,通常会根据对象的资源名与对象名组合成一个唯一的字符串作为对象键。此外,cache将缓存的维护工作委托给threadSafeMap来完成,threadSafeMap内部实现了一套类似MySql覆盖索引、二级索引的存储机制,用户可以自行添加具有特定索引生成方法的二级索引,方便自己的数据存取。

另外:

K8s内部,目前使用的默认对象键计算方法(也就是cache里面的keyfunc)是MetaNamespaceKeyFunc

// 文件路径: k8s.io/client-go/tools/cache/store.go

// 不解释,看注释就能懂
// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
// keys for API objects which implement meta.Interface.
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
// it's just <name>.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
	if key, ok := obj.(ExplicitKey); ok {
		return string(key), nil
	}
	meta, err := meta.Accessor(obj)
	if err != nil {
		return "", fmt.Errorf("object has no meta: %v", err)
	}
	if len(meta.GetNamespace()) > 0 {
		return meta.GetNamespace() + "/" + meta.GetName(), nil
	}
	return meta.GetName(), nil
}

k8s内部目前使用的自定义的indexFuncPodPVCIndexFuncindexByPodNodeNameMetaNamespaceIndexFunc,选取indexByPodNodeName看一下:

// 文件路径: pkg/controller/daemon/daemon_controller.go 
// daemon controller需要监控pod所在的node name,这个需求也非常合理

// 提取active的pod的node name,然后返回
func indexByPodNodeName(obj interface{}) ([]string, error) {
	pod, ok := obj.(*v1.Pod)
	if !ok {
		return []string{}, nil
	}
	// We are only interested in active pods with nodeName set
	if len(pod.Spec.NodeName) == 0 || pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
		return []string{}, nil
	}
	return []string{pod.Spec.NodeName}, nil
}

3.2 DeltaFIFO

DeltaFIFO其实是两个词:Delta + FIFODelta代表变化,FIFO则是先入先出的队列。

在这里插入图片描述

DeltaFIFO将接受来的资源event,转化为特定的变化类型,存储在队列中,周期性的POP出去,分发到事件处理器,并更新Indexer中的本地缓存。

Client-go定义了以下几种变化类型:

// 文件路径: k8s.io/client-go/tools/cache/delta_fifo.go
// DeltaType 其实是字符串类型的别名,代表一种变化
type DeltaType string

// Change type definition
const (
	Added   DeltaType = "Added" // 增
	Updated DeltaType = "Updated" // 更新
	Deleted DeltaType = "Deleted"  // 删除
	Replaced DeltaType = "Replaced" // 替换,list出错时,会触发relist,此时会替换
	Sync DeltaType = "Sync"  // 周期性的同步,底层会当作一个update类型处理
)
// Delta由一个对象+类型组成
type Delta struct {
	Type   DeltaType
	Object interface{}
}

// Deltas是一组Delta
type Deltas []Delta

然后我们看一下Delta_FIFO的实现

// 文件路径: k8s.io/client-go/tools/cache/delta_fifo.go

type DeltaFIFO struct {
	// 读写锁与条件变量
	lock sync.RWMutex
	cond sync.Cond

	// items是一个字典,存储了对象键与Delats的映射关系
    // queue是一个FIFO队列,存储了先后进入队列的对象的对象键,queue里面的对象和items里的对象键是一一对应的
    // items里的对象,至少有一个Delta
	items map[string]Deltas
	queue []string

	// 通过Replace()接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
	populated bool
	// 通过Replace()接口将第一批对象放入队列的对象数量
	initialPopulationCount int

	// 用于计算对象键的方法
	keyFunc KeyFunc

	// 其实就是Indexer
	knownObjects KeyListerGetter

	// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
	// DeltaType when Replace() is called (to preserve backwards compat).
	emitDeltaTypeReplaced bool
}

可以用一张图简单描述下Delta_FIFO里面itemsqueue的关系:

在这里插入图片描述

采用这样的结构把对象与事件的存储分离,好处就是不会因为某个对象的事件太多,而导致其他对象的事件一直得不到消费。

Delta_FIFO的核心操作有两个:往队列里面添加元素、从队列中POP元素,可以看一下这两个方法的实现:

// 文件路径: k8s.io/client-go/tools/cache/delta_fifo.go

// queueActionLocked 用于向队列中添加delta,调用前必须加写锁	
// 传入delta类型、资源对象两个参数
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    // 获取资源对象的对象键
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}

    // 向items中添加delta,并对操作进行去重,目前来看,只有连续两次操作都是删除操作的情况下,才可以合并,其他操作不会合并
	newDeltas := append(f.items[id], Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
        // 向queue和items中添加元素
        // 添加以后,条件变量发出消息,通知可能正在阻塞的POP方法有事件进队列了
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		f.cond.Broadcast()
	} else {
		// 冗余判断,其实是不会走到这个分支的,去重后的delta list长度怎么也不可能小于1
		delete(f.items, id)
	}
	return nil
}

// Pop方法
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
        // 如果队列是空的,利用条件变量阻塞住,直到有新的delta
        // 如果Close()被调用,则退出
        // 否则一直循环处理
		for len(f.queue) == 0 {
			if f.closed {
				return nil, ErrFIFOClosed
			}

			f.cond.Wait()
		}
        // 取队列第一个的所有deltas
		id := f.queue[0]
		f.queue = f.queue[1:]
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		item, ok := f.items[id]
		if !ok {
			// Item may have been deleted subsequently.
			continue
		}
		delete(f.items, id)
		err := process(item)
        // 如果处理失败了,调用addIfNotPresent,addIfNotPresent意为:如果queue中没有则添加
        // 本身刚刚从queue和items中取出对象,应该不会存在重复的对象,这里调用addIfNotPresent应该只是为了保险起见
		if e, ok := err.(Err
                        
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/King_DJF/article/details/108307735
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢