diff --git a/spec/Parse.Push.spec.js b/spec/Parse.Push.spec.js index a5771aef88..77f3fbc2cc 100644 --- a/spec/Parse.Push.spec.js +++ b/spec/Parse.Push.spec.js @@ -203,4 +203,261 @@ describe('Parse.Push', () => { done(); }); }); + + const successfulAny = function(body, installations) { + const promises = installations.map((device) => { + return Promise.resolve({ + transmitted: true, + device: device, + }) + }); + + return Promise.all(promises); + }; + + const provideInstallations = function(num) { + if(!num) { + num = 2; + } + + const installations = []; + while(installations.length !== num) { + // add Android installations + const installation = new Parse.Object("_Installation"); + installation.set("installationId", "installation_" + installations.length); + installation.set("deviceToken","device_token_" + installations.length); + installation.set("deviceType", "android"); + installations.push(installation); + } + + return installations; + }; + + const losingAdapter = { + send: function(body, installations) { + // simulate having lost an installation before this was called + // thus invalidating our 'count' in _PushStatus + installations.pop(); + + return successfulAny(body, installations); + }, + getValidPushTypes: function() { + return ["android"]; + } + }; + + /** + * Verifies that _PushStatus cannot get stuck in a 'running' state + * Simulates a simple push where 1 installation is removed between _PushStatus + * count being set and the pushes being sent + */ + it('does not get stuck with _PushStatus \'running\' on 1 installation lost', (done) => { + reconfigureServer({ + push: {adapter: losingAdapter} + }).then(() => { + return Parse.Object.saveAll(provideInstallations()); + }).then(() => { + return Parse.Push.send( + { + data: {alert: "We fixed our status!"}, + where: {deviceType: 'android'} + }, + { useMasterKey: true } + ); + }).then(() => { + // it is enqueued so it can take time + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 1000); + }); + }).then(() => { + // query for push status + const query = new Parse.Query('_PushStatus'); + return query.find({useMasterKey: true}); + }).then((results) => { + // verify status is NOT broken + expect(results.length).toBe(1); + const result = results[0]; + expect(result.get('status')).toEqual('succeeded'); + expect(result.get('numSent')).toEqual(1); + expect(result.get('count')).toEqual(undefined); + done(); + }); + }); + + /** + * Verifies that _PushStatus cannot get stuck in a 'running' state + * Simulates a simple push where 1 installation is added between _PushStatus + * count being set and the pushes being sent + */ + it('does not get stuck with _PushStatus \'running\' on 1 installation added', (done) => { + const installations = provideInstallations(); + + // add 1 iOS installation which we will omit & add later on + const iOSInstallation = new Parse.Object("_Installation"); + iOSInstallation.set("installationId", "installation_" + installations.length); + iOSInstallation.set("deviceToken","device_token_" + installations.length); + iOSInstallation.set("deviceType", "ios"); + installations.push(iOSInstallation); + + reconfigureServer({ + push: { + adapter: { + send: function(body, installations) { + // simulate having added an installation before this was called + // thus invalidating our 'count' in _PushStatus + installations.push(iOSInstallation); + + return successfulAny(body, installations); + }, + getValidPushTypes: function() { + return ["android"]; + } + } + } + }).then(() => { + return Parse.Object.saveAll(installations); + }).then(() => { + return Parse.Push.send( + { + data: {alert: "We fixed our status!"}, + where: {deviceType: {'$ne' : 'random'}} + }, + { useMasterKey: true } + ); + }).then(() => { + // it is enqueued so it can take time + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 1000); + }); + }).then(() => { + // query for push status + const query = new Parse.Query('_PushStatus'); + return query.find({useMasterKey: true}); + }).then((results) => { + // verify status is NOT broken + expect(results.length).toBe(1); + const result = results[0]; + expect(result.get('status')).toEqual('succeeded'); + expect(result.get('numSent')).toEqual(3); + expect(result.get('count')).toEqual(undefined); + done(); + }); + }); + + /** + * Verifies that _PushStatus cannot get stuck in a 'running' state + * Simulates an extended push, where some installations may be removed, + * resulting in a non-zero count + */ + it('does not get stuck with _PushStatus \'running\' on many installations removed', (done) => { + const devices = 1000; + const installations = provideInstallations(devices); + + reconfigureServer({ + push: {adapter: losingAdapter} + }).then(() => { + return Parse.Object.saveAll(installations); + }).then(() => { + return Parse.Push.send( + { + data: {alert: "We fixed our status!"}, + where: {deviceType: 'android'} + }, + { useMasterKey: true } + ); + }).then(() => { + // it is enqueued so it can take time + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 1000); + }); + }).then(() => { + // query for push status + const query = new Parse.Query('_PushStatus'); + return query.find({useMasterKey: true}); + }).then((results) => { + // verify status is NOT broken + expect(results.length).toBe(1); + const result = results[0]; + expect(result.get('status')).toEqual('succeeded'); + // expect # less than # of batches used, assuming each batch is 100 pushes + expect(result.get('numSent')).toEqual(devices - (devices / 100)); + expect(result.get('count')).toEqual(undefined); + done(); + }); + }); + + /** + * Verifies that _PushStatus cannot get stuck in a 'running' state + * Simulates an extended push, where some installations may be added, + * resulting in a non-zero count + */ + it('does not get stuck with _PushStatus \'running\' on many installations added', (done) => { + const devices = 1000; + const installations = provideInstallations(devices); + + // add 1 iOS installation which we will omit & add later on + const iOSInstallations = []; + + while(iOSInstallations.length !== (devices / 100)) { + const iOSInstallation = new Parse.Object("_Installation"); + iOSInstallation.set("installationId", "installation_" + installations.length); + iOSInstallation.set("deviceToken", "device_token_" + installations.length); + iOSInstallation.set("deviceType", "ios"); + installations.push(iOSInstallation); + iOSInstallations.push(iOSInstallation); + } + + reconfigureServer({ + push: { + adapter: { + send: function(body, installations) { + // simulate having added an installation before this was called + // thus invalidating our 'count' in _PushStatus + installations.push(iOSInstallations.pop()); + + return successfulAny(body, installations); + }, + getValidPushTypes: function() { + return ["android"]; + } + } + } + }).then(() => { + return Parse.Object.saveAll(installations); + }).then(() => { + return Parse.Push.send( + { + data: {alert: "We fixed our status!"}, + where: {deviceType: {'$ne' : 'random'}} + }, + { useMasterKey: true } + ); + }).then(() => { + // it is enqueued so it can take time + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 1000); + }); + }).then(() => { + // query for push status + const query = new Parse.Query('_PushStatus'); + return query.find({useMasterKey: true}); + }).then((results) => { + // verify status is NOT broken + expect(results.length).toBe(1); + const result = results[0]; + expect(result.get('status')).toEqual('succeeded'); + // expect # less than # of batches used, assuming each batch is 100 pushes + expect(result.get('numSent')).toEqual(devices + (devices / 100)); + expect(result.get('count')).toEqual(undefined); + done(); + }); + }); }); diff --git a/spec/PushWorker.spec.js b/spec/PushWorker.spec.js index 96317efcc6..2238027550 100644 --- a/spec/PushWorker.spec.js +++ b/spec/PushWorker.spec.js @@ -276,7 +276,7 @@ describe('PushWorker', () => { 'failedPerType.ios': { __op: 'Increment', amount: 1 }, [`sentPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, [`failedPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, - count: { __op: 'Increment', amount: -2 }, + count: { __op: 'Increment', amount: -1 } }); const query = new Parse.Query('_PushStatus'); return query.get(handler.objectId, { useMasterKey: true }); @@ -354,7 +354,7 @@ describe('PushWorker', () => { 'failedPerType.ios': { __op: 'Increment', amount: 1 }, [`sentPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, [`failedPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 }, - count: { __op: 'Increment', amount: -2 }, + count: { __op: 'Increment', amount: -1 } }); done(); }); diff --git a/src/Controllers/SchemaController.js b/src/Controllers/SchemaController.js index d1c15fe498..88de10d4c3 100644 --- a/src/Controllers/SchemaController.js +++ b/src/Controllers/SchemaController.js @@ -88,7 +88,7 @@ const defaultColumns = Object.freeze({ "failedPerType": {type:'Object'}, "sentPerUTCOffset": {type:'Object'}, "failedPerUTCOffset": {type:'Object'}, - "count": {type:'Number'} + "count": {type:'Number'} // tracks # of batches queued and pending }, _JobStatus: { "jobName": {type: 'String'}, diff --git a/src/Push/PushQueue.js b/src/Push/PushQueue.js index cc6fb16e84..095edfc16e 100644 --- a/src/Push/PushQueue.js +++ b/src/Push/PushQueue.js @@ -40,7 +40,7 @@ export class PushQueue { if (!results || count == 0) { return Promise.reject({error: 'PushController: no results in query'}) } - pushStatus.setRunning(count); + pushStatus.setRunning(Math.ceil(count / limit)); let skip = 0; while (skip < count) { const query = { where, diff --git a/src/StatusHandler.js b/src/StatusHandler.js index f77c0bbcce..601fbfe53f 100644 --- a/src/StatusHandler.js +++ b/src/StatusHandler.js @@ -190,10 +190,18 @@ export function pushStatusHandler(config, existingObjectId) { }); } - const setRunning = function(count) { - logger.verbose(`_PushStatus ${objectId}: sending push to %d installations`, count); - return handler.update({status:"pending", objectId: objectId}, - {status: "running", count }); + const setRunning = function(batches) { + logger.verbose(`_PushStatus ${objectId}: sending push to installations with %d batches`, batches); + return handler.update( + { + status:"pending", + objectId: objectId + }, + { + status: "running", + count: batches + } + ); } const trackSent = function(results, UTCOffset, cleanupInstallations = process.env.PARSE_SERVER_CLEANUP_INVALID_INSTALLATIONS) { @@ -235,7 +243,6 @@ export function pushStatusHandler(config, existingObjectId) { } return memo; }, update); - incrementOp(update, 'count', -results.length); } logger.verbose(`_PushStatus ${objectId}: sent push! %d success, %d failures`, update.numSent, update.numFailed); @@ -259,6 +266,9 @@ export function pushStatusHandler(config, existingObjectId) { }); } + // indicate this batch is complete + incrementOp(update, 'count', -1); + return handler.update({ objectId }, update).then((res) => { if (res && res.count === 0) { return this.complete();