Java实现消息队列服务 - Go语言中文社区

Java实现消息队列服务


使用 JAVA 语言自己动手来写一个MQ (类似ActiveMQ,RabbitMQ)
在这里插入图片描述
主要角色

首先我们必须需要搞明白 MQ (消息队列) 中的三个基本角色

ProducerBrokerConsumer

整体架构如下所示
在这里插入图片描述
自定义协议

首先从上一篇中介绍了协议的相关信息,具体厂商的 MQ(消息队列) 需要遵循某种协议或者自定义协议 , 消息的 生产者和消费者需要遵循其协议(约定)才能后成功地生产消息和生产消息 ,所以在这里我们自定义一个协议如下.

消息处理中心 : 如果接收到的信息包含"SEND"字符串,即视为生产者发送的消息,消息处理中心需要将此信息存储等待消费者消费

消息处理中心 : 如果接受到的信息为CONSUME,既视为消费者发送消费请求,需要将存储的消息队列头部的信息转发给消费者,然后将此消息从队列中移除

消息处理中心 : 如果消息处理中心存储的消息满3条仍然没有消费者进行消费,则不再接受生产者的生产请求

消息生产者:需要遵循协议将生产的消息头部增加"SEND:" 表示生产消息

消息消费者:需要遵循协议向消息处理中心发送"CONSUME"字符串表示消费消息

流程顺序

项目构建流程

下面将整个MQ的构建流程过一遍

新建一个 Broker 类,内部维护一个 ArrayBlockingQueue 队列,提供生产消息和消费消息的方法, 仅仅具备存储服务功能

新建一个 BrokerServer 类,将 Broker 发布为服务到本地9999端口,监听本地9999端口的 Socket 链接,在接受的信息中进行我们的协议校验, 这里 仅仅具备接受消息,校验协议,转发消息功能;

新建一个 MqClient 类,此类提供与本地端口9999的Socket链接 , 仅仅具备生产消息和消费消息的方法

测试:新建两个 MyClient 类对象,分别执行其生产方法和消费方法

具体使用流程

生产消息:客户端执行生产消息方法,传入需要生产的信息,该信息需要遵循我们自定义的协议,消息处理中心服务在接受到消息会根据自定义的协议校验该消息是否合法,如果合法如果合法就会将该消息存储到Broker内部维护的 ArrayBlockingQueue 队列中.如果 ArrayBlockingQueue 队列没有达到我们协议中的最大长度将将消息添加到队列中,否则输出生产消息失败.

消息消息:客户端执行消费消息方法, Broker服务 会校验请求的信息的信息是否等于 CONSUME ,如果验证成功则从Broker内部维护的 ArrayBlockingQueue 队列的 Poll 出一个消息返回给客户端

代码演示

消息处理中心 Broker

/**
 * 
 * 消息处理中心
 * 
 */
public class Broker {
	// 队列存储消息的最大数量
	private final static int MAX_SIZE = 3;
	// 保存消息数据的容器
	private static ArrayBlockingQueue messageQueue = new ArrayBlockingQueue(MAX_SIZE);

	// 生产消息
	public static void produce(String msg) {
		if (messageQueue.offer(msg)) {
			System.out.println("成功向消息处理中心投递消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());
		} else {
			System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
		}
		System.out.println("=======================");
	}// 消费消息

	public static String consume() { 
		String msg = messageQueue.poll();
		if(msg !=null) {// 消费条件满足情况,从消息容器中取出一条消息
			System.out.println("已经消费消息:"+ msg +",当前暂存的消息数量是:"+ messageQueue.size());   
			}else{            System.out.println("消息处理中心内没有消息可供消费!");        }   
		System.out.println("=======================");
		returnmsg; 
		}
}}

消息处理中心服务 BrokerServer

客户端 MqClient


/**
 * 
 * 用于启动消息处理中心
 * 
 */
public class BrokerServer implements Runnable {
	public static int SERVICE_PORT = 9999;
	private final Socket socket;

	public BrokerServer(Socket socket) {
		this.socket = socket;
	}

	@Override
	public void run() {
		try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
				PrintWriter out = new PrintWriter(socket.getOutputStream())) {
			while (true) {
				String str = in.readLine();
				if (str == null) {
					continue;
				}
				System.out.println("接收到原始数据:" + str);
				if (str.equals("CONSUME")) {// CONSUME 表示要消费一条消息//从消息队列中消费一条消息
					String message = Broker.consume();
					out.println(message);
					out.flush();
				} else if (str.contains("SEND:")) {// 接受到的请求包含SEND:字符串 表示生产消息放到消息队列中
					Broker.produce(str);
				} else {
					System.out.println("原始数据:" + str + "没有遵循协议,不提供相关服务");
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) throws Exception {
		ServerSocket server = new ServerSocket(SERVICE_PORT);
		while (true) {
			BrokerServer brokerServer = new BrokerServer(server.accept());
			new Thread(brokerServer).start();
		}
	}
}

测试MQ

public class ProduceClient {
	public static void main(String[] args) throws Exception {
		MqClient client = newMqClient();
		client.produce("SEND:Hello World");
	}
}

public class ConsumeClient {
	public static void main(String[] args) throws Exception {
		MqClient client = newMqClient();
		String message = client.consume();
		System.out.println("获取的消息为:" + message);
	}
}

我们多执行几次客户端的生产方法和消费方法就可以看到一个完整的MQ的通讯过程,下面是我执行了几次的一些日志

接收到原始数据:SEND:Hello World成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:

1=======================接收到原始数据:SEND:Hello World成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:
2=======================接收到原始数据:SEND:Hello World成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:
3=======================接收到原始数据:SEND:Hello World消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
=======================接收到原始数据:Hello World原始数据:Hello World没有遵循协议,不提供相关服务接收到原始数据:CONSUME已经消费消息:SEND:Hello World,当前暂存的消息数量是:
2=======================接收到原始数据:CONSUME已经消费消息:SEND:Hello World,当前暂存的消息数量是:
1=======================接收到原始数据:CONSUME已经消费消息:SEND:Hello World,当前暂存的消息数量是:
0=======================接收到原始数据:CONSUME消息处理中心内没有消息可供消费!=======================

小结

本章示例代码主要源自分布式消息中间件实践一书 , 这里我们自己使用Java语言写了一个MQ消息队列 , 通过这个消息队列我们对MQ中的几个角色 “生产者,消费者,消费处理中心,协议” 有了更深的理解 ; 那么下一章节我们就来一块学习具体厂商的MQ RabbitMQ

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/likunpeng6656201/article/details/100161133
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2021-05-16 19:34:29
  • 阅读 ( 862 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