Переглянути джерело

Various bug fixes for the ack protocol

Fix return the sequence number before it has been incremented
Fix don't queue handshake messages or messages that have been transferred from the previous connection
Fix schedule the ack timer when a message has been received or an explicit request
Lennart Grahl 7 роки тому
батько
коміт
9cf5314b11
3 змінених файлів з 38 додано та 26 видалено
  1. 6 2
      src/protocol/cache.ts
  2. 5 3
      src/protocol/sequence_number.ts
  3. 27 21
      src/services/webclient.ts

+ 6 - 2
src/protocol/cache.ts

@@ -70,7 +70,9 @@ export class ChunkCache {
     public append(chunk: CachedChunk): void {
     public append(chunk: CachedChunk): void {
         // Update sequence number, update size & append chunk
         // Update sequence number, update size & append chunk
         this._sequenceNumber.increment();
         this._sequenceNumber.increment();
-        this._size += chunk.byteLength;
+        if (chunk !== null) {
+            this._size += chunk.byteLength;
+        }
         this.cache.push(chunk);
         this.cache.push(chunk);
     }
     }
 
 
@@ -95,6 +97,8 @@ export class ChunkCache {
 
 
         // Slice our cache & recalculate size
         // Slice our cache & recalculate size
         this.cache = endOffset === 0 ? [] : this.cache.slice(endOffset);
         this.cache = endOffset === 0 ? [] : this.cache.slice(endOffset);
-        this._size = this.cache.reduce((sum, chunk) => sum + chunk.byteLength, 0);
+        this._size = this.cache
+            .filter((chunk) => chunk !== null)
+            .reduce((sum, chunk) => sum + chunk.byteLength, 0);
     }
     }
 }
 }

+ 5 - 3
src/protocol/sequence_number.ts

@@ -59,13 +59,15 @@ export class SequenceNumber {
     }
     }
 
 
     /**
     /**
-     * Increment the sequence number and return the resulting number.
+     * Increment the sequence number and return the sequence number as it was
+     * before it has been incremented.
      */
      */
     public increment(by: number = 1): number {
     public increment(by: number = 1): number {
         if (by < 0) {
         if (by < 0) {
             throw new Error('Cannot decrement the sequence number');
             throw new Error('Cannot decrement the sequence number');
         }
         }
-        this.set(this.value + by);
-        return this.get();
+        const value = this.value;
+        this.set(value + by);
+        return value;
     }
     }
 }
 }

+ 27 - 21
src/services/webclient.ts

@@ -628,7 +628,7 @@ export class WebClientService {
 
 
         // Resend chunks
         // Resend chunks
         for (const chunk of this.currentChunkCache.chunks) {
         for (const chunk of this.currentChunkCache.chunks) {
-            this.sendChunk(chunk, true);
+            this.sendChunk(chunk, true, false);
         }
         }
 
 
         // Done, yay!
         // Done, yay!
@@ -636,19 +636,19 @@ export class WebClientService {
     }
     }
 
 
     /**
     /**
-     * Schedule (or reschedule) the connection ack to be sent.
+     * Schedule the connection ack to be sent.
      *
      *
      * By default, a connection ack message will be sent after 10 seconds
      * By default, a connection ack message will be sent after 10 seconds
      * (as defined by the protocol).
      * (as defined by the protocol).
      */
      */
     private scheduleConnectionAck(timeout: number = 10000): void {
     private scheduleConnectionAck(timeout: number = 10000): void {
-        if (this.ackTimer !== null) {
-            clearTimeout(this.ackTimer);
+        // Don't schedule if already running
+        if (this.ackTimer === null) {
+            this.ackTimer = self.setTimeout(() => {
+                this.ackTimer = null;
+                this._sendConnectionAck();
+            }, timeout);
         }
         }
-        this.ackTimer = self.setTimeout(() => {
-            // Send
-            this._sendConnectionAck();
-        }, timeout);
     }
     }
 
 
     /**
     /**
@@ -713,9 +713,6 @@ export class WebClientService {
         this.previousIncomingChunkSequenceNumber = null;
         this.previousIncomingChunkSequenceNumber = null;
         this.previousChunkCache = null;
         this.previousChunkCache = null;
 
 
-        // Schedule the periodic ack timer
-        this.scheduleConnectionAck();
-
         // Reset fields and request initial data if not resuming the session
         // Reset fields and request initial data if not resuming the session
         const requiredInitializationSteps = [];
         const requiredInitializationSteps = [];
         if (!resumeSession) {
         if (!resumeSession) {
@@ -1100,8 +1097,10 @@ export class WebClientService {
             sequenceNumber: this.currentIncomingChunkSequenceNumber.get(),
             sequenceNumber: this.currentIncomingChunkSequenceNumber.get(),
         });
         });
 
 
-        // Re-schedule sending a connection ack
-        this.scheduleConnectionAck();
+        // Clear pending ack timer (if any)
+        if (this.ackTimer !== null) {
+            clearTimeout(this.ackTimer);
+        }
     }
     }
 
 
     /**
     /**
@@ -3356,6 +3355,10 @@ export class WebClientService {
                 break;
                 break;
             case threema.ChosenTask.RelayedData:
             case threema.ChosenTask.RelayedData:
                 {
                 {
+                    // Don't queue handshake messages
+                    // TODO: Add this as a method argument
+                    const canQueue = message.subType !== WebClientService.SUB_TYPE_CONNECTION_INFO;
+
                     // Send bytes through e2e encrypted WebSocket
                     // Send bytes through e2e encrypted WebSocket
                     const bytes: Uint8Array = this.msgpackEncode(message);
                     const bytes: Uint8Array = this.msgpackEncode(message);
 
 
@@ -3364,7 +3367,7 @@ export class WebClientService {
                     const chunker = new chunkedDc.Chunker(messageSequenceNumber, bytes, WebClientService.CHUNK_SIZE);
                     const chunker = new chunkedDc.Chunker(messageSequenceNumber, bytes, WebClientService.CHUNK_SIZE);
                     for (const chunk of chunker) {
                     for (const chunk of chunker) {
                         // Send (and cache)
                         // Send (and cache)
-                        this.sendChunk(chunk, retransmit);
+                        this.sendChunk(chunk, retransmit, canQueue);
                     }
                     }
 
 
                     // Check if we need to request an acknowledgement
                     // Check if we need to request an acknowledgement
@@ -3384,17 +3387,17 @@ export class WebClientService {
     /**
     /**
      * Send a chunk via the underlying transport.
      * Send a chunk via the underlying transport.
      */
      */
