扫一扫下方二维码,关注本站官方公众号
获取永久解锁本站全部文章的验证码
还能不定期领现金红包

nsq安装与配置-Go语言中文社区

nsq安装与配置


 

1.NSQ官网

http://nsq.io

2.相关概念


    nsqlookupd:管理nsqd节点拓扑信息并提供最终一致性的发现服务的守护进程
    nsqd:负责接收、排队、转发消息到客户端的守护进程,并且定时向nsqlookupd服务发送心跳
    nsqadmin:nsq的web统计界面,可实时查看集群的统计数据和执行一些管理任务
    utilities:常见基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq

3.二进制安装(无需Go环境)

 

下载地址:

https://nsq.io/deployment/installing.html

https://github.com/nsqio/nsq

      nsq官网下载二进制linux安装包:nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz
      解压到家目录tar -xvzf nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz
给bin目录下文件添加执行权限

cd ~/nsq-1.0.0-compat.linux-amd64.go1.8
sudo chmod -R u+x bin/

3.1.简单启动

进入bin目录

打开一个终端,启动nsqlookupd

nohup ./nsqlookupd &

打开另一个终端,启动nsqd

nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 &

打开另一个终端,启动nsqadmin

nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 &

访问nsqadmin监控

http://127.0.0.1:4171

打开另外一个终端,启动nsq_to_file,将消息写入/tmp文件的日志文件,文件名默认由主题topic+主机+日期时间戳组成

nohup ./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161 &

使用curl命令,发布一条消息

curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'

 

3.2本地多nsq启动
 

把~nsq/bin目录加入path

start.sh

#!/bin/sh
#服务启动
#lookupd:151 152
#更改 --data-path 所指定的数据存放路径,否则无法运行
echo '删除日志文件'
rm -rf ./log/*

echo '启动nsqlookupd服务'
nohup nsqlookupd >./log/nsqlookupd.log 2>&1 &


echo '启动nsqd服务'
nohup nsqd --lookupd-tcp-address=0.0.0.0:4160 -tcp-address="0.0.0.0:4153" --data-path=./data1  >./log/nsqd1.log 2>&1 &
nohup nsqd --lookupd-tcp-address=0.0.0.0:4160 -tcp-address="0.0.0.0:4154" --data-path=./data2 -http-address="0.0.0.0:4155" >./log/nsqd2.log 2>&1 &


echo '启动nsqdadmin服务'
nohup nsqadmin --lookupd-http-address=127.0.0.1:4161 >./log/nsqadmin.log 2>&1 &

stop.sh

#!/bin/sh
#服务停止
ps -ef | grep nsq| grep -v grep | awk '{print $2}' | xargs kill -2

 

3.2集群nsq启动

假设我们的服务器安装下面要求编排。

nsqlookup 集群列表

192.168.234.77
192.168.234.36
192.168.234.39

nsq 节点

192.168.234.117
192.168.234.118

nsqadmin 节点

192.168.234.119


构建环境变量

设置nsq的环境变量,假设nsq的bin路径在/usr/local/software/nsq下。

运行命令如下:

vi /etc/profile
export NSQ_HOME=/usr/local/software/nsq
export PATH=$PATH:$NSQ_HOME/bin

. /etc/profile

构建运行脚本

创建执行脚本,分别创建start-nsqlookup.sh, start-nsq.sh和 start-nsqadmin.

start-nsqlookup.sh

#nsqlookup
eval nsqlookupd --verbose > nsqlookup.log 2>&1 "&"

start-nsq.sh 

#nsq 
eval nsqd –data-path=/usr/local/software/nsq/data –lookupd-tcp-address=192.168.234.77:4160,192.168.234.36:4160,192.168.234.39:4160 > nsqd.log 2>&1 “&”

start-nsqadmin.sh 

#nsqadmin 
eval nsqadmin –lookupd-http-address=192.168.234.77:4161,192.168.234.36:4161,192.168.234.39:4161 > nsqadmin.log 2>&1 “&”

运行脚本

首先需要安装次序运行上面的脚本,必选先运行nsqlookup集群,在运行start-nsqadmin.sh或者start-nsq.sh.

在对应服务器上执行对应的脚本。

nsqlookup节点

#192.168.234.77
#192.168.234.36
#192.168.234.39
sh start-nsqlookup.sh

nsq节点上 

#192.168.234.117 
#192.168.234.118 
sh start-nsq.sh

nsqadmin节点上 

#192.168.234.119 
sh start-nsqadmin.sh

查看运行效果,可以直接访问nsqadmin节点

http://192.168.234.119:4171/。

 

创建Producer

package main

import (
    "log"
    "github.com/bitly/go-nsq"
    "io/ioutil"
    "strconv"
)
var nullLogger = log.New(ioutil.Discard, "", log.LstdFlags)

func sendMsg(message string){
    //init default config
    config := nsq.NewConfig()

    w, _ := nsq.NewProducer("192.168.2.117:4150", config)
    err := w.Ping()
    if err != nil {
        //192.168.2.117:4150,192.168.2.68:4150
        log.Fatalln("error ping 10.50.115.16:4150", err)
        // switch the second nsq. You can use nginx or HAProxy for HA.
        w, _ = nsq.NewProducer("192.168.2.68:4150", config)
    }
    w.SetLogger(nullLogger, nsq.LogLevelInfo)

    err2 := w.Publish("a-test", []byte(message))
    if err2 != nil {
        log.Panic("Could not connect nsq")
    }

    w.Stop()
}
func main() {
    for i := 0; i < 2; i ++ {
        sendMsg("msg index "+ strconv.Itoa(i + 10000))
    }

}

恭喜,你已经成功发送两个消息到nsq节点。创建Consumer

package main

import (
    "log"
    "github.com/bitly/go-nsq"
    "fmt"
)


func doSimpleConsumerTask(){

    config := nsq.NewConfig()
    q, _ := nsq.NewConsumer("a-test", "ch", config)
    q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {

        log.Printf("message: %v", string(message.Body))
        message.Finish()
        return nil
    }))
    lookupAddr := []string {
        "192.168.234.77:4161",
        "192.168.234.36:4161",
        "192.168.234.39:4161",
    }
    err := q.ConnectToNSQLookupds(lookupAddr)
    if err != nil {
        log.Panic("Could not connect")
    }
    <-q.StopChan

    stats := q.Stats()
    fmt.Sprintf("message received %d, finished %d", stats.MessagesReceived, stats.MessagesFinished)
}
func main(){
    doSimpleConsumerTask()
}


程序运行输出:

2017/02/23 09:07:50 message: msg index 10000
2017/02/23 09:07:50 message: msg index 10001

消费者持续接受发送来的信息,运行程序后端。


参考:

https://blog.csdn.net/xie1xiao1jun/article/details/78685036

https://blog.csdn.net/john_f_lau/article/details/56666196 
https://www.jianshu.com/p/4257dd2d3946

版权声明1:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/yangyangye/article/details/88818308
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2019-09-13 19:53
  • 阅读 ( 1718 )

0 条评论

请先 登录 后评论

官方社群