diff --git a/RabbitMq/BaseAmqp.php b/RabbitMq/BaseAmqp.php index 8f40bcfb..b3d62eaa 100644 --- a/RabbitMq/BaseAmqp.php +++ b/RabbitMq/BaseAmqp.php @@ -5,7 +5,6 @@ use OldSound\RabbitMqBundle\Event\AMQPEvent; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; -use PhpAmqpLib\Connection\AMQPLazyConnection; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Symfony\Component\EventDispatcher\EventDispatcherInterface; @@ -21,6 +20,19 @@ abstract class BaseAmqp protected $autoSetupFabric = true; protected $basicProperties = array('content_type' => 'text/plain', 'delivery_mode' => 2); + /** + * Initialize confirmation mechanism for channel if enabled. + * See RabbitMQ {@link https://www.rabbitmq.com/confirms.html documentation} + * + * @var bool + */ + protected $enableConfirmation = false; + + /** + * @var int + */ + private $waitConfirmationTimeout = 1; + /** * @var LoggerInterface */ @@ -99,10 +111,13 @@ public function close() public function reconnect() { - if (!$this->conn->isConnected()) { - return; - } - + /** + * TODO: Reconnect do not restore channels. This issue persists in 2.6.3 and 2.7.0-rc1. Following PRs doesn't help: + * https://github.com/php-amqplib/php-amqplib/commit/2ccc97ca5b1229f9b12ea47fbab6c16fad26df41 + * https://github.com/php-amqplib/php-amqplib/commit/c87469ecbbf38fdc18688d3216c1e253d640ba32 + */ + $this->conn->reconnect(); + $this->closeChannel(); $this->conn->reconnect(); } @@ -112,7 +127,7 @@ public function reconnect() public function getChannel() { if (empty($this->ch) || null === $this->ch->getChannelId()) { - $this->ch = $this->conn->channel(); + $this->setChannel($this->conn->channel()); } return $this->ch; @@ -126,6 +141,7 @@ public function getChannel() public function setChannel(AMQPChannel $ch) { $this->ch = $ch; + $this->initChannel(); } /** @@ -283,4 +299,86 @@ public function getEventDispatcher() { return $this->eventDispatcher; } + + /** + * Close assigned channel + * + * @return void + */ + protected function closeChannel() + { + if (!$this->ch) { + return; + } + $this->ch = null; + } + + /** + * Wait for channel confirms that message is delivered after publish + * + * @return void + */ + public function waitConfirmation() + { + $this->getChannel()->wait_for_pending_acks($this->waitConfirmationTimeout); + } + + /** + * Set publish confirmation timeout + * + * @param int $timeout in seconds or 0 to wait forever + * + * @return void + * @throws \InvalidArgumentException if provided timeout isn't a integer or less than zero + */ + public function setWaitConfirmationTimeout($timeout) + { + if (!is_int($timeout) || $timeout < 0) { + throw new \InvalidArgumentException('Confirmation timeout must be an integer and greater or equal to zero'); + } + $this->waitConfirmationTimeout = $timeout; + } + + /** + * Return timeout in seconds + * + * @return int + */ + public function getWaitConfirmationTimeout() + { + return $this->waitConfirmationTimeout; + } + + /** + * Enable channel confirmation + * + * @return void + */ + public function enableConfirmation() + { + if ($this->enableConfirmation) { + // already enabled so we are sure that channel already properly initialized + return; + } + + $this->enableConfirmation = true; + + // If channel already created need to reinitialize it + if ($this->ch) { + $this->initChannel(); + } + } + + /** + * Initialize channel setting(e.g. confirmation) + * + * @return void + */ + protected function initChannel() + { + if ($this->enableConfirmation) { + $this->ch->confirm_select(); + } + } + } diff --git a/Tests/RabbitMq/BaseAmqpTest.php b/Tests/RabbitMq/BaseAmqpTest.php index 21d5044b..bc6cce79 100644 --- a/Tests/RabbitMq/BaseAmqpTest.php +++ b/Tests/RabbitMq/BaseAmqpTest.php @@ -30,13 +30,17 @@ public function testNotLazyConnection() $connection = $this->getMockBuilder('PhpAmqpLib\Connection\AbstractConnection') ->disableOriginalConstructor() ->getMock(); + $channel = $this->getMockBuilder('PhpAmqpLib\Channel\AMQPChannel') + ->disableOriginalConstructor() + ->getMock(); $connection ->method('connectOnConstruct') ->willReturn(true); $connection ->expects(static::once()) - ->method('channel'); + ->method('channel') + ->willReturn($channel); new Consumer($connection, null); } diff --git a/Tests/RabbitMq/ProducerTest.php b/Tests/RabbitMq/ProducerTest.php new file mode 100644 index 00000000..abea24a4 --- /dev/null +++ b/Tests/RabbitMq/ProducerTest.php @@ -0,0 +1,107 @@ +prophesize('\PhpAmqpLib\Connection\AMQPLazyConnection'); + $channel = $this->prophesize('\PhpAmqpLib\Channel\AMQPChannel'); + $producer = new Producer($connection->reveal()); + + $connection->isConnected()->willReturn(true); + $connection->reconnect()->shouldBeCalled(); + $connection->close()->shouldBeCalled(); + $connection->channel()->willReturn($channel->reveal()); + + $producer->reconnect(); + $producer->getChannel(); + } + + public function testEnableConfirmationWhenChannelIsNotSet() + { + $connection = $this->prophesize('\PhpAmqpLib\Connection\AMQPLazyConnection'); + $channel = $this->prophesize('\PhpAmqpLib\Channel\AMQPChannel'); + $producer = new Producer($connection->reveal()); + + $channel->close()->shouldBeCalled(); + $channel->confirm_select()->shouldBeCalled(); + + $producer->enableConfirmation(); + $producer->setChannel($channel->reveal()); + } + + public function testEnableConfirmationWhenChannelIsSet() + { + $connection = $this->prophesize('\PhpAmqpLib\Connection\AMQPLazyConnection'); + $channel = $this->prophesize('\PhpAmqpLib\Channel\AMQPChannel'); + $producer = new Producer($connection->reveal()); + + $channel->close()->shouldBeCalled(); + $channel->confirm_select()->shouldBeCalled(); + + $producer->setChannel($channel->reveal()); + $producer->enableConfirmation(); + } + + public function testWaitConfirmation() + { + $connection = $this->prophesize('\PhpAmqpLib\Connection\AMQPLazyConnection'); + $channel = $this->prophesize('\PhpAmqpLib\Channel\AMQPChannel'); + + $producer = new Producer($connection->reveal()); + + $channel->close()->shouldBeCalled(); + $channel->confirm_select()->shouldBeCalled(); + $channel->getChannelId()->willReturn('channel_id'); + $channel->wait_for_pending_acks($producer->getWaitConfirmationTimeout())->shouldBeCalled(); + + $producer->setChannel($channel->reveal()); + $producer->enableConfirmation(); + $producer->waitConfirmation(); + } + + /** + * @dataProvider provideTestSetWaitConfirmationTimeout + */ + public function testSetWaitConfirmationTimeout($timeout, $expectedException) + { + $connection = $this->prophesize('\PhpAmqpLib\Connection\AMQPLazyConnection'); + + $producer = new Producer($connection->reveal()); + + try { + $producer->setWaitConfirmationTimeout($timeout); + $this->assertEquals($timeout, $producer->getWaitConfirmationTimeout()); + } catch (\InvalidArgumentException $e) { + if ($expectedException) { + $this->addToAssertionCount(1); + } + } + } + + public function provideTestSetWaitConfirmationTimeout() + { + return array( + 'correct timeout' => array( + 'timeout' => 5, + 'expectedException' => false, + ), + 'timeout zero' => array( + 'timeout' => 0, + 'expectedException' => false, + ), + 'timeout less then zero' => array( + 'timeout' => -1, + 'expectedException' => '\InvalidArgumentException', + ), + 'timeout not integer' => array( + 'timeout' => '5', + 'expectedException' => '\InvalidArgumentException', + ), + ); + } +} diff --git a/composer.json b/composer.json index d2d07242..39c1f570 100644 --- a/composer.json +++ b/composer.json @@ -15,7 +15,7 @@ "symfony/config": "~2.3 || ~3.0", "symfony/yaml": "~2.3 || ~3.0", "symfony/console": "~2.3 || ~3.0", - "php-amqplib/php-amqplib": "~2.6", + "php-amqplib/php-amqplib": "^2.6.3", "psr/log": "~1.0" }, "require-dev": {