
<?php
$connect = new AMQPConnection();
$connect->connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName('logs');
$queue->declare();
$queue->bind('exchange', 'logs');
while (true) {
$queue->consume('callback');
}
$connection->close();
function callback($envelope, $queue) {
var_dump($envelope->getBody());
$queue->nack($envelope->getDeliveryTag());
}
<?php
$connect = new AMQPConnection();
$connect->connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$exchange->publish('direct type test','logs');
var_dump("Send Message OK");
$connect->disconnect();
创建receive_one.php和receive_two.php 并把send.php代码改成如下代码方便我们观看
receive_one.php 和 receive_two.php 代码相同 或者用dos运行多个接收端<?php
$connect = new AMQPConnection();
$connect->connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName('logs');
@$queue->declare();
$queue->bind('exchange', 'logs');
while (true) {
$queue->consume('callback');
}
$connection->close();
function callback($envelope, $queue) {
var_dump($envelope->getBody());
$queue->nack($envelope->getDeliveryTag());
}
<?php
$connect = new AMQPConnection();
$connect->connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
for ($index = 1; $index < 5; $index++) {
$exchange->publish($index,'logs');
var_dump("Send:$index");
}
$exchange->delete();
$connect->disconnect();
for ($index = 1; $index < 50; $index++) {
$exchange->publish($index,'logs');
var_dump("Send:$index");
}function callback($envelope, $queue) {
var_dump($envelope->getBody());
sleep(3);
$queue->nack($envelope->getDeliveryTag());
}
$channel = new AMQPChannel($connect);
$channel = new AMQPChannel($connect); $channel->setPrefetchCount(1);
PHP怎么学习?PHP怎么入门?PHP在哪学?PHP怎么学才快?不用担心,这里为大家提供了PHP速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号