本篇文章为大家展示了RabbitMQ怎么在php中使用,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
什么是队列
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回。消息使用者再从MQ中取消息进行逻辑处理。对于消耗较大的请求,可以立马返回处理结果。减少服务器压力。为各个子系统之间解耦和异步处理。
rabbitmq的整体结构
rmq简单来说就是一个(生产/消费)的模型结构。消息生产者把数据丢到队列中,消息消费者从队列中取出数据进行逻辑处理。
那么如何确保,生产者添加的数据,能够到达指定的队列中呢?
rmq(消息队列)主要提供了三个概念(中间件?)来确保消息的分发。Exchange(交换机)、RoutingKey(路由)、Queue(队列)。从上面的图也可以看出来。 处理消息的接收、分发,主要在Broker模块中。
Exchange 所有生产消息的入口都是到交换机这里。exchange通过进来的路由(RoutingKey),去和已binding的规则进行匹配,找到指定的队列。
RoutingKey 我的理解,这里相当于一把钥匙。而binding的操作相当于一把锁头。
Queue 消息的存放区域,等到消费者来取。
Binding Exchange和Queue之间的一个绑定。
从这些概念来看,影响规则的主要是依赖Exchange。那rmq提供了哪些类型,都有什么特点呢?
exchange类型
RabbitMQ提供了四种Exchange类型
direct
fanout
topic
header
header类型在实际使用中较少,所以在这里就不进行说明。
Direct Exchange
direct 的规则比较简单。在发布消息前,需要把exchange和queue做一个绑定。 如果发布消息的时候,RoutingKey 和绑定的值(key)一致。则将消息投递到该队列中。如果不存在对应的队列,则消息会被丢弃。 (这时候访问rmq管理web时。可以看到消息进来,但是队列中没有值)
Fanout Exchange
fanout 类型则更简单一些。 只要exchange和队列做了绑定。发布的消息都会到队列中去。
Topic Exchange
相对来说 topic类型要复杂一些。 和direct类型相比。topic相当于模糊匹配,而direct为全等。类似mysql中 ‘like’关键词。
针对direct 类型写一个实例
实例分两部分 生产者、消费者(回调函数)
因为我的代码,对mq的部分做了封装,懒得拆分出来。 所以我只贴业务代码和封装的核心方法。
生产者代码
$mqModel = new Rabbitmq(); // 初始化(rmq连接操作) $newResult = ['tom','bill','jack']; if ($mqModel) { $mqRoute = 'push_data_to_crm_routing'; // 路由 $mqExchange = 'push_data_to_crm_exchange'; // 交换机 $mqQuery = 'push_data_to_crm_queue'; // 队列 // 建立连接,设置交换机,设置队列 $mqModel->setChannel()->setExchange($mqExchange,AMQP_EX_TYPE_DIRECT,AMQP_DURABLE)->setQueue($mqQuery,AMQP_DURABLE,$mqExchange,$mqRoute); foreach ($newResult as $k => $v){ $push_data = $v; $mqModel->publishMessage($push_data,$mqRoute); // 消息推送 } }
消费者代码
$mqModel = new Rabbitmq(); // $mqRoute = 'push_data_to_crm_routing'; 消费者用不上路由,因为不需要指定。 只要想取队列,消费即可。 $mqExchange = 'push_data_to_crm_exchange'; $mqQuery = 'push_data_to_crm_queue'; $mqModel->setChannel()->setExchange($mqExchange,'', AMQP_PASSIVE)->setQueue($mqQuery, AMQP_PASSIVE); $zmq->consume(function($msg){ var_dump($msg); return true; });
封装类中的核心方法
//设置交换机 public function setExchange($changeName = '', $changeType = '', $flags = false) { $errorMsg = ''; try{ if(!$this->channel){ throw new \AMQPQueueException("Error channel on method setExchange", 1); } $this->exchange = new \AMQPExchange($this->channel); if($changeName){ $this->changeName = $changeName; // 交换机名称 $this->exchange->setName($changeName); // 设置名称 $changeType = $changeType ? $changeType : AMQP_EX_TYPE_DIRECT; // 交换机类型 }else{ $this->changeName = ''; } if($changeType){ $this->changeType = $changeType; $this->exchange->settype($changeType); // 设置交换机类型 }else{ $this->changeType = ''; } if($flags){ $this->exchange->setFlags($flags); //交换机标志 } if($changeType || $flags){ $this->exchange->declareExchange(); // 创建 } } catch(AMQPQueueException $ex) { $errorMsg = "AMQPQueueException error exchange: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } catch(Exception $ex) { $errorMsg = "Exception error exchange: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } if($errorMsg){ throw new Exception($errorMsg, 1); } return $this; } // 设置队列 public function setQueue($queueName = '', $flags = '', $exchange_name = '', $routing_key = '', $arguments=[] ){ $errorMsg = ''; try{ if(!$this->channel){ throw new \AMQPQueueException("Error channel on method setQueue", 1); } $this->queue = new \AMQPQueue($this->channel); if(!$queueName){ return false; } $this->queueName = $queueName; // 队列名称 $this->queue->setName($queueName); if($flags){ $this->queue->setFlags($flags); // 队列标志。与消息持久化有关。 这篇文字不涉及这一块的说明 } if(is_array($arguments) && !empty($arguments)){ $this->queue->setArguments($arguments); // 参数配置 } $this->queue->declareQueue(); // 创建一个队列 $exchange_name = $exchange_name === false ? '' : ($exchange_name === true || !$exchange_name ? $this->changeName : $exchange_name); $routing_key = $routing_key ? $routing_key : $this->queueName; if($exchange_name && $routing_key ){ $this->queue->bind($exchange_name, $routing_key); // 交换机和队列的绑定操作 } } catch(AMQPQueueException $ex) { $errorMsg = "AMQPQueueException error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } catch(Exception $ex) { $errorMsg = "Exception error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } if($errorMsg){ throw new Exception($errorMsg, 1); } return $this; } // 发布消息 public function publishMessage($message = '', $routing_key = '', $flags = AMQP_NOPARAM, $attributes = []){ if(!$message){ return false; } $routing_key = $routing_key ? $routing_key : $this->queueName; // 发布消息,带有路由key。如果需要,则会用于关联。 $this->exchange->publish($message, $routing_key, $flags, $attributes); return true; } // 消费 public function consume($callback = null, $qos = 0, $isAct = true){ if($qos){ $this->channel->qos(0, $qos); } $errorMsg = ''; try{ if(!$this->queue){ throw new \AMQPQueueException("Error queue on method consume", 1); } $this->callBackFnc = $callback; $this->isAct = $isAct; $callback = function($envelope, $queue){ if(is_callable($this->callBackFnc)){ call_user_func($this->callBackFnc, $envelope->getBody()); if($this->isAct){ $queue->ack($envelope->getDeliveryTag()); }else{ $queue->nack($envelope->getDeliveryTag()); } } }; $this->queue->consume($callback); } catch(AMQPQueueException $ex) { $errorMsg = "AMQPQueueException error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } catch(Exception $ex) { $errorMsg = "Exception error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } if($errorMsg){ throw new \Exception($errorMsg, 1); } }
因为封装代码里写了很多 try catch 所以看起来特别乱。 还有部分兼容的逻辑。 看起来不舒服,就先删掉再看吧。
执行结果
先跑一遍生产者代码,这里可以用浏览器直接访问。 执行完了之后。 到rabbitmq 的web管理页面中查看。 发现消息已经正常添加到队列中。(web管理页面可查询别的文章开启)
这时候再执行消费者代码。 消费者代码需要在cli下执行。因为消费者为轮询等待,是死循环,无法在浏览器下执行。
上述内容就是RabbitMQ怎么在php中使用,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。