-
Notifications
You must be signed in to change notification settings - Fork 464
Recreate channel after reconnect. Message confirmation feature #418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
36cafbc
4d5fbbb
aac0c2d
72e93bd
8c7ecc2
b3ad3f9
f66fc27
68d9f1f
e73698b
6842474
227e305
693ba5a
86c3937
9cde59c
7b94fd5
1a1b63c
6a97fde
70243ca
15af878
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,19 @@ abstract class BaseAmqp | |
protected $autoSetupFabric = true; | ||
protected $basicProperties = array('content_type' => 'text/plain', 'delivery_mode' => 2); | ||
|
||
/** | ||
* Initialize confirmation mechanism for channel if enabled. | ||
* See RabbitMQ {@link https://www.rabbitmq.com/confirms.html documentation} | ||
* | ||
* @var bool | ||
*/ | ||
protected $enableConfirmation = false; | ||
|
||
/** | ||
* @var int | ||
*/ | ||
private $waitConfirmationTimeout = 1; | ||
|
||
/** | ||
* @var LoggerInterface | ||
*/ | ||
|
@@ -103,6 +116,13 @@ public function reconnect() | |
return; | ||
} | ||
|
||
/** | ||
* TODO: must be done with one reconnect when php-amqplib will be updated till 2.6.3 | ||
* see https://github.com/php-amqplib/php-amqplib/commit/2ccc97ca5b1229f9b12ea47fbab6c16fad26df41 | ||
* see https://github.com/php-amqplib/php-amqplib/commit/c87469ecbbf38fdc18688d3216c1e253d640ba32 | ||
*/ | ||
$this->conn->reconnect(); | ||
$this->closeChannel(); | ||
$this->conn->reconnect(); | ||
} | ||
|
||
|
@@ -112,7 +132,7 @@ public function reconnect() | |
public function getChannel() | ||
{ | ||
if (empty($this->ch) || null === $this->ch->getChannelId()) { | ||
$this->ch = $this->conn->channel(); | ||
$this->setChannel($this->conn->channel()); | ||
} | ||
|
||
return $this->ch; | ||
|
@@ -126,6 +146,7 @@ public function getChannel() | |
public function setChannel(AMQPChannel $ch) | ||
{ | ||
$this->ch = $ch; | ||
$this->initChannel(); | ||
} | ||
|
||
/** | ||
|
@@ -283,4 +304,91 @@ public function getEventDispatcher() | |
{ | ||
return $this->eventDispatcher; | ||
} | ||
|
||
/** | ||
* Close assigned channel | ||
* | ||
* @return void | ||
*/ | ||
protected function closeChannel() | ||
{ | ||
if (!$this->ch) { | ||
return; | ||
} | ||
try { | ||
$this->ch = null; | ||
} catch (\Exception $e) { | ||
// ignore exception on Channel object destructor | ||
// TODO: this workaround can be removed after php-amqplib will be updated till 2.6.3 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could adjust There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @stloyd removed |
||
} | ||
} | ||
|
||
/** | ||
* Wait for channel confirms that message is delivered after publish | ||
* | ||
* @return void | ||
*/ | ||
public function waitConfirmation() | ||
{ | ||
$this->getChannel()->wait_for_pending_acks($this->waitConfirmationTimeout); | ||
} | ||
|
||
/** | ||
* Set publish confirmation timeout | ||
* | ||
* @param int $timeout in seconds or 0 to wait forever | ||
* | ||
* @return void | ||
* @throws \InvalidArgumentException if provided timeout isn't a integer or less than zero | ||
*/ | ||
public function setWaitConfirmationTimeout($timeout) | ||
{ | ||
if (!is_int($timeout) || $timeout < 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if (!$timeout || !is_int($timeout)) { There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @stloyd not sure why this condition need to be changed. Please explain.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @stloyd ping |
||
throw new \InvalidArgumentException('Confirmation timeout must be an integer and greater or equal to zero'); | ||
} | ||
$this->waitConfirmationTimeout = $timeout; | ||
} | ||
|
||
/** | ||
* Return timeout in seconds | ||
* | ||
* @return int | ||
*/ | ||
public function getWaitConfirmationTimeout() | ||
{ | ||
return $this->waitConfirmationTimeout; | ||
} | ||
|
||
/** | ||
* Enable channel confirmation | ||
* | ||
* @return void | ||
*/ | ||
public function enableConfirmation() | ||
{ | ||
if ($this->enableConfirmation) { | ||
// already enabled so we are sure that channel already properly initialized | ||
return; | ||
} | ||
|
||
$this->enableConfirmation = true; | ||
|
||
// If channel already created need to reinitialize it | ||
if ($this->ch) { | ||
$this->initChannel(); | ||
} | ||
} | ||
|
||
/** | ||
* Initialize channel setting(e.g. confirmation) | ||
* | ||
* @return void | ||
*/ | ||
protected function initChannel() | ||
{ | ||
if ($this->enableConfirmation) { | ||
$this->ch->confirm_select(); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
<?php | ||
/* | ||
* This file is part of the OpCart software. | ||
* | ||
* (c) 2015, OpticsPlanet, Inc | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please use proper header |
||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace OldSound\RabbitMqBundle\Tests\RabbitMq; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. old namespace There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
|
||
use OldSound\RabbitMqBundle\RabbitMq\Producer; | ||
use PhpAmqpLib\Channel\AMQPChannel; | ||
|
||
class ProducerTest extends \PHPUnit_Framework_TestCase | ||
{ | ||
public function testReconnect() | ||
{ | ||
$connection = $this->prophesize('\PhpAmqpLib\Connection\AMQPLazyConnection'); | ||
$channel = $this->prophesize('\PhpAmqpLib\Channel\AMQPChannel'); | ||
$producer = new Producer($connection->reveal()); | ||
|
||
$connection->isConnected()->willReturn(true); | ||
$connection->reconnect()->shouldBeCalled(); | ||
$connection->close()->shouldBeCalled(); | ||
$connection->channel()->willReturn($channel->reveal()); | ||
|
||
$producer->reconnect(); | ||
$producer->getChannel(); | ||
} | ||
|
||
public function testEnableConfirmationWhenChannelIsNotSet() | ||
{ | ||
$connection = $this->prophesize('\PhpAmqpLib\Connection\AMQPLazyConnection'); | ||
$channel = $this->prophesize('\PhpAmqpLib\Channel\AMQPChannel'); | ||
$producer = new Producer($connection->reveal()); | ||
|
||
$channel->close()->shouldBeCalled(); | ||
$channel->confirm_select()->shouldBeCalled(); | ||
|
||
$producer->enableConfirmation(); | ||
$producer->setChannel($channel->reveal()); | ||
} | ||
|
||
public function testEnableConfirmationWhenChannelIsSet() | ||
{ | ||
$connection = $this->prophesize('\PhpAmqpLib\Connection\AMQPLazyConnection'); | ||
$channel = $this->prophesize('\PhpAmqpLib\Channel\AMQPChannel'); | ||
$producer = new Producer($connection->reveal()); | ||
|
||
$channel->close()->shouldBeCalled(); | ||
$channel->confirm_select()->shouldBeCalled(); | ||
|
||
$producer->setChannel($channel->reveal()); | ||
$producer->enableConfirmation(); | ||
} | ||
|
||
public function testWaitConfirmation() | ||
{ | ||
$connection = $this->prophesize('\PhpAmqpLib\Connection\AMQPLazyConnection'); | ||
$channel = $this->prophesize('\PhpAmqpLib\Channel\AMQPChannel'); | ||
|
||
$producer = new Producer($connection->reveal()); | ||
|
||
$channel->close()->shouldBeCalled(); | ||
$channel->confirm_select()->shouldBeCalled(); | ||
$channel->wait_for_pending_acks($producer->getWaitConfirmationTimeout())->shouldBeCalled(); | ||
|
||
$producer->setChannel($channel->reveal()); | ||
$producer->enableConfirmation(); | ||
$producer->waitConfirmation(); | ||
} | ||
|
||
public function testSetWaitConfirmationTimeout() | ||
{ | ||
$connection = $this->prophesize('\PhpAmqpLib\Connection\AMQPLazyConnection'); | ||
|
||
$producer = new Producer($connection->reveal()); | ||
$producer->setWaitConfirmationTimeout(5); | ||
$this->assertEquals(5, $producer->getWaitConfirmationTimeout()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,7 +31,8 @@ | |
}, | ||
"extra": { | ||
"branch-alias": { | ||
"dev-master": "1.10.x-dev" | ||
"dev-master": "1.10.x-dev", | ||
"dev-reconnect-fix-upstream": "1.13.x-dev" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it required? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was done for our needs. Have to be removed. |
||
} | ||
}, | ||
"autoload": { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2.6.3 is released
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need time to recheck why it was done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry not fixed yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oleg-andreyev 2.6.3 and even 2.7.0-rc1 doesn't help :(