社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
注:文中涉及工作环境相关的网址和IP已经被替换
方案特点:
etcd 是用 golang 实现的一种 K-V 分布式存储系统,内部用raft协议做一致性校验,对外提供http的访问接口,最新版中提供了grpc的访问接口。
etcd主要用于:
与etcd类似的还有zookeeper
这里 有一篇文章简单介绍了etcd和zookeeper的优缺点以及etcd的工作原理
前面介绍了etcd特别适合用于做集群服务的配置管理,kubernets 是用于docker容器编排的,也是用golang实现的,所以自然而然就采用etcd作为服务配置的存储方式了。这里 有一篇kubernets的架构介绍。
etcd在kubernetes中的最大作用是保存容器节点(pod)信息,包括:容器的服务名、状态、IP、版本以及其他信息
通过类似如下的命令可以获取到pod的信息
curl http://10.20.30.40:2379/v2/keys/registry/pods/default
etcd中保存的容器节点信息格式如下:
{
"action": "get",
"node": {
"key": "/registry/pods/default",
"dir": true,
"nodes": [
{
"key": "/registry/pods/default/hello-web-29a74e26ea3c2138e1727f35a111f4c6-dknwh",
"value": "{"kind":"Pod","apiVersion":"v1","metadata":{"name":"hello-web-29a74e26ea3c2138e1727f35a111f4c6-dknwh","generateName":"hello-web-29a74e26ea3c2138e1727f35a111f4c6-","namespace":"default","selfLink":"/api/v1/namespaces/default/pods/hello-web-29a74e26ea3c2138e1727f35a111f4c6-dknwh","uid":"09c45029-3fa0-11e7-a46c-00163e327954","creationTimestamp":"2017-05-23T10:10:24Z","labels":{"app":"hello","deployment":"bb6de7bfc7f357818a8c07faf3987d40","tier":"frontend"},"annotations":{"kubernetes.io/created-by":"{\"kind\":\"SerializedReference\",\"apiVersion\":\"v1\",\"reference\":{\"kind\":\"ReplicationController\",\"namespace\":\"default\",\"name\":\"hello-web-29a74e26ea3c2138e1727f35a111f4c6\",\"uid\":\"e42ce61a-3f9f-11e7-a46c-00163e327954\",\"apiVersion\":\"v1\",\"resourceVersion\":\"4361319\"}}\n"},"ownerReferences":[{"apiVersion":"v1","kind":"ReplicationController","name":"hello-web","uid":"32559b88-3fa0-11e7-a46c-00163e327954","controller":true}]},"spec":{"containers":[{"name":"hello-web","image":"docker.helloword.com/hello-web:f022d25","ports":[{"containerPort":8087,"protocol":"TCP"}],"env":[{"name":"SERVER","valueFrom":{"configMapKeyRef":{"name":"cluster-config","key":"external.ip"}}},{"name":"SERVER_PORT","valueFrom":{"configMapKeyRef":{"name":"hello-config","key":"hello.api.port"}}}],"resources":{"limits":{"cpu":"1","memory":"1Gi"},"requests":{"cpu":"100m","memory":"512Mi"}},"terminationMessagePath":"/dev/termination-log","imagePullPolicy":"IfNotPresent"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","nodeName":"10.30.58.179","securityContext":{},"imagePullSecrets":[{"name":"cn-registry"}]},"status":{"phase":"Running","conditions":[{"type":"Initialized","status":"True","lastProbeTime":null,"lastTransitionTime":"2017-05-23T10:10:24Z"},{"type":"Ready","status":"True","lastProbeTime":null,"lastTransitionTime":"2017-05-23T10:10:29Z"},{"type":"PodScheduled","status":"True","lastProbeTime":null,"lastTransitionTime":"2017-05-23T10:10:24Z"}],"hostIP":"10.30.58.179","podIP":"172.80.13.4","startTime":"2017-05-23T10:10:24Z","containerStatuses":[{"name":"hello-web","state":{"running":{"startedAt":"2017-05-23T10:10:29Z"}},"lastState":{},"ready":true,"restartCount":0,"image":"docker.helloword.com/hello-web:f022d25","imageID":"docker-pullable://docker.helloword.com/hello-web@sha256:f8e0460983b0d3f87733453b588469d8e225afbfc764da2ae55238cd524ef70a","containerID":"docker://78cd912de942f744a36bd51907562c5e670fb300ddc85267e3ec72572fdb5617"}]}}n",
"modifiedIndex": 4361528,
"createdIndex": 4361320
}
]
}
}
其中value部分的json数据格式化后如下:
{
"kind": "Pod",
"apiVersion": "v1",
"metadata": {
"name": "hello-web-29a74e26ea3c2138e1727f35a111f4c6-dknwh",
"generateName": "hello-web-29a74e26ea3c2138e1727f35a111f4c6-",
"namespace": "default",
"selfLink": "/api/v1/namespaces/default/pods/hello-web-29a74e26ea3c2138e1727f35a111f4c6-dknwh",
"uid": "09c45029-3fa0-11e7-a46c-00163e327954",
"creationTimestamp": "2017-05-23T10:10:24Z",
"labels": {
"app": "hello",
"deployment": "bb6de7bfc7f357818a8c07faf3987d40",
"tier": "frontend"
},
"annotations": {
"kubernetes.io/created-by": "{"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"ReplicationController","namespace":"default","name":"hello-web-29a74e26ea3c2138e1727f35a111f4c6","uid":"e42ce61a-3f9f-11e7-a46c-00163e327954","apiVersion":"v1","resourceVersion":"4361319"}}n"
},
"ownerReferences": [
{
"apiVersion": "v1",
"kind": "ReplicationController",
"name": "hello-web",
"uid": "32559b88-3fa0-11e7-a46c-00163e327954",
"controller": true
}
]
},
"spec": {
"containers": [
{
"name": "hello-web",
"image": "docker.helloword.com/hello-web:f022d25",
"ports": [
{
"containerPort": 8087,
"protocol": "TCP"
}
],
"env": [
{
"name": "SERVER",
"valueFrom": {
"configMapKeyRef": {
"name": "cluster-config",
"key": "external.ip"
}
}
},
{
"name": "SERVER_PORT",
"valueFrom": {
"configMapKeyRef": {
"name": "hello-config",
"key": "hello.api.port"
}
}
}
],
"resources": {
"limits": {
"cpu": "1",
"memory": "1Gi"
},
"requests": {
"cpu": "100m",
"memory": "512Mi"
}
},
"terminationMessagePath": "/dev/termination-log",
"imagePullPolicy": "IfNotPresent"
}
],
"restartPolicy": "Always",
"terminationGracePeriodSeconds": 30,
"dnsPolicy": "ClusterFirst",
"nodeName": "10.30.58.179",
"securityContext": {},
"imagePullSecrets": [
{
"name": "cn-registry"
}
]
},
"status": {
"phase": "Running",
"conditions": [
{
"type": "Initialized",
"status": "True",
"lastProbeTime": null,
"lastTransitionTime": "2017-05-23T10:10:24Z"
},
{
"type": "Ready",
"status": "True",
"lastProbeTime": null,
"lastTransitionTime": "2017-05-23T10:10:29Z"
},
{
"type": "PodScheduled",
"status": "True",
"lastProbeTime": null,
"lastTransitionTime": "2017-05-23T10:10:24Z"
}
],
"hostIP": "10.30.58.179",
"podIP": "172.80.13.4",
"startTime": "2017-05-23T10:10:24Z",
"containerStatuses": [
{
"name": "hello-web",
"state": {
"running": {
"startedAt": "2017-05-23T10:10:29Z"
}
},
"lastState": {},
"ready": true,
"restartCount": 0,
"image": "docker.helloword.com/hello-web:f022d25",
"imageID": "docker-pullable://docker.helloword.com/hello-web@sha256:f8e0460983b0d3f87733453b588469d8e225afbfc764da2ae55238cd524ef70a",
"containerID": "docker://78cd912de942f744a36bd51907562c5e670fb300ddc85267e3ec72572fdb5617"
}
]
}
}
grpc是google实现的一种基于protobuf的远程服务调用框架,数据采用二进制传输,其传输协议是基于http2.0。
相比于其他各种rpc框架,grpc由于基于protobuf和http2.0,具有以下优点:
首先,去https://github.com/google/protobuf/releases/tag/v3.3.0 这个页面下载对应的protobuf编译器安装文件并安装好protoc
go get -u github.com/golang/protobuf
cd $GOPATH/src/github.com/golang/protobuf
# 如果有安装makefile,直接执行make install,如果没有则执行以下命令
go install ./proto ./jsonpb ./ptypes
go install ./protoc-gen-go
#安装grpc依赖库
go get -u google.golang.org/grpc
#安装grpc-go插件,用于将proto文件编译成grpc的golang代码
go get -u github.com/grpc/grpc-go
cd $GOPATH/src
mv github.com/grpc/grpc-go google.golang.org/grpc/grpc-go
遇到go get无法下载的包,也可以通过 http://gopm.io/ 或者 http://golangtc.com/download/package 进行下载
###5. 定义proto文件
syntax = "proto3"; //使用proto3版本
//用于java等语言的package配置
option java_multiple_files = true;
option java_package = "io.grpc.examples.hellorpc";
option java_outer_classname = "hellorpcProto";
//用于golang等语言的package配置
package hellorpc;
//定义服务接口,其中rpc关键字表示 rpc 接口,用于生成grpc接口代码
service Sync {
rpc Get (SyncRequest) returns(SyncResponse) {}
rpc Set (SyncRequest) returns(SyncResponse) {}
rpc GetAll(SyncRequest)returns(SyncResponse) {}
}
//定义请求数据类型, repeated最终会转换成golang中的数组/切片
message SyncRequest {
repeated SyncData data= 1;
}
//定义返回的数据类型
message SyncResponse {
repeated SyncData data= 1;
}
//定义实体数据类型,用type字段表示请求的数据类型,用data字段保存请求的数据或者返回的数据
//map<string, string>最终会转换成golang中的map[string]string类型
message SyncData {
int32 type = 1;
map<string, string> data = 2;
}
编译proto文件
protoc --go_out=plugins=grpc:./hellorpc hellorpc.proto
其中–go_out用于指定go的proto编译插件以及插件参数
编译成功后,会在 hellorpc目录中生成 hellorpc.pb.go 文件,可以在其他go文件中通过 import “hello-api/hellorpc” 来使用文件中定义的接口
前面提到的 service Sync 部分会编译成如下两部分
type SyncClient interface {
Get(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (*SyncResponse, error)
Set(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (*SyncResponse, error)
GetAll(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (*SyncResponse, error)
}
type SyncServer interface {
Get(context.Context, *SyncRequest) (*SyncResponse, error)
Set(context.Context, *SyncRequest) (*SyncResponse, error)
GetAll(context.Context, *SyncRequest) (*SyncResponse, error)
}
其中 SyncClient 的接口 在 hellorpc.pb.go 里面已经实现好了接口,直接调用即可,但SyncServer定义的接口是需要我们自己实现
//先定义server类型,并实现好SyncServer定义的接口
type server struct {}
const (
HELLO_SYNC_REST_CLUSTER_INFO = iota
)
func (s *server)Get(ctx context.Context, in *hellorpc.SyncRequest) (*hellorpc.SyncResponse, error){
var response = hellorpc.SyncResponse{Data: make([]*hellorpc.SyncData, 0, 10)}
for i := 0; i < len(in.Data); i++{
request := in.Data[i]
switch request.Type {
case hello_SYNC_REST_CLUSTER_INFO:
// get something from local cache and set to response
break
}
}
return &response, nil
}
func (s *server)Set(ctx context.Context, in *hellorpc.SyncRequest) (*hellorpc.SyncResponse, error){
var response = hellorpc.SyncResponse{Data: make([]*hellorpc.SyncData, 0, 10)}
for i := 0; i < len(in.Data); i++{
request := in.Data[i]
switch request.Type {
case HELLO_SYNC_REST_CLUSTER_INFO:
// set something to local cache, and set the result to response
break
}
}
return &response, nil
}
func (s *server)GetAll(ctx context.Context, in *hellorpc.SyncRequest) (*hellorpc.SyncResponse, error){
var response = hellorpc.SyncResponse{Data: make([]*hellorpc.SyncData, 0, 10)}
for i := 0; i < len(in.Data); i++{
request := in.Data[i]
switch request.Type {
case HELLO_SYNC_REST_CLUSTER_INFO:
// get all data from local cache, and set the result to response
break
}
}
return &response, nil
}
实现好接口后,我们需要将服务注册到grpc,这里我们实现一个名为StartSyncServer的函数来做这些事情
func StartSyncServer(address string) error{
lis, err := net.Listen("tcp", address)
if err != nil {
beego.Debug("start sync server error: %v", err)
return err
}
s := grpc.NewServer()
hellorpc.RegisterSyncServer(s, &server{})
//由于s.Serve方法是会一直阻塞住,所以我们需要起一个go routine来执行,在其停止后输出错误信息
go func(){
err := s.Serve(lis)
beego.Debug("sync server stopped with error: %v", err)
}()
return nil
}
将StartSyncServer函数添加到模块的 init 函数中执行,我们服务端的代码就基本完成了
//先定义好客户端类型syncClient,这里我们利用继承的方式将hellorpc.SyncClient实现的方法继承过来
type syncClient struct{
hellorpc.SyncClient
conn *grpc.ClientConn
address string
}
func OpenSyncClient(address string)(syncClient, error) {
s := syncClient{}
//grpc.WithInsecure用于关闭安全验证,因为我们是在docker内部环境里使用,不暴露在外网,就没有加安全认证了
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
if err != nil {
fmt.Println("----open client error %v, conn: %v", err, conn)
return s, err
}
s.conn = conn
s.address = address
s.client = hellorpc.NewSyncClient(conn)
return s, nil
}
func CloseSyncClient(s *syncClient) {
if s.conn != nil {
s.conn.Close()
s.conn = nil
s.client = nil
}
}
这样我们只需要编写 c, err := OpenSyncClient(address),既可通过 response, err := c.Get(context.Background(), request) 的方式调用hellorpc.SyncClient定义的方法
根据etcd的返回值数据结构,我们定义一下两种类型的数据
//用于保存etcd的返回的数据
type EtcdData struct{
Key string
Dir bool
Value interface{}
CreatedIndex int32
ModifiedIndex int32
Nodes []EtcdData
}
//用于保存pod相关的数据
type PodData struct {
Name string
PodIP string
HostIP string
Status string
UpdateTime string
Timestamp int64
}
func newEtcdData() EtcdData{
return EtcdData{Dir: false, Value: "", Key: "", Nodes: make([]EtcdData, 0, 100)}
}
接下来我们实现EtcdClient
//先定义好EtcdClient的数据结构
type EtcdClient struct{}
//用于解析etcd返回的数据
func parseEtcdData(dataIn map[string]interface{}, dataOut *EtcdData) error {
if key, ok := dataIn["key"]; ok {
dataOut.Key = key.(string)
}
if isDir, ok := dataIn["dir"]; ok {
dataOut.Dir = isDir.(bool)
}
if value, ok := dataIn["value"]; ok {
dataOut.Value = value
}
if createdIndex, ok := dataIn["createdIndex"]; ok {
dataOut.CreatedIndex = int32(createdIndex.(float64))
}
if modifiedIndex, ok := dataIn["modifiedIndex"]; ok {
dataOut.ModifiedIndex = int32(modifiedIndex.(float64))
}
if nodes, ok := dataIn["nodes"]; ok {
var subnodes = nodes.([]interface{})
for i := 0; i < len(subnodes); i++{
node := subnodes[i].(map[string]interface{})
var nodeData = newEtcdData()
parseEtcdData(node, &nodeData)
dataOut.Nodes = append(dataOut.Nodes,nodeData)
}
}
return nil
}
//实现Get方法用于获取某个key的值
func (c *EtcdClient)Get(baseUrl, key string)(EtcdData, error){
var url = baseUrl + key
var res = newEtcdData()
var result = make(map[string]interface{})
resp, err := http.Get(url)
if err == nil{
out, err1 := ioutil.ReadAll(resp.Body)
if err1 == nil{
err2 := json.Unmarshal([]byte(out), &result)
if err2 != nil{
return res, err2
}
node := result["node"].(map[string]interface{})
err = parseEtcdData(node, &res)
}else{
return res, err1
}
}
return res, err
}
由于我们的服务是跑在docker里,由kubernetes进行服务编排,所以我们需要解析kubernetes在etcd中保存的数据
//用于解析pod的状态信息
func parsePodStatus(podStatus interface{}, podData *PodData){
pod_status := podStatus.(map[string]interface{}
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/letian0805/article/details/72862354
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!