读猿码系列——4. 从filebeat和go-stash深入日志收集及处理(go-stash篇) - Go语言中文社区

读猿码系列——4. 从filebeat和go-stash深入日志收集及处理(go-stash篇)


go-stash是一个高效的从Kafka获取,根据配置的规则进行处理,然后发送到ElasticSearch集群的工具。它属于go-zero生态的一个组件,是logstash 的 Go 语言替代版,它相比于原先的 logstash 节省了2/3的服务器资源。

项目地址:https://github.com/kevwan/go-stash

先从yaml配置中看整体系统设计(stash/etc/config.yaml)其中kafka作为数据输入端,ElasticSearch作为数据输出端,filter 抽象了数据处理过程。

Clusters:
- Input:
    Kafka:
      Name: go-stash
      Log:
        Mode: file
      Brokers:
      - "172.16.48.41:9092"
      - "172.16.48.42:9092"
      - "172.16.48.43:9092"
      Topic: ngapplog
      Group: stash
      Conns: 3
      Consumers: 10
      Processors: 60
      MinBytes: 1048576
      MaxBytes: 10485760
      Offset: first
  Filters:
  - Action: drop
    Conditions:
      - Key: status
        Value: 503
        Type: contains
      - Key: type
        Value: "app"
        Type: match
        Op: and
  - Action: remove_field
    Fields:
    - message
    - source
    - beat
    - fields
    - input_type
    - offset
    - "@version"
    - _score
    - _type
    - clientip
    - http_host
    - request_time
  Output:
    ElasticSearch:
      Hosts:
      - "http://172.16.188.73:9200"
      - "http://172.16.188.74:9200"
      - "http://172.16.188.75:9200"
      Index: "go-stash-{{yyyy.MM.dd}}"
      MaxChunkBytes: 5242880
      GracePeriod: 10s
      Compress: false
      TimeZone: UTC

input:

Conn表示kafka的连接数,一般<=CPU核数;

Consumers表示每个连接数打开的线程数,Conns * Consumers不建议超过topic分片数;

Processors为处理数据的线程数量;

MinBytes和MaxBytes表示每次从kafka获取数据块的区间大小;

Offset参数可选last和false,默认为last,表示从头从kafka开始读取数据。

Filters:

- Action: drop为删除标识,表示在处理时将被移除,不进入es。Conditions下放删除条件,可以指定key字段及Value的值,Type字段可选contains(包含)或match(匹配),Op是附加条件可以写and或者or;

- Action: remove_field为移除字段标识,在Fields下列出要移除的字段;

- Action: transfer为转移字段标识:例如可以将message字段,重新定义为data字段。

Output:

ElasticSearch下的Index表示索引名称;

MaxChunkBytes为每次往ES提交的bulk大小;

GracePeriod默认为10s,在程序关闭后,在10s内用于处理余下的消费和数据,优雅退出;

Compress指数据压缩,压缩会减少传输的数据量,但会增加一定的处理性能,默认为false;

TimeZone默认值为UTC,世界标准时间。

我们从主函数入口开始了解整个数据流程,入口函数stash/stash.go:

func main() {
    // 解析命令行参数,启动优雅退出
    flag.Parse()

    var c config.Config
    conf.MustLoad(*configFile, &c)
    proc.SetTimeToForceQuit(c.GracePeriod)
    // service 组合模式
    group := service.NewServiceGroup()
    defer group.Stop()

    for _, processor := range c.Clusters {
        // 连接es
        client, err := elastic.NewClient(
            elastic.SetSniff(false),
            elastic.SetURL(processor.Output.ElasticSearch.Hosts...),
            elastic.SetBasicAuth(processor.Output.ElasticSearch.Username,processor.Output.ElasticSearch.Password),
        )
        logx.Must(err)
        // filter processors 构建
        filters := filter.CreateFilters(processor)
        writer, err := es.NewWriter(processor.Output.ElasticSearch)
        logx.Must(err)

        var loc *time.Location
        if len(processor.Output.ElasticSearch.TimeZone) > 0 {
            loc, err = time.LoadLocation(processor.Output.ElasticSearch.TimeZone)
            logx.Must(err)
        } else {
            loc = time.Local
        }
        // 准备es的写入操作 {写入的index, 写入器writer}
        indexer := es.NewIndex(client, processor.Output.ElasticSearch.Index, loc)
        handle := handler.NewHandler(writer, indexer)
        handle.AddFilters(filters...)
        handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
        // 按照配置启动kafka,并将消费操作传入,同时加入组合器
        for _, k := range toKqConf(processor.Input.Kafka) {
            group.Add(kq.MustNewQueue(k, handle))
        }
    }
    // 启动这个组合器
    group.Start()
}

