From b94864b1e90e5f4178bc626c3e7628a3e8d63873 Mon Sep 17 00:00:00 2001 From: Benjamin Friedman Date: Sat, 4 Nov 2017 09:35:13 -0700 Subject: [PATCH 1/3] Fix for _PushStatus stuck 'running' if count is off --- spec/Parse.Push.spec.js | 261 ++++++++++++++++++++++++++++ spec/PushWorker.spec.js | 2 + src/Controllers/SchemaController.js | 3 +- src/Push/PushQueue.js | 2 +- src/StatusHandler.js | 25 ++- 5 files changed, 285 insertions(+), 8 deletions(-) diff --git a/spec/Parse.Push.spec.js b/spec/Parse.Push.spec.js index a5771aef88..f48d2b0e1f 100644 --- a/spec/Parse.Push.spec.js +++ b/spec/Parse.Push.spec.js @@ -203,4 +203,265 @@ describe('Parse.Push', () => { done(); }); }); + + const successfulAny = function(body, installations) { + const promises = installations.map((device) => { + return Promise.resolve({ + transmitted: true, + device: device, + }) + }); + + return Promise.all(promises); + }; + + const provideInstallations = function(num) { + if(!num) { + num = 2; + } + + const installations = []; + while(installations.length !== num) { + // add Android installations + const installation = new Parse.Object("_Installation"); + installation.set("installationId", "installation_" + installations.length); + installation.set("deviceToken","device_token_" + installations.length); + installation.set("deviceType", "android"); + installations.push(installation); + } + + return installations; + }; + + const losingAdapter = { + send: function(body, installations) { + // simulate having lost an installation before this was called + // thus invalidating our 'count' in _PushStatus + installations.pop(); + + return successfulAny(body, installations); + }, + getValidPushTypes: function() { + return ["android"]; + } + }; + + /** + * Verifies that _PushStatus cannot get stuck in a 'running' state + * Simulates a simple push where 1 installation is removed between _PushStatus + * count being set and the pushes being sent + */ + it('does not get stuck with _PushStatus \'running\' on 1 installation lost', (done) => { + reconfigureServer({ + push: {adapter: losingAdapter} + }).then(() => { + return Parse.Object.saveAll(provideInstallations()); + }).then(() => { + return Parse.Push.send( + { + data: {alert: "We fixed our status!"}, + where: {deviceType: 'android'} + }, + { useMasterKey: true } + ); + }).then(() => { + // it is enqueued so it can take time + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 1000); + }); + }).then(() => { + // query for push status + const query = new Parse.Query('_PushStatus'); + return query.find({useMasterKey: true}); + }).then((results) => { + // verify status is NOT broken + expect(results.length).toBe(1); + const result = results[0]; + expect(result.get('status')).toEqual('succeeded'); + expect(result.get('numSent')).toEqual(1); + expect(result.get('count')).toEqual(undefined); + expect(result.get('batches')).toEqual(undefined); + done(); + }); + }); + + /** + * Verifies that _PushStatus cannot get stuck in a 'running' state + * Simulates a simple push where 1 installation is added between _PushStatus + * count being set and the pushes being sent + */ + it('does not get stuck with _PushStatus \'running\' on 1 installation added', (done) => { + const installations = provideInstallations(); + + // add 1 iOS installation which we will omit & add later on + const iOSInstallation = new Parse.Object("_Installation"); + iOSInstallation.set("installationId", "installation_" + installations.length); + iOSInstallation.set("deviceToken","device_token_" + installations.length); + iOSInstallation.set("deviceType", "ios"); + installations.push(iOSInstallation); + + reconfigureServer({ + push: { + adapter: { + send: function(body, installations) { + // simulate having added an installation before this was called + // thus invalidating our 'count' in _PushStatus + installations.push(iOSInstallation); + + return successfulAny(body, installations); + }, + getValidPushTypes: function() { + return ["android"]; + } + } + } + }).then(() => { + return Parse.Object.saveAll(installations); + }).then(() => { + return Parse.Push.send( + { + data: {alert: "We fixed our status!"}, + where: {deviceType: {'$ne' : 'random'}} + }, + { useMasterKey: true } + ); + }).then(() => { + // it is enqueued so it can take time + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 1000); + }); + }).then(() => { + // query for push status + const query = new Parse.Query('_PushStatus'); + return query.find({useMasterKey: true}); + }).then((results) => { + // verify status is NOT broken + expect(results.length).toBe(1); + const result = results[0]; + expect(result.get('status')).toEqual('succeeded'); + expect(result.get('numSent')).toEqual(3); + expect(result.get('count')).toEqual(undefined); + expect(result.get('batches')).toEqual(undefined); + done(); + }); + }); + + /** + * Verifies that _PushStatus cannot get stuck in a 'running' state + * Simulates an extended push, where some installations may be removed, + * resulting in a non-zero count + */ + it('does not get stuck with _PushStatus \'running\' on many installations removed', (done) => { + const devices = 1000; + const installations = provideInstallations(devices); + + reconfigureServer({ + push: {adapter: losingAdapter} + }).then(() => { + return Parse.Object.saveAll(installations); + }).then(() => { + return Parse.Push.send( + { + data: {alert: "We fixed our status!"}, + where: {deviceType: 'android'} + }, + { useMasterKey: true } + ); + }).then(() => { + // it is enqueued so it can take time + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 1000); + }); + }).then(() => { + // query for push status + const query = new Parse.Query('_PushStatus'); + return query.find({useMasterKey: true}); + }).then((results) => { + // verify status is NOT broken + expect(results.length).toBe(1); + const result = results[0]; + expect(result.get('status')).toEqual('succeeded'); + // expect # less than # of batches used, assuming each batch is 100 pushes + expect(result.get('numSent')).toEqual(devices - (devices / 100)); + expect(result.get('count')).toEqual(undefined); + expect(result.get('batches')).toEqual(undefined); + done(); + }); + }); + + /** + * Verifies that _PushStatus cannot get stuck in a 'running' state + * Simulates an extended push, where some installations may be added, + * resulting in a non-zero count + */ + it('does not get stuck with _PushStatus \'running\' on many installations added', (done) => { + const devices = 1000; + const installations = provideInstallations(devices); + + // add 1 iOS installation which we will omit & add later on + const iOSInstallations = []; + + while(iOSInstallations.length !== (devices / 100)) { + const iOSInstallation = new Parse.Object("_Installation"); + iOSInstallation.set("installationId", "installation_" + installations.length); + iOSInstallation.set("deviceToken", "device_token_" + installations.length); + iOSInstallation.set("deviceType", "ios"); + installations.push(iOSInstallation); + iOSInstallations.push(iOSInstallation); + } + + reconfigureServer({ + push: { + adapter: { + send: function(body, installations) { + // simulate having added an installation before this was called + // thus invalidating our 'count' in _PushStatus + installations.push(iOSInstallations.pop()); + + return successfulAny(body, installations); + }, + getValidPushTypes: function() { + return ["android"]; + } + } + } + }).then(() => { + return Parse.Object.saveAll(installations); + }).then(() => { + return Parse.Push.send( + { + data: {alert: "We fixed our status!"}, + where: {deviceType: {'$ne' : 'random'}} + }, + { useMasterKey: true } + ); + }).then(() => { + // it is enqueued so it can take time + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 1000); + }); + }).then(() => { + // query for push status + const query = new Parse.Query('_PushStatus'); + return query.find({useMasterKey: true}); + }).then((results) => { + // verify status is NOT broken + expect(results.length).toBe(1); + const result = results[0]; + expect(result.get('status')).toEqual('succeeded'); + // expect # less than # of batches used, assuming each batch is 100 pushes + expect(result.get('numSent')).toEqual(devices + (devices / 100)); + expect(result.get('count')).toEqual(undefined); + expect(result.get('batches')).toEqual(undefined); + done(); + }); + }); }); diff --git a/spec/PushWorker.spec.js b/spec/PushWorker.spec.js index 96317efcc6..f2e3a38b2b 100644 --- a/spec/PushWorker.spec.js +++ b/spec/PushWorker.spec.js @@ -277,6 +277,7 @@ describe('PushWorker', () => { [`sentPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, [`failedPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, count: { __op: 'Increment', amount: -2 }, + batches: { __op: 'Increment', amount: -1 }, }); const query = new Parse.Query('_PushStatus'); return query.get(handler.objectId, { useMasterKey: true }); @@ -355,6 +356,7 @@ describe('PushWorker', () => { [`sentPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, [`failedPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, count: { __op: 'Increment', amount: -2 }, + batches: { __op: 'Increment', amount: -1 }, }); done(); }); diff --git a/src/Controllers/SchemaController.js b/src/Controllers/SchemaController.js index d1c15fe498..e13415055b 100644 --- a/src/Controllers/SchemaController.js +++ b/src/Controllers/SchemaController.js @@ -88,7 +88,8 @@ const defaultColumns = Object.freeze({ "failedPerType": {type:'Object'}, "sentPerUTCOffset": {type:'Object'}, "failedPerUTCOffset": {type:'Object'}, - "count": {type:'Number'} + "count": {type:'Number'}, // tracks estimated # of pushes pending + "batches": {type:'Number'} // tracks # of batches queued and pending }, _JobStatus: { "jobName": {type: 'String'}, diff --git a/src/Push/PushQueue.js b/src/Push/PushQueue.js index cc6fb16e84..0da4e0acb8 100644 --- a/src/Push/PushQueue.js +++ b/src/Push/PushQueue.js @@ -40,7 +40,7 @@ export class PushQueue { if (!results || count == 0) { return Promise.reject({error: 'PushController: no results in query'}) } - pushStatus.setRunning(count); + pushStatus.setRunning(count, Math.ceil(count / limit)); let skip = 0; while (skip < count) { const query = { where, diff --git a/src/StatusHandler.js b/src/StatusHandler.js index f77c0bbcce..750d429519 100644 --- a/src/StatusHandler.js +++ b/src/StatusHandler.js @@ -190,10 +190,19 @@ export function pushStatusHandler(config, existingObjectId) { }); } - const setRunning = function(count) { - logger.verbose(`_PushStatus ${objectId}: sending push to %d installations`, count); - return handler.update({status:"pending", objectId: objectId}, - {status: "running", count }); + const setRunning = function(count, batches) { + logger.verbose(`_PushStatus ${objectId}: sending push to %d installations with %d batches`, count, batches); + return handler.update( + { + status:"pending", + objectId: objectId + }, + { + status: "running", + count: count, + batches: batches + } + ); } const trackSent = function(results, UTCOffset, cleanupInstallations = process.env.PARSE_SERVER_CLEANUP_INVALID_INSTALLATIONS) { @@ -259,8 +268,11 @@ export function pushStatusHandler(config, existingObjectId) { }); } + // indicate this batch is complete + incrementOp(update, 'batches', -1); + return handler.update({ objectId }, update).then((res) => { - if (res && res.count === 0) { + if (res && res.batches === 0) { return this.complete(); } }) @@ -269,7 +281,8 @@ export function pushStatusHandler(config, existingObjectId) { const complete = function() { return handler.update({ objectId }, { status: 'succeeded', - count: {__op: 'Delete'} + count: {__op: 'Delete'}, + batches: {__op: 'Delete'} }); } From e892df94cec347be7a5d4b67a361d305e9fa7e73 Mon Sep 17 00:00:00 2001 From: Benjamin Friedman Date: Sat, 4 Nov 2017 10:34:44 -0700 Subject: [PATCH 2/3] use 'count' for batches --- spec/Parse.Push.spec.js | 4 ---- spec/PushWorker.spec.js | 6 ++---- src/Controllers/SchemaController.js | 3 +-- src/Push/PushQueue.js | 2 +- src/StatusHandler.js | 15 ++++++--------- 5 files changed, 10 insertions(+), 20 deletions(-) diff --git a/spec/Parse.Push.spec.js b/spec/Parse.Push.spec.js index f48d2b0e1f..77f3fbc2cc 100644 --- a/spec/Parse.Push.spec.js +++ b/spec/Parse.Push.spec.js @@ -282,7 +282,6 @@ describe('Parse.Push', () => { expect(result.get('status')).toEqual('succeeded'); expect(result.get('numSent')).toEqual(1); expect(result.get('count')).toEqual(undefined); - expect(result.get('batches')).toEqual(undefined); done(); }); }); @@ -345,7 +344,6 @@ describe('Parse.Push', () => { expect(result.get('status')).toEqual('succeeded'); expect(result.get('numSent')).toEqual(3); expect(result.get('count')).toEqual(undefined); - expect(result.get('batches')).toEqual(undefined); done(); }); }); @@ -390,7 +388,6 @@ describe('Parse.Push', () => { // expect # less than # of batches used, assuming each batch is 100 pushes expect(result.get('numSent')).toEqual(devices - (devices / 100)); expect(result.get('count')).toEqual(undefined); - expect(result.get('batches')).toEqual(undefined); done(); }); }); @@ -460,7 +457,6 @@ describe('Parse.Push', () => { // expect # less than # of batches used, assuming each batch is 100 pushes expect(result.get('numSent')).toEqual(devices + (devices / 100)); expect(result.get('count')).toEqual(undefined); - expect(result.get('batches')).toEqual(undefined); done(); }); }); diff --git a/spec/PushWorker.spec.js b/spec/PushWorker.spec.js index f2e3a38b2b..d1f3fe4832 100644 --- a/spec/PushWorker.spec.js +++ b/spec/PushWorker.spec.js @@ -276,8 +276,7 @@ describe('PushWorker', () => { 'failedPerType.ios': { __op: 'Increment', amount: 1 }, [`sentPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, [`failedPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, - count: { __op: 'Increment', amount: -2 }, - batches: { __op: 'Increment', amount: -1 }, + count: { __op: 'Increment', amount: -2 } }); const query = new Parse.Query('_PushStatus'); return query.get(handler.objectId, { useMasterKey: true }); @@ -355,8 +354,7 @@ describe('PushWorker', () => { 'failedPerType.ios': { __op: 'Increment', amount: 1 }, [`sentPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, [`failedPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, - count: { __op: 'Increment', amount: -2 }, - batches: { __op: 'Increment', amount: -1 }, + count: { __op: 'Increment', amount: -2 } }); done(); }); diff --git a/src/Controllers/SchemaController.js b/src/Controllers/SchemaController.js index e13415055b..88de10d4c3 100644 --- a/src/Controllers/SchemaController.js +++ b/src/Controllers/SchemaController.js @@ -88,8 +88,7 @@ const defaultColumns = Object.freeze({ "failedPerType": {type:'Object'}, "sentPerUTCOffset": {type:'Object'}, "failedPerUTCOffset": {type:'Object'}, - "count": {type:'Number'}, // tracks estimated # of pushes pending - "batches": {type:'Number'} // tracks # of batches queued and pending + "count": {type:'Number'} // tracks # of batches queued and pending }, _JobStatus: { "jobName": {type: 'String'}, diff --git a/src/Push/PushQueue.js b/src/Push/PushQueue.js index 0da4e0acb8..095edfc16e 100644 --- a/src/Push/PushQueue.js +++ b/src/Push/PushQueue.js @@ -40,7 +40,7 @@ export class PushQueue { if (!results || count == 0) { return Promise.reject({error: 'PushController: no results in query'}) } - pushStatus.setRunning(count, Math.ceil(count / limit)); + pushStatus.setRunning(Math.ceil(count / limit)); let skip = 0; while (skip < count) { const query = { where, diff --git a/src/StatusHandler.js b/src/StatusHandler.js index 750d429519..601fbfe53f 100644 --- a/src/StatusHandler.js +++ b/src/StatusHandler.js @@ -190,8 +190,8 @@ export function pushStatusHandler(config, existingObjectId) { }); } - const setRunning = function(count, batches) { - logger.verbose(`_PushStatus ${objectId}: sending push to %d installations with %d batches`, count, batches); + const setRunning = function(batches) { + logger.verbose(`_PushStatus ${objectId}: sending push to installations with %d batches`, batches); return handler.update( { status:"pending", @@ -199,8 +199,7 @@ export function pushStatusHandler(config, existingObjectId) { }, { status: "running", - count: count, - batches: batches + count: batches } ); } @@ -244,7 +243,6 @@ export function pushStatusHandler(config, existingObjectId) { } return memo; }, update); - incrementOp(update, 'count', -results.length); } logger.verbose(`_PushStatus ${objectId}: sent push! %d success, %d failures`, update.numSent, update.numFailed); @@ -269,10 +267,10 @@ export function pushStatusHandler(config, existingObjectId) { } // indicate this batch is complete - incrementOp(update, 'batches', -1); + incrementOp(update, 'count', -1); return handler.update({ objectId }, update).then((res) => { - if (res && res.batches === 0) { + if (res && res.count === 0) { return this.complete(); } }) @@ -281,8 +279,7 @@ export function pushStatusHandler(config, existingObjectId) { const complete = function() { return handler.update({ objectId }, { status: 'succeeded', - count: {__op: 'Delete'}, - batches: {__op: 'Delete'} + count: {__op: 'Delete'} }); } From 366ba8a78ac10b7b9fb8a4710e7e447ef4efb07e Mon Sep 17 00:00:00 2001 From: Benjamin Friedman Date: Sat, 4 Nov 2017 10:41:11 -0700 Subject: [PATCH 3/3] push worker test fix --- spec/PushWorker.spec.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/PushWorker.spec.js b/spec/PushWorker.spec.js index d1f3fe4832..2238027550 100644 --- a/spec/PushWorker.spec.js +++ b/spec/PushWorker.spec.js @@ -276,7 +276,7 @@ describe('PushWorker', () => { 'failedPerType.ios': { __op: 'Increment', amount: 1 }, [`sentPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, [`failedPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, - count: { __op: 'Increment', amount: -2 } + count: { __op: 'Increment', amount: -1 } }); const query = new Parse.Query('_PushStatus'); return query.get(handler.objectId, { useMasterKey: true }); @@ -354,7 +354,7 @@ describe('PushWorker', () => { 'failedPerType.ios': { __op: 'Increment', amount: 1 }, [`sentPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, [`failedPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, - count: { __op: 'Increment', amount: -2 } + count: { __op: 'Increment', amount: -1 } }); done(); });