nsq源码阅读(二)nsqlookup-1 - Go语言中文社区

nsq源码阅读(二)nsqlookup-1


nsqlookupd 源码阅读(1)

daemon的启动过程
1. Init
2. start

func (p *program) Start() error {
    opts := nsqlookupd.NewOptions()
    flagSet := nsqlookupdFlagSet(opts)
    .......
    .......
    daemon := nsqlookupd.New(opts)
    daemon.Main()
    p.nsqlookupd = daemon
    return nil
}

(1) 首先new一个options结构

type Options struct {
    // 日志等级
    LogLevel  string `flag:"log-level"`
    // 日志前缀
    LogPrefix string `flag:"log-prefix"`
    // true 使用LogLevel 来设定日志等级,false 不允许,默认是DEBUG
    Verbose   bool   `flag:"verbose"` // for backwards compatibility
    // 日志接口
    Logger    Logger
    // 根据LogLevel 和 Verbose 来得到的真正的日志等级
    logLevel  lg.LogLevel // private, not really an option
    // 监听的tcp地址
    TCPAddress       string `flag:"tcp-address"`
    // 监听的http的地址
    HTTPAddress      string `flag:"http-address"`
    // 广播地址默认是 主机名hostname
    BroadcastAddress string `flag:"broadcast-address"`

    // 从上一次ping之后 生产者驻留在nsqlookup的时长
    InactiveProducerTimeout time.Duration `flag:"inactive-producer-timeout"`
    // tombstone的驻留时长
    TombstoneLifetime       time.Duration `flag:"tombstone、、 tombstone的驻留时长-lifetime"`
}
 func NewOptions() *Options {
    hostname, err := os.Hostname()
    if err != nil {
        log.Fatal(err)
    }

    return &Options{
        LogPrefix:        "[nsqlookupd] ",
        LogLevel:         "info",
        TCPAddress:       "0.0.0.0:4160",
        HTTPAddress:      "0.0.0.0:4161",
        BroadcastAddress: hostname,

        InactiveProducerTimeout: 300 * time.Second,
        TombstoneLifetime:       45 * time.Second,
    }

(2) 读取启功参数,更新options

(3) 根据options new一个daemon

type NSQLookupd struct {
    sync.RWMutex
    // 启动参数
    opts         *Options
    // tcp监听
    tcpListener  net.Listener
    // http监听
    httpListener net.Listener
    // 用来进行安全退出等待的waitGroup
    waitGroup    util.WaitGroupWrapper
    // 注册信息的保存点
    DB           *RegistrationDB
}

func New(opts *Options) *NSQLookupd {
    if opts.Logger == nil {
        opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
    }
    n := &NSQLookupd{
        opts: opts,
        DB:   NewRegistrationDB(),
    }

    var err error
    opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose)
    if err != nil {
        n.logf(LOG_FATAL, "%s", err)
        os.Exit(1)
    }

    n.logf(LOG_INFO, version.String("nsqlookupd"))
    return n
}

(4) 调用该daemon的main函数

func (l *NSQLookupd) Main() {
    // 把自己保存到context中 
    ctx := &Context{l}
    // 根据配置的信息获取一个tcpListener
    tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
    if err != nil {
        l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.TCPAddress, err)
        os.Exit(1)
    }
    l.Lock()
    l.tcpListener = tcpListener
    l.Unlock()
    // 新建一个tcpserver
    tcpServer := &tcpServer{ctx: ctx}
    // 将这个server放到nsqlook的Wrap 中执行 (为了安全的退出,)
    l.waitGroup.Wrap(func() {
        protocol.TCPServer(tcpListener, tcpServer, l.logf)
    })
    // httpListener
    httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
    if err != nil {
        l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.HTTPAddress, err)
        os.Exit(1)
    }
    l.Lock()
    l.httpListener = httpListener
    l.Unlock()
    // 新建一个httpServer
    httpServer := newHTTPServer(ctx)
    // 将这个server放到nsqlook的Wrap 中执行 (为了安全的退出,)
    l.waitGroup.Wrap(func() {
        http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
    })
}

