diff --git a/CHANGELOG.md b/CHANGELOG.md index 32ee90f..c092372 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## 2.1.0-beta1 - 2021-03-28 + +**Added** + +- Handling of failed jobs + ## 2.0.1 - 2020-12-06 **Fixed** diff --git a/phpunit.xml b/phpunit.xml index ea7a136..994539f 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -23,6 +23,7 @@ + diff --git a/src/CloudTasksJob.php b/src/CloudTasksJob.php index 6367179..d3d690a 100644 --- a/src/CloudTasksJob.php +++ b/src/CloudTasksJob.php @@ -10,11 +10,11 @@ class CloudTasksJob extends LaravelJob implements JobContract { private $job; private $attempts; + private $maxTries; - public function __construct($job, $attempts) + public function __construct($job) { $this->job = $job; - $this->attempts = $attempts; $this->container = Container::getInstance(); } @@ -32,4 +32,28 @@ public function attempts() { return $this->attempts; } + + public function setAttempts($attempts) + { + $this->attempts = $attempts; + } + + public function setMaxTries($maxTries) + { + if ((int) $maxTries === -1) { + $maxTries = null; + } + + $this->maxTries = $maxTries; + } + + public function maxTries() + { + return $this->maxTries; + } + + public function setQueue($queue) + { + $this->queue = $queue; + } } diff --git a/src/TaskHandler.php b/src/TaskHandler.php index a1eaa85..9f5496f 100644 --- a/src/TaskHandler.php +++ b/src/TaskHandler.php @@ -4,22 +4,19 @@ use Google\Cloud\Tasks\V2\CloudTasksClient; use Illuminate\Http\Request; +use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Worker; use Illuminate\Queue\WorkerOptions; -use Firebase\JWT\JWT; class TaskHandler { private $request; - private $guzzle; - private $jwt; private $publicKey; - public function __construct(CloudTasksClient $client, Request $request, JWT $jwt, OpenIdVerificator $publicKey) + public function __construct(CloudTasksClient $client, Request $request, OpenIdVerificator $publicKey) { $this->client = $client; $this->request = $request; - $this->jwt = $jwt; $this->publicKey = $publicKey; } @@ -33,6 +30,8 @@ public function handle($task = null) $task = $task ?: $this->captureTask(); + $this->listenForEvents(); + $this->handleTask($task); } @@ -81,7 +80,11 @@ private function captureTask() { $input = file_get_contents('php://input'); - if ($input === false) { + if (!$input) { + $input = request('input') ?: false; + } + + if (!$input) { throw new CloudTasksException('Could not read incoming task'); } @@ -94,19 +97,47 @@ private function captureTask() return $task; } + private function listenForEvents() + { + app('events')->listen(JobFailed::class, function ($event) { + app('queue.failer')->log( + 'cloudtasks', $event->job->getQueue(), + $event->job->getRawBody(), $event->exception + ); + }); + } + /** * @param $task * @throws CloudTasksException */ private function handleTask($task) { - $job = new CloudTasksJob($task, request()->header('X-CloudTasks-TaskRetryCount')); + $job = new CloudTasksJob($task); + + $job->setAttempts(request()->header('X-CloudTasks-TaskRetryCount') + 1); + $job->setQueue(request()->header('X-Cloudtasks-Queuename')); + $job->setMaxTries($this->getQueueMaxTries($job)); $worker = $this->getQueueWorker(); $worker->process('cloudtasks', $job, new WorkerOptions()); } + private function getQueueMaxTries(CloudTasksJob $job) + { + $queueName = $this->client->queueName( + Config::project(), + Config::location(), + $job->getQueue() + ); + + return $this->client + ->getQueue($queueName) + ->getRetryConfig() + ->getMaxAttempts(); + } + /** * @return Worker */ diff --git a/tests/GooglePublicKeyTest.php b/tests/GooglePublicKeyTest.php index 07a8da6..f8e6b4a 100644 --- a/tests/GooglePublicKeyTest.php +++ b/tests/GooglePublicKeyTest.php @@ -8,7 +8,6 @@ use Illuminate\Cache\Events\CacheHit; use Illuminate\Cache\Events\CacheMissed; use Illuminate\Cache\Events\KeyWritten; -use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Event; use Mockery; use phpseclib\Crypt\RSA; diff --git a/tests/Support/FailingJob.php b/tests/Support/FailingJob.php new file mode 100644 index 0000000..f8d9e87 --- /dev/null +++ b/tests/Support/FailingJob.php @@ -0,0 +1,34 @@ +shouldReceive('getPublicKey')->andReturnNull(); $googlePublicKey->shouldReceive('getKidFromOpenIdToken')->andReturnNull(); + $cloudTasksClient = Mockery::mock(new CloudTasksClient()); + + // Ensure we don't fetch the Queue name and attempts each test... + $cloudTasksClient->shouldReceive('queueName')->andReturn('my-queue'); + $cloudTasksClient->shouldReceive('getQueue')->andReturn(new class { + public function getRetryConfig() { + return new class { + public function getMaxAttempts() { + return 3; + } + }; + } + }); + $this->handler = new TaskHandler( - new CloudTasksClient(), + $cloudTasksClient, request(), - $this->jwt, $googlePublicKey ); @@ -66,21 +80,6 @@ public function it_needs_an_authorization_header() $this->handler->handle(); } - /** @test */ - public function the_authorization_header_must_contain_a_valid_gcloud_token() - { - request()->headers->add([ - 'Authorization' => 'Bearer 123', - ]); - - $this->expectException(CloudTasksException::class); - $this->expectExceptionMessage('Could not decode incoming task'); - - $this->handler->handle(); - - // @todo - test with a valid token, not sure how to do that right now - } - /** @test */ public function it_will_validate_the_token_iss() { @@ -144,8 +143,46 @@ public function it_runs_the_incoming_job() Mail::assertSent(TestMailable::class); } + /** @test */ + public function after_max_attempts_it_will_log_to_failed_table() + { + $this->request->headers->add(['X-Cloudtasks-Queuename' => 'my-queue']); + + $this->request->headers->add(['X-CloudTasks-TaskRetryCount' => 1]); + try { + $this->handler->handle($this->failingJob()); + } catch (\Throwable $e) { + // + } + + $this->assertCount(0, DB::table('failed_jobs')->get()); + + $this->request->headers->add(['X-CloudTasks-TaskRetryCount' => 2]); + try { + $this->handler->handle($this->failingJob()); + } catch (\Throwable $e) { + // + } + + $this->assertDatabaseHas('failed_jobs', [ + 'connection' => 'cloudtasks', + 'queue' => 'my-queue', + 'payload' => rtrim($this->failingJobPayload()), + ]); + } + private function simpleJob() { return json_decode(file_get_contents(__DIR__ . '/Support/test-job-payload.json'), true); } + + private function failingJobPayload() + { + return file_get_contents(__DIR__ . '/Support/failing-job-payload.json'); + } + + private function failingJob() + { + return json_decode($this->failingJobPayload(), true); + } } diff --git a/tests/TestCase.php b/tests/TestCase.php index 5cc8041..71d043d 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -2,10 +2,33 @@ namespace Tests; -use Illuminate\Support\Facades\Artisan; - class TestCase extends \Orchestra\Testbench\TestCase { + public static $migrated = false; + + protected function setUp(): void + { + parent::setUp(); + + // There is probably a more sane way to do this + if (!static::$migrated) { + if (file_exists(database_path('database.sqlite'))) { + unlink(database_path('database.sqlite')); + } + + touch(database_path('database.sqlite')); + + foreach(glob(database_path('migrations/*.php')) as $file) { + unlink($file); + } + + $this->artisan('queue:failed-table'); + $this->artisan('migrate'); + + static::$migrated = true; + } + } + /** * Get package providers. At a minimum this is the package being tested, but also * would include packages upon which our package depends, e.g. Cartalyst/Sentry