-    private sendChunk(chunk: Uint8Array, retransmit: boolean): void {
+    private sendChunk(chunk: Uint8Array, retransmit: boolean, canQueue: boolean): void {
         // TODO: Support for sending in chunks via data channels will be added later
         // TODO: Support for sending in chunks via data channels will be added later
         if (this.chosenTask !== threema.ChosenTask.RelayedData) {
         if (this.chosenTask !== threema.ChosenTask.RelayedData) {
             throw new Error(`Cannot send chunk, not supported by task: ${this.chosenTask}`);
             throw new Error(`Cannot send chunk, not supported by task: ${this.chosenTask}`);
         }
         }
-        const ready = this.previousChunkCache === null;
+        const shouldQueue = canQueue && this.previousChunkCache !== null;
         let chunkCache;
         let chunkCache;
 
 
-        // Currently not ready? Enqueue in the chunk cache that is pending
-        // to be transferred and send a wakeup push.
-        if (!ready) {
+        // Enqueue in the chunk cache that is pending to be transferred and
+        // send a wakeup push.
+        if (shouldQueue) {
             chunkCache = this.previousChunkCache;
             chunkCache = this.previousChunkCache;
             this.$log.debug(this.logTag, 'Currently not connected, queueing chunk');
             this.$log.debug(this.logTag, 'Currently not connected, queueing chunk');
             if (this.pushService.isAvailable()) {
             if (this.pushService.isAvailable()) {
@@ -3408,7 +3411,7 @@ export class WebClientService {
 
 
         // Add to chunk cache
         // Add to chunk cache
         try {
         try {
-            chunkCache.append(retransmit ? null : chunk);
+            chunkCache.append(retransmit ? chunk : null);
         } catch (error) {
         } catch (error) {
             this.$log.error(this.logTag, error);
             this.$log.error(this.logTag, error);
             this.failSession();
             this.failSession();
@@ -3416,7 +3419,7 @@ export class WebClientService {
         }
         }
 
 
         // Send if ready
         // Send if ready
-        if (ready) {
+        if (!shouldQueue) {
             if (this.config.MSG_DEBUGGING) {
             if (this.config.MSG_DEBUGGING) {
                 this.$log.debug('[Chunk] Sending chunk:', chunk);
                 this.$log.debug('[Chunk] Sending chunk:', chunk);
             }
             }
@@ -3443,6 +3446,9 @@ export class WebClientService {
 
 
         // Process chunk
         // Process chunk
         this.unchunker.add(chunk.buffer);
         this.unchunker.add(chunk.buffer);
+
+        // Schedule the periodic ack timer
+        this.scheduleConnectionAck();
     }
     }
 
 
     /**
     /**