PHP使用RabbitMQ - Go语言中文社区

PHP使用RabbitMQ


新建dockerfile文件,内容如下。安装amqp扩展需要librabbitmq依赖,所以需要先安装librabbitmq再安装zmqp扩展

# PHP官方镜像
FROM php:7.2-fpm

# amqp 扩展
RUN apt-get update && apt-get install -y librabbitmq-dev
RUN pecl install amqp-1.10.2 && docker-php-ext-enable amqp

创建images镜像,不要忽略后面的点

docker build -t mqphp:7.2-fpm .

镜像创建完成,查看images镜像

docker images

创建PHP和nginx容器,参考:https://blog.csdn.net/qq_18361349/article/details/106175211
创建完成并启动容器后访问:http://127.0.0.1/index.php
搜索amqp,可以搜索到说明amqp扩展已经成功安装。
image.png
安装RabbitMQ
RabbitMQ中文相关文档:http://rabbitmq.mr-ping.com/

# 拉取rabbitmq镜像,management是rabbitmq的web管理界面。不使用标签拉取是不带管理界面的
docker pull rabbitmq:management

# 启动rabbitmq容器,15672是web管理界面端口5672是rabbitmq端口
docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq:management

启动容器后,浏览器中访问http://127.0.0.1:15672来查看控制台信息
RabbitMQ默认的用户名:guest,密码:guest
image.png
查看RabbitMQ容器IP

docker inspect --format '{{ .NetworkSettings.IPAddress }}' RabbitMQ容器名称

创建生产者服务端Server.php文件

<?php

/**
 * Class Server
 */
class Server
{
    // 主机地址 rabbitmq容器IP:172.17.0.4 或者宿主机内网IP如:192.168.1.7
    private static $host = '172.17.0.4';
    // 端口
    private static $port = '5672';
    // login 用户
    private static $login = 'guest';
    // 密码
    private static $password = 'guest';
    // virtual hosts
    private static $v_host = '/';
    // connect object
    private static $connect;

    /**
     * 获取连接
     * @return void
     */
    public static function connect()
    {
        try {
            // 创建连接
            if (empty(self::$connect)) {
                self::$connect = new AMQPConnection(array('host' => self::$host, 'port' => self::$port, 'login' => self::$login,
                    'password' => self::$password, 'vhost' => self::$v_host));
            }
            self::$connect->connect();
        } catch (Exception $exception) {
            var_dump($exception->getMessage());
            exit();
        }
    }

    /**
     * 推送消息
     * @param string $exchange_name 交换机名称
     * @param array $message 消息
     * @param string $routing_key routing_key
     * @return bool
     */
    public static function publish($exchange_name = 'exchange', $message = array(), $routing_key = '')
    {
        $result = false;
        try {
            self::connect();
            // 创建channel
            $channel = new AMQPChannel(self::$connect);
            // 创建交换机
            $ex = new AMQPExchange($channel);
            // 创建或选择交换机,不存在则添加一个
            $ex->setName($exchange_name);
            // 交换机类型
            // direct类型交换机 (直连交换机) // define(‘AMQP_EX_TYPE_DIRECT‘, ‘direct‘);
            // fanout类型交换机 (扇型交换机)// define(‘AMQP_EX_TYPE_FANOUT‘, ‘fanout‘);
            // topic类型交换机  (主题交换机)// define(‘AMQP_EX_TYPE_TOPIC‘, ‘topic‘);
            // header类型交换机 (头交换机)// define(‘AMQP_EX_TYPE_HEADERS‘, ‘headers‘);
            $ex->setType(AMQP_EX_TYPE_DIRECT);
            // 持久化交换机和队列,当代理重启动后依然存在,并包括它们中的完整数据    // define(‘AMQP_DURABLE‘, 2);
            $ex->setFlags(AMQP_DURABLE);
            $ex->declareExchange();
            // 推送消息
            $result = $ex->publish(json_encode($message), $routing_key);
            self::disconnect();
        } catch (Exception $exception) {
            var_dump($exception->getMessage());
            exit();
        }
        return $result;
    }

    /**
     *
     * 断开连接
     * @return void
     */
    public static function disconnect()
    {
        self::$connect->disconnect();
    }
}

var_dump(Server::publish('exchange_sanguo_weiguo', array('司马懿', 'zhangfei', '男', 30), 'weiguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_weiguo', array('司马昭', 'zhangfei', '男', 31), 'weiguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_weiguo', array('曹操', 'zhangfei', '男', 32), 'weiguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_weiguo', array('夏侯惇', 'zhangfei', '男', 33), 'weiguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_weiguo', array('许褚', 'zhangfei', '男', 34), 'weiguo'), '<br />');

