社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
新建dockerfile文件,内容如下。安装amqp扩展需要librabbitmq依赖,所以需要先安装librabbitmq再安装zmqp扩展
# PHP官方镜像
FROM php:7.2-fpm
# amqp 扩展
RUN apt-get update && apt-get install -y librabbitmq-dev
RUN pecl install amqp-1.10.2 && docker-php-ext-enable amqp
创建images镜像,不要忽略后面的点
docker build -t mqphp:7.2-fpm .
镜像创建完成,查看images镜像
docker images
创建PHP和nginx容器,参考:https://blog.csdn.net/qq_18361349/article/details/106175211
创建完成并启动容器后访问:http://127.0.0.1/index.php
搜索amqp,可以搜索到说明amqp扩展已经成功安装。
安装RabbitMQ
RabbitMQ中文相关文档:http://rabbitmq.mr-ping.com/
# 拉取rabbitmq镜像,management是rabbitmq的web管理界面。不使用标签拉取是不带管理界面的
docker pull rabbitmq:management
# 启动rabbitmq容器,15672是web管理界面端口5672是rabbitmq端口
docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq:management
启动容器后,浏览器中访问http://127.0.0.1:15672来查看控制台信息
RabbitMQ默认的用户名:guest,密码:guest
查看RabbitMQ容器IP
docker inspect --format '{{ .NetworkSettings.IPAddress }}' RabbitMQ容器名称
创建生产者服务端Server.php文件
<?php
/**
* Class Server
*/
class Server
{
// 主机地址 rabbitmq容器IP:172.17.0.4 或者宿主机内网IP如:192.168.1.7
private static $host = '172.17.0.4';
// 端口
private static $port = '5672';
// login 用户
private static $login = 'guest';
// 密码
private static $password = 'guest';
// virtual hosts
private static $v_host = '/';
// connect object
private static $connect;
/**
* 获取连接
* @return void
*/
public static function connect()
{
try {
// 创建连接
if (empty(self::$connect)) {
self::$connect = new AMQPConnection(array('host' => self::$host, 'port' => self::$port, 'login' => self::$login,
'password' => self::$password, 'vhost' => self::$v_host));
}
self::$connect->connect();
} catch (Exception $exception) {
var_dump($exception->getMessage());
exit();
}
}
/**
* 推送消息
* @param string $exchange_name 交换机名称
* @param array $message 消息
* @param string $routing_key routing_key
* @return bool
*/
public static function publish($exchange_name = 'exchange', $message = array(), $routing_key = '')
{
$result = false;
try {
self::connect();
// 创建channel
$channel = new AMQPChannel(self::$connect);
// 创建交换机
$ex = new AMQPExchange($channel);
// 创建或选择交换机,不存在则添加一个
$ex->setName($exchange_name);
// 交换机类型
// direct类型交换机 (直连交换机) // define(‘AMQP_EX_TYPE_DIRECT‘, ‘direct‘);
// fanout类型交换机 (扇型交换机)// define(‘AMQP_EX_TYPE_FANOUT‘, ‘fanout‘);
// topic类型交换机 (主题交换机)// define(‘AMQP_EX_TYPE_TOPIC‘, ‘topic‘);
// header类型交换机 (头交换机)// define(‘AMQP_EX_TYPE_HEADERS‘, ‘headers‘);
$ex->setType(AMQP_EX_TYPE_DIRECT);
// 持久化交换机和队列,当代理重启动后依然存在,并包括它们中的完整数据 // define(‘AMQP_DURABLE‘, 2);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();
// 推送消息
$result = $ex->publish(json_encode($message), $routing_key);
self::disconnect();
} catch (Exception $exception) {
var_dump($exception->getMessage());
exit();
}
return $result;
}
/**
*
* 断开连接
* @return void
*/
public static function disconnect()
{
self::$connect->disconnect();
}
}
var_dump(Server::publish('exchange_sanguo_weiguo', array('司马懿', 'zhangfei', '男', 30), 'weiguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_weiguo', array('司马昭', 'zhangfei', '男', 31), 'weiguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_weiguo', array('曹操', 'zhangfei', '男', 32), 'weiguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_weiguo', array('夏侯惇', 'zhangfei', '男', 33), 'weiguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_weiguo', array('许褚', 'zhangfei', '男', 34), 'weiguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_shuguo', array('张飞', 'zhangfei', '男', 20), 'shuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_shuguo', array('关羽', 'zhangfei', '男', 21), 'shuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_shuguo', array('黄忠', 'zhangfei', '男', 22), 'shuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_shuguo', array('赵云', 'zhangfei', '男', 23), 'shuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_shuguo', array('马超', 'zhangfei', '男', 24), 'shuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_wuguo', array('周瑜', 'zhangfei', '男', 25), 'wuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_wuguo', array('孙权', 'zhangfei', '男', 26), 'wuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_wuguo', array('孙策', 'zhangfei', '男', 27), 'wuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_wuguo', array('陆逊', 'zhangfei', '男', 28), 'wuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_wuguo', array('黄盖', 'zhangfei', '男', 29), 'wuguo'), '<br />');
创建消费者客户端Client.php文件
<?php
/**
* Class Client
*/
class Client
{
// 主机地址 rabbitmq容器IP:172.17.0.4 或者宿主机内网IP如:192.168.1.7
private static $host = '172.17.0.4';
// 端口
private static $port = '5672';
// login 用户
private static $login = 'guest';
// 密码
private static $password = 'guest';
// virtual hosts
private static $v_host = '/';
// connect object
private static $connect;
/**
* 获取连接
* @return void
*/
public static function connect()
{
try {
// 创建连接
if (empty(self::$connect)) {
self::$connect = new AMQPConnection(array('host' => self::$host, 'port' => self::$port, 'login' => self::$login,
'password' => self::$password, 'vhost' => self::$v_host));
}
self::$connect->connect();
} catch (Exception $exception) {
var_dump($exception->getMessage());
exit();
}
}
/**
* 获取消息
* @param string $exchange_name
* @param string $queue_name
* @param string $routing_key
* @return mixed
*/
public static function get($exchange_name = 'exchange', $queue_name = 'queue', $routing_key = '')
{
$message = array();
try {
self::connect();
// 创建channel
$channel = new AMQPChannel(self::$connect);
// 创建列队
$q = new AMQPQueue($channel);
// 设置列队名称,不存在则添加一个
$q->setName($queue_name);
// 持久化交换机和队列,当代理重启动后依然存在,并包括它们中的完整数据 // define(‘AMQP_DURABLE‘, 2);
$q->setFlags(AMQP_DURABLE);
// 声明一个新队列
$q->declare();
// 将给定的队列绑定到交换机上
$q->bind($exchange_name, $routing_key);
//消息获取 当在队列get方法中作为标志传递这个参数的时候,消息将在被服务器输出之前标志为acknowledged (已收到)
$messages = $q->get(AMQP_AUTOACK);
if ($messages){
$message = json_decode($messages->getBody(), true);
}
self::disconnect();
} catch (Exception $exception) {
var_dump($exception->getMessage());
exit();
}
return $message;
}
/**
*
* 断开连接
* @return void
*/
public static function disconnect()
{
self::$connect->disconnect();
}
}
var_dump(Client::get('exchange_sanguo_weiguo', 'queue_weiguo', 'weiguo'), '<br />');
var_dump(Client::get('exchange_sanguo_shuguo', 'queue_shuguo', 'shuguo'), '<br />');
var_dump(Client::get('exchange_sanguo_wuguo', 'queue_wuguo', 'wuguo'), '<br />');
访问Server.php文件
查看RabbitMQ管理界面,查看添加的交换机
访问Client.php文件,每访问一次都会从列队中取出一条消息。
第一次
第二次
查看RabbitMQ管理界面,查看队列
下面是amqp扩展中一些常量的定义解释
<?php
/**
* A direct exchange type.
* direct类型交换机
*/
// define(‘AMQP_EX_TYPE_DIRECT‘, ‘direct‘);
/**
* A fanout exchange type.
* fanout类型交换机
*/
// define(‘AMQP_EX_TYPE_FANOUT‘, ‘fanout‘);
/**
* A topic exchange type.
* topic类型交换机
*/
// define(‘AMQP_EX_TYPE_TOPIC‘, ‘topic‘);
/**
* A header exchange type.
* header类型交换机
*/
// define(‘AMQP_EX_TYPE_HEADERS‘, ‘headers‘);
/**
* Durable exchanges and queues will survive a broker restart, complete with all of their data.
* 持久化交换机和队列,当代理重启动后依然存在,并包括它们中的完整数据
*/
// define(‘AMQP_DURABLE‘, 2);
/**
* Passive exchanges and queues will not be redeclared, but the broker will throw an error if the exchange or queue does not exist.
* 被动模式的交换机和队列不能被重新定义,但是如果交换机和队列不存在,代理将扔出一个错误提示
*/
// define(‘AMQP_PASSIVE‘, 4);
/**
* Valid for queues only, this flag indicates that only one client can be listening to and consuming from this queue.
* 仅对队列有效,这个人标志定义队列仅允许一个客户端连接并且从其消费消息
*/
// define(‘AMQP_EXCLUSIVE‘, 8);
/**
* For exchanges, the auto delete flag indicates that the exchange will be deleted as soon as no more queues are bound
* to it. If no queues were ever bound the exchange, the exchange will never be deleted. For queues, the auto delete
* flag indicates that the queue will be deleted as soon as there are no more listeners subscribed to it. If no
* subscription has ever been active, the queue will never be deleted. Note: Exclusive queues will always be
* automatically deleted with the client disconnects.
* 对交换机而言,自动删除标志表示交换机将在没有队列绑定的情况下被自动删除,如果从没有队列和其绑定过,这个交换机将不会被删除.
* 对队列而言,自动删除标志表示如果没有消费者和你绑定的话将被自动删除,如果从没有消费者和其绑定,将不被删除,独占队列在客户断
* 开连接的时候将总是会被删除
*/
// define(‘AMQP_AUTODELETE‘, 16);
/**
* Clients are not allowed to make specific queue bindings to exchanges defined with this flag.
* 这个标志标识不允许自定义队列绑定到交换机上
*/
// define(‘AMQP_INTERNAL‘, 32);
/**
* When passed to the consume method for a clustered environment, do not consume from the local node.
* 在集群环境消费方法中传递这个参数,表示将不会从本地站点消费消息
*/
// define(‘AMQP_NOLOCAL‘, 64);
/**
* When passed to the {@link AMQPQueue::get()} and {@link AMQPQueue::get()} methods as a flag,
* the messages will be immediately marked as acknowledged by the server upon delivery.
* 当在队列get方法中作为标志传递这个参数的时候,消息将在被服务器输出之前标志为acknowledged (已收到)
*/
// define(‘AMQP_AUTOACK‘, 128);
/**
* Passed on queue creation, this flag indicates that the queue should be deleted if it becomes empty.
* 在队列建立时候传递这个参数,这个标志表示队列将在为空的时候被删除
*/
// define(‘AMQP_IFEMPTY‘, 256);
/**
* Passed on queue or exchange creation, this flag indicates that the queue or exchange should be
* deleted when no clients are connected to the given queue or exchange.
* 在交换机或者队列建立的时候传递这个参数,这个标志表示没有客户端连接的时候,交换机或者队列将被删除
*/
// define(‘AMQP_IFUNUSED‘, 512);
/**
* When publishing a message, the message must be routed to a valid queue. If it is not, an error will be returned.
* 当发布消息的时候,消息必须被正确路由到一个有效的队列,否则将返回一个错误
*/
// define(‘AMQP_MANDATORY‘, 1024);
/**
* When publishing a message, mark this message for immediate processing by the broker. (High priority message.)
* 当发布消息时候,这个消息将被立即处理.
*/
// define(‘AMQP_IMMEDIATE‘, 2048);
/**
* If set during a call to {@link AMQPQueue::ack()}, the delivery tag is treated as "up to and including", so that multiple
* messages can be acknowledged with a single method. If set to zero, the delivery tag refers to a single message.
* If the AMQP_MULTIPLE flag is set, and the delivery tag is zero, this indicates acknowledgement of all outstanding
* messages.
* 当在调用AMQPQueue::ack时候设置这个标志,传递标签将被视为最大包含数量,以便通过单个方法标示多个消息为已收到,如果设置为0
* 传递标签指向单个消息,如果设置了AMQP_MULTIPLE,并且传递标签是0,将所有未完成消息标示为已收到
*/
// define(‘AMQP_MULTIPLE‘, 4096);
/**
* If set during a call to {@link AMQPExchange::bind()}, the server will not respond to the method.The client should not wait
* for a reply method. If the server could not complete the method it will raise a channel or connection exception.
* 当在调用AMQPExchange::bind()方法的时候,服务器将不响应请求,客户端将不应该等待响应,如果服务器无法完成该方法,将会抛出一个异常
*/
// define(‘AMQP_NOWAIT‘, 8192);
/**
* If set during a call to {@link AMQPQueue::nack()}, the message will be placed back to the queue.
* 如果在调用AMQPQueue::nack方法时候设置,消息将会被传递回队列
*/
// define(‘AMQP_REQUEUE‘, 16384);
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!