golang中redis对redigo的发布订阅机制的使用 - Go语言中文社区

golang中redis对redigo的发布订阅机制的使用



golang中redis对redigo的发布订阅机制的使用

redigo 对redis的订阅机制放在pubsub.go里面,

订阅主题后通过Receive()函数接收发布到订阅主题的消息。

// Receive returns a pushed message as a Subscription, Message, Pong or error.
// The return value is intended to be used directly in a type switch as
// illustrated in the PubSubConn example.
func (c PubSubConn) Receive() interface{} {
	return c.receiveInternal(c.Conn.Receive())
}

返回的是一个空接口类型 interface{},由于空接口没有方法,因此所有类型都实现了空接口,也就是说可以返回任意类型。

具体会返回哪些类型在receiveInternal()里面可以看到,
目前返回的三种Message、Subscription、Pong都定义在了pubsub.go 里面。

func (c PubSubConn) receiveInternal(replyArg interface{}, errArg error) interface{} {
	reply, err := Values(replyArg, errArg)
	if err != nil {
		return err
	}

	var kind string
	reply, err = Scan(reply, &kind)
	if err != nil {
		return err
	}

	switch kind {
	case "message":
		var m Message
		if _, err := Scan(reply, &m.Channel, &m.Data); err != nil {
			return err
		}
		return m
	case "pmessage":
		var m Message
		if _, err := Scan(reply, &m.Pattern, &m.Channel, &m.Data); err != nil {
			return err
		}
		return m
	case "subscribe", "psubscribe", "unsubscribe", "punsubscribe":
		s := Subscription{Kind: kind}
		if _, err := Scan(reply, &s.Channel, &s.Count); err != nil {
			return err
		}
		return s
	case "pong":
		var p Pong
		if _, err := Scan(reply, &p.Data); err != nil {
			return err
		}
		return p
	}
	r

订阅示例

因为平时用的cpp_redis,习惯了那种订阅一个主题,收到消息后通过注册的回调函数处理的方式,所以示例代码多了长map存放回调函数。

package main

import(
	//"github.com/go-redis/redis"
  "fmt"
  "time"
  //"reflect"
  "unsafe"
	"github.com/gomodule/redigo/redis"
	log "github.com/astaxie/beego/logs"
)

type SubscribeCallback func (channel, message string)

type Subscriber struct {
  client redis.PubSubConn
  cbMap map[string]SubscribeCallback
}

func (c *Subscriber) Connect(ip string, port uint16) {
  conn, err := redis.Dial("tcp", "127.0.0.1:6379")
  if err != nil {
      log.Critical("redis dial failed.")
  }

  c.client = redis.PubSubConn{conn}
  c.cbMap = make(map[string]SubscribeCallback)

  go func() {
    for {
        log.Debug("wait...")
        switch res := c.client.Receive().(type) {
          case redis.Message:
              channel := (*string)(unsafe.Pointer(&res.Channel))
              message := (*string)(unsafe.Pointer(&res.Data))
              c.cbMap[*channel](*channel, *message)
          case redis.Subscription:
              fmt.Printf("%s: %s %dn", res.Channel, res.Kind, res.Count)
          case error:
            log.Error("error handle...")
            continue
        }
    }
  }()

}

func (c *Subscriber) Close() {
  err := c.client.Close()
  if err != nil{
    log.Error("redis close error.")
  }
}

func (c *Subscriber) Subscribe(channel interface{}, cb SubscribeCallback) {
  err := c.client.Subscribe(channel)
  if err != nil{
    log.Critical("redis Subscribe error.")
  }

  c.cbMap[channel.(string)] = cb
}

func TestCallback1(chann, msg string){
  log.Debug("TestCallback1 channel : ", chann, " message : ", msg)
}

func TestCallback2(chann, msg string){
  log.Debug("TestCallback2 channel : ", chann, " message : ", msg)
}

func TestCallback3(chann, msg string){
  log.Debug("TestCallback3 channel : ", chann, " message : ", msg)
}

func main() {

  log.Info("===========main start============")

  var sub Subscriber
  sub.Connect("127.0.0.1", 6397)
  sub.Subscribe("test_chan1", TestCallback1)
  sub.Subscribe("test_chan2", TestCallback2)
  sub.Subscribe("test_chan3", TestCallback3)

  for{
   time.Sleep(1 * time.Second)
  }
}

Alt text

发布示例

发布直接使用默认的Conn来Send “Publish“ 命令即可.
redigo的管道的使用方法设计到三个函数,Do函数也是下面这三个函数的合并:

1、c.Send()
2、c.Flush()
3、c.Receive()

send()方法把命令写到输出缓冲区,Flush()把缓冲区的命令刷新到redis服务器,Receive()函数接收redis给予的响应,三个操作共同完成一套命令流程。

package main

import(
  //"github.com/go-redis/redis"
  "github.com/gomodule/redigo/redis"
  log "github.com/astaxie/beego/logs"
)

func main() {

  client, err := redis.Dial("tcp", "127.0.0.1:6379")
  if err != nil {
      log.Critical("redis dial failed.")
  }
  defer client.Close()

  _, err = client.Do("Publish", "test_chan1", "hello")
  if err != nil {
    log.Critical("redis Publish failed.")
  }

  _, err = client.Do("Publish", "test_chan2", "hello")
  if err != nil {
    log.Critical("redis Publish failed.")
  }

  _, err = client.Do("Publish", "test_chan3", "hello")
  if err != nil {
    log.Critical("redis Publish failed.")
  }

}

Alt text

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_17308321/article/details/89417493
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