nsq的安装及简单使用 - Go语言中文社区

nsq的安装及简单使用


1、下载nsq
打开 https://nsq.io/deployment/installing.html 下载对应的nsq版本,我下载的是linux最新稳定版.

2、Linux上安装nsq

2.1 、将包上传至服务器后解压;
          tar xvf nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
          
2.2、进入bin目录 cd nsq-1.1.0.linux-amd64.go1.10.3/bin

2.3、./nsqlookupd 
		加上./,否则有可能报错 can not find nsqlookupd  command
		
2.4、./nsqadmin --lookupd-http-address=127.0.0.1:4161 --broadcast-address=127.0.0.1
        加上 --broadcast-address=127.0.0.1,否则会报错误**UPSTREAM_ERROR: Failed to query any nsqd: Get http://RRU-Alarm:4151/stats?format=json&topic=Insert: dial tcp: lookup RRU-Alarm on 10.40.8.8:53: no such host**
2.5、./nsqadmin --lookupd-http-address=127.0.0.1:4161

启动之后就可以访问http://127.0.0.1:4171/,看到图形化界面了在这里插入图片描述
3、下载 github.com/nsqio/go-nsq
https://gopm.io/download
导入路径 github.com/nsqio/go-nsq ,立即下载即可

先创建一个主题,并且发布20条消息:

package main
 
import (
	"github.com/nsqio/go-nsq"
	"fmt"
)
 
var (
	//127.0.0.1 nsqd的服务器地址,使用了tcp监听的端口
	tcpNsqdAddrr = "127.0.0.1:4150"
)
 
func main() {
	//初始化配置
	config := nsq.NewConfig()
	for i := 0; i < 20; i++ {
		//创建20个生产者
		tPro, err := nsq.NewProducer(tcpNsqdAddrr, config)
		if err != nil {
			fmt.Println(err)
		}
		//主题
		topic := "Insert"
		//主题内容
		tCommand := "data"
		//发布消息
		err = tPro.Publish(topic, []byte(tCommand))
		if err != nil {
			fmt.Println(err)
		}
	}
}

结果如下:
在这里插入图片描述
可以看到Nsqd接收到了20条信息,20条信息都储存在内存中。

创建一个消费者去订阅我们的主题:

package main
 
import (
	"github.com/nsqio/go-nsq"
	"fmt"
	"time"
)
 
var (
	tcpNsqdAddrr = "127.0.0.1:4150"
)
 
//声明一个结构体,实现HandleMessage接口方法
type NsqHandler struct {
	//消息数
	msqCount int64
	//标识ID
	nsqHandlerID string
}
 
//实现HandleMessage方法
//message是接收到的消息
func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
	//没收到一条消息+1
	s.msqCount++
	fmt.Println(s.msqCount,s.nsqHandlerID)
	fmt.Printf("msg.Timestamp=%v, msg.nsqaddress=%s,msg.body=%s n", time.Unix(0  , message.Timestamp).Format("2006-01-02 03:04:05") , message.NSQDAddress, string(message.Body))
	return nil
}
 
func main() {
	config := nsq.NewConfig()
	//创造消费者,参数一是订阅的主题,参数二是使用的通道
	com, err := nsq.NewConsumer("Insert", "channel", config)
	if err != nil {
		fmt.Println(err)
	}
	//添加处理回调
	com.AddHandler(&NsqHandler{nsqHandlerID:"One"})
	//连接对应的nsqd
	err = com.ConnectToNSQD(tcpNsqdAddrr)
	if err != nil {
		fmt.Println(err)
	}
}

可以看到,之前的20条信息,被我们的订阅者读取了。
在这里插入图片描述

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/u012189747/article/details/90610110
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2019-09-13 19:52:15
  • 阅读 ( 3960 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

推荐文章

猜你喜欢