RabbitMQ、RabbitMQ+SpringBoot笔记整理 - Go语言中文社区

RabbitMQ、RabbitMQ+SpringBoot笔记整理



参考

官网
轻松搞定RabbitMQ
RabbitMQ的应用场景以及基本原理介绍
springboot+rabbitmq整合示例程

简介

MQMessage Queue,消息队列)是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据。是通过读写出入队列的消息来通信(RPC则是通过直接调用彼此来通信的)。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

AMQP协议,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
在这里插入图片描述

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息的载体,每个消息都会被投到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
Producer:消息生产者,就是投递消息的程序。
Consumer:消息消费者,就是接受消息的程序。
Channel:消息通道,在客户端的每个连接里,可建立多个channel。

这里写图片描述
在这里插入图片描述

启动服务

需要预先安装erlang
这里写图片描述

http://localhost:15672
这里写图片描述

概念记录

生产者和消费者都能可以创建队列,在关注队列的时候需要有一个明确的可监听队列。
生产者和消费者关注的都是队列,都是在队列上定义的操作。
循环分发
消息确认
消息持久化
公平分发
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue.
MQ消息模型中的核心思想是生产者从不直接发送消息到队列中。

Java 实例

源码:https://download.csdn.net/download/peng_hong_fu/10423174

1.HelloWorld

这里写图片描述

Producer sends messages to the “hello” queue. The consumer receives messages from that queue.

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>

Producer.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者
 * Created by PengHongfu 2018-05-14 11:24
 */
public class Producer {

    private final static String QUEUE_NAME = "test_queue";//队列名称

    public static void main(String args[]) throws IOException, TimeoutException {
        // 1.创建连接连接到RabbitMQ
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置地址、端口、账号、密码
        factory.setHost("localhost");
        // 3.获取连接
        Connection conn = factory.newConnection();
        // 4.获取通道
        Channel channel = conn.createChannel();
        // 5.创建队列 1-队列名称	2-队列是否持久化	3-队列是否是独占	4-使用完之后是否删除此队列	5-其他属性
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 6.发送消息
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [Producer] Sent '" + message + "'");
        // 7.关闭资源
        channel.close();
        conn.close();
    }
}

Consumer.java

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 消费者
 * Created by PengHongfu 2018-05-14 11:50
 */
public class Consumer {
    private final static String QUEUE_NAME = "helloWorld_queue";//队列名称

    public static void main(String args[]) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection conn = factory.newConnection();

        Channel channel = conn.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        System.out.println(" [消费者] Waiting for messages. To exit press CTRL+C");
        //消费消息
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [消费者] Received '" + message + "'");
            }
        };
        /*
         * 监听队列
         * 参数1:队列名称
         * 参数2:是否发送ack包,不发送ack消息会持续在服务端保存,直到收到ack。 可以通过channel.basicAck手动回复ack
         * 参数3:消费者
         */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

依次启动消费者和生产者
这里写图片描述
这里写图片描述
这里写图片描述

2.Work Queues

工作队列

这里写图片描述
Producer.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * work模式
 * Created by PengHongfu 2018-05-14 14:27
 */
public class Producer {
    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 10; i++) {
            // 消息内容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [生产者] Sent '" + message + "'");
            //发送的消息间隔越来越长
            Thread.sleep(i * 10);
        }
        channel.close();
        connection.close();
    }
}

消费者处理消息时,利用休眠时间长短来模拟工作任务的轻重

Consumer1.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 消费者1
 * Created by PengHongfu 2018-05-14 14:29
 */
public class Consumer1 {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者(能者多劳模式),空闲多的消费者,消费更多的消息
        //channel.basicQos(1);
     
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [消费者1] Received '" + message + "'");
            //休眠
            Thread.sleep(10);
            // 手动返回ack包确认状态
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            //channel.basicReject(); channel.basicNack(); //可以通过这两个函数拒绝消息,可以指定消息在服务器删除还是继续投递给其他消费者
        }
    }
}

