Pārlūkot izejas kodu

Refactor PushService for more coordinated retrying

The PushService can spawn a Push instance which...

- Retransmits the Push until an undefined goal has been achieved
- Applies a TTL to the push message
- Collapses push messages by applying a collapse key
Lennart Grahl 6 gadi atpakaļ
vecāks
revīzija
3d069a1caf
4 mainītis faili ar 335 papildinājumiem un 110 dzēšanām
  1. 18 0
      src/exceptions.ts
  2. 262 87
      src/services/push.ts
  3. 48 23
      src/services/webclient.ts
  4. 7 0
      src/threema.d.ts

+ 18 - 0
src/exceptions.ts

@@ -0,0 +1,18 @@
+/**
+ * This file is part of Threema Web.
+ *
+ * Threema Web is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with Threema Web. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+export class TimeoutError extends Error {}

+ 262 - 87
src/services/push.ts

@@ -15,123 +15,298 @@
  * along with Threema Web. If not, see <http://www.gnu.org/licenses/>.
  */
 
+import {TimeoutError} from '../exceptions';
+import {randomString, sleep} from '../helpers';
 import {sha256} from '../helpers/crypto';
 
+/**
+ * A push session will send pushes continuously until an undefined goal has
+ * been achieved which needs to call the `.done` method to stop pushes.
+ *
+ * The push session will stop and reject the returned promise in case the
+ * push relay determined a client error (e.g. an invalid push token). In any
+ * other case, it will continue sending pushes. Thus, it is crucial to call
+ * `.done` eventually!
+ *
+ * With default settings, the push session will send a push in the following
+ * intervals: 0s, 2s, 4s, 8s, 16s, 30s (maximum), 30s, ...
+ *
+ * The first push will use a TTL (time to live) of 0, the second push a TTL of
+ * 15s, and all subsequent pushes will use a TTL of 90s.
+ *
+ * The default settings intend to wake up the app immediately by the first push
+ * which uses a TTL of 0, indicating the push server to deliver *now or never*.
+ * The mid TTL tries to work around issues with FCM clients interpreting the
+ * TTL as *don't need to dispatch until expired*. And the TTL of 90s acts as a
+ * last resort mechanism to wake up the app eventually.
+ *
+ * Furthermore, the collapse key ensures that only one push per session will be
+ * stored on the push server.
+ */
+export class PushSession {
+    private readonly $log: ng.ILogService;
+    private readonly service: PushService;
+    private readonly session: Uint8Array;
+    private readonly config: threema.PushSessionConfig;
+    private readonly collapseKey: string = randomString(6);
+    private readonly doneFuture: Future<any> = new Future();
+    private logTag: string = '[Push]';
+    private running: boolean = false;
+    private retryTimeoutMs: number;
+    private tries: number = 0;
+
+    /**
+     * Return the default configuration.
+     */
+    public static get defaultConfig(): threema.PushSessionConfig {
+        return {
+            retryTimeoutInitMs: 2000,
+            retryTimeoutMaxMs: 30000,
+            triesMax: 3,
+            timeToLiveRange: [0, 15, 90],
+        };
+    }
+
+    /**
+     * Return the expected maximum period until the session will be forcibly
+     * rejected.
+     *
+     * Note: The actual maximum period will usually be larger since the HTTP
+     *       request itself can take an arbitrary amount of time.
+     */
+    public static expectedPeriodMaxMs(config?: threema.PushSessionConfig): number {
+        if (config === undefined) {
+            config = PushSession.defaultConfig;
+        }
+        if (config.triesMax === Number.POSITIVE_INFINITY) {
+            return Number.POSITIVE_INFINITY;
+        }
+        let retryTimeoutMs = config.retryTimeoutInitMs;
+        let sumMs = 0;
+        for (let i = 0; i < config.triesMax; ++i) {
+            sumMs += retryTimeoutMs;
+            retryTimeoutMs = Math.min(retryTimeoutMs * 2, config.retryTimeoutMaxMs);
+        }
+        return sumMs;
+    }
+
+    /**
+     * Create a push session.
+     *
+     * @param service The associated `PushService` instance.
+     * @param session Session identifier (public permanent key of the
+     *   initiator)
+     * @param config Push session configuration.
+     */
+    public constructor(service: PushService, session: Uint8Array, config?: threema.PushSessionConfig) {
+        this.$log = service.$log;
+        this.service = service;
+        this.session = session;
+        this.config = config !== undefined ? config : PushSession.defaultConfig;
+        this.retryTimeoutMs = this.config.retryTimeoutInitMs;
+
+        // Sanity checks
+        if (this.config.timeToLiveRange.length === 0) {
+            throw new Error('timeToLiveRange must not be an empty array');
+        }
+        if (this.config.triesMax < 1) {
+            throw new Error('triesMax must be >= 1');
+        }
+    }
+
+    /**
+     * The promise resolves once the session has been marked as *done*.
+     *
+     * It will reject in case the server indicated a bad request or the maximum
+     * amount of retransmissions have been reached.
+     *
+     * @throws TimeoutError in case the maximum amount of retries has been
+     *   reached.
+     * @throws Error in case of an unrecoverable error which prevents further
+     *   pushes.
+     */
+    public start(): Promise<void> {
+        // Start sending
+        if (!this.running) {
+            this.run().catch((error) => {
+                this.$log.error(this.logTag, 'Push runner failed:', error);
+                this.doneFuture.reject(error);
+            });
+            this.running = true;
+        }
+        return this.doneFuture;
+    }
+
+    /**
+     * Mark as done and stop sending push messages.
+     *
+     * This will resolve all pending promises.
+     */
+    public done(): void {
+        this.$log.info(this.logTag, 'Push done');
+        this.doneFuture.resolve();
+    }
+
+    private async run(): Promise<void> {
+        // Calculate session hash
+        const sessionHash = await sha256(this.session.buffer);
+        this.logTag = `[Push.${sessionHash}.${this.collapseKey}]`;
+
+        // Prepare data
+        const data = new URLSearchParams();
+        data.set(PushService.ARG_TYPE, this.service.pushType);
+        data.set(PushService.ARG_SESSION, sessionHash);
+        data.set(PushService.ARG_VERSION, `${this.service.version}`);
+        if (this.service.pushType === threema.PushTokenType.Apns) {
+            // APNS token format: "<hex-deviceid>;<endpoint>;<bundle-id>"
+            const parts = this.service.pushToken.split(';');
+            if (parts.length < 3) {
+                throw new Error(`APNS push token contains ${parts.length} parts, but at least 3 are required`);
+            }
+            data.set(PushService.ARG_TOKEN, parts[0]);
+            data.set(PushService.ARG_ENDPOINT, parts[1]);
+            data.set(PushService.ARG_BUNDLE_ID, parts[2]);
+        } else if (this.service.pushType === threema.PushTokenType.Gcm) {
+            data.set(PushService.ARG_TOKEN, this.service.pushToken);
+        } else {
+            throw new Error(`Invalid push type: ${this.service.pushType}`);
+        }
+
+        // Push until done or unrecoverable error
+        while (!this.doneFuture.done) {
+            // Determine TTL
+            let timeToLive = this.config.timeToLiveRange[this.tries];
+            if (timeToLive === undefined) {
+                timeToLive = this.config.timeToLiveRange[this.config.timeToLiveRange.length - 1];
+            }
+
+            // Set/Remove collapse key
+            if (timeToLive === 0) {
+                data.delete(PushService.ARG_COLLAPSE_KEY);
+            } else {
+                data.set(PushService.ARG_COLLAPSE_KEY, this.collapseKey);
+            }
+
+            // Modify data
+            data.set(PushService.ARG_TIME_TO_LIVE, `${timeToLive}`);
+            ++this.tries;
+
+            // Send push
+            this.$log.debug(this.logTag, `Sending push ${this.tries}/${this.config.triesMax} (ttl=${timeToLive})`);
+            if (this.service.config.DEBUG) {
+                this.$log.debug(this.logTag, 'Push data:', `${data}`);
+            }
+            try {
+                const response = await fetch(this.service.url, {
+                    method: 'POST',
+                    headers: {
+                        'Content-Type': 'application/x-www-form-urlencoded',
+                    },
+                    body: data,
+                });
+
+                // Check if successful
+                if (response.ok) {
+                    // Success: Retry
+                    this.$log.debug(this.logTag, 'Push sent successfully');
+                } else if (response.status >= 400 && response.status < 500) {
+                    // Client error: Don't retry
+                    const error = `Push rejected (client error), status: ${response.status}`;
+                    this.$log.warn(this.logTag, error);
+                    this.doneFuture.reject(new Error(error));
+                } else {
+                    // Server error: Retry
+                    this.$log.warn(this.logTag, `Push rejected (server error), status: ${response.status}`);
+                }
+            } catch (error) {
+                this.$log.warn(this.logTag, 'Sending push failed:', error);
+            }
+
+            // Retry after timeout
+            await sleep(this.retryTimeoutMs);
+
+            // Apply RTO backoff
+            this.retryTimeoutMs = Math.min(this.retryTimeoutMs * 2, this.config.retryTimeoutMaxMs);
+
+            // Maximum tries reached?
+            if (this.tries === this.config.triesMax) {
+                const error = `Push session timeout after ${this.tries} tries`;
+                this.$log.warn(this.logTag, error);
+                this.doneFuture.reject(new TimeoutError(error));
+            }
+        }
+    }
+}
+
 export class PushService {
-    private static ARG_TYPE = 'type';
-    private static ARG_TOKEN = 'token';
-    private static ARG_SESSION = 'session';
-    private static ARG_VERSION = 'version';
-    private static ARG_ENDPOINT = 'endpoint';
-    private static ARG_BUNDLE_ID = 'bundleid';
-
-    private logTag: string = '[PushService]';
-
-    private $http: ng.IHttpService;
-    private $log: ng.ILogService;
-    private $httpParamSerializerJQLike;
-
-    private url: string;
-    private pushToken: string = null;
-    private pushType = threema.PushTokenType.Gcm;
-    private version: number = null;
-
-    public static $inject = ['$http', '$log', '$httpParamSerializerJQLike', 'CONFIG', 'PROTOCOL_VERSION'];
-    constructor($http: ng.IHttpService, $log: ng.ILogService, $httpParamSerializerJQLike,
-                CONFIG: threema.Config, PROTOCOL_VERSION: number) {
-        this.$http = $http;
+    public static readonly $inject = ['$log', 'CONFIG', 'PROTOCOL_VERSION'];
+
+    public static readonly ARG_TYPE = 'type';
+    public static readonly ARG_TOKEN = 'token';
+    public static readonly ARG_SESSION = 'session';
+    public static readonly ARG_VERSION = 'version';
+    public static readonly ARG_ENDPOINT = 'endpoint';
+    public static readonly ARG_BUNDLE_ID = 'bundleid';
+    public static readonly ARG_TIME_TO_LIVE = 'ttl';
+    public static readonly ARG_COLLAPSE_KEY = 'collapse_key';
+
+    private readonly logTag: string = '[PushService]';
+    public readonly $log: ng.ILogService;
+    public readonly config: threema.Config;
+    public readonly url: string;
+    public readonly version: number = null;
+    private _pushToken: string = null;
+    private _pushType = threema.PushTokenType.Gcm;
+
+    constructor($log: ng.ILogService, CONFIG: threema.Config, PROTOCOL_VERSION: number) {
         this.$log = $log;
-        this.$httpParamSerializerJQLike = $httpParamSerializerJQLike;
+        this.config = CONFIG;
         this.url = CONFIG.PUSH_URL;
         this.version = PROTOCOL_VERSION;
     }
 
+    public get pushToken(): string {
+        return this._pushToken;
+    }
+
+    public get pushType(): string {
+        return this._pushType;
+    }
+
     /**
      * Initiate the push service with a push token.
      */
     public init(pushToken: string, pushTokenType: threema.PushTokenType): void {
         this.$log.info(this.logTag, 'Initialized with', pushTokenType, 'token');
-        this.pushToken = pushToken;
-        this.pushType = pushTokenType;
+        this._pushToken = pushToken;
+        this._pushType = pushTokenType;
     }
 
     /**
      * Reset the push service, remove stored push tokens.
      */
     public reset(): void {
-        this.pushToken = null;
+        this._pushToken = null;
     }
 
     /**
-     * Return true if service has been initialized with a push token.
+     * Return whether the service has been initialized with a push token.
      */
     public isAvailable(): boolean {
-        return this.pushToken != null;
+        return this._pushToken != null;
     }
 
     /**
-     * Send a push notification for the specified session (public permanent key
-     * of the initiator). The promise is always resolved to a boolean.
-     *
-     * If something goes wrong, the promise is rejected with an `Error` instance.
+     * Create a push session for a specific session (public permanent key of
+     * the initiator) which will repeatedly send push messages until the
+     * session is marked as established.
      */
-    public async sendPush(session: Uint8Array): Promise<boolean> {
+    public createSession(session: Uint8Array, config?: threema.PushSessionConfig): PushSession {
         if (!this.isAvailable()) {
-            return false;
+            throw new Error('Push service unavailable');
         }
 
-        // Calculate session hash
-        const sessionHash = await sha256(session.buffer);
-
-        // Prepare request
-        const data = {
-            [PushService.ARG_TYPE]: this.pushType,
-            [PushService.ARG_SESSION]: sessionHash,
-            [PushService.ARG_VERSION]: this.version,
-        };
-        if (this.pushType === threema.PushTokenType.Apns) {
-            // APNS token format: "<hex-deviceid>;<endpoint>;<bundle-id>"
-            const parts = this.pushToken.split(';');
-            if (parts.length < 3) {
-                this.$log.warn(this.logTag, 'APNS push token contains', parts.length, 'parts, at least 3 are required');
-                return false;
-            }
-            data[PushService.ARG_TOKEN] = parts[0];
-            data[PushService.ARG_ENDPOINT] = parts[1];
-            data[PushService.ARG_BUNDLE_ID] = parts[2];
-        } else if (this.pushType === threema.PushTokenType.Gcm) {
-            data[PushService.ARG_TOKEN] = this.pushToken;
-        } else {
-            this.$log.warn(this.logTag, 'Invalid push type');
-            return false;
-        }
-
-        const request = {
-            method: 'POST',
-            url: this.url,
-            headers: {
-                'Content-Type': 'application/x-www-form-urlencoded',
-            },
-            data: this.$httpParamSerializerJQLike(data),
-        };
-
-        // Send push
-        return new Promise((resolve) => {
-            this.$http(request).then(
-                (successResponse) => {
-                    if (successResponse.status === 204) {
-                        this.$log.debug(this.logTag, 'Sent push');
-                        resolve(true);
-                    } else {
-                        this.$log.warn(this.logTag, 'Sending push failed: HTTP ' + successResponse.status);
-                        resolve(false);
-                    }
-                },
-                (errorResponse) => {
-                    this.$log.warn(this.logTag, 'Sending push failed:', errorResponse);
-                    resolve(false);
-                },
-            );
-        }) as Promise<boolean>;
+        // Create push instance
+        return new PushSession(this, session, config);
     }
 }

+ 48 - 23
src/services/webclient.ts

@@ -37,7 +37,7 @@ import {MessageService} from './message';
 import {MimeService} from './mime';
 import {NotificationService} from './notification';
 import {PeerConnectionHelper} from './peerconnection';
-import {PushService} from './push';
+import {PushService, PushSession} from './push';
 import {QrCodeService} from './qrcode';
 import {ReceiverService} from './receiver';
 import {StateService} from './state';
@@ -45,6 +45,7 @@ import {TimeoutService} from './timeout';
 import {TitleService} from './title';
 import {VersionService} from './version';
 
+import {TimeoutError} from '../exceptions';
 import {ChunkCache} from '../protocol/cache';
 import {SequenceNumber} from '../protocol/sequence_number';
 
@@ -186,7 +187,6 @@ export class WebClientService {
     private pendingInitializationStepRoutines: Set<threema.InitializationStepRoutine> = new Set();
     private initialized: Set<threema.InitializationStep> = new Set();
     private stateService: StateService;
-    private lastPush: Date = null;
 
     // Session connection
     private saltyRtcHost: string = null;
@@ -215,8 +215,13 @@ export class WebClientService {
     public conversations: threema.Container.Conversations;
     public receivers: threema.Container.Receivers;
     public alerts: threema.Alert[] = [];
+
+    // Push
     private pushToken: string = null;
     private pushTokenType: threema.PushTokenType = null;
+    private pushSession: PushSession = null;
+    private pushPromise: Promise<any> = null;
+    private readonly pushExpectedPeriodMaxMs: number = PushSession.expectedPeriodMaxMs();
 
     // Timeouts
     private batteryStatusTimeout: ng.IPromise<void> = null;
@@ -484,6 +489,12 @@ export class WebClientService {
         // We want to know about new responders.
         this.salty.on('new-responder', () => {
             if (!this.startupDone) {
+                // Pushing complete
+                if (this.pushSession !== null) {
+                    this.pushSession.done();
+                    this.pushSession = null;
+                }
+
                 // Peer handshake
                 this.stateService.updateConnectionBuildupState('peer_handshake');
             }
@@ -1018,28 +1029,36 @@ export class WebClientService {
 
     /**
      * Send a push message to wake up the peer.
-     * The push message will only be sent if the last push is less than 2 seconds ago.
-     */
-    private sendPush(): void {
-        // Make sure not to flood the target device with pushes
-        const minPushInterval = 2000;
-        const now = new Date();
-        if (this.lastPush !== null && (now.getTime() - this.lastPush.getTime()) < minPushInterval) {
-            this.$log.debug(this.logTag,
-                'Skipping push, last push was requested less than ' + (minPushInterval / 1000) + 's ago');
-            return;
+     *
+     * Returns the maximum expected period until the promise will be resolved,
+     * and the promise itself.
+     */
+    public sendPush(): [number, Promise<void>] {
+        // Create new session
+        if (this.pushSession === null) {
+            this.pushSession = this.pushService.createSession(this.salty.permanentKeyBytes);
+
+            // Start and handle errors
+            this.pushPromise = this.pushSession.start().catch((error) => {
+                if (error instanceof TimeoutError) {
+                    // TODO: Show device unreachable dialog
+                    // TODO: If unreachable dialog is already shown, set .retrying to false (with root scope)
+                    // this.showDeviceUnreachableDialog();
+                } else {
+                    this.failSession();
+                }
+            });
+
+            // Update state
+            if (!this.$rootScope.$$phase) {
+                this.$rootScope.$apply(() => this.stateService.updateConnectionBuildupState('push'));
+            } else {
+                this.stateService.updateConnectionBuildupState('push');
+            }
         }
-        this.lastPush = now;
 
-        // Actually send the push notification
-        this.pushService.sendPush(this.salty.permanentKeyBytes)
-            .then(() => {
-                this.$log.debug(this.logTag, 'Requested app wakeup via', this.pushTokenType, 'push');
-                this.$rootScope.$apply(() => {
-                    this.stateService.updateConnectionBuildupState('push');
-                });
-            })
-            .catch((e: Error) => this.$log.error(this.logTag, 'Could not send wakeup push to app: ' + e.message));
+        // Retrieve the expected maximum period
+        return [this.pushExpectedPeriodMaxMs, this.pushPromise];
     }
 
     /**
@@ -1101,6 +1120,12 @@ export class WebClientService {
         let close = args.close !== false;
         let remove = false;
 
+        // Stop push session
+        if (this.pushSession !== null) {
+            this.pushSession.done();
+            this.pushSession = null;
+        }
+
         // Session deleted: Force close and delete
         if (args.reason === DisconnectReason.SessionDeleted) {
             close = true;
@@ -3121,7 +3146,7 @@ export class WebClientService {
                     this.$log.error(this.logTag, 'Invalid operating system in client info');
             }
         }
-        if (this.pushToken && this.pushTokenType) {
+        if (this.pushToken !== null && this.pushTokenType !== null) {
             this.pushService.init(this.pushToken, this.pushTokenType);
         }
 

+ 7 - 0
src/threema.d.ts

@@ -487,6 +487,13 @@ declare namespace threema {
         text: string;
     }
 
+    interface PushSessionConfig {
+        retryTimeoutInitMs: number;
+        retryTimeoutMaxMs: number;
+        triesMax: number;
+        timeToLiveRange: number[];
+    }
+
     const enum PushTokenType {
         Gcm = 'gcm',
         Apns = 'apns',