Skip to content

Commit 3b3fdb7

Browse files
fon60skafandri
authored andcommitted
Add notify to rpc client. To enable progress functionality. (#351)
* Add notify to rpc client. To enable progress capabilities * Typo * Test exception throwing
1 parent bd96a63 commit 3b3fdb7

File tree

2 files changed

+47
-0
lines changed

2 files changed

+47
-0
lines changed

RabbitMq/RpcClient.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ class RpcClient extends BaseAmqp
1010
protected $replies = array();
1111
protected $expectSerializedResponse;
1212
protected $timeout = 0;
13+
protected $notifyCallback;
1314

1415
private $queueName;
1516
private $unserializer = 'unserialize';
@@ -80,6 +81,9 @@ public function processMessage(AMQPMessage $msg)
8081
if ($this->expectSerializedResponse) {
8182
$messageBody = call_user_func($this->unserializer, $messageBody);
8283
}
84+
if ($this->notifyCallback !== null) {
85+
call_user_func($this->notifyCallback, $messageBody);
86+
}
8387

8488
$this->replies[$msg->get('correlation_id')] = $messageBody;
8589
}
@@ -98,6 +102,15 @@ public function setUnserializer($unserializer)
98102
$this->unserializer = $unserializer;
99103
}
100104

105+
public function notify($callback)
106+
{
107+
if (is_callable($callback)) {
108+
$this->notifyCallback = $callback;
109+
} else {
110+
throw new \InvalidArgumentException('First parameter expects to be callable');
111+
}
112+
}
113+
101114
public function setDirectReplyTo($directReplyTo)
102115
{
103116
$this->directReplyTo = $directReplyTo;

Tests/RabbitMq/RpcClientTest.php

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,38 @@ public function testProcessMessageWithCustomUnserializer()
2222
});
2323
$client->processMessage($message);
2424
}
25+
26+
public function testProcessMessageWithNotifyMethod()
27+
{
28+
/** @var RpcClient $client */
29+
$client = $this->getMockBuilder('\OldSound\RabbitMqBundle\RabbitMq\RpcClient')
30+
->setMethods(array('sendReply', 'maybeStopConsumer'))
31+
->disableOriginalConstructor()
32+
->getMock();
33+
$expectedNotify = 'message';
34+
$message = $this->getMock('\PhpAmqpLib\Message\AMQPMessage', array('get'), array($expectedNotify));
35+
$notified = false;
36+
$client->notify(function ($message) use (&$notified) {
37+
$notified = $message;
38+
});
39+
40+
$client->initClient(false);
41+
$client->processMessage($message);
42+
43+
$this->assertSame($expectedNotify, $notified);
44+
}
45+
46+
/**
47+
* @expectedException \InvalidArgumentException
48+
*/
49+
public function testInvalidParameterOnNotify()
50+
{
51+
/** @var RpcClient $client */
52+
$client = $this->getMockBuilder('\OldSound\RabbitMqBundle\RabbitMq\RpcClient')
53+
->setMethods(array('sendReply', 'maybeStopConsumer'))
54+
->disableOriginalConstructor()
55+
->getMock();
56+
57+
$client->notify('not a callable');
58+
}
2559
}

0 commit comments

Comments
 (0)