diff --git a/CHANGELOG.md b/CHANGELOG.md
index 32ee90f..c092372 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
+## 2.1.0-beta1 - 2021-03-28
+
+**Added**
+
+- Handling of failed jobs
+
## 2.0.1 - 2020-12-06
**Fixed**
diff --git a/phpunit.xml b/phpunit.xml
index ea7a136..994539f 100644
--- a/phpunit.xml
+++ b/phpunit.xml
@@ -23,6 +23,7 @@
+
diff --git a/src/CloudTasksJob.php b/src/CloudTasksJob.php
index 6367179..d3d690a 100644
--- a/src/CloudTasksJob.php
+++ b/src/CloudTasksJob.php
@@ -10,11 +10,11 @@ class CloudTasksJob extends LaravelJob implements JobContract
{
private $job;
private $attempts;
+ private $maxTries;
- public function __construct($job, $attempts)
+ public function __construct($job)
{
$this->job = $job;
- $this->attempts = $attempts;
$this->container = Container::getInstance();
}
@@ -32,4 +32,28 @@ public function attempts()
{
return $this->attempts;
}
+
+ public function setAttempts($attempts)
+ {
+ $this->attempts = $attempts;
+ }
+
+ public function setMaxTries($maxTries)
+ {
+ if ((int) $maxTries === -1) {
+ $maxTries = null;
+ }
+
+ $this->maxTries = $maxTries;
+ }
+
+ public function maxTries()
+ {
+ return $this->maxTries;
+ }
+
+ public function setQueue($queue)
+ {
+ $this->queue = $queue;
+ }
}
diff --git a/src/TaskHandler.php b/src/TaskHandler.php
index a1eaa85..9f5496f 100644
--- a/src/TaskHandler.php
+++ b/src/TaskHandler.php
@@ -4,22 +4,19 @@
use Google\Cloud\Tasks\V2\CloudTasksClient;
use Illuminate\Http\Request;
+use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Worker;
use Illuminate\Queue\WorkerOptions;
-use Firebase\JWT\JWT;
class TaskHandler
{
private $request;
- private $guzzle;
- private $jwt;
private $publicKey;
- public function __construct(CloudTasksClient $client, Request $request, JWT $jwt, OpenIdVerificator $publicKey)
+ public function __construct(CloudTasksClient $client, Request $request, OpenIdVerificator $publicKey)
{
$this->client = $client;
$this->request = $request;
- $this->jwt = $jwt;
$this->publicKey = $publicKey;
}
@@ -33,6 +30,8 @@ public function handle($task = null)
$task = $task ?: $this->captureTask();
+ $this->listenForEvents();
+
$this->handleTask($task);
}
@@ -81,7 +80,11 @@ private function captureTask()
{
$input = file_get_contents('php://input');
- if ($input === false) {
+ if (!$input) {
+ $input = request('input') ?: false;
+ }
+
+ if (!$input) {
throw new CloudTasksException('Could not read incoming task');
}
@@ -94,19 +97,47 @@ private function captureTask()
return $task;
}
+ private function listenForEvents()
+ {
+ app('events')->listen(JobFailed::class, function ($event) {
+ app('queue.failer')->log(
+ 'cloudtasks', $event->job->getQueue(),
+ $event->job->getRawBody(), $event->exception
+ );
+ });
+ }
+
/**
* @param $task
* @throws CloudTasksException
*/
private function handleTask($task)
{
- $job = new CloudTasksJob($task, request()->header('X-CloudTasks-TaskRetryCount'));
+ $job = new CloudTasksJob($task);
+
+ $job->setAttempts(request()->header('X-CloudTasks-TaskRetryCount') + 1);
+ $job->setQueue(request()->header('X-Cloudtasks-Queuename'));
+ $job->setMaxTries($this->getQueueMaxTries($job));
$worker = $this->getQueueWorker();
$worker->process('cloudtasks', $job, new WorkerOptions());
}
+ private function getQueueMaxTries(CloudTasksJob $job)
+ {
+ $queueName = $this->client->queueName(
+ Config::project(),
+ Config::location(),
+ $job->getQueue()
+ );
+
+ return $this->client
+ ->getQueue($queueName)
+ ->getRetryConfig()
+ ->getMaxAttempts();
+ }
+
/**
* @return Worker
*/
diff --git a/tests/GooglePublicKeyTest.php b/tests/GooglePublicKeyTest.php
index 07a8da6..f8e6b4a 100644
--- a/tests/GooglePublicKeyTest.php
+++ b/tests/GooglePublicKeyTest.php
@@ -8,7 +8,6 @@
use Illuminate\Cache\Events\CacheHit;
use Illuminate\Cache\Events\CacheMissed;
use Illuminate\Cache\Events\KeyWritten;
-use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Event;
use Mockery;
use phpseclib\Crypt\RSA;
diff --git a/tests/Support/FailingJob.php b/tests/Support/FailingJob.php
new file mode 100644
index 0000000..f8d9e87
--- /dev/null
+++ b/tests/Support/FailingJob.php
@@ -0,0 +1,34 @@
+shouldReceive('getPublicKey')->andReturnNull();
$googlePublicKey->shouldReceive('getKidFromOpenIdToken')->andReturnNull();
+ $cloudTasksClient = Mockery::mock(new CloudTasksClient());
+
+ // Ensure we don't fetch the Queue name and attempts each test...
+ $cloudTasksClient->shouldReceive('queueName')->andReturn('my-queue');
+ $cloudTasksClient->shouldReceive('getQueue')->andReturn(new class {
+ public function getRetryConfig() {
+ return new class {
+ public function getMaxAttempts() {
+ return 3;
+ }
+ };
+ }
+ });
+
$this->handler = new TaskHandler(
- new CloudTasksClient(),
+ $cloudTasksClient,
request(),
- $this->jwt,
$googlePublicKey
);
@@ -66,21 +80,6 @@ public function it_needs_an_authorization_header()
$this->handler->handle();
}
- /** @test */
- public function the_authorization_header_must_contain_a_valid_gcloud_token()
- {
- request()->headers->add([
- 'Authorization' => 'Bearer 123',
- ]);
-
- $this->expectException(CloudTasksException::class);
- $this->expectExceptionMessage('Could not decode incoming task');
-
- $this->handler->handle();
-
- // @todo - test with a valid token, not sure how to do that right now
- }
-
/** @test */
public function it_will_validate_the_token_iss()
{
@@ -144,8 +143,46 @@ public function it_runs_the_incoming_job()
Mail::assertSent(TestMailable::class);
}
+ /** @test */
+ public function after_max_attempts_it_will_log_to_failed_table()
+ {
+ $this->request->headers->add(['X-Cloudtasks-Queuename' => 'my-queue']);
+
+ $this->request->headers->add(['X-CloudTasks-TaskRetryCount' => 1]);
+ try {
+ $this->handler->handle($this->failingJob());
+ } catch (\Throwable $e) {
+ //
+ }
+
+ $this->assertCount(0, DB::table('failed_jobs')->get());
+
+ $this->request->headers->add(['X-CloudTasks-TaskRetryCount' => 2]);
+ try {
+ $this->handler->handle($this->failingJob());
+ } catch (\Throwable $e) {
+ //
+ }
+
+ $this->assertDatabaseHas('failed_jobs', [
+ 'connection' => 'cloudtasks',
+ 'queue' => 'my-queue',
+ 'payload' => rtrim($this->failingJobPayload()),
+ ]);
+ }
+
private function simpleJob()
{
return json_decode(file_get_contents(__DIR__ . '/Support/test-job-payload.json'), true);
}
+
+ private function failingJobPayload()
+ {
+ return file_get_contents(__DIR__ . '/Support/failing-job-payload.json');
+ }
+
+ private function failingJob()
+ {
+ return json_decode($this->failingJobPayload(), true);
+ }
}
diff --git a/tests/TestCase.php b/tests/TestCase.php
index 5cc8041..71d043d 100644
--- a/tests/TestCase.php
+++ b/tests/TestCase.php
@@ -2,10 +2,33 @@
namespace Tests;
-use Illuminate\Support\Facades\Artisan;
-
class TestCase extends \Orchestra\Testbench\TestCase
{
+ public static $migrated = false;
+
+ protected function setUp(): void
+ {
+ parent::setUp();
+
+ // There is probably a more sane way to do this
+ if (!static::$migrated) {
+ if (file_exists(database_path('database.sqlite'))) {
+ unlink(database_path('database.sqlite'));
+ }
+
+ touch(database_path('database.sqlite'));
+
+ foreach(glob(database_path('migrations/*.php')) as $file) {
+ unlink($file);
+ }
+
+ $this->artisan('queue:failed-table');
+ $this->artisan('migrate');
+
+ static::$migrated = true;
+ }
+ }
+
/**
* Get package providers. At a minimum this is the package being tested, but also
* would include packages upon which our package depends, e.g. Cartalyst/Sentry