社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
daemon的启动过程
1. Init
2. start
func (p *program) Start() error {
opts := nsqlookupd.NewOptions()
flagSet := nsqlookupdFlagSet(opts)
.......
.......
daemon := nsqlookupd.New(opts)
daemon.Main()
p.nsqlookupd = daemon
return nil
}
(1) 首先new一个options结构
type Options struct {
// 日志等级
LogLevel string `flag:"log-level"`
// 日志前缀
LogPrefix string `flag:"log-prefix"`
// true 使用LogLevel 来设定日志等级,false 不允许,默认是DEBUG
Verbose bool `flag:"verbose"` // for backwards compatibility
// 日志接口
Logger Logger
// 根据LogLevel 和 Verbose 来得到的真正的日志等级
logLevel lg.LogLevel // private, not really an option
// 监听的tcp地址
TCPAddress string `flag:"tcp-address"`
// 监听的http的地址
HTTPAddress string `flag:"http-address"`
// 广播地址默认是 主机名hostname
BroadcastAddress string `flag:"broadcast-address"`
// 从上一次ping之后 生产者驻留在nsqlookup的时长
InactiveProducerTimeout time.Duration `flag:"inactive-producer-timeout"`
// tombstone的驻留时长
TombstoneLifetime time.Duration `flag:"tombstone、、 tombstone的驻留时长-lifetime"`
}
func NewOptions() *Options {
hostname, err := os.Hostname()
if err != nil {
log.Fatal(err)
}
return &Options{
LogPrefix: "[nsqlookupd] ",
LogLevel: "info",
TCPAddress: "0.0.0.0:4160",
HTTPAddress: "0.0.0.0:4161",
BroadcastAddress: hostname,
InactiveProducerTimeout: 300 * time.Second,
TombstoneLifetime: 45 * time.Second,
}
(2) 读取启功参数,更新options
(3) 根据options new一个daemon
type NSQLookupd struct {
sync.RWMutex
// 启动参数
opts *Options
// tcp监听
tcpListener net.Listener
// http监听
httpListener net.Listener
// 用来进行安全退出等待的waitGroup
waitGroup util.WaitGroupWrapper
// 注册信息的保存点
DB *RegistrationDB
}
func New(opts *Options) *NSQLookupd {
if opts.Logger == nil {
opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
}
n := &NSQLookupd{
opts: opts,
DB: NewRegistrationDB(),
}
var err error
opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose)
if err != nil {
n.logf(LOG_FATAL, "%s", err)
os.Exit(1)
}
n.logf(LOG_INFO, version.String("nsqlookupd"))
return n
}
(4) 调用该daemon的main函数
func (l *NSQLookupd) Main() {
// 把自己保存到context中
ctx := &Context{l}
// 根据配置的信息获取一个tcpListener
tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
if err != nil {
l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.TCPAddress, err)
os.Exit(1)
}
l.Lock()
l.tcpListener = tcpListener
l.Unlock()
// 新建一个tcpserver
tcpServer := &tcpServer{ctx: ctx}
// 将这个server放到nsqlook的Wrap 中执行 (为了安全的退出,)
l.waitGroup.Wrap(func() {
protocol.TCPServer(tcpListener, tcpServer, l.logf)
})
// httpListener
httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
if err != nil {
l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.HTTPAddress, err)
os.Exit(1)
}
l.Lock()
l.httpListener = httpListener
l.Unlock()
// 新建一个httpServer
httpServer := newHTTPServer(ctx)
// 将这个server放到nsqlook的Wrap 中执行 (为了安全的退出,)
l.waitGroup.Wrap(func() {
http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
})
}
(5) TCPServer 函数内部
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
logf(lg.INFO, "TCP: listening on %s", listener.Addr())
for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
logf(lg.WARN, "temporary Accept() failure - %s", err)
runtime.Gosched()
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
logf(lg.ERROR, "listener.Accept() - %s", err)
}
break
}
go handler.Handle(clientConn)
}
logf(lg.INFO, "TCP: closing %s", listener.Addr())
}
同大部分的tcp链接一样,这里等待着新的链接的到来,并进行相对应的handle处理
(6) http Serve 函数内部
func Serve(listener net.Listener, handler http.Handler, proto string, logf lg.AppLogFunc) {
logf(lg.INFO, "%s: listening on %s", proto, listener.Addr())
// new 一个 server 同时将handler 路由处理函数与之绑定
server := &http.Server{
Handler: handler,
ErrorLog: log.New(logWriter{logf}, "", 0),
}
// 开始监听 ,等待http的链接
err := server.Serve(listener)
// theres no direct way to detect this error because it is not exposed
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
logf(lg.ERROR, "http.Serve() - %s", err)
}
logf(lg.INFO, "%s: closing %s", proto, listener.Addr())
}
到目前为止nsqloop算是启动起来了,分别tcp等待链接,http等待链接
之后我们的apps主程序 使用os.Signal 来等待退出信号的到来。
signalChan := make(chan os.Signal, 1)
signalNotify(signalChan, ws.signals...)
<-signalChan
err = ws.i.Stop()
当等到退出信号之后,通知nsqdlookup调用 exit退出函数
func (p *program) Stop() error {
if p.nsqlookupd != nil {
p.nsqlookupd.Exit()
}
return nil
}
exit 函数内部就是通知listener关闭
func (l *NSQLookupd) Exit() {
if l.tcpListener != nil {
l.tcpListener.Close()
}
if l.httpListener != nil {
l.httpListener.Close()
}
l.waitGroup.Wait()
}
这样在TCPServer函数内部,listener.Accept() 就会出错( err = accept tcp [::]:4160: use of closed network connection),执行break推出循环,安全退出
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
logf(lg.INFO, "TCP: listening on %s", listener.Addr())
for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
logf(lg.WARN, "temporary Accept() failure - %s", err)
runtime.Gosched()
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
logf(lg.ERROR, "listener.Accept() - %s", err)
}
break
}
go handler.Handle(clientConn)
}
logf(lg.INFO, "TCP: closing %s", listener.Addr())
}
在http的Serve 函数内部Serve 函数就会出错
func Serve(listener net.Listener, handler http.Handler, proto string, logf lg.AppLogFunc) {
logf(lg.INFO, "%s: listening on %s", proto, listener.Addr())
server := &http.Server{
Handler: handler,
ErrorLog: log.New(logWriter{logf}, "", 0),
}
err := server.Serve(listener)
// theres no direct way to detect this error because it is not exposed
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
logf(lg.ERROR, "http.Serve() - %s", err)
}
logf(lg.INFO, "%s: closing %s", proto, listener.Addr())
}
整个的启动结束流程打印的日志信息如图下
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!