【DEMO】SpringBoot整合NSQ - Go语言中文社区

【DEMO】SpringBoot整合NSQ


仅仅是一个简单的Java使用NSQ的实现

pom引用
<!-- NSQ - JavaNSQClient -->
<dependency>
     <groupId>com.github.brainlag</groupId>
     <artifactId>nsq-client</artifactId>
     <version>1.0.0.RC4</version>
 </dependency>
配置文件
# NSQ
nsq.produce.host=192.168.3.17
nsq.produce.port=4150
nsq.lookup.host=192.168.3.17
nsq.lookup.port=4161
nsq.topic=own_test
nsq.channel1=channel1
生产者
import com.github.brainlag.nsq.NSQProducer;
import com.github.brainlag.nsq.exceptions.NSQException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeoutException;

@Component
public class NsqProduce {

    @Value("${nsq.produce.host}")
    private String nsqHost;

    @Value("${nsq.produce.port}")
    private Integer nsqPort;

    @Value("${nsq.topic}")
    private String nsqTopic;

    public NSQProducer getProducer() {
        NSQProducer producer = new NSQProducer();
        producer.addAddress(nsqHost, nsqPort).start();
        return producer;
    }

    public void produce(String msg) throws NSQException, TimeoutException {
        this.getProducer().produce(nsqTopic, msg.getBytes());
    }

}
消费者
  • 服务启动时,连接到NSQserver,监听指定topic下的指定channel,等待消息投递
import com.github.brainlag.nsq.NSQConsumer;
import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
import com.github.brainlag.nsq.lookup.NSQLookup;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

@Component
public class NsqConsume implements ApplicationRunner {

    @Value("${nsq.lookup.host}")
    private String nsqHost;

    @Value("${nsq.lookup.port}")
    private Integer nsqLookupPort;

    @Value("${nsq.topic}")
    private String nsqTopic;

    @Value("${nsq.channel1}")
    private String channel1;

    @Autowired
    private NsqMesgDeal nsqMesgDeal;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        NSQLookup lookup = new DefaultNSQLookup();
        lookup.addLookupAddress(nsqHost, nsqLookupPort);
        NSQConsumer consumer = new NSQConsumer(lookup, nsqTopic, channel1, nsqMesgDeal);
        consumer.start();
    }
}

Tips

  • 生产者推送消息仅仅指定了topic,并没有指定channel
  • 消费者消费消息却要指定channel

解释

  • 发往topic的消息,nsqd向该topic下的所有channel都投递消息
  • 如果Channel下存在多个消费者,则随机选择一个消费者投递消息(达到负载均衡)

图解
在这里插入图片描述

消息处理
import com.github.brainlag.nsq.NSQMessage;
import com.github.brainlag.nsq.callbacks.NSQMessageCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class NsqMesgDeal implements NSQMessageCallback {
    @Override
    public void message(NSQMessage nsqMessage) {
        String msg = new String(nsqMessage.getMessage());
        log.info(msg);
        nsqMessage.finished();
    }
}
测试代码
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestApplicationTests {

    @Autowired
    private NsqProduce nsqProduce;

    @Test
    public void nsqProduce() {
        try {
            nsqProduce.produce("-*-*-*-*-*-*-*-*-* own test nsq message -*-*-*-*-*-*-*-*");
        } catch (NSQException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/wushengjun753/article/details/89382360
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