var_dump(Server::publish('exchange_sanguo_shuguo', array('张飞', 'zhangfei', '男', 20), 'shuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_shuguo', array('关羽', 'zhangfei', '男', 21), 'shuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_shuguo', array('黄忠', 'zhangfei', '男', 22), 'shuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_shuguo', array('赵云', 'zhangfei', '男', 23), 'shuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_shuguo', array('马超', 'zhangfei', '男', 24), 'shuguo'), '<br />');

var_dump(Server::publish('exchange_sanguo_wuguo', array('周瑜', 'zhangfei', '男', 25), 'wuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_wuguo', array('孙权', 'zhangfei', '男', 26), 'wuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_wuguo', array('孙策', 'zhangfei', '男', 27), 'wuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_wuguo', array('陆逊', 'zhangfei', '男', 28), 'wuguo'), '<br />');
var_dump(Server::publish('exchange_sanguo_wuguo', array('黄盖', 'zhangfei', '男', 29), 'wuguo'), '<br />');

创建消费者客户端Client.php文件

<?php

/**
 * Class Client
 */
class Client
{
    // 主机地址 rabbitmq容器IP:172.17.0.4 或者宿主机内网IP如:192.168.1.7
    private static $host = '172.17.0.4';
    // 端口
    private static $port = '5672';
    // login 用户
    private static $login = 'guest';
    // 密码
    private static $password = 'guest';
    // virtual hosts
    private static $v_host = '/';
    // connect object
    private static $connect;

    /**
     * 获取连接
     * @return void
     */
    public static function connect()
    {
        try {
            // 创建连接
            if (empty(self::$connect)) {
                self::$connect = new AMQPConnection(array('host' => self::$host, 'port' => self::$port, 'login' => self::$login,
                    'password' => self::$password, 'vhost' => self::$v_host));
            }
            self::$connect->connect();
        } catch (Exception $exception) {
            var_dump($exception->getMessage());
            exit();
        }
    }

    /**
     * 获取消息
     * @param string $exchange_name
     * @param string $queue_name
     * @param string $routing_key
     * @return mixed
     */
    public static function get($exchange_name = 'exchange', $queue_name = 'queue', $routing_key = '')
    {
        $message = array();
        try {
            self::connect();
            // 创建channel
            $channel = new AMQPChannel(self::$connect);
            // 创建列队
            $q = new AMQPQueue($channel);
            // 设置列队名称,不存在则添加一个
            $q->setName($queue_name);
            // 持久化交换机和队列,当代理重启动后依然存在,并包括它们中的完整数据    // define(‘AMQP_DURABLE‘, 2);
            $q->setFlags(AMQP_DURABLE);
            // 声明一个新队列
            $q->declare();
            // 将给定的队列绑定到交换机上
            $q->bind($exchange_name, $routing_key);
            //消息获取 当在队列get方法中作为标志传递这个参数的时候,消息将在被服务器输出之前标志为acknowledged (已收到)
            $messages = $q->get(AMQP_AUTOACK);
            if ($messages){
                $message = json_decode($messages->getBody(), true);
            }
            self::disconnect();
        } catch (Exception $exception) {
            var_dump($exception->getMessage());
            exit();
        }
        return $message;
    }

    /**
     *
     * 断开连接
     * @return void
     */
    public static function disconnect()
    {
        self::$connect->disconnect();
    }
}

var_dump(Client::get('exchange_sanguo_weiguo', 'queue_weiguo', 'weiguo'), '<br />');
var_dump(Client::get('exchange_sanguo_shuguo', 'queue_shuguo', 'shuguo'), '<br />');
var_dump(Client::get('exchange_sanguo_wuguo', 'queue_wuguo', 'wuguo'), '<br />');

访问Server.php文件
image.png
查看RabbitMQ管理界面,查看添加的交换机
image.png
访问Client.php文件,每访问一次都会从列队中取出一条消息。
第一次
image.png
第二次
image.png
查看RabbitMQ管理界面,查看队列
image.png
下面是amqp扩展中一些常量的定义解释

<?php

/**
 * A direct exchange type.
 * direct类型交换机
 */
// define(‘AMQP_EX_TYPE_DIRECT‘, ‘direct‘);

/**
 * A fanout exchange type.
 * fanout类型交换机
 */
// define(‘AMQP_EX_TYPE_FANOUT‘, ‘fanout‘);

/**
 * A topic exchange type.
 * topic类型交换机
 */
// define(‘AMQP_EX_TYPE_TOPIC‘, ‘topic‘);

/**
 * A header exchange type.
 * header类型交换机
 */
// define(‘AMQP_EX_TYPE_HEADERS‘, ‘headers‘);

/**
 * Durable exchanges and queues will survive a broker restart, complete with all of their data.
 * 持久化交换机和队列,当代理重启动后依然存在,并包括它们中的完整数据
 */
// define(‘AMQP_DURABLE‘, 2);