循环从配置的集群中取出每个processor来处理,首先建立es客户端连接,构建filter processor,其中filter.CreateFilters()方法如下:

func CreateFilters(p config.Cluster) []FilterFunc {
    var filters []FilterFunc

    for _, f := range p.Filters {
        switch f.Action {
        case filterDrop:
            filters = append(filters, DropFilter(f.Conditions))
        case filterRemoveFields:
            filters = append(filters, RemoveFieldFilter(f.Fields))
        case filterTransfer:
            filters = append(filters, TransferFilter(f.Field, f.Target))
        }
    }

    return filters
}

我们看到这里实现的方法对应了我们在yaml配置中约定好的Filters中的drop、remove_field、transfer字段下对应的约束,最终返回满足条件的filters过滤器列表。

es.NewWriter()创建写入器writer,用于es的写入操作,代码如下:

func NewWriter(c config.ElasticSearchConf) (*Writer, error) {
    client, err := elastic.NewClient(
        elastic.SetSniff(false),
        elastic.SetURL(c.Hosts...),
        elastic.SetGzip(c.Compress),
        elastic.SetBasicAuth(c.Username,c.Password),
    )
    if err != nil {
        return nil, err
    }

    writer := Writer{
        docType: c.DocType,
        client:  client,
    }
    writer.inserter = executors.NewChunkExecutor(writer.execute, executors.WithChunkBytes(c.MaxChunkBytes))
    return &writer, nil
}

es.NewIndex()创建写入的index,index结构体数据结构如下:

type Index struct {
    client       *elastic.Client
    indexFormat  IndexFormat
    indices      map[string]lang.PlaceholderType
    lock         sync.RWMutex
    singleFlight syncx.SingleFlight
}

然后用writer、index、filter创建MessageHandler,结构体如下:

type MessageHandler struct {
    writer  *es.Writer
    indexer *es.Index
    filters []filter.FilterFunc
}

func NewHandler(writer *es.Writer, indexer *es.Index) *MessageHandler {
    return &MessageHandler{
        writer:  writer,
        indexer: indexer,
    }
}

MessageHandler在结构上对接了下游es,负责数据处理到数据写入;对上接入kafka部分在接口设计上通过go-queue实现了ConsumeHandler接口,在消费过程中执行 handler 的操作,从而写入 es。

func (mh *MessageHandler) Consume(_, val string) error {
    var m map[string]interface{}
    // 反序列化从 kafka 中的消息
    if err := jsoniter.Unmarshal([]byte(val), &m); err != nil {
        return err
    }
    // es 写入index配置
    index := mh.indexer.GetIndex(m)
    // filter 链式处理(map进map出)
    for _, proc := range mh.filters {
        if m = proc(m); m == nil {
            return nil
        }
    }

    bs, err := jsoniter.Marshal(m)
    if err != nil {
        return err
    }
    // es 写入
    return mh.writer.Write(index, string(bs))
}

按照配置启动kafka,并将消费操作传入,同时加入组合器,启动组合器group.Start():

for _, k := range toKqConf(processor.Input.Kafka) {
    group.Add(kq.MustNewQueue(k, handle))
}

至此数据处理以及上下游的连接点已打通,开发者主动从kafka中拉数据拿到es中处理。加入 group 的 service 都是实现 Start()来启动,kafka启动逻辑如下。

即启动kafka消费程序——>从 kafka 拉取消息到 q.channel——>消费程序终止,收尾工作。

 

func (q *kafkaQueue) Start() {
    q.startConsumers()
    q.startProducers()

    q.producerRoutines.Wait()
    close(q.channel)
    q.consumerRoutines.Wait()
}

 

q.startConsumers()
  |- [q.consumeOne(key, value) for msg in q.channel]
    |- q.handler.Consume(key, value)

至此整个流程已经串起来了,这里放一张官方数据流程图:

参考:

https://github.com/kevwan/go-stash

https://mp.weixin.qq.com/s/UeeSZi_-ZiiHf3P4tmyszw

欢迎关注我的个人公众号才浅coding攻略, 即时收到更新推送~

我是一个热爱技术干货的程序媛,从事游戏及微服务后端开发,分享Go、微服务、云原生及Python、网络及算法等相关内容。日拱一卒。欢迎各位催更扯淡一条龙!

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/Snippers/article/details/125173275
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2022-12-30 21:04:24
  • 阅读 ( 182 )
  • 分类:Go深入理解

0 条评论

请先 登录 后评论

官方社群

GO教程

推荐文章

猜你喜欢