From d0b646fd65e5b068d364e2d2f3c1d5004d85e15d Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sat, 4 Dec 2021 23:49:57 +0100 Subject: [PATCH 1/7] Fix job marked as failed when in a queue with max attempts --- src/CloudTasksJob.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/CloudTasksJob.php b/src/CloudTasksJob.php index d3d690a..d856fc7 100644 --- a/src/CloudTasksJob.php +++ b/src/CloudTasksJob.php @@ -41,7 +41,7 @@ public function setAttempts($attempts) public function setMaxTries($maxTries) { if ((int) $maxTries === -1) { - $maxTries = null; + $maxTries = 0; } $this->maxTries = $maxTries; From b28d5e95952ec276ed57908eadcb04b1a1366b39 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sun, 5 Dec 2021 17:21:42 +0100 Subject: [PATCH 2/7] Add support for MAX_RETRY_DURATION --- src/CloudTasksJob.php | 33 ++++++++++++++++++++- src/CloudTasksQueue.php | 14 +++++++++ src/TaskHandler.php | 65 ++++++++++++++++++++++++++++++++++++----- 3 files changed, 103 insertions(+), 9 deletions(-) diff --git a/src/CloudTasksJob.php b/src/CloudTasksJob.php index d856fc7..ba000a8 100644 --- a/src/CloudTasksJob.php +++ b/src/CloudTasksJob.php @@ -2,6 +2,7 @@ namespace Stackkit\LaravelGoogleCloudTasksQueue; +use Google\Cloud\Tasks\V2\CloudTasksClient; use Illuminate\Container\Container; use Illuminate\Queue\Jobs\Job as LaravelJob; use Illuminate\Contracts\Queue\Job as JobContract; @@ -11,11 +12,18 @@ class CloudTasksJob extends LaravelJob implements JobContract private $job; private $attempts; private $maxTries; + public $retryUntil = null; - public function __construct($job) + /** + * @var CloudTasksQueue + */ + private $cloudTasksQueue; + + public function __construct($job, CloudTasksQueue $cloudTasksQueue) { $this->job = $job; $this->container = Container::getInstance(); + $this->cloudTasksQueue = $cloudTasksQueue; } public function getJobId() @@ -56,4 +64,27 @@ public function setQueue($queue) { $this->queue = $queue; } + + public function setRetryUntil($retryUntil) + { + $this->retryUntil = $retryUntil; + } + + public function retryUntil() + { + return $this->retryUntil; + } + + // timeoutAt was renamed to retryUntil in 8.x but we still support this. + public function timeoutAt() + { + return $this->retryUntil; + } + + public function delete() + { + parent::delete(); + + $this->cloudTasksQueue->delete($this); + } } diff --git a/src/CloudTasksQueue.php b/src/CloudTasksQueue.php index 73b7764..20530eb 100644 --- a/src/CloudTasksQueue.php +++ b/src/CloudTasksQueue.php @@ -94,6 +94,20 @@ private function createHttpRequest() return app(HttpRequest::class); } + public function delete(CloudTasksJob $job) + { + $config = $this->config; + + $taskName = $this->client->taskName( + $config['project'], + $config['location'], + $job->getQueue(), + request()->header('X-Cloudtasks-Taskname') + ); + + $this->client->deleteTask($taskName); + } + /** * @return Task */ diff --git a/src/TaskHandler.php b/src/TaskHandler.php index a8b2b5f..47d75ac 100644 --- a/src/TaskHandler.php +++ b/src/TaskHandler.php @@ -2,7 +2,9 @@ namespace Stackkit\LaravelGoogleCloudTasksQueue; +use Google\Cloud\Tasks\V2\Attempt; use Google\Cloud\Tasks\V2\CloudTasksClient; +use Google\Cloud\Tasks\V2\RetryConfig; use Illuminate\Http\Request; use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Worker; @@ -14,6 +16,16 @@ class TaskHandler private $publicKey; private $config; + /** + * @var CloudTasksQueue + */ + private $queue; + + /** + * @var RetryConfig + */ + private $retryConfig = null; + public function __construct(CloudTasksClient $client, Request $request, OpenIdVerificator $publicKey) { $this->client = $client; @@ -31,6 +43,8 @@ public function handle($task = null) $this->loadQueueConnectionConfiguration($task); + $this->setQueue(); + $this->authorizeRequest(); $this->listenForEvents(); @@ -48,6 +62,11 @@ private function loadQueueConnectionConfiguration($task) ); } + private function setQueue() + { + $this->queue = new CloudTasksQueue($this->config, $this->client); + } + /** * @throws CloudTasksException */ @@ -122,29 +141,59 @@ private function listenForEvents() */ private function handleTask($task) { - $job = new CloudTasksJob($task); + $job = new CloudTasksJob($task, $this->queue); + + $this->loadQueueRetryConfig(); $job->setAttempts(request()->header('X-CloudTasks-TaskRetryCount') + 1); $job->setQueue(request()->header('X-Cloudtasks-Queuename')); - $job->setMaxTries($this->getQueueMaxTries($job)); + $job->setMaxTries($this->retryConfig->getMaxAttempts()); + + // If the job is being attempted again we also check if a + // max retry duration has been set. If that duration + // has passed, it should stop trying altogether. + if ($job->attempts() >= 1) { + $job->setRetryUntil($this->getRetryUntilTimestamp($job)); + } $worker = $this->getQueueWorker(); $worker->process($this->config['connection'], $job, new WorkerOptions()); } - private function getQueueMaxTries(CloudTasksJob $job) + private function loadQueueRetryConfig() { $queueName = $this->client->queueName( $this->config['project'], $this->config['location'], - $job->getQueue() + request()->header('X-Cloudtasks-Queuename') + ); + + $this->retryConfig = $this->client->getQueue($queueName)->getRetryConfig(); + } + + private function getRetryUntilTimestamp(CloudTasksJob $job) + { + $task = $this->client->getTask( + $this->client->taskName( + $this->config['project'], + $this->config['location'], + $job->getQueue(), + request()->header('X-Cloudtasks-Taskname') + ) ); - return $this->client - ->getQueue($queueName) - ->getRetryConfig() - ->getMaxAttempts(); + $attempt = $task->getFirstAttempt(); + + if (!$attempt instanceof Attempt) { + return null; + } + + $maxDurationInSeconds = $this->retryConfig->getMaxRetryDuration()->getSeconds(); + + $firstAttemptTimestamp = $attempt->getDispatchTime()->toDateTime()->getTimestamp(); + + return $firstAttemptTimestamp + $maxDurationInSeconds; } /** From 6e6e6cb61155f64f281e5839809ca4c3642480ee Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sun, 12 Dec 2021 13:12:23 +0100 Subject: [PATCH 3/7] Fix incorrect check --- src/TaskHandler.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/TaskHandler.php b/src/TaskHandler.php index 47d75ac..3c0d00d 100644 --- a/src/TaskHandler.php +++ b/src/TaskHandler.php @@ -152,7 +152,7 @@ private function handleTask($task) // If the job is being attempted again we also check if a // max retry duration has been set. If that duration // has passed, it should stop trying altogether. - if ($job->attempts() >= 1) { + if ($job->attempts() > 1) { $job->setRetryUntil($this->getRetryUntilTimestamp($job)); } From fed1c4d4220e256c1f9e078fcbbb58aa0a233c47 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sun, 12 Dec 2021 14:25:43 +0100 Subject: [PATCH 4/7] Add tests --- tests/TaskHandlerTest.php | 76 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/tests/TaskHandlerTest.php b/tests/TaskHandlerTest.php index c0d2f3e..54eae32 100644 --- a/tests/TaskHandlerTest.php +++ b/tests/TaskHandlerTest.php @@ -4,7 +4,10 @@ use Firebase\JWT\JWT; use Firebase\JWT\SignatureInvalidException; +use Google\Cloud\Tasks\V2\Attempt; use Google\Cloud\Tasks\V2\CloudTasksClient; +use Google\Cloud\Tasks\V2\Task; +use Google\Protobuf\Timestamp; use Illuminate\Cache\Events\CacheHit; use Illuminate\Cache\Events\KeyWritten; use Illuminate\Support\Facades\DB; @@ -27,6 +30,8 @@ class TaskHandlerTest extends TestCase private $request; + private $cloudTasksClient; + protected function setUp(): void { parent::setUp(); @@ -46,7 +51,8 @@ protected function setUp(): void $googlePublicKey->shouldReceive('getPublicKey')->andReturnNull(); $googlePublicKey->shouldReceive('getKidFromOpenIdToken')->andReturnNull(); - $cloudTasksClient = Mockery::mock(new CloudTasksClient()); + $cloudTasksClient = Mockery::mock(new CloudTasksClient())->byDefault(); + $this->cloudTasksClient = $cloudTasksClient; // Ensure we don't fetch the Queue name and attempts each test... $cloudTasksClient->shouldReceive('queueName')->andReturn('my-queue'); @@ -56,9 +62,25 @@ public function getRetryConfig() { public function getMaxAttempts() { return 3; } + + public function getMaxRetryDuration() { + return new class { + public function getSeconds() { + return 30; + } + }; + } }; } }); + $cloudTasksClient->shouldReceive('taskName')->andReturn('FakeTaskName'); + $cloudTasksClient->shouldReceive('getTask')->byDefault()->andReturn(new class { + public function getFirstAttempt() { + return null; + } + }); + + $cloudTasksClient->shouldReceive('deleteTask')->andReturnNull(); $this->handler = new TaskHandler( $cloudTasksClient, @@ -171,6 +193,58 @@ public function after_max_attempts_it_will_log_to_failed_table() ]); } + /** @test */ + public function after_max_attempts_it_will_delete_the_task() + { + $this->request->headers->add(['X-CloudTasks-TaskRetryCount' => 2]); + + rescue(function () { + $this->handler->handle($this->failingJob()); + }); + + $this->cloudTasksClient->shouldHaveReceived('deleteTask')->once(); + } + + /** @test */ + public function after_max_retry_until_it_will_delete_the_task() + { + $this->request->headers->add(['X-CloudTasks-TaskRetryCount' => 1]); + + $this->cloudTasksClient + ->shouldReceive('getTask') + ->byDefault() + ->andReturn(new class { + public function getFirstAttempt() { + return (new Attempt()) + ->setDispatchTime(new Timestamp([ + 'seconds' => time() - 29, + ])); + } + }); + + rescue(function () { + $this->handler->handle($this->failingJob()); + }); + + $this->cloudTasksClient->shouldNotHaveReceived('deleteTask'); + + $this->cloudTasksClient->shouldReceive('getTask') + ->andReturn(new class { + public function getFirstAttempt() { + return (new Attempt()) + ->setDispatchTime(new Timestamp([ + 'seconds' => time() - 30, + ])); + } + }); + + rescue(function () { + $this->handler->handle($this->failingJob()); + }); + + $this->cloudTasksClient->shouldHaveReceived('deleteTask')->once(); + } + private function simpleJob() { return json_decode(file_get_contents(__DIR__ . '/Support/test-job-payload.json'), true); From 1d5f9f8d63b3621e1b90157b1c90fca95f0d0673 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sun, 12 Dec 2021 14:35:32 +0100 Subject: [PATCH 5/7] Add test --- src/TaskHandler.php | 4 +++ tests/TaskHandlerTest.php | 71 +++++++++++++++++++++++++++++---------- 2 files changed, 58 insertions(+), 17 deletions(-) diff --git a/src/TaskHandler.php b/src/TaskHandler.php index 3c0d00d..7b0c42f 100644 --- a/src/TaskHandler.php +++ b/src/TaskHandler.php @@ -189,6 +189,10 @@ private function getRetryUntilTimestamp(CloudTasksJob $job) return null; } + if (! $this->retryConfig->hasMaxRetryDuration()) { + return null; + } + $maxDurationInSeconds = $this->retryConfig->getMaxRetryDuration()->getSeconds(); $firstAttemptTimestamp = $attempt->getDispatchTime()->toDateTime()->getTimestamp(); diff --git a/tests/TaskHandlerTest.php b/tests/TaskHandlerTest.php index 54eae32..4e1544a 100644 --- a/tests/TaskHandlerTest.php +++ b/tests/TaskHandlerTest.php @@ -56,23 +56,29 @@ protected function setUp(): void // Ensure we don't fetch the Queue name and attempts each test... $cloudTasksClient->shouldReceive('queueName')->andReturn('my-queue'); - $cloudTasksClient->shouldReceive('getQueue')->andReturn(new class { - public function getRetryConfig() { - return new class { - public function getMaxAttempts() { - return 3; - } - - public function getMaxRetryDuration() { - return new class { - public function getSeconds() { - return 30; - } - }; - } - }; - } - }); + $cloudTasksClient->shouldReceive('getQueue') + ->byDefault() + ->andReturn(new class { + public function getRetryConfig() { + return new class { + public function getMaxAttempts() { + return 3; + } + + public function hasMaxRetryDuration() { + return true; + } + + public function getMaxRetryDuration() { + return new class { + public function getSeconds() { + return 30; + } + }; + } + }; + } + }); $cloudTasksClient->shouldReceive('taskName')->andReturn('FakeTaskName'); $cloudTasksClient->shouldReceive('getTask')->byDefault()->andReturn(new class { public function getFirstAttempt() { @@ -245,6 +251,37 @@ public function getFirstAttempt() { $this->cloudTasksClient->shouldHaveReceived('deleteTask')->once(); } + /** @test */ + public function test_unlimited_max_attempts() + { + $this->cloudTasksClient->shouldReceive('getQueue') + ->byDefault() + ->andReturn(new class { + public function getRetryConfig() { + return new class { + public function getMaxAttempts() { + return -1; + } + + public function hasMaxRetryDuration() { + return false; + } + }; + } + }); + + for ($i = 0; $i < 50; $i++) { + $this->request->headers->add(['X-CloudTasks-TaskRetryCount' => $i]); + + rescue(function () { + $this->handler->handle($this->failingJob()); + }); + + $this->cloudTasksClient->shouldNotHaveReceived('deleteTask'); + } + + } + private function simpleJob() { return json_decode(file_get_contents(__DIR__ . '/Support/test-job-payload.json'), true); From d165daf84f22330e14fb4b3b048bc14166902416 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sun, 12 Dec 2021 15:12:21 +0100 Subject: [PATCH 6/7] Add test --- tests/TaskHandlerTest.php | 64 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/tests/TaskHandlerTest.php b/tests/TaskHandlerTest.php index 4e1544a..ce3c051 100644 --- a/tests/TaskHandlerTest.php +++ b/tests/TaskHandlerTest.php @@ -2,6 +2,7 @@ namespace Tests; +use Carbon\Carbon; use Firebase\JWT\JWT; use Firebase\JWT\SignatureInvalidException; use Google\Cloud\Tasks\V2\Attempt; @@ -279,7 +280,70 @@ public function hasMaxRetryDuration() { $this->cloudTasksClient->shouldNotHaveReceived('deleteTask'); } + } + + /** + * @test + * @testWith [{"retryCount": 1, "shouldHaveFailed": false}] + * [{"retryCount": 2, "shouldHaveFailed": false}] + * [{"retryCount": 2, "travelSeconds": 29, "shouldHaveFailed": false}] + * [{"retryCount": 2, "travelSeconds": 31, "shouldHaveFailed": true}] + */ + public function job_max_attempts_is_ignored_if_has_retry_until($example) + { + // Arrange + $this->request->headers->add(['X-CloudTasks-TaskRetryCount' => $example['retryCount']]); + + $now = Carbon::now()->getTimestamp(); + if (array_key_exists('travelSeconds', $example)) { + Carbon::setTestNow(Carbon::now()->addSeconds($example['travelSeconds'])); + } + + $this->cloudTasksClient->shouldReceive('getQueue') + ->byDefault() + ->andReturn(new class() { + public function getRetryConfig() { + return new class { + public function getMaxAttempts() { + return 3; + } + + public function hasMaxRetryDuration() { + return true; + } + public function getMaxRetryDuration() { + return new class { + public function getSeconds() { + return 30; + } + }; + } + }; + } + }); + + $this->cloudTasksClient + ->shouldReceive('getTask') + ->byDefault() + ->andReturn(new class { + public function getFirstAttempt() { + return (new Attempt()) + ->setDispatchTime(new Timestamp([ + 'seconds' => time(), + ])); + } + }); + + rescue(function () { + $this->handler->handle($this->failingJob()); + }); + + if ($example['shouldHaveFailed']) { + $this->cloudTasksClient->shouldHaveReceived('deleteTask'); + } else { + $this->cloudTasksClient->shouldNotHaveReceived('deleteTask'); + } } private function simpleJob() From 9fd1a36503f24775824a746facf97a3c08e1196b Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sun, 12 Dec 2021 16:19:01 +0100 Subject: [PATCH 7/7] Update test to take into account updated 8.x behavior --- tests/TaskHandlerTest.php | 105 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 100 insertions(+), 5 deletions(-) diff --git a/tests/TaskHandlerTest.php b/tests/TaskHandlerTest.php index ce3c051..c0c6262 100644 --- a/tests/TaskHandlerTest.php +++ b/tests/TaskHandlerTest.php @@ -284,17 +284,13 @@ public function hasMaxRetryDuration() { /** * @test - * @testWith [{"retryCount": 1, "shouldHaveFailed": false}] - * [{"retryCount": 2, "shouldHaveFailed": false}] - * [{"retryCount": 2, "travelSeconds": 29, "shouldHaveFailed": false}] - * [{"retryCount": 2, "travelSeconds": 31, "shouldHaveFailed": true}] + * @dataProvider whenIsJobFailingProvider */ public function job_max_attempts_is_ignored_if_has_retry_until($example) { // Arrange $this->request->headers->add(['X-CloudTasks-TaskRetryCount' => $example['retryCount']]); - $now = Carbon::now()->getTimestamp(); if (array_key_exists('travelSeconds', $example)) { Carbon::setTestNow(Carbon::now()->addSeconds($example['travelSeconds'])); } @@ -346,6 +342,105 @@ public function getFirstAttempt() { } } + public function whenIsJobFailingProvider() + { + $this->createApplication(); + + // 8.x behavior: if retryUntil, only check that. + // 6.x behavior: if retryUntil, check that, otherwise check maxAttempts + + // max retry count is 3 + // max retryUntil is 30 seconds + + if (version_compare(app()->version(), '8.0.0', '>=')) { + return [ + [ + [ + 'retryCount' => 1, + 'shouldHaveFailed' => false, + ], + ], + [ + [ + 'retryCount' => 2, + 'shouldHaveFailed' => false, + ], + ], + [ + [ + 'retryCount' => 1, + 'travelSeconds' => 29, + 'shouldHaveFailed' => false, + ], + ], + [ + [ + 'retryCount' => 1, + 'travelSeconds' => 31, + 'shouldHaveFailed' => true, + ], + ], + [ + [ + 'retryCount' => 1, + 'travelSeconds' => 32, + 'shouldHaveFailed' => true, + ], + ], + [ + [ + 'retryCount' => 1, + 'travelSeconds' => 31, + 'shouldHaveFailed' => true, + ], + ], + ]; + } + + return [ + [ + [ + 'retryCount' => 1, + 'shouldHaveFailed' => false, + ], + ], + [ + [ + 'retryCount' => 2, + 'shouldHaveFailed' => true, + ], + ], + [ + [ + 'retryCount' => 1, + 'travelSeconds' => 29, + 'shouldHaveFailed' => false, + ], + ], + [ + [ + 'retryCount' => 1, + 'travelSeconds' => 31, + 'shouldHaveFailed' => true, + ], + ], + [ + [ + 'retryCount' => 1, + 'travelSeconds' => 32, + 'shouldHaveFailed' => true, + ], + ], + [ + [ + 'retryCount' => 1, + 'travelSeconds' => 32, + 'shouldHaveFailed' => true, + ], + ], + ]; + } + private function simpleJob() { return json_decode(file_get_contents(__DIR__ . '/Support/test-job-payload.json'), true);