Consumer2.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
 * 消费者2
 * Created by PengHongfu 2018-05-14 14:29
 */
public class Consumer2 {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
      
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
   
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
       
        // 同一时刻服务器只会发一条消息给消费者(能者多劳模式),空闲多的消费者,消费更多的消息
        //channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
      
        channel.basicConsume(QUEUE_NAME, false, consumer);
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [消费者2] Received '" + message + "'");
            // 休眠1秒
            Thread.sleep(1000);
            //反馈消息的消费状态
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

依次启动Consumer1,Consumer2,Producer,结果如下:
这里写图片描述
这里写图片描述

平均每个消费者获得相同数量的消息。这是由于分发消息机制—Round-Robin(轮询),合理的方案是能者多劳,休眠多的消费者,工作任务重的消费者少分发消息任务。下面的代码正是处理这个问题的,使得分配公正。

channel.basicQos(1);

每个消费者在处理完一个消息之后,服务器才会发一条消息给消费者,这使得空闲的消费者,会分配到更多的消息。

这里写图片描述

这里写图片描述

3.Publish/Subscribe

发布/订阅
we'll deliver a message to multiple consumers.

这里写图片描述

There are a few exchange types available: direct, topic, headers and fanout

Direct Exchange:直接匹配,通过Exchange名称+RountingKey来发送与接收消息.
Fanout Exchange:广播订阅,向所有的消费者发布消息,但是只有消费者将队列绑定到该Exchange路由器才能收到消息,忽略Routing Key.
Topic Exchange:主题匹配订阅,这里的主题指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.来分隔多个词,只有消息这将队列绑定到该路由器且指定RoutingKey符合匹配规则时才能收到消息;
Headers Exchange:消息头订阅,消息发布前,为消息定义一个或多个键值对的消息头,然后消费者接收消息同时需要定义类似的键值对请求头:(如:x-mactch=all或者x_match=any),只有请求头与消息头匹配,才能接收消息,忽略RoutingKey.
默认的exchange:如果用空字符串去声明一个exchange,那么系统就会使用”amq.direct”这个exchange,我们创建一个queue时,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去

channel.BasicPublish("", "TaskQueue", properties, bytes);

这里写图片描述

Producer.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 订阅模式
 * Created by PengHongfu 2018-05-14 16:27
 */
public class Producer {
    //交换机的名称
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {   
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        /*
         * 声明exchange(交换机)
         * 参数1:交换机名称
         * 参数2:交换机类型
         */
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        for (int i = 0; i < 5; i++) {
            String message = "订阅消息-"+i;
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [生产者] Sent '" + message + "'");
            Thread.sleep(100);
        }
        channel.close();
        connection.close();
    }
}

Consumer1.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.File;
import java.io.FileOutputStream;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * Created by PengHongfu 2018-05-14 16:29
 */
public class Consumer1 {

    private final static String QUEUE_NAME = "test_queue_exchange_1";//可以是任意的队列名

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        /*
         * 绑定队列到交换机(这个交换机的名称一定要和上面的生产者交换机名称相同)
         * 参数1:队列的名称
         * 参数2:交换机的名称
         * 参数3:Routing Key
         *
         */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,自动返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer);
        System.out.println(" [消费者1] Waiting for messages. To exit press CTRL+C");
        // 获取消息
        while (true) {
            String dir = Consumer1.class.getClassLoader().getResource("").getPath();
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String fileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
            File file = new File(dir, fileName + ".log");
            FileOutputStream outputStream = new FileOutputStream(file, true);
            outputStream.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())) + "-" + message + "rn").getBytes());
            outputStream.flush();
            outputStream.close();
        }
    }
}

Consumer2.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * Created by PengHongfu 2018-05-14 16:30
 */
public class Consumer2 {

    private final static String QUEUE_NAME = "test_queue_exchange_2";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        c
                        
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/Peng_Hong_fu/article/details/80323905
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