社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
<!-- NSQ - JavaNSQClient -->
<dependency>
<groupId>com.github.brainlag</groupId>
<artifactId>nsq-client</artifactId>
<version>1.0.0.RC4</version>
</dependency>
# NSQ
nsq.produce.host=192.168.3.17
nsq.produce.port=4150
nsq.lookup.host=192.168.3.17
nsq.lookup.port=4161
nsq.topic=own_test
nsq.channel1=channel1
import com.github.brainlag.nsq.NSQProducer;
import com.github.brainlag.nsq.exceptions.NSQException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeoutException;
@Component
public class NsqProduce {
@Value("${nsq.produce.host}")
private String nsqHost;
@Value("${nsq.produce.port}")
private Integer nsqPort;
@Value("${nsq.topic}")
private String nsqTopic;
public NSQProducer getProducer() {
NSQProducer producer = new NSQProducer();
producer.addAddress(nsqHost, nsqPort).start();
return producer;
}
public void produce(String msg) throws NSQException, TimeoutException {
this.getProducer().produce(nsqTopic, msg.getBytes());
}
}
import com.github.brainlag.nsq.NSQConsumer;
import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
import com.github.brainlag.nsq.lookup.NSQLookup;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Component
public class NsqConsume implements ApplicationRunner {
@Value("${nsq.lookup.host}")
private String nsqHost;
@Value("${nsq.lookup.port}")
private Integer nsqLookupPort;
@Value("${nsq.topic}")
private String nsqTopic;
@Value("${nsq.channel1}")
private String channel1;
@Autowired
private NsqMesgDeal nsqMesgDeal;
@Override
public void run(ApplicationArguments args) throws Exception {
NSQLookup lookup = new DefaultNSQLookup();
lookup.addLookupAddress(nsqHost, nsqLookupPort);
NSQConsumer consumer = new NSQConsumer(lookup, nsqTopic, channel1, nsqMesgDeal);
consumer.start();
}
}
Tips
解释
图解
import com.github.brainlag.nsq.NSQMessage;
import com.github.brainlag.nsq.callbacks.NSQMessageCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class NsqMesgDeal implements NSQMessageCallback {
@Override
public void message(NSQMessage nsqMessage) {
String msg = new String(nsqMessage.getMessage());
log.info(msg);
nsqMessage.finished();
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestApplicationTests {
@Autowired
private NsqProduce nsqProduce;
@Test
public void nsqProduce() {
try {
nsqProduce.produce("-*-*-*-*-*-*-*-*-* own test nsq message -*-*-*-*-*-*-*-*");
} catch (NSQException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!