PHP+RabbitMQ消息队列的配置和使用方法【MQ】
版权声明:
本文为博主原创文章,转载请声明原文链接...谢谢。o_0。
更新时间:
2019-09-20 19:45:21
温馨提示:
学无止境,技术类文章有它的时效性,请留意文章更新时间,如发现内容有误请留言指出,防止别人"踩坑",我会及时更新文章
配置环境
windows + nginx + php7.2.19
php扩展添加
根据自己的版本下载扩展http://pecl.php.net/package/amqp
下载完成后把rabbitmq.4.dll放php.exe所在目录,php_amqp.dll放php的ext目录然后在php.ini中添加 extension=php_amqp.dll重启后查看扩展信息。
如果是apache的话要在httpd.conf中添加
loadFile "你的路径/rabbitmq.4.dll";
使用方法
服务端(消费者)代码
<?php //配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/', ); $e_name = 'e_test'; //交换机名 $q_name = 'q_test'; //队列名 $k_route = 'route_test'; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } //在连接内创建一个通道 $channel = new AMQPChannel($conn); //创建交换机 $ex = new AMQPExchange($channel); $ex->setName($e_name); //设置交换机类型 //AMQP_EX_TYPE_DIRECT:直连交换机 //AMQP_EX_TYPE_FANOUT:扇形交换机 //AMQP_EX_TYPE_HEADERS:头交换机 //AMQP_EX_TYPE_TOPIC:主题交换机 $ex->setType(AMQP_EX_TYPE_DIRECT); //设置交换机持久 $ex->setFlags(AMQP_DURABLE); //声明交换机并输出状态 echo "Exchange Status:" . $ex->declareExchange() . "\n"; //创建队列 $q = new AMQPQueue($channel); $q->setName($q_name); //设置队列持久 $q->setFlags(AMQP_DURABLE); //声明消息队列并输出状态 echo "Message Total:" . $q->declareQueue() . "\n"; //绑定交换机与队列,并指定路由键 echo 'Queue Bind: ' . $q->bind($e_name, $k_route) . "\n"; //阻塞模式接收消息 echo "waiting reeive Message...\n"; //接收消息并进行处理的回调方法 function processMessage($envelope, $queue) { //取消息内容 $msg = $envelope->getBody(); echo $msg . "\n"; //处理消息 //显式确认,队列收到消费者显式确认后,会删除该消息 $queue->ack($envelope->getDeliveryTag()); } //设置消息队列消费者回调方法,并进行阻塞 $q->consume('processMessage'); //下面是隐式确认,不推荐 //$q->consume('processMessage', AMQP_AUTOACK); $conn->disconnect();
客户端(生产者)代码
<?php //配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/', ); $e_name = 'e_test'; //交换机名,和服务端一至 $k_route = 'route_test'; //路由key,和服务端一至 //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($conn); //创建交换机对象 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex->setFlags(AMQP_DURABLE); //持久化 $ex->declareExchange(); //发送消息 //$channel->startTransaction(); //开始事务 for ($i = 0; $i < 5; ++$i) { //sleep(1); //休眠1秒 //消息内容 $message = "TEST MESSAGE!" . date("h:i:sa"); echo "Send Message:" . $ex->publish($message, $k_route) . "\n"; } //$channel->commitTransaction(); //提交事务 $conn->disconnect();
使用说明
交换机,路由键,队列,如果细细了解的话要学习的东西就多啦去啦,项目中使用一般会有这样的场景,搭建一个消息队列中心来接收和处理多个项目发过来的消息,一个项目中可能会有多个消息类型,比如评论消息、秒杀下单消息、日志消息等。我们可以使用项目名字和消息类型来创建对应的交换机,路由和队列。
项目名字->交换机名字
消息类型->路由键名字
消息类型->队列名字
生产者生产消息的时候只用标识是哪个项目的哪个消息直接发出去就ok,下面是这种方案的一个实现,代码保存为 MessageQueue.php
消息队列管理
/** * 消息队列管理 * author:zhaokeli * blog:https://www.zhaokeli.com * link:https://www.zhaokeli.com/article/8583.html */ class MessageQueue { protected $host = '127.0.0.1'; protected $port = '5672'; protected $username = 'guest'; protected $password = 'guest'; protected $vhost = '/'; protected $exchangeName = ''; //交换机名字 protected $routeName = ''; //路由 protected $queueName = ''; //队列 protected $conn = null; protected $channel = null; protected $exchange = null; protected $queue = null; /** * 初始化配置 * @authname [权限名字] 0 * @Author mokuyu * @DateTime 2019-09-20 * @param array $config 连接配置信息 * @param string $exchangeName 交换机名字(项目名字) */ public function __construct($config = [], $exchangeName = 'default') { $this->setExchangeName($exchangeName); foreach ($config as $key => $value) { $this->$key = $value; } } /** * 初始化连接 * @authname [权限名字] 0 * @Author mokuyu * @DateTime 2019-09-20 * @return [type] */ private function initConn() { //配置信息 $conn_args = array( 'host' => $this->host, 'port' => $this->port, 'login' => $this->username, 'password' => $this->password, 'vhost' => $this->vhost, ); //创建连接和channel $this->conn = new \AMQPConnection($conn_args); if (!$this->conn->connect()) { die("Cannot connect to the broker!\n"); } //在连接内创建一个通道 $this->channel = new \AMQPChannel($this->conn); //创建交换机 $this->exchange = new \AMQPExchange($this->channel); $this->exchange->setName($this->exchangeName); //设置交换机类型 //AMQP_EX_TYPE_DIRECT:直连交换机 //AMQP_EX_TYPE_FANOUT:扇形交换机 //AMQP_EX_TYPE_HEADERS:头交换机 //AMQP_EX_TYPE_TOPIC:主题交换机 $this->exchange->setType(AMQP_EX_TYPE_DIRECT); //设置交换机持久 $this->exchange->setFlags(AMQP_DURABLE); //声明交换机并输出状态 $this->exchange->declareExchange(); // echo "Exchange Status:" . . "\n"; } /** * 发送消息 * @authname [权限名字] 0 * @Author mokuyu * @DateTime 2019-09-20 * @param [type] $msgType 消息类型 * @param [type] $msgData 消息数据 * @return [type] */ public function sendMessage($msgType, $msgData) { $this->setRouteName($msgType); $this->conn || $this->initConn(); if (!is_string($msgData)) { $msgData = json_encode($msgData); } return $this->exchange->publish($msgData, $this->routeName) ? true : false; } /** * 从指定队列中取出一条消息 * @authname [权限名字] 0 * @Author mokuyu * @DateTime 2019-09-20 * @param [type] $queueName [description] * @return [type] */ public function getMessage($queueName) { $this->queue || $this->initQueue($queueName); return $this->queue->get(AMQP_AUTOACK)->getBody(); } /** * 消费者阻塞接收消息 * @authname 0 * @Author mokuyu * @DateTime 2019-09-20 * @param [type] $queueName 消息队列名字 * @param [type] $receiveMessagefunc 消息回调函数 * @return [type] */ public function receiveMessage($queueName, $receiveMessagefunc) { $this->queue || $this->initQueue($queueName); //设置消息回调 $this->queue->consume($receiveMessagefunc); } /** * 删除一个队列 * @authname [权限名字] 0 * @Author mokuyu * @DateTime 2019-09-20 * @param [type] $queueName 队列名字 * @return [type] */ public function deleteQueue($queueName) { $this->queue || $this->initQueue($queueName); return $this->queue->delete(); } /** * 清空队列中的消息 * @authname [权限名字] 0 * @Author mokuyu * @DateTime 2019-09-20 * @param [type] $queueName [description] * @return [type] */ public function clearQueue($queueName) { $this->queue || $this->initQueue($queueName); return $this->queue->purge(); } /** * 初始化队列 * @authname [权限名字] 0 * @Author mokuyu * @DateTime 2019-09-20 * @param [type] $queueName [description] * @return [type] */ private function initQueue($queueName) { $this->setQueueName($queueName); $this->setRouteName($queueName); $this->conn || $this->initConn(); //创建队列 $this->queue = new \AMQPQueue($this->channel); $this->queue->setName($this->queueName); //设置队列持久 $this->queue->setFlags(AMQP_DURABLE); //声明消息队列并输出状态 $this->queue->declareQueue(); //绑定交换机与队列,并指定路由键 $this->queue->bind($this->exchangeName, $this->routeName); } public function setRouteName($name) { $this->routeName = 'route_' . $name; } public function setExchangeName($name) { $this->exchangeName = 'exc_' . $name; } public function setQueueName($name) { $this->queueName = 'queue_' . $name; } }
初始化队列
$conf = [ 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/', ]; $obj = new MessageQueue($conf, 'default');
发送消息
//发送消息到order队列 echo $obj->sendMessage('order', time());
取出一条消息
//从队列取出一个消息 $msg = $obj->getMessage('order'); echo $msg;
阻塞处理消息
// 阻塞回调消息处理 // //接收消息并进行处理的回调方法,回调示例 // public function processMessage($envelope, $queue) // { // //取消息内容 // $msg = $envelope->getBody(); // echo $msg . "\n"; //处理消息 // //显式确认,队列收到消费者显式确认后,会删除该消息 // $queue->ack($envelope->getDeliveryTag()); // } //这里直接传啦匿名函数 $obj->receiveMessage('order', function ($envelope, $queue) { //取消息内容 $msg = $envelope->getBody(); echo $msg . "\n"; //处理消息 //显式确认,队列收到消费者显式确认后,会删除该消息 $queue->ack($envelope->getDeliveryTag()); });
删除和清空
//删除队列 $obj->deleteQueue('order'); //清除队列中的消息 $obj->clearQueue('order');