1. 云栖社区>
  2. PHP教程>
  3. 正文

rabbitmq功能实现

作者:用户 来源:互联网 时间:2017-11-30 11:08:32

实现rabbitmq功能

rabbitmq功能实现 - 摘要: 本文讲的是rabbitmq功能实现, RabbitMQ 功能实现 引入php-amqplib类库,类库地址为https://github.com/php-amqplib/php-amqplib 简单的示例代码实现生产者(发送消息者) public f


RabbitMQ 功能实现


引入php-amqplib类库,类库地址为https://github.com/php-amqplib/php-amqplib



简单的示例代码实现
rabbitmq功能实现-

生产者(发送消息者)
    public function producer(){
//创建连接实例
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
//创建一个连接通道
$channel = $connection->channel();
//声明队列,如果该队列不存在会创建
$channel->queue_declare('hello', false, false, false, false);
//创建消息实例
$msg = new AMQPMessage('Hello World1!');
//通过通道,推送消息到队列中
$channel->basic_publish($msg, '', 'hello');
//关闭通道
$channel->close();
//关闭连接
$connection->close();
echo " [x] Sent 'Hello World!'/n";
}

消费者(获取消息者)
    public function consumer(){
//创建连接实例
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
//创建一个连接通道
$channel = $connection->channel();
//声明队列,如果该队列不存在会创建
$channel->queue_declare('hello', false, false, false, false);
//创建一个实例(这里用于回调)
$callback_model=new Callback();
//通过通道消费队列中的信息,并执行回调(这里为array($callback_model,'getQueueInfo'))
$channel->basic_consume('hello', '', false, false, false, false,array($callback_model,'getQueueInfo'));
//当存在回调时,这里将进入无限循环,每当队列中被推送新值,就会执行回调
while(count($channel->callbacks)) {
$channel->wait();
}
//关闭通道
$channel->close();
//关闭连接
$connection->close();
}

回调方法
class Callback extends Model{
//$channel->basic_consume 执行回调方法时,会传入$msg对象
public static function getQueueInfo($msg){
//我这里将$msg中的主体(队列中的消息值) 和 当前进程号 存入表中
$test_model=new Table();
$test_model->content=$msg->body;
$test_model->num=getmypid();
$test_model->save();
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
}

队列及消息的持久化设置
首先设置队列的持久化
//设置第三个参数durable 为true
//注意:如果这里hello队列已存在,RabbitMQ不允许重新定义现有队列,并且会返回错误,这里你可以声明一个新队列
$channel->queue_declare('hello', false, true, false, false);

设置消息的持久化
$msg = new AMQPMessage($data,
array('delivery_mode' => 2) //使消息持久化
);


虽然设置了队列和消息的持久化,但RabbitMQ可能有时只是存入缓存不是磁盘中,如果需要更强力的保障,请使用 publisher confirms



合理调度实现

如果你想让工人处理并确认了当前任务后再接受新任务,需在消耗信息时设置


//设置prefetch_count =1
$channel->basic_qos(null, 1, null); //参数为1 表示工人当前任务最多1个
$channel->basic_consume('hello', '', false, false, false, false);

消息确认机制
$channel->basic_consume('hello', '', false, false, false, false,array($callback_model,'getQueueInfo'));
//注意:$channel->basic_consume 的第四个参数为true时(即 no ack),则为关闭消息确认
//$channel->basic_consume 的第四个参数为false时,则为开启消息确认
//开启消息确认机制后,回调方法执行消息确认后,该信息才会被消耗. 当该工人或服务死后,未确认的信息会被再次放入到队列中
//回调方法中执行
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);



以上是云栖社区小编为您精心准备的的内容,在云栖社区的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索实现 , rabbitmq 功能 ,以便于您获取更多的相关知识。