(5) TCPServer 函数内部


func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
    logf(lg.INFO, "TCP: listening on %s", listener.Addr())

    for {
        clientConn, err := listener.Accept()
        if err != nil {
            if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                logf(lg.WARN, "temporary Accept() failure - %s", err)
                runtime.Gosched()
                continue
            }

            // theres no direct way to detect this error because it is not exposed
            if !strings.Contains(err.Error(), "use of closed network connection") {
                logf(lg.ERROR, "listener.Accept() - %s", err)
            }
            break
        }
        go handler.Handle(clientConn)
    }

    logf(lg.INFO, "TCP: closing %s", listener.Addr())
}

同大部分的tcp链接一样,这里等待着新的链接的到来,并进行相对应的handle处理
(6) http Serve 函数内部

func Serve(listener net.Listener, handler http.Handler, proto string, logf lg.AppLogFunc) {
    logf(lg.INFO, "%s: listening on %s", proto, listener.Addr())
// new 一个 server 同时将handler  路由处理函数与之绑定  
    server := &http.Server{
        Handler:  handler,
        ErrorLog: log.New(logWriter{logf}, "", 0),
    }
    // 开始监听 ,等待http的链接
    err := server.Serve(listener)
    // theres no direct way to detect this error because it is not exposed
    if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
        logf(lg.ERROR, "http.Serve() - %s", err)
    }

    logf(lg.INFO, "%s: closing %s", proto, listener.Addr())
}

到目前为止nsqloop算是启动起来了,分别tcp等待链接,http等待链接

之后我们的apps主程序 使用os.Signal 来等待退出信号的到来。


    signalChan := make(chan os.Signal, 1)
    signalNotify(signalChan, ws.signals...)
    <-signalChan

    err = ws.i.Stop()

当等到退出信号之后,通知nsqdlookup调用 exit退出函数

func (p *program) Stop() error {
    if p.nsqlookupd != nil {
        p.nsqlookupd.Exit()
    }
    return nil
}

exit 函数内部就是通知listener关闭


func (l *NSQLookupd) Exit() {
    if l.tcpListener != nil {
        l.tcpListener.Close()
    }

    if l.httpListener != nil {
        l.httpListener.Close()
    }
    l.waitGroup.Wait()
}

这样在TCPServer函数内部,listener.Accept() 就会出错( err = accept tcp [::]:4160: use of closed network connection),执行break推出循环,安全退出

func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
    logf(lg.INFO, "TCP: listening on %s", listener.Addr())

    for {
        clientConn, err := listener.Accept()
        if err != nil {
            if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                logf(lg.WARN, "temporary Accept() failure - %s", err)
                runtime.Gosched()
                continue
            }

            // theres no direct way to detect this error because it is not exposed
            if !strings.Contains(err.Error(), "use of closed network connection") {
                logf(lg.ERROR, "listener.Accept() - %s", err)
            }
            break
        }
        go handler.Handle(clientConn)
    }

    logf(lg.INFO, "TCP: closing %s", listener.Addr())
}

在http的Serve 函数内部Serve 函数就会出错

func Serve(listener net.Listener, handler http.Handler, proto string, logf lg.AppLogFunc) {
    logf(lg.INFO, "%s: listening on %s", proto, listener.Addr())

    server := &http.Server{
        Handler:  handler,
        ErrorLog: log.New(logWriter{logf}, "", 0),
    }
    err := server.Serve(listener)
    // theres no direct way to detect this error because it is not exposed
    if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
        logf(lg.ERROR, "http.Serve() - %s", err)
    }

    logf(lg.INFO, "%s: closing %s", proto, listener.Addr())
}

整个的启动结束流程打印的日志信息如图下
这里写图片描述

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/suiban7403/article/details/80265521
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2019-09-13 19:52:57
  • 阅读 ( 1237 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

推荐文章

猜你喜欢