Browse Source

Implement the connection ack procedure & various bug fixes

Add a SequenceNumber class
Use the SequenceNumber class in the ChunkCache class
Add scheduling of the connection ack message to be sent periodically once the connection has been established
Add respond to connection ack requests
Add an incoming chunk sequence number and increment it on each incoming chunk
Fix don't recreate the unchunker instance on session resume
Fix actually don't resume the session if it has not been requested
Fix re-establish the message sequence number since that needs to be persistent across sessions
Fix send the correct sequence number in the connection info message (d'oh!)
Remove obsolete outgoingMessageQueue
Lennart Grahl 7 years ago
parent
commit
b9e37461f8
3 changed files with 284 additions and 90 deletions
  1. 12 14
      src/protocol/cache.ts
  2. 71 0
      src/protocol/sequence_number.ts
  3. 201 76
      src/services/webclient.ts

+ 12 - 14
src/protocol/cache.ts

@@ -15,25 +15,26 @@
  * along with Threema Web. If not, see <http://www.gnu.org/licenses/>.
  */
 
+import {SequenceNumber} from './sequence_number';
+
 export type CachedChunk = Uint8Array | null;
 
 /**
- * Contains messages that have not yet been acknowledged,
+ * Contains chunks that have not yet been acknowledged.
  */
 export class ChunkCache {
-    private readonly sequenceNumberMax: number;
-    private _sequenceNumber = 0;
+    private _sequenceNumber: SequenceNumber;
     private _size = 0;
     private cache: CachedChunk[] = [];
 
-    constructor(sequenceNumberMax: number) {
-        this.sequenceNumberMax = sequenceNumberMax;
+    constructor(sequenceNumber: SequenceNumber) {
+        this._sequenceNumber = sequenceNumber;
     }
 
     /**
      * Get the current sequence number (e.g. of the **next** chunk to be added).
      */
-    public get sequenceNumber(): number {
+    public get sequenceNumber(): SequenceNumber {
         return this._sequenceNumber;
     }
 
@@ -67,13 +68,8 @@ export class ChunkCache {
      * Append a chunk to the chunk cache.
      */
     public append(chunk: CachedChunk): void {
-        // Check if the sequence number would overflow
-        if (this._sequenceNumber >= this.sequenceNumberMax) {
-            throw Error('Sequence number overflow');
-        }
-
         // Update sequence number, update size & append chunk
-        ++this._sequenceNumber;
+        this._sequenceNumber.increment();
         this._size += chunk.byteLength;
         this.cache.push(chunk);
     }
@@ -82,13 +78,15 @@ export class ChunkCache {
      * Acknowledge cached chunks and remove those from the cache.
      */
     public acknowledge(theirSequenceNumber: number): void {
-        if (theirSequenceNumber < 0 || theirSequenceNumber > this.sequenceNumberMax) {
+        try {
+            this._sequenceNumber.validate(theirSequenceNumber);
+        } catch (error) {
             throw new Error(`Remote sent us an invalid sequence number: ${theirSequenceNumber}`);
         }
 
         // Calculate the slice start index for the chunk cache
         // Important: Our sequence number is one chunk ahead!
-        const endOffset = theirSequenceNumber + 1 - this._sequenceNumber;
+        const endOffset = theirSequenceNumber + 1 - this._sequenceNumber.get();
         if (endOffset > 0) {
             throw new Error('Remote travelled through time and acknowledged a chunk which is in the future');
         } else if (-endOffset > this.cache.length) {

+ 71 - 0
src/protocol/sequence_number.ts

@@ -0,0 +1,71 @@
+/**
+ * This file is part of Threema Web.
+ *
+ * Threema Web is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with Threema Web. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * A generic sequence number with specific boundaries.
+ *
+ * Does not allow for wrapping.
+ */
+export class SequenceNumber {
+    private readonly minValue: number;
+    private readonly maxValue: number;
+    private value: number;
+
+    constructor(initialValue: number = 0, minValue: number, maxValue: number) {
+        this.minValue = minValue;
+        this.maxValue = maxValue;
+        this.value = initialValue;
+    }
+
+    /**
+     * Validate a specific sequence number.
+     */
+    public validate(other: number) {
+        if (other < this.minValue) {
+            throw new Error(`Invalid sequence number: ${other} < 0`);
+        }
+        if (other > this.maxValue) {
+            throw new Error(`Invalid sequence number: ${other} > ${this.maxValue}`);
+        }
+    }
+
+    /**
+     * Get the current value of the sequence number.
+     */
+    public get(): number {
+        return this.value;
+    }
+
+    /**
+     * Set the new value of the sequence number.
+     */
+    public set(value: number): void {
+        this.validate(value);
+        this.value = value;
+    }
+
+    /**
+     * Increment the sequence number and return the resulting number.
+     */
+    public increment(by: number = 1): number {
+        if (by < 0) {
+            throw new Error('Cannot decrement the sequence number');
+        }
+        this.set(this.value + by);
+        return this.get();
+    }
+}

+ 201 - 76
src/services/webclient.ts

@@ -39,6 +39,7 @@ import {TitleService} from './title';
 import {VersionService} from './version';
 
 import {ChunkCache} from '../protocol/cache';
+import {SequenceNumber} from '../protocol/sequence_number';
 
 // Aliases
 import InitializationStep = threema.InitializationStep;
@@ -50,6 +51,7 @@ import DisconnectReason = threema.DisconnectReason;
  */
 export class WebClientService {
     private static CHUNK_SIZE = 64 * 1024;
+    private static SEQUENCE_NUMBER_MIN = 0;
     private static SEQUENCE_NUMBER_MAX = (2 ** 32) - 1;
     private static CHUNK_CACHE_SIZE_MAX = 2 * 1024 * 1024;
     private static AVATAR_LOW_MAX_SIZE = 48;
@@ -168,11 +170,15 @@ export class WebClientService {
     private relayedDataTask: saltyrtc.tasks.relayed_data.RelayedDataTask = null;
     private secureDataChannel: saltyrtc.tasks.webrtc.SecureDataChannel = null;
     public chosenTask: threema.ChosenTask = threema.ChosenTask.None;
-    private pendingAckRequest: number | null = null;
+    private outgoingMessageSequenceNumber: SequenceNumber;
     private previousConnectionId: Uint8Array = null;
     private currentConnectionId: Uint8Array = null;
+    private previousIncomingChunkSequenceNumber: SequenceNumber = null;
+    private currentIncomingChunkSequenceNumber: SequenceNumber;
     private previousChunkCache: ChunkCache = null;
     private currentChunkCache: ChunkCache = null;
+    private ackTimer: number | null = null;
+    private pendingAckRequest: number | null = null;
 
     // Message chunking
     private unchunker: chunkedDc.Unchunker = null;
@@ -185,9 +191,6 @@ export class WebClientService {
     private pushToken: string = null;
     private pushTokenType: threema.PushTokenType = null;
 
-    // Pending messages when waiting for a responder to connect
-    private outgoingMessageQueue: threema.WireMessage[] = [];
-
     // Other
     private config: threema.Config;
     private container: threema.Container.Factory;
@@ -330,21 +333,50 @@ export class WebClientService {
 
     /**
      * Initialize the webclient service.
+     *
+     * Warning: Do not call this with `resumeSession` set to `false` in case
+     *          messages can be queued by the user.
      */
     public init(keyStore?: saltyrtc.KeyStore, peerTrustedKey?: Uint8Array, resumeSession = true): void {
         // Reset state
         this.stateService.reset();
 
-        // Move instances that we need to re-establish a previous session
-        // Note: Only move the previous connection's instances if the previous
-        //       connection was successful.
-        if (!this.previousChunkCache && this.currentChunkCache) {
+        // Only move the previous connection's instances if the previous
+        // connection was successful (and if there was one at all).
+        if (resumeSession &&
+            this.outgoingMessageSequenceNumber && this.unchunker &&
+            this.previousChunkCache === this.currentChunkCache) {
+            // Move instances that we need to re-establish a previous session
             this.previousConnectionId = this.currentConnectionId;
-            this.currentConnectionId = null;
+            this.previousIncomingChunkSequenceNumber = this.currentIncomingChunkSequenceNumber;
             this.previousChunkCache = this.currentChunkCache;
-            this.currentChunkCache = null;
+        } else {
+            // Reset the outgoing message sequence number and the unchunker
+            this.outgoingMessageSequenceNumber = new SequenceNumber(
+                0, WebClientService.SEQUENCE_NUMBER_MIN, WebClientService.SEQUENCE_NUMBER_MAX);
+            this.unchunker = new chunkedDc.Unchunker();
+            this.unchunker.onMessage = this.handleIncomingMessageBytes.bind(this);
+
+            // Discard previous connection instances
+            this.previousConnectionId = null;
+            this.previousIncomingChunkSequenceNumber = null;
+            this.previousChunkCache = null;
+
+            // Not resuming
+            resumeSession = false;
         }
 
+        // Initialise connection cashes
+        this.currentConnectionId = null;
+        this.currentIncomingChunkSequenceNumber = new SequenceNumber(
+            0, WebClientService.SEQUENCE_NUMBER_MIN, WebClientService.SEQUENCE_NUMBER_MAX);
+        const outgoingChunkSequenceNumber = new SequenceNumber(
+            0, WebClientService.SEQUENCE_NUMBER_MIN, WebClientService.SEQUENCE_NUMBER_MAX);
+        this.currentChunkCache = new ChunkCache(outgoingChunkSequenceNumber);
+
+        // Reset pending ack request
+        this.pendingAckRequest = null;
+
         // Create new handshake future
         this.connectionInfoFuture = new Future();
 
@@ -395,16 +427,6 @@ export class WebClientService {
             this.$log.debug('Auth token:', this.salty.authTokenHex);
         }
 
-        // Reset ack request
-        this.pendingAckRequest = null;
-
-        // Create unchunker
-        this.unchunker = new chunkedDc.Unchunker();
-        this.unchunker.onMessage = this.handleIncomingMessageBytes.bind(this);
-
-        // Create chunk cache
-        this.currentChunkCache = new ChunkCache(WebClientService.SEQUENCE_NUMBER_MAX);
-
         // We want to know about new responders.
         this.salty.on('new-responder', () => {
             if (!this.startupDone) {
@@ -606,26 +628,45 @@ export class WebClientService {
 
         // Resend chunks
         for (const chunk of this.currentChunkCache.chunks) {
-            this.sendChunk(chunk);
+            this.sendChunk(chunk, true);
         }
 
         // Done, yay!
         this.$log.debug(this.logTag, 'Session resumed');
     }
 
+    /**
+     * Schedule (or reschedule) the connection ack to be sent.
+     *
+     * By default, a connection ack message will be sent after 10 seconds
+     * (as defined by the protocol).
+     */
+    private scheduleConnectionAck(timeout: number = 10000): void {
+        if (this.ackTimer !== null) {
+            clearTimeout(this.ackTimer);
+        }
+        this.ackTimer = self.setTimeout(() => {
+            // Send
+            this._sendConnectionAck();
+        }, timeout);
+    }
+
     /**
      * For the WebRTC task, this is called when the DataChannel is open.
      * For the relayed data task, this is called once the connection is established.
      */
     private async onConnectionEstablished(resumeSession: boolean) {
         // Send connection info
-        resumeSession = resumeSession && this.previousConnectionId !== null && this.previousChunkCache !== null;
+        resumeSession = resumeSession &&
+            this.previousConnectionId !== null &&
+            this.previousIncomingChunkSequenceNumber !== null &&
+            this.previousChunkCache !== null;
         this.$log.debug(this.logTag, 'Sending connection info');
         if (resumeSession) {
             this._sendConnectionInfo(
                 this.currentConnectionId.buffer,
                 this.previousConnectionId.buffer,
-                this.previousChunkCache.sequenceNumber
+                this.previousIncomingChunkSequenceNumber.get(),
             );
         } else {
             this._sendConnectionInfo(this.currentConnectionId.buffer);
@@ -665,9 +706,16 @@ export class WebClientService {
         }
 
         // Invalidate the previous connection cache & id
+        // Note: This MUST be done immediately after the session has been
+        //       resumed to prevent re-establishing a session of a connection
+        //       where the handshake has been started but not been completed.
         this.previousConnectionId = null;
+        this.previousIncomingChunkSequenceNumber = null;
         this.previousChunkCache = null;
 
+        // Schedule the periodic ack timer
+        this.scheduleConnectionAck();
+
         // Reset fields and request initial data if not resuming the session
         const requiredInitializationSteps = [];
         if (!resumeSession) {
@@ -752,17 +800,9 @@ export class WebClientService {
                 this.$log.warn('Secure data channel: Closed');
             };
         } else if (this.chosenTask === threema.ChosenTask.RelayedData) {
-            // Note: This ensures that the 'data' event cannot leak into
-            //       unchunker instances of subsequent connections.
-            const unchunker = this.unchunker;
-
             // Handle messages directly
             this.relayedDataTask.on('data', (ev: saltyrtc.SaltyRTCEvent) => {
-                const chunk = new Uint8Array(ev.data);
-                if (this.config.MSG_DEBUGGING && this.config.DEBUG) {
-                    this.$log.debug('[Chunk] Received chunk:', chunk);
-                }
-                unchunker.add(chunk.buffer);
+                this.receiveChunk(new Uint8Array(ev.data));
             });
 
             // The communication channel is now open! Fetch initial data
@@ -895,6 +935,13 @@ export class WebClientService {
             this._sendUpdate(WebClientService.SUB_TYPE_CONNECTION_DISCONNECT, false, undefined, {reason: reason});
         }
 
+        // Stop ack timer
+        if (this.ackTimer !== null) {
+            clearTimeout(this.ackTimer);
+            this.ackTimer = null;
+            this.$log.debug(this.logTag, 'Timer stopped');
+        }
+
         // Reset states
         this.stateService.reset();
 
@@ -910,11 +957,15 @@ export class WebClientService {
         if (close) {
             this.previousConnectionId = null;
             this.currentConnectionId = null;
+            this.previousIncomingChunkSequenceNumber = null;
+            this.currentIncomingChunkSequenceNumber = new SequenceNumber(
+                0, WebClientService.SEQUENCE_NUMBER_MIN, WebClientService.SEQUENCE_NUMBER_MAX);
             this.previousChunkCache = null;
             this.currentChunkCache = null;
             this.pushService.reset();
             this.$log.debug(this.logTag, 'Session closed (cannot be resumed)');
         } else {
+            this.previousChunkCache = this.currentChunkCache;
             this.$log.debug(this.logTag, 'Session remains open (can be resumed)');
         }
 
@@ -1040,6 +1091,19 @@ export class WebClientService {
         this._sendRequest(WebClientService.SUB_TYPE_CONNECTION_ACK, false);
     }
 
+    /**
+     * Send a connection ack update.
+     */
+    private _sendConnectionAck(): void {
+        // Send the current incoming sequence number for chunks
+        this._sendUpdate(WebClientService.SUB_TYPE_CONNECTION_ACK, false, undefined, {
+            sequenceNumber: this.currentIncomingChunkSequenceNumber.get(),
+        });
+
+        // Re-schedule sending a connection ack
+        this.scheduleConnectionAck();
+    }
+
     /**
      * Send a client info request.
      */
@@ -1883,9 +1947,16 @@ export class WebClientService {
     }
 
     /**
-     * A connectionAck message arrived.
+     * A connectionAck request arrived.
      */
-    private _receiveConnectionAck(message: threema.WireMessage) {
+    private _receiveRequestConnectionAck(message: threema.WireMessage) {
+        this._sendConnectionAck();
+    }
+
+    /**
+     * A connectionAck update arrived.
+     */
+    private _receiveUpdateConnectionAck(message: threema.WireMessage) {
         if (!hasValue(message.data)) {
             this.$log.warn(this.logTag, 'Invalid connectionAck message: data missing');
             return;
@@ -2961,7 +3032,11 @@ export class WebClientService {
         this.send(message, retransmit);
     }
 
-    private _sendPromiseMessage(message: threema.WireMessage, retransmit: boolean, timeout: number = null): Promise<any> {
+    private _sendPromiseMessage(
+        message: threema.WireMessage,
+        retransmit: boolean,
+        timeout: number = null,
+    ): Promise<any> {
         // Create arguments on wired message
         if (message.args === undefined || message.args === null) {
             message.args = {};
@@ -3009,7 +3084,13 @@ export class WebClientService {
         return this._sendPromiseMessage(message, retransmit, timeout);
     }
 
-    private _sendCreatePromise(type, retransmit: boolean, args = null, data: any = null, timeout: number = null): Promise<any> {
+    private _sendCreatePromise(
+        type,
+        retransmit: boolean,
+        args = null,
+        data: any = null,
+        timeout: number = null,
+    ): Promise<any> {
         const message: threema.WireMessage = {
             type: WebClientService.TYPE_CREATE,
             subType: type,
@@ -3019,7 +3100,13 @@ export class WebClientService {
         return this._sendPromiseMessage(message, retransmit, timeout);
     }
 
-    private _sendUpdatePromise(type, retransmit: boolean, args = null, data: any = null, timeout: number = null): Promise<any> {
+    private _sendUpdatePromise(
+        type,
+        retransmit: boolean,
+        args = null,
+        data: any = null,
+        timeout: number = null,
+    ): Promise<any> {
         const message: threema.WireMessage = {
             type: WebClientService.TYPE_UPDATE,
             subType: type,
@@ -3067,7 +3154,14 @@ export class WebClientService {
     }
 
     private _receiveRequest(type, message): void {
-        this.$log.warn('Ignored request with type:', type);
+        switch (type) {
+            case WebClientService.SUB_TYPE_CONNECTION_ACK:
+                this._receiveRequestConnectionAck(message);
+                break;
+            default:
+                this.$log.warn('Ignored update with type:', type);
+                break;
+        }
     }
 
     private _receivePromise(message: any, receiveResult: threema.PromiseRequestResult<any>) {
@@ -3178,7 +3272,7 @@ export class WebClientService {
                 this._receiveAlert(message);
                 break;
             case WebClientService.SUB_TYPE_CONNECTION_ACK:
-                this._receiveConnectionAck(message);
+                this._receiveUpdateConnectionAck(message);
                 break;
             case WebClientService.SUB_TYPE_CONNECTION_DISCONNECT:
                 this._receiveConnectionDisconnect(message);
@@ -3263,41 +3357,22 @@ export class WebClientService {
             case threema.ChosenTask.RelayedData:
                 {
                     // Send bytes through e2e encrypted WebSocket
-                    if (this.salty.state !== 'task') {
-                        this.$log.debug(this.logTag, 'Currently not connected (state='
-                            + this.salty.state + '), putting outgoing message in queue');
-                        this.outgoingMessageQueue.push(message);
-                        if (this.pushService.isAvailable()) {
-                            this.sendPush(threema.WakeupType.Wakeup);
-                        } else {
-                            this.$log.warn(this.logTag, 'Push service not available, cannot wake up peer!');
-                        }
-                    } else {
-                        const bytes: Uint8Array = this.msgpackEncode(message);
-                        // Note: We use the sequence number of the chunk cache here to avoid having another counter
-                        const chunker = new chunkedDc.Chunker(
-                            this.currentChunkCache.sequenceNumber, bytes, WebClientService.CHUNK_SIZE);
-                        for (const chunk of chunker) {
-                            // Add to chunk cache
-                            try {
-                                this.currentChunkCache.append(retransmit ? null : chunk);
-                            } catch (error) {
-                                this.$log.error(this.logTag, error);
-                                this.failSession();
-                                return;
-                            }
+                    const bytes: Uint8Array = this.msgpackEncode(message);
 
-                            // Send
-                            this.sendChunk(chunk);
-                        }
+                    // Increment the outgoing message sequence number
+                    const messageSequenceNumber = this.outgoingMessageSequenceNumber.increment();
+                    const chunker = new chunkedDc.Chunker(messageSequenceNumber, bytes, WebClientService.CHUNK_SIZE);
+                    for (const chunk of chunker) {
+                        // Send (and cache)
+                        this.sendChunk(chunk, retransmit);
+                    }
 
-                        // Check if we need to request an acknowledgement
-                        // Note: We only request if none is pending
-                        if (this.pendingAckRequest === null &&
-                            this.currentChunkCache.size > WebClientService.CHUNK_CACHE_SIZE_MAX) {
-                            this._requestConnectionAck();
-                            this.pendingAckRequest = this.currentChunkCache.sequenceNumber;
-                        }
+                    // Check if we need to request an acknowledgement
+                    // Note: We only request if none is pending
+                    if (this.pendingAckRequest === null &&
+                        this.currentChunkCache.size > WebClientService.CHUNK_CACHE_SIZE_MAX) {
+                        this._requestConnectionAck();
+                        this.pendingAckRequest = this.currentChunkCache.sequenceNumber.get();
                     }
                 }
                 break;
@@ -3309,15 +3384,65 @@ export class WebClientService {
     /**
      * Send a chunk via the underlying transport.
      */
-    private sendChunk(chunk: Uint8Array): void {
+    private sendChunk(chunk: Uint8Array, retransmit: boolean): void {
         // TODO: Support for sending in chunks via data channels will be added later
         if (this.chosenTask !== threema.ChosenTask.RelayedData) {
             throw new Error(`Cannot send chunk, not supported by task: ${this.chosenTask}`);
         }
-        if (this.config.MSG_DEBUGGING) {
-            this.$log.debug('[Chunk] Sending chunk:', chunk);
+        const ready = this.previousChunkCache === null;
+        let chunkCache;
+
+        // Currently not ready? Enqueue in the chunk cache that is pending
+        // to be transferred and send a wakeup push.
+        if (!ready) {
+            chunkCache = this.previousChunkCache;
+            this.$log.debug(this.logTag, 'Currently not connected, queueing chunk');
+            if (this.pushService.isAvailable()) {
+                this.sendPush(threema.WakeupType.Wakeup);
+            } else {
+                this.$log.warn(this.logTag, 'Push service not available, cannot wake up peer!');
+            }
+        } else {
+            chunkCache = this.currentChunkCache;
+        }
+
+        // Add to chunk cache
+        try {
+            chunkCache.append(retransmit ? null : chunk);
+        } catch (error) {
+            this.$log.error(this.logTag, error);
+            this.failSession();
+            return;
+        }
+
+        // Send if ready
+        if (ready) {
+            if (this.config.MSG_DEBUGGING) {
+                this.$log.debug('[Chunk] Sending chunk:', chunk);
+            }
+            this.relayedDataTask.sendMessage(chunk.buffer);
         }
-        this.relayedDataTask.sendMessage(chunk.buffer);
+    }
+
+    /**
+     * Handle an incoming chunk from the underlying transport.
+     */
+    private receiveChunk(chunk: Uint8Array): void {
+        if (this.config.MSG_DEBUGGING && this.config.DEBUG) {
+            this.$log.debug('[Chunk] Received chunk:', chunk);
+        }
+
+        // Update incoming sequence number
+        try {
+            this.currentIncomingChunkSequenceNumber.increment();
+        } catch (error) {
+            this.$log.error(this.logTag, `Unable to continue session: ${error}`);
+            this.failSession();
+            return;
+        }
+
+        // Process chunk
+        this.unchunker.add(chunk.buffer);
     }
 
     /**