/**
 * Passive exchanges and queues will not be redeclared, but the broker will throw an error if the exchange or queue does not exist.
 * 被动模式的交换机和队列不能被重新定义,但是如果交换机和队列不存在,代理将扔出一个错误提示
 */
// define(‘AMQP_PASSIVE‘, 4);

/**
 * Valid for queues only, this flag indicates that only one client can be listening to and consuming from this queue.
 * 仅对队列有效,这个人标志定义队列仅允许一个客户端连接并且从其消费消息
 */
// define(‘AMQP_EXCLUSIVE‘, 8);

/**
 * For exchanges, the auto delete flag indicates that the exchange will be deleted as soon as no more queues are bound
 * to it. If no queues were ever bound the exchange, the exchange will never be deleted. For queues, the auto delete
 * flag indicates that the queue will be deleted as soon as there are no more listeners subscribed to it. If no
 * subscription has ever been active, the queue will never be deleted. Note: Exclusive queues will always be
 * automatically deleted with the client disconnects.
 * 对交换机而言,自动删除标志表示交换机将在没有队列绑定的情况下被自动删除,如果从没有队列和其绑定过,这个交换机将不会被删除.
 * 对队列而言,自动删除标志表示如果没有消费者和你绑定的话将被自动删除,如果从没有消费者和其绑定,将不被删除,独占队列在客户断
 * 开连接的时候将总是会被删除
 */
// define(‘AMQP_AUTODELETE‘, 16);

/**
 * Clients are not allowed to make specific queue bindings to exchanges defined with this flag.
 * 这个标志标识不允许自定义队列绑定到交换机上
 */
// define(‘AMQP_INTERNAL‘, 32);

/**
 * When passed to the consume method for a clustered environment, do not consume from the local node.
 * 在集群环境消费方法中传递这个参数,表示将不会从本地站点消费消息
 */
// define(‘AMQP_NOLOCAL‘, 64);

/**
 * When passed to the {@link AMQPQueue::get()} and {@link AMQPQueue::get()} methods as a flag,
 * the messages will be immediately marked as acknowledged by the server upon delivery.
 * 当在队列get方法中作为标志传递这个参数的时候,消息将在被服务器输出之前标志为acknowledged (已收到)
 */
// define(‘AMQP_AUTOACK‘, 128);

/**
 * Passed on queue creation, this flag indicates that the queue should be deleted if it becomes empty.
 * 在队列建立时候传递这个参数,这个标志表示队列将在为空的时候被删除
 */
// define(‘AMQP_IFEMPTY‘, 256);

/**
 * Passed on queue or exchange creation, this flag indicates that the queue or exchange should be
 * deleted when no clients are connected to the given queue or exchange.
 * 在交换机或者队列建立的时候传递这个参数,这个标志表示没有客户端连接的时候,交换机或者队列将被删除
 */
// define(‘AMQP_IFUNUSED‘, 512);

/**
 * When publishing a message, the message must be routed to a valid queue. If it is not, an error will be returned.
 * 当发布消息的时候,消息必须被正确路由到一个有效的队列,否则将返回一个错误
 */
// define(‘AMQP_MANDATORY‘, 1024);

/**
 * When publishing a message, mark this message for immediate processing by the broker. (High priority message.)
 * 当发布消息时候,这个消息将被立即处理.
 */
// define(‘AMQP_IMMEDIATE‘, 2048);

/**
 * If set during a call to {@link AMQPQueue::ack()}, the delivery tag is treated as "up to and including", so that multiple
 * messages can be acknowledged with a single method. If set to zero, the delivery tag refers to a single message.
 * If the AMQP_MULTIPLE flag is set, and the delivery tag is zero, this indicates acknowledgement of all outstanding
 * messages.
 * 当在调用AMQPQueue::ack时候设置这个标志,传递标签将被视为最大包含数量,以便通过单个方法标示多个消息为已收到,如果设置为0
 * 传递标签指向单个消息,如果设置了AMQP_MULTIPLE,并且传递标签是0,将所有未完成消息标示为已收到
 */
// define(‘AMQP_MULTIPLE‘, 4096);

/**
 * If set during a call to {@link AMQPExchange::bind()}, the server will not respond to the method.The client should not wait
 * for a reply method. If the server could not complete the method it will raise a channel or connection exception.
 * 当在调用AMQPExchange::bind()方法的时候,服务器将不响应请求,客户端将不应该等待响应,如果服务器无法完成该方法,将会抛出一个异常
 */
// define(‘AMQP_NOWAIT‘, 8192);

/**
 * If set during a call to {@link AMQPQueue::nack()}, the message will be placed back to the queue.
 * 如果在调用AMQPQueue::nack方法时候设置,消息将会被传递回队列
 */
// define(‘AMQP_REQUEUE‘, 16384);

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_18361349/article/details/106239921
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-06-27 20:26:56
  • 阅读 ( 1278 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