Skip to content

Update to match rdkafka version 6.0.4 #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
}
],
"require": {
"ext-rdkafka": ">=4.0"
"ext-rdkafka": ">=6.0"
},
"require-dev": {
"phpunit/phpunit": "^8.2.4"
Expand Down
64 changes: 51 additions & 13 deletions stubs/RdKafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use RdKafka\Metadata;
use RdKafka\Topic;
use RdKafka\TopicConf;
use RdKafka\Queue;
use RdKafka\TopicPartition;

abstract class RdKafka
{
Expand All @@ -13,7 +13,7 @@ abstract class RdKafka
*
* @return int
*/
public function addBrokers($broker_list)
public function addBrokers(string $broker_list): int
{
}

Expand All @@ -25,14 +25,14 @@ public function addBrokers($broker_list)
* @throws Exception
* @return Metadata
*/
public function getMetadata($all_topics, $only_topic = null, $timeout_ms)
public function getMetadata(bool $all_topics, Topic $only_topic = null, int $timeout_ms): Metadata
{
}

/**
* @return int
*/
public function getOutQLen()
public function getOutQLen(): int
{
}

Expand All @@ -42,7 +42,7 @@ public function getOutQLen()
*
* @return Topic
*/
public function newTopic($topic_name, TopicConf $topic_conf = null)
public function newTopic(string $topic_name, TopicConf $topic_conf = null): Topic
{
}

Expand All @@ -51,7 +51,7 @@ public function newTopic($topic_name, TopicConf $topic_conf = null)
*
* @return void
*/
public function poll($timeout_ms)
public function poll(int $timeout_ms)
{
}

Expand All @@ -62,7 +62,14 @@ public function poll($timeout_ms)
*
* @return void
*/
public function setLogLevel($level)
public function setLogLevel(int $level)
{
}

/**
* @deprecated
*/
public function setLogger(int $logger_id)
{
}

Expand All @@ -73,32 +80,63 @@ public function setLogLevel($level)
* @param int $high
* @param int $timeout_ms
*/
public function queryWatermarkOffsets($topic, $partition, &$low, &$high, $timeout_ms)
public function queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeout_ms)
{
}

/**
* @param array $topicPartitions
* @param TopicPartition[] $topicPartitions
* @param int $timeout_ms
* @return array
* @return TopicPartition[]
*/
public function offsetsForTimes ($topicPartitions , $timeout_ms)
public function offsetsForTimes (array $topicPartitions, int $timeout_ms): array
{
}

/**
* @param int $purge_flags
* @return int
*/
public function purge($purge_flags)
public function purge(int $purge_flags): int
{
}

/**
* @param int $timeout_ms
* @return int
*/
public function flush($timeout_ms)
public function flush(int $timeout_ms): int
{
}

/**
* @param string $token_value
* @param int|float|string $lifetime_ms
* @param string $principal_name
* @param array $extensions
*
* @return void
*/
public function oauthbearerSetToken(string $token_value, $lifetime_ms, string $principal_name, array $extensions = [])
{
}

/**
* @param string $error
*
* @return void
*/
public function oauthbearerSetTokenFailure(string $error)
{
}

/**
* @param int $timeout_ms
*
* @return int
*/
public function getControllerId(int $timeout_ms): int
{

}
}
28 changes: 19 additions & 9 deletions stubs/RdKafka/Conf.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Conf
/**
* @return array<string, string>
*/
public function dump()
public function dump(): array
{
}

Expand All @@ -20,7 +20,7 @@ public function dump()
*
* @return void
*/
public function set($name, $value)
public function set(string $name, string $value)
{
}

Expand All @@ -36,7 +36,7 @@ public function setDefaultTopicConf(TopicConf $topic_conf)
}

/**
* @param callable $callback
* @param callable $callback (RdKafka\Kafka $kafka, RdKafka\Message $message)
*
* @return void
*/
Expand All @@ -45,7 +45,7 @@ public function setDrMsgCb(callable $callback)
}

/**
* @param callable $callback
* @param callable $callback (RdKafka\KafkaConsumer|RdKafka\Producer $kafka, int $err, string $reason)
*
* @return void
*/
Expand All @@ -54,7 +54,7 @@ public function setErrorCb(callable $callback)
}

/**
* @param callable $callback
* @param callable $callback (RdKafka\KafkaConsumer $kafka, int $err, array $partitions)
*
* @return void
*/
Expand All @@ -63,7 +63,7 @@ public function setRebalanceCb(callable $callback)
}

/**
* @param callable $callback
* @param callable $callback (object $kafka, string $json, int $json_len);
*
* @return void
*/
Expand All @@ -72,7 +72,7 @@ public function setStatsCb(callable $callback)
}

/**
* @param callable $callback
* @param callable $callback (RdKafka\Message $msg)
*
* @return void
*/
Expand All @@ -81,7 +81,7 @@ public function setConsumeCb(callable $callback)
}

/**
* @param callable $callback
* @param callable $callback (object $kafka, int $err, array $partitions);
*
* @return void
*/
Expand All @@ -90,11 +90,21 @@ public function setOffsetCommitCb(callable $callback)
}

/**
* @param callable $callback
* @param callable $callback (object $kafka, int $level, string $facility, string $message);
*
* @return void
*/
public function setLogCb(callable $callback)
{
}

/**
* @param callable $callback (RdKafka\Producer $producer)
*
* @return void
*/
public function setOauthbearerTokenRefreshCb(callable $callback)
{

}
}
10 changes: 5 additions & 5 deletions stubs/RdKafka/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,26 @@
class Consumer extends \RdKafka
{
/**
* @param Conf $conf
* @param Conf|null $conf
*/
public function __construct(Conf $conf = null)
{
}

/**
* @param string $topic_name
* @param TopicConf $topic_conf
* @param string $topic_name
* @param TopicConf|null $topic_conf
*
* @return ConsumerTopic
*/
public function newTopic($topic_name, TopicConf $topic_conf = null)
public function newTopic(string $topic_name, TopicConf $topic_conf = null): Topic
{
}

/**
* @return Queue
*/
public function newQueue()
public function newQueue(): Queue
{
}
}
16 changes: 8 additions & 8 deletions stubs/RdKafka/ConsumerTopic.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ private function __construct()
*
* @return Message|null
*/
public function consume($partition, $timeout_ms)
public function consume(int $partition, int $timeout_ms)
{
}

Expand All @@ -25,7 +25,7 @@ public function consume($partition, $timeout_ms)
*
* @return void
*/
public function consumeQueueStart($partition, $offset, Queue $queue)
public function consumeQueueStart(int $partition, int $offset, Queue $queue)
{
}

Expand All @@ -35,7 +35,7 @@ public function consumeQueueStart($partition, $offset, Queue $queue)
*
* @return void
*/
public function consumeStart($partition, $offset)
public function consumeStart(int $partition, int $offset)
{
}

Expand All @@ -44,7 +44,7 @@ public function consumeStart($partition, $offset)
*
* @return void
*/
public function consumeStop($partition)
public function consumeStop(int $partition)
{
}

Expand All @@ -54,7 +54,7 @@ public function consumeStop($partition)
*
* @return void
*/
public function offsetStore($partition, $offset)
public function offsetStore(int $partition, int $offset)
{
}

Expand All @@ -65,9 +65,9 @@ public function offsetStore($partition, $offset)
*
* @throws Exception
* @throws \InvalidArgumentException
* @return array
* @return Message[]|null
*/
public function consumeBatch($partition, $timeout_ms, $batch_size)
public function consumeBatch(int $partition, int $timeout_ms, int $batch_size)
{
}

Expand All @@ -78,7 +78,7 @@ public function consumeBatch($partition, $timeout_ms, $batch_size)
*
* @return void
*/
public function consumeCallback($partition, $timeout_ms, callable $callback)
public function consumeCallback(int $partition, int $timeout_ms, callable $callback)
{
}
}
Loading