From c0bfc897138c90279f40c2af59a5a280ae8f5587 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sun, 7 Mar 2021 16:11:09 +0100 Subject: [PATCH 01/10] Add initial draft of handling failed jobs --- src/CloudTasksJob.php | 24 ++++++++++++++++-- src/CloudTasksQueue.php | 9 +++++++ src/TaskHandler.php | 25 ++++++++++++++++-- tests/Support/FailingJob.php | 35 ++++++++++++++++++++++++++ tests/Support/failing-job-payload.json | 1 + 5 files changed, 90 insertions(+), 4 deletions(-) create mode 100644 tests/Support/FailingJob.php create mode 100644 tests/Support/failing-job-payload.json diff --git a/src/CloudTasksJob.php b/src/CloudTasksJob.php index 6367179..5ea0db3 100644 --- a/src/CloudTasksJob.php +++ b/src/CloudTasksJob.php @@ -10,11 +10,11 @@ class CloudTasksJob extends LaravelJob implements JobContract { private $job; private $attempts; + private $maxTries; - public function __construct($job, $attempts) + public function __construct($job) { $this->job = $job; - $this->attempts = $attempts; $this->container = Container::getInstance(); } @@ -32,4 +32,24 @@ public function attempts() { return $this->attempts; } + + public function setAttempts($attempts) + { + $this->attempts = $attempts; + } + + public function setMaxTries($maxTries) + { + $this->maxTries = $maxTries; + } + + public function maxTries() + { + return $this->maxTries; + } + + public function setQueue($queue) + { + $this->queue = $queue; + } } diff --git a/src/CloudTasksQueue.php b/src/CloudTasksQueue.php index 7c892a4..0e4cc0e 100644 --- a/src/CloudTasksQueue.php +++ b/src/CloudTasksQueue.php @@ -60,6 +60,10 @@ protected function pushToCloudTasks($queue, $payload, $delay = 0, $attempts = 0) $httpRequest->setHttpMethod(HttpMethod::POST); $httpRequest->setBody($payload); + $httpRequest->setHeaders([ + 'X-Stackkit-Max-Attempts' => $this->getMaxAttempts($queueName), + ]); + $task = $this->createTask(); $task->setHttpRequest($httpRequest); @@ -99,4 +103,9 @@ private function createTask() { return app(Task::class); } + + private function getMaxAttempts($queue) + { + return $this->client->getQueue($queue)->getRetryConfig()->getMaxAttempts(); + } } diff --git a/src/TaskHandler.php b/src/TaskHandler.php index a1eaa85..22d13af 100644 --- a/src/TaskHandler.php +++ b/src/TaskHandler.php @@ -4,6 +4,7 @@ use Google\Cloud\Tasks\V2\CloudTasksClient; use Illuminate\Http\Request; +use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Worker; use Illuminate\Queue\WorkerOptions; use Firebase\JWT\JWT; @@ -33,6 +34,8 @@ public function handle($task = null) $task = $task ?: $this->captureTask(); + $this->listenForEvents(); + $this->handleTask($task); } @@ -81,7 +84,11 @@ private function captureTask() { $input = file_get_contents('php://input'); - if ($input === false) { + if (!$input) { + $input = request('input') ?: false; + } + + if (!$input) { throw new CloudTasksException('Could not read incoming task'); } @@ -94,13 +101,27 @@ private function captureTask() return $task; } + private function listenForEvents() + { + app('events')->listen(JobFailed::class, function ($event) { + app('queue.failer')->log( + 'cloudtasks', $event->job->getQueue(), + $event->job->getRawBody(), $event->exception + ); + }); + } + /** * @param $task * @throws CloudTasksException */ private function handleTask($task) { - $job = new CloudTasksJob($task, request()->header('X-CloudTasks-TaskRetryCount')); + $job = new CloudTasksJob($task); + + $job->setAttempts(request()->header('X-CloudTasks-TaskRetryCount') + 1); + $job->setQueue(request()->header('X-Cloudtasks-Queuename')); + $job->setMaxTries(request()->header('X-Stackkit-Max-Attempts')); $worker = $this->getQueueWorker(); diff --git a/tests/Support/FailingJob.php b/tests/Support/FailingJob.php new file mode 100644 index 0000000..6f25e51 --- /dev/null +++ b/tests/Support/FailingJob.php @@ -0,0 +1,35 @@ + Date: Fri, 19 Mar 2021 17:12:01 +0100 Subject: [PATCH 02/10] Move setting maxAttempt to when task is handled --- src/CloudTasksQueue.php | 9 --------- src/TaskHandler.php | 16 +++++++++++++++- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/CloudTasksQueue.php b/src/CloudTasksQueue.php index 0e4cc0e..7c892a4 100644 --- a/src/CloudTasksQueue.php +++ b/src/CloudTasksQueue.php @@ -60,10 +60,6 @@ protected function pushToCloudTasks($queue, $payload, $delay = 0, $attempts = 0) $httpRequest->setHttpMethod(HttpMethod::POST); $httpRequest->setBody($payload); - $httpRequest->setHeaders([ - 'X-Stackkit-Max-Attempts' => $this->getMaxAttempts($queueName), - ]); - $task = $this->createTask(); $task->setHttpRequest($httpRequest); @@ -103,9 +99,4 @@ private function createTask() { return app(Task::class); } - - private function getMaxAttempts($queue) - { - return $this->client->getQueue($queue)->getRetryConfig()->getMaxAttempts(); - } } diff --git a/src/TaskHandler.php b/src/TaskHandler.php index 22d13af..bb45073 100644 --- a/src/TaskHandler.php +++ b/src/TaskHandler.php @@ -121,13 +121,27 @@ private function handleTask($task) $job->setAttempts(request()->header('X-CloudTasks-TaskRetryCount') + 1); $job->setQueue(request()->header('X-Cloudtasks-Queuename')); - $job->setMaxTries(request()->header('X-Stackkit-Max-Attempts')); + $job->setMaxTries($this->getQueueMaxTries($job)); $worker = $this->getQueueWorker(); $worker->process('cloudtasks', $job, new WorkerOptions()); } + private function getQueueMaxTries(CloudTasksJob $job) + { + $queueName = $this->client->queueName( + Config::project(), + Config::location(), + $job->getQueue() + ); + + return $this->client + ->getQueue($queueName) + ->getRetryConfig() + ->getMaxAttempts(); + } + /** * @return Worker */ From f4c77f6e325ea31aa68c4855de594cf9bf91d9de Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sun, 28 Mar 2021 15:45:42 +0200 Subject: [PATCH 03/10] Add test to assert queue failed driver works --- phpunit.xml | 1 + tests/TaskHandlerTest.php | 70 ++++++++++++++++++++++++++++++--------- tests/TestCase.php | 27 +++++++++++++-- 3 files changed, 80 insertions(+), 18 deletions(-) diff --git a/phpunit.xml b/phpunit.xml index ea7a136..994539f 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -23,6 +23,7 @@ + diff --git a/tests/TaskHandlerTest.php b/tests/TaskHandlerTest.php index 2b46931..9c7c445 100644 --- a/tests/TaskHandlerTest.php +++ b/tests/TaskHandlerTest.php @@ -7,6 +7,7 @@ use Google\Cloud\Tasks\V2\CloudTasksClient; use Illuminate\Cache\Events\CacheHit; use Illuminate\Cache\Events\KeyWritten; +use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Event; use Illuminate\Support\Facades\Mail; use Mockery; @@ -45,8 +46,22 @@ protected function setUp(): void $googlePublicKey->shouldReceive('getPublicKey')->andReturnNull(); $googlePublicKey->shouldReceive('getKidFromOpenIdToken')->andReturnNull(); + $cloudTasksClient = Mockery::mock(new CloudTasksClient()); + + // Ensure we don't fetch the Queue name and attempts each test... + $cloudTasksClient->shouldReceive('queueName')->andReturn('my-queue'); + $cloudTasksClient->shouldReceive('getQueue')->andReturn(new class { + public function getRetryConfig() { + return new class { + public function getMaxAttempts() { + return 3; + } + }; + } + }); + $this->handler = new TaskHandler( - new CloudTasksClient(), + $cloudTasksClient, request(), $this->jwt, $googlePublicKey @@ -66,21 +81,6 @@ public function it_needs_an_authorization_header() $this->handler->handle(); } - /** @test */ - public function the_authorization_header_must_contain_a_valid_gcloud_token() - { - request()->headers->add([ - 'Authorization' => 'Bearer 123', - ]); - - $this->expectException(CloudTasksException::class); - $this->expectExceptionMessage('Could not decode incoming task'); - - $this->handler->handle(); - - // @todo - test with a valid token, not sure how to do that right now - } - /** @test */ public function it_will_validate_the_token_iss() { @@ -144,8 +144,46 @@ public function it_runs_the_incoming_job() Mail::assertSent(TestMailable::class); } + /** @test */ + public function after_max_attempts_it_will_log_to_failed_table() + { + $this->request->headers->add(['X-Cloudtasks-Queuename' => 'my-queue']); + + $this->request->headers->add(['X-CloudTasks-TaskRetryCount' => 1]); + try { + $this->handler->handle($this->failingJob()); + } catch (\Throwable $e) { + // + } + + $this->assertCount(0, DB::table('failed_jobs')->get()); + + $this->request->headers->add(['X-CloudTasks-TaskRetryCount' => 2]); + try { + $this->handler->handle($this->failingJob()); + } catch (\Throwable $e) { + // + } + + $this->assertDatabaseHas('failed_jobs', [ + 'connection' => 'cloudtasks', + 'queue' => 'my-queue', + 'payload' => rtrim($this->failingJobPayload()), + ]); + } + private function simpleJob() { return json_decode(file_get_contents(__DIR__ . '/Support/test-job-payload.json'), true); } + + private function failingJobPayload() + { + return file_get_contents(__DIR__ . '/Support/failing-job-payload.json'); + } + + private function failingJob() + { + return json_decode(file_get_contents(__DIR__ . '/Support/failing-job-payload.json'), true); + } } diff --git a/tests/TestCase.php b/tests/TestCase.php index 5cc8041..d0e6cba 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -2,10 +2,30 @@ namespace Tests; -use Illuminate\Support\Facades\Artisan; - class TestCase extends \Orchestra\Testbench\TestCase { + public static $migrated = false; + + protected function setUp() + { + parent::setUp(); + + // There is probably a more sane way to do this + if (!static::$migrated) { + unlink(database_path('database.sqlite')); + touch(database_path('database.sqlite')); + + foreach(glob(database_path('migrations/*.php')) as $file) { + unlink($file); + } + + $this->artisan('queue:failed-table'); + $this->artisan('migrate'); + + static::$migrated = true; + } + } + /** * Get package providers. At a minimum this is the package being tested, but also * would include packages upon which our package depends, e.g. Cartalyst/Sentry @@ -45,6 +65,9 @@ protected function getEnvironmentSetUp($app) 'handler' => 'https://localhost/my-handler', 'service_account_email' => 'info@stackkit.io', ]); + + $app['config']->set('database.default', 'sqlite'); + $app['config']->set('database.connections.sqlite.database', database_path('database.sqlite')); } protected function setConfigValue($key, $value) From 9c9455f129bde7db68d27a6d2e3abd2a6762064a Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sun, 28 Mar 2021 15:57:01 +0200 Subject: [PATCH 04/10] Cleanup --- src/TaskHandler.php | 6 +----- tests/GooglePublicKeyTest.php | 1 - tests/Support/FailingJob.php | 1 - tests/TaskHandlerTest.php | 3 +-- 4 files changed, 2 insertions(+), 9 deletions(-) diff --git a/src/TaskHandler.php b/src/TaskHandler.php index bb45073..9f5496f 100644 --- a/src/TaskHandler.php +++ b/src/TaskHandler.php @@ -7,20 +7,16 @@ use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Worker; use Illuminate\Queue\WorkerOptions; -use Firebase\JWT\JWT; class TaskHandler { private $request; - private $guzzle; - private $jwt; private $publicKey; - public function __construct(CloudTasksClient $client, Request $request, JWT $jwt, OpenIdVerificator $publicKey) + public function __construct(CloudTasksClient $client, Request $request, OpenIdVerificator $publicKey) { $this->client = $client; $this->request = $request; - $this->jwt = $jwt; $this->publicKey = $publicKey; } diff --git a/tests/GooglePublicKeyTest.php b/tests/GooglePublicKeyTest.php index 07a8da6..f8e6b4a 100644 --- a/tests/GooglePublicKeyTest.php +++ b/tests/GooglePublicKeyTest.php @@ -8,7 +8,6 @@ use Illuminate\Cache\Events\CacheHit; use Illuminate\Cache\Events\CacheMissed; use Illuminate\Cache\Events\KeyWritten; -use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Event; use Mockery; use phpseclib\Crypt\RSA; diff --git a/tests/Support/FailingJob.php b/tests/Support/FailingJob.php index 6f25e51..f8d9e87 100644 --- a/tests/Support/FailingJob.php +++ b/tests/Support/FailingJob.php @@ -7,7 +7,6 @@ use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; -use Illuminate\Support\Facades\Mail; class FailingJob implements ShouldQueue { diff --git a/tests/TaskHandlerTest.php b/tests/TaskHandlerTest.php index 9c7c445..24a5343 100644 --- a/tests/TaskHandlerTest.php +++ b/tests/TaskHandlerTest.php @@ -63,7 +63,6 @@ public function getMaxAttempts() { $this->handler = new TaskHandler( $cloudTasksClient, request(), - $this->jwt, $googlePublicKey ); @@ -184,6 +183,6 @@ private function failingJobPayload() private function failingJob() { - return json_decode(file_get_contents(__DIR__ . '/Support/failing-job-payload.json'), true); + return json_decode($this->failingJobPayload(), true); } } From de1987b1826ef2d91654a48706657323cadd92ba Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sun, 28 Mar 2021 16:17:12 +0200 Subject: [PATCH 05/10] Cleanup --- tests/TestCase.php | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/TestCase.php b/tests/TestCase.php index d0e6cba..d920e06 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -65,9 +65,6 @@ protected function getEnvironmentSetUp($app) 'handler' => 'https://localhost/my-handler', 'service_account_email' => 'info@stackkit.io', ]); - - $app['config']->set('database.default', 'sqlite'); - $app['config']->set('database.connections.sqlite.database', database_path('database.sqlite')); } protected function setConfigValue($key, $value) From a3ee460e3713376068b923e9c35c88c7061dc3c1 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sun, 28 Mar 2021 16:18:29 +0200 Subject: [PATCH 06/10] Update CHANGELOG.md --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 32ee90f..c092372 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## 2.1.0-beta1 - 2021-03-28 + +**Added** + +- Handling of failed jobs + ## 2.0.1 - 2020-12-06 **Fixed** From 5a24202c08f5081c903b94dc01d4c12a87af6f45 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sun, 28 Mar 2021 16:38:02 +0200 Subject: [PATCH 07/10] Support unlimited job attempts --- src/CloudTasksJob.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/CloudTasksJob.php b/src/CloudTasksJob.php index 5ea0db3..d3d690a 100644 --- a/src/CloudTasksJob.php +++ b/src/CloudTasksJob.php @@ -40,6 +40,10 @@ public function setAttempts($attempts) public function setMaxTries($maxTries) { + if ((int) $maxTries === -1) { + $maxTries = null; + } + $this->maxTries = $maxTries; } From e15ae21059ac5fa8088b9abd2263b3351e06f3a4 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sun, 28 Mar 2021 16:44:21 +0200 Subject: [PATCH 08/10] Run tests From ba6a716d83e5a4851cd329231cfe79e0eba98afe Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sun, 28 Mar 2021 16:55:43 +0200 Subject: [PATCH 09/10] Add missing return type --- tests/TestCase.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/TestCase.php b/tests/TestCase.php index d920e06..87d4a80 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -6,7 +6,7 @@ class TestCase extends \Orchestra\Testbench\TestCase { public static $migrated = false; - protected function setUp() + protected function setUp(): void { parent::setUp(); From 9cfc3ae581127054a96286e1d51404ad7f7f6d27 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sun, 28 Mar 2021 16:57:43 +0200 Subject: [PATCH 10/10] Fix unlink error if database does not exist yet --- tests/TestCase.php | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/TestCase.php b/tests/TestCase.php index 87d4a80..71d043d 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -12,7 +12,10 @@ protected function setUp(): void // There is probably a more sane way to do this if (!static::$migrated) { - unlink(database_path('database.sqlite')); + if (file_exists(database_path('database.sqlite'))) { + unlink(database_path('database.sqlite')); + } + touch(database_path('database.sqlite')); foreach(glob(database_path('migrations/*.php')) as $file) {