Explorar o código

Implement chunk caching and explicit acknowledgement requests

Add explicit marking of messages that should be retransmitted after connection loss
Add explicit ack requests in case the chunk cache exceeds 2 MiB
Add processing of ack updates
Add a size method to the chunk cache
Remove messageSerial (we now simply use the current chunk sequence number)
Remove sending of the outgoingMessageQueue (WIP: needs to be removed entirely)
Lennart Grahl %!s(int64=7) %!d(string=hai) anos
pai
achega
6104b285b1
Modificáronse 3 ficheiros con 159 adicións e 80 borrados
  1. 8 2
      src/partials/messenger.ts
  2. 13 3
      src/protocol/cache.ts
  3. 138 75
      src/services/webclient.ts

+ 8 - 2
src/partials/messenger.ts

@@ -585,6 +585,9 @@ class ConversationController {
                         `,
                         // tslint:enable:max-line-length
                     }).then((data) => {
+                        // TODO: This should probably be moved into the
+                        //       WebClientService as a specific method for the
+                        //       type.
                         const caption = data.caption;
                         const sendAsFile = data.sendAsFile;
                         contents.forEach((msg: threema.FileMessageData, index: number) => {
@@ -592,7 +595,7 @@ class ConversationController {
                                 msg.caption = caption;
                             }
                             msg.sendAsFile = sendAsFile;
-                            this.webClientService.sendMessage(this.$stateParams, type, msg)
+                            this.webClientService.sendMessage(this.$stateParams, type, true, msg)
                                 .then(() => {
                                     nextCallback(index);
                                 })
@@ -612,7 +615,10 @@ class ConversationController {
                         // remove quote
                         this.webClientService.setQuote(this.receiver);
                         // send message
-                        this.webClientService.sendMessage(this.$stateParams, type, msg)
+                        // TODO: This should probably be moved into the
+                        //       WebClientService as a specific method for the
+                        //       type.
+                        this.webClientService.sendMessage(this.$stateParams, type, true, msg)
                             .then(() => {
                                 nextCallback(index);
                             })

+ 13 - 3
src/protocol/cache.ts

@@ -23,6 +23,7 @@ export type CachedChunk = Uint8Array | null;
 export class ChunkCache {
     private readonly sequenceNumberMax: number;
     private _sequenceNumber = 0;
+    private _size = 0;
     private cache: CachedChunk[] = [];
 
     constructor(sequenceNumberMax: number) {
@@ -36,6 +37,13 @@ export class ChunkCache {
         return this._sequenceNumber;
     }
 
+    /**
+     * Get the size of currently stored chunks.
+     */
+    public get size(): number {
+        return this._size;
+    }
+
     /**
      * Get the currently cached chunks.
      */
@@ -47,7 +55,7 @@ export class ChunkCache {
      * Transfer an array of cached chunks to this cache instance.
      */
     public transfer(cache: CachedChunk[]): void {
-        // Add chunks but remove all which are blacklisted
+        // Add chunks but remove all which should not be retransmitted
         for (const chunk of cache) {
             if (chunk !== null) {
                 this.append(chunk);
@@ -64,8 +72,9 @@ export class ChunkCache {
             throw Error('Sequence number overflow');
         }
 
-        // Update sequence number & append chunk
+        // Update sequence number, update size & append chunk
         ++this._sequenceNumber;
+        this._size += chunk.byteLength;
         this.cache.push(chunk);
     }
 
@@ -86,7 +95,8 @@ export class ChunkCache {
             throw new Error('Remote travelled back in time and acknowledged a chunk it has already acknowledged');
         }
 
-        // Slice our cache
+        // Slice our cache & recalculate size
         this.cache = endOffset === 0 ? [] : this.cache.slice(endOffset);
+        this._size = this.cache.reduce((sum, chunk) => sum + chunk.byteLength, 0);
     }
 }

+ 138 - 75
src/services/webclient.ts

@@ -51,6 +51,7 @@ import DisconnectReason = threema.DisconnectReason;
 export class WebClientService {
     private static CHUNK_SIZE = 64 * 1024;
     private static SEQUENCE_NUMBER_MAX = (2 ** 32) - 1;
+    private static CHUNK_CACHE_SIZE_MAX = 2 * 1024 * 1024;
     private static AVATAR_LOW_MAX_SIZE = 48;
     private static MAX_TEXT_LENGTH = 3500;
     private static MAX_FILE_SIZE_WEBRTC = 15 * 1024 * 1024;
@@ -167,13 +168,13 @@ export class WebClientService {
     private relayedDataTask: saltyrtc.tasks.relayed_data.RelayedDataTask = null;
     private secureDataChannel: saltyrtc.tasks.webrtc.SecureDataChannel = null;
     public chosenTask: threema.ChosenTask = threema.ChosenTask.None;
+    private pendingAckRequest: number | null = null;
     private previousConnectionId: Uint8Array = null;
     private currentConnectionId: Uint8Array = null;
     private previousChunkCache: ChunkCache = null;
     private currentChunkCache: ChunkCache = null;
 
     // Message chunking
-    private messageSerial = 0;
     private unchunker: chunkedDc.Unchunker = null;
 
     // Messenger data
@@ -394,6 +395,13 @@ export class WebClientService {
             this.$log.debug('Auth token:', this.salty.authTokenHex);
         }
 
+        // Reset ack request
+        this.pendingAckRequest = null;
+
+        // Create unchunker
+        this.unchunker = new chunkedDc.Unchunker();
+        this.unchunker.onMessage = this.handleIncomingMessageBytes.bind(this);
+
         // Create chunk cache
         this.currentChunkCache = new ChunkCache(WebClientService.SEQUENCE_NUMBER_MAX);
 
@@ -593,7 +601,7 @@ export class WebClientService {
             return;
         }
 
-        // Transfer the cache (filters blacklisted chunks)
+        // Transfer the cache (filters chunks which should not be retransmitted)
         this.currentChunkCache.transfer(this.previousChunkCache.chunks);
 
         // Resend chunks
@@ -695,19 +703,6 @@ export class WebClientService {
 
         // Notify state service about data loading
         this.stateService.updateConnectionBuildupState('loading');
-
-        // Process pending messages
-        if (this.outgoingMessageQueue.length > 0) {
-            this.$log.debug(this.logTag, 'Sending', this.outgoingMessageQueue.length, 'pending messages');
-            while (true) {
-                const msg = this.outgoingMessageQueue.shift();
-                if (msg === undefined) {
-                    break;
-                } else {
-                    this.send(msg);
-                }
-            }
-        }
     }
 
     /**
@@ -757,13 +752,17 @@ export class WebClientService {
                 this.$log.warn('Secure data channel: Closed');
             };
         } else if (this.chosenTask === threema.ChosenTask.RelayedData) {
+            // Note: This ensures that the 'data' event cannot leak into
+            //       unchunker instances of subsequent connections.
+            const unchunker = this.unchunker;
+
             // Handle messages directly
             this.relayedDataTask.on('data', (ev: saltyrtc.SaltyRTCEvent) => {
                 const chunk = new Uint8Array(ev.data);
                 if (this.config.MSG_DEBUGGING && this.config.DEBUG) {
                     this.$log.debug('[Chunk] Received chunk:', chunk);
                 }
-                this.unchunker.add(chunk.buffer);
+                unchunker.add(chunk.buffer);
             });
 
             // The communication channel is now open! Fetch initial data
@@ -893,7 +892,7 @@ export class WebClientService {
 
         // Send disconnect reason to the remote peer if requested
         if (send && this.stateService.state === threema.GlobalConnectionState.Ok) {
-            this._sendUpdate(WebClientService.SUB_TYPE_CONNECTION_DISCONNECT, undefined, {reason: reason});
+            this._sendUpdate(WebClientService.SUB_TYPE_CONNECTION_DISCONNECT, false, undefined, {reason: reason});
         }
 
         // Reset states
@@ -1031,7 +1030,14 @@ export class WebClientService {
                 sequenceNumber: sequenceNumber,
             };
         }
-        this._sendUpdate(WebClientService.SUB_TYPE_CONNECTION_INFO, args, data);
+        this._sendUpdate(WebClientService.SUB_TYPE_CONNECTION_INFO, false, args, data);
+    }
+
+    /**
+     * Request a connection ack update.
+     */
+    private _requestConnectionAck(): void {
+        this._sendRequest(WebClientService.SUB_TYPE_CONNECTION_ACK, false);
     }
 
     /**
@@ -1049,7 +1055,7 @@ export class WebClientService {
         if (browser.version) {
             data[WebClientService.ARGUMENT_BROWSER_VERSION] = browser.version;
         }
-        this._sendRequest(WebClientService.SUB_TYPE_CLIENT_INFO, undefined, data);
+        this._sendRequest(WebClientService.SUB_TYPE_CLIENT_INFO, true, undefined, data);
     }
 
     /**
@@ -1057,7 +1063,7 @@ export class WebClientService {
      */
     public requestReceivers(): void {
         this.$log.debug('Sending receivers request');
-        this._sendRequest(WebClientService.SUB_TYPE_RECEIVERS);
+        this._sendRequest(WebClientService.SUB_TYPE_RECEIVERS, true);
     }
 
     /**
@@ -1065,7 +1071,7 @@ export class WebClientService {
      */
     public requestConversations(): void {
         this.$log.debug('Sending conversation request');
-        this._sendRequest(WebClientService.SUB_TYPE_CONVERSATIONS, {
+        this._sendRequest(WebClientService.SUB_TYPE_CONVERSATIONS, true, {
             [WebClientService.ARGUMENT_MAX_SIZE]: WebClientService.AVATAR_LOW_MAX_SIZE,
         });
     }
@@ -1075,7 +1081,7 @@ export class WebClientService {
      */
     public requestBatteryStatus(): void {
         this.$log.debug('Sending battery status request');
-        this._sendRequest(WebClientService.SUB_TYPE_BATTERY_STATUS);
+        this._sendRequest(WebClientService.SUB_TYPE_BATTERY_STATUS, true);
     }
 
     /**
@@ -1083,7 +1089,7 @@ export class WebClientService {
      */
     public requestProfile(): void {
         this.$log.debug('Sending profile request');
-        this._sendRequest(WebClientService.SUB_TYPE_PROFILE);
+        this._sendRequest(WebClientService.SUB_TYPE_PROFILE, true);
     }
 
     /**
@@ -1132,7 +1138,7 @@ export class WebClientService {
         this.$log.debug('Sending message request for', receiver.type, receiver.id,
             'with message id', msgId);
 
-        this._sendRequest(WebClientService.SUB_TYPE_MESSAGES, args);
+        this._sendRequest(WebClientService.SUB_TYPE_MESSAGES, true, args);
 
         return refMsgId;
     }
@@ -1167,7 +1173,7 @@ export class WebClientService {
         }
 
         this.$log.debug('Sending', resolution, 'res avatar request for', receiver.type, receiver.id);
-        return this._sendRequestPromise(WebClientService.SUB_TYPE_AVATAR, args, 10000);
+        return this._sendRequestPromise(WebClientService.SUB_TYPE_AVATAR, true, args, 10000);
     }
 
     /**
@@ -1189,7 +1195,7 @@ export class WebClientService {
         };
 
         this.$log.debug('Sending', 'thumbnail request for', receiver.type, message.id);
-        return this._sendRequestPromise(WebClientService.SUB_TYPE_THUMBNAIL, args, 10000);
+        return this._sendRequestPromise(WebClientService.SUB_TYPE_THUMBNAIL, true, args, 10000);
     }
 
     /**
@@ -1211,7 +1217,7 @@ export class WebClientService {
             [WebClientService.ARGUMENT_MESSAGE_ID]: msgId,
         };
         this.$log.debug('Sending blob request for message', msgId);
-        return this._sendRequestPromise(WebClientService.SUB_TYPE_BLOB, args);
+        return this._sendRequestPromise(WebClientService.SUB_TYPE_BLOB, true, args);
     }
 
     /**
@@ -1235,11 +1241,11 @@ export class WebClientService {
             [WebClientService.ARGUMENT_MESSAGE_ID]: newestMessage.id.toString(),
         };
         this.$log.debug('Sending read request for', receiver.type, receiver.id, '(msg ' + newestMessage.id + ')');
-        this._sendRequest(WebClientService.SUB_TYPE_READ, args);
+        this._sendRequest(WebClientService.SUB_TYPE_READ, true, args);
     }
 
     public requestContactDetail(contactReceiver: threema.ContactReceiver): Promise<any> {
-        return this._sendRequestPromise(WebClientService.SUB_TYPE_CONTACT_DETAIL, {
+        return this._sendRequestPromise(WebClientService.SUB_TYPE_CONTACT_DETAIL, true, {
             [WebClientService.ARGUMENT_IDENTITY]: contactReceiver.id,
         });
     }
@@ -1247,9 +1253,12 @@ export class WebClientService {
     /**
      * Send a message to the specified receiver.
      */
-    public sendMessage(receiver,
-                       type: threema.MessageContentType,
-                       message: threema.MessageData): Promise<Promise<any>> {
+    public sendMessage(
+        receiver,
+        type: threema.MessageContentType,
+        retransmit: boolean,
+        message: threema.MessageData,
+    ): Promise<Promise<any>> {
         return new Promise<any> (
             (resolve, reject) => {
                 // Try to load receiver object
@@ -1372,7 +1381,7 @@ export class WebClientService {
                 };
 
                 // Send message and handling error promise
-                this._sendCreatePromise(subType, args, message).catch((error) => {
+                this._sendCreatePromise(subType, retransmit, args, message).catch((error) => {
                     this.$log.error('Error sending message:', error);
 
                     // Remove temporary message
@@ -1420,7 +1429,7 @@ export class WebClientService {
             [WebClientService.ARGUMENT_MESSAGE_ID]: message.id.toString(),
             [WebClientService.ARGUMENT_MESSAGE_ACKNOWLEDGED]: acknowledged,
         };
-        this._sendRequest(WebClientService.SUB_TYPE_ACK, args);
+        this._sendRequest(WebClientService.SUB_TYPE_ACK, true, args);
     }
 
     /**
@@ -1437,17 +1446,17 @@ export class WebClientService {
             [WebClientService.ARGUMENT_RECEIVER_ID]: receiver.id,
             [WebClientService.ARGUMENT_MESSAGE_ID]: message.id.toString(),
         };
-        this._sendDeletePromise(WebClientService.SUB_TYPE_MESSAGE, args);
+        this._sendDeletePromise(WebClientService.SUB_TYPE_MESSAGE, true, args);
     }
 
     public sendMeIsTyping(receiver: threema.ContactReceiver, isTyping: boolean): void {
         const args = {[WebClientService.ARGUMENT_RECEIVER_ID]: receiver.id};
         const data = {[WebClientService.ARGUMENT_IS_TYPING]: isTyping};
-        this._sendUpdate(WebClientService.SUB_TYPE_TYPING, args, data);
+        this._sendUpdate(WebClientService.SUB_TYPE_TYPING, false, args, data);
     }
 
     public sendKeyPersisted(): void {
-        this._sendRequest(WebClientService.SUB_TYPE_KEY_PERSISTED);
+        this._sendRequest(WebClientService.SUB_TYPE_KEY_PERSISTED, true);
     }
 
     /**
@@ -1458,7 +1467,7 @@ export class WebClientService {
         const data = {
             [WebClientService.ARGUMENT_IDENTITY]: threemaId,
         };
-        return this._sendCreatePromise(WebClientService.SUB_TYPE_CONTACT, args, data);
+        return this._sendCreatePromise(WebClientService.SUB_TYPE_CONTACT, true, args, data);
     }
 
     /**
@@ -1493,7 +1502,7 @@ export class WebClientService {
         const args = {
             [WebClientService.ARGUMENT_IDENTITY]: threemaId,
         };
-        const promise = this._sendUpdatePromise(WebClientService.SUB_TYPE_CONTACT, args, data);
+        const promise = this._sendUpdatePromise(WebClientService.SUB_TYPE_CONTACT, true, args, data);
 
         // If necessary, force an avatar reload
         if (avatar !== undefined) {
@@ -1522,7 +1531,7 @@ export class WebClientService {
             data[WebClientService.ARGUMENT_AVATAR] = avatar;
         }
 
-        return this._sendCreatePromise(WebClientService.SUB_TYPE_GROUP, args, data);
+        return this._sendCreatePromise(WebClientService.SUB_TYPE_GROUP, true, args, data);
     }
 
     /**
@@ -1547,7 +1556,7 @@ export class WebClientService {
         const args = {
             [WebClientService.ARGUMENT_RECEIVER_ID]: id,
         };
-        const promise = this._sendUpdatePromise(WebClientService.SUB_TYPE_GROUP, args, data);
+        const promise = this._sendUpdatePromise(WebClientService.SUB_TYPE_GROUP, true, args, data);
 
         // If necessary, reset avatar to force a avatar reload
         if (avatar !== undefined) {
@@ -1566,7 +1575,7 @@ export class WebClientService {
             [WebClientService.ARGUMENT_DELETE_TYPE]: WebClientService.DELETE_GROUP_TYPE_LEAVE,
         };
 
-        return this._sendDeletePromise(WebClientService.SUB_TYPE_GROUP, args);
+        return this._sendDeletePromise(WebClientService.SUB_TYPE_GROUP, true, args);
     }
 
     public deleteGroup(group: threema.GroupReceiver): Promise<any> {
@@ -1582,7 +1591,7 @@ export class WebClientService {
             [WebClientService.ARGUMENT_DELETE_TYPE]: WebClientService.DELETE_GROUP_TYPE_DELETE,
         };
 
-        return this._sendDeletePromise(WebClientService.SUB_TYPE_GROUP, args);
+        return this._sendDeletePromise(WebClientService.SUB_TYPE_GROUP, true, args);
     }
 
     /**
@@ -1597,7 +1606,7 @@ export class WebClientService {
             [WebClientService.ARGUMENT_RECEIVER_ID]: group.id,
         };
 
-        return this._sendRequestPromise(WebClientService.SUB_TYPE_GROUP_SYNC, args, 10000);
+        return this._sendRequestPromise(WebClientService.SUB_TYPE_GROUP_SYNC, true, args, 10000);
     }
 
     /**
@@ -1613,7 +1622,7 @@ export class WebClientService {
             [WebClientService.ARGUMENT_NAME]: name,
         };
 
-        return this._sendCreatePromise(WebClientService.SUB_TYPE_DISTRIBUTION_LIST, args, data);
+        return this._sendCreatePromise(WebClientService.SUB_TYPE_DISTRIBUTION_LIST, true, args, data);
     }
 
     public modifyDistributionList(id: string,
@@ -1624,7 +1633,7 @@ export class WebClientService {
             [WebClientService.ARGUMENT_NAME]: name,
         } as any;
 
-        return this._sendUpdatePromise(WebClientService.SUB_TYPE_DISTRIBUTION_LIST, {
+        return this._sendUpdatePromise(WebClientService.SUB_TYPE_DISTRIBUTION_LIST, true, {
             [WebClientService.ARGUMENT_RECEIVER_ID]: id,
         }, data);
     }
@@ -1638,7 +1647,7 @@ export class WebClientService {
             [WebClientService.ARGUMENT_RECEIVER_ID]: distributionList.id,
         };
 
-        return this._sendDeletePromise(WebClientService.SUB_TYPE_DISTRIBUTION_LIST, args);
+        return this._sendDeletePromise(WebClientService.SUB_TYPE_DISTRIBUTION_LIST, true, args);
     }
 
     /**
@@ -1656,7 +1665,7 @@ export class WebClientService {
             [WebClientService.ARGUMENT_RECEIVER_ID]: receiver.id,
         };
 
-        return this._sendDeletePromise(WebClientService.SUB_TYPE_CLEAN_RECEIVER_CONVERSATION, args);
+        return this._sendDeletePromise(WebClientService.SUB_TYPE_CLEAN_RECEIVER_CONVERSATION, true, args);
     }
 
     /**
@@ -1681,7 +1690,7 @@ export class WebClientService {
         }
 
         // Send update, get back promise
-        return this._sendUpdatePromise(WebClientService.SUB_TYPE_PROFILE, null, data);
+        return this._sendUpdatePromise(WebClientService.SUB_TYPE_PROFILE, true, null, data);
     }
 
     /**
@@ -1758,10 +1767,6 @@ export class WebClientService {
         // Reset initialization data
         this._resetInitializationSteps();
 
-        // Initialize unchunker
-        this.unchunker = new chunkedDc.Unchunker();
-        this.unchunker.onMessage = this.handleIncomingMessageBytes.bind(this);
-
         // Create container instances
         this.receivers = this.container.createReceivers();
         this.conversations = this.container.createConversations();
@@ -1877,6 +1882,37 @@ export class WebClientService {
 
     }
 
+    /**
+     * A connectionAck message arrived.
+     */
+    private _receiveConnectionAck(message: threema.WireMessage) {
+        if (!hasValue(message.data)) {
+            this.$log.warn(this.logTag, 'Invalid connectionAck message: data missing');
+            return;
+        }
+        if (!hasValue(message.data.sequenceNumber)) {
+            this.$log.warn(this.logTag, 'Invalid connectionAck message: sequenceNumber missing');
+            return;
+        }
+        const sequenceNumber = message.data.sequenceNumber;
+
+        // Acknowledge chunks
+        const size = this.currentChunkCache.size;
+        try {
+            this.currentChunkCache.acknowledge(sequenceNumber);
+        } catch (error) {
+            this.$log.error(this.logTag, error);
+            this.failSession();
+            return;
+        }
+        this.$log.debug(`Chunk cache size ${size} -> ${this.currentChunkCache.size}`);
+
+        // Clear pending ack requests
+        if (this.pendingAckRequest !== null && sequenceNumber >= this.pendingAckRequest) {
+            this.pendingAckRequest = null;
+        }
+    }
+
     /**
      * A connectionDisconnect message arrived.
      */
@@ -2897,7 +2933,7 @@ export class WebClientService {
         this.pushTokenType = tokenType;
     }
 
-    private _sendRequest(type, args?: object, data?: object): void {
+    private _sendRequest(type, retransmit: boolean, args?: object, data?: object): void {
         const message: threema.WireMessage = {
             type: WebClientService.TYPE_REQUEST,
             subType: type,
@@ -2908,10 +2944,10 @@ export class WebClientService {
         if (data !== undefined) {
             message.data = data;
         }
-        this.send(message);
+        this.send(message, retransmit);
     }
 
-    private _sendUpdate(type, args?: object, data?: object): void {
+    private _sendUpdate(type, retransmit: boolean, args?: object, data?: object): void {
         const message: threema.WireMessage = {
             type: WebClientService.TYPE_UPDATE,
             subType: type,
@@ -2922,10 +2958,10 @@ export class WebClientService {
         if (data !== undefined) {
             message.data = data;
         }
-        this.send(message);
+        this.send(message, retransmit);
     }
 
-    private _sendPromiseMessage(message: threema.WireMessage, timeout: number = null): Promise<any> {
+    private _sendPromiseMessage(message: threema.WireMessage, retransmit: boolean, timeout: number = null): Promise<any> {
         // Create arguments on wired message
         if (message.args === undefined || message.args === null) {
             message.args = {};
@@ -2952,7 +2988,7 @@ export class WebClientService {
                     }, timeout);
                 }
 
-                this.send(message);
+                this.send(message, retransmit);
             },
         );
     }
@@ -2964,36 +3000,36 @@ export class WebClientService {
      *
      * @param timeout Optional request timeout in ms
      */
-    private _sendRequestPromise(type, args = null, timeout: number = null): Promise<any> {
+    private _sendRequestPromise(type, retransmit: boolean, args = null, timeout: number = null): Promise<any> {
         const message: threema.WireMessage = {
             type: WebClientService.TYPE_REQUEST,
             subType: type,
             args: args,
         };
-        return this._sendPromiseMessage(message, timeout);
+        return this._sendPromiseMessage(message, retransmit, timeout);
     }
 
-    private _sendCreatePromise(type, args = null, data: any = null, timeout: number = null): Promise<any> {
+    private _sendCreatePromise(type, retransmit: boolean, args = null, data: any = null, timeout: number = null): Promise<any> {
         const message: threema.WireMessage = {
             type: WebClientService.TYPE_CREATE,
             subType: type,
             args: args,
             data: data,
         };
-        return this._sendPromiseMessage(message, timeout);
+        return this._sendPromiseMessage(message, retransmit, timeout);
     }
 
-    private _sendUpdatePromise(type, args = null, data: any = null, timeout: number = null): Promise<any> {
+    private _sendUpdatePromise(type, retransmit: boolean, args = null, data: any = null, timeout: number = null): Promise<any> {
         const message: threema.WireMessage = {
             type: WebClientService.TYPE_UPDATE,
             subType: type,
             data: data,
             args: args,
         };
-        return this._sendPromiseMessage(message, timeout);
+        return this._sendPromiseMessage(message, retransmit, timeout);
     }
 
-    private _sendCreate(type, data, args = null): void {
+    private _sendCreate(type, retransmit: boolean, data, args = null): void {
         const message: threema.WireMessage = {
             type: WebClientService.TYPE_CREATE,
             subType: type,
@@ -3002,27 +3038,32 @@ export class WebClientService {
         if (args) {
             message.args = args;
         }
-        this.send(message);
+        this.send(message, retransmit);
     }
 
-    private _sendDelete(type, args, data = null): void {
+    private _sendDelete(type, retransmit: boolean, args, data = null): void {
         const message: threema.WireMessage = {
             type: WebClientService.TYPE_DELETE,
             subType: type,
             data: data,
             args: args,
         };
-        this.send(message);
+        this.send(message, retransmit);
     }
 
-    private _sendDeletePromise(type, args, data: any = null, timeout: number = null): Promise<any> {
+    private _sendDeletePromise(
+        type, retransmit: boolean,
+        args,
+        data: any = null,
+        timeout: number = null,
+    ): Promise<any> {
         const message: threema.WireMessage = {
             type: WebClientService.TYPE_DELETE,
             subType: type,
             data: data,
             args: args,
         };
-        return this._sendPromiseMessage(message, timeout);
+        return this._sendPromiseMessage(message, retransmit, timeout);
     }
 
     private _receiveRequest(type, message): void {
@@ -3136,6 +3177,9 @@ export class WebClientService {
             case WebClientService.SUB_TYPE_ALERT:
                 this._receiveAlert(message);
                 break;
+            case WebClientService.SUB_TYPE_CONNECTION_ACK:
+                this._receiveConnectionAck(message);
+                break;
             case WebClientService.SUB_TYPE_CONNECTION_DISCONNECT:
                 this._receiveConnectionDisconnect(message);
                 break;
@@ -3202,11 +3246,12 @@ export class WebClientService {
     /**
      * Send a message via the underlying transport.
      */
-    private send(message: threema.WireMessage): void {
+    private send(message: threema.WireMessage, retransmit: boolean): void {
         this.$log.debug('Sending', message.type + '/' + message.subType, 'message');
         if (this.config.MSG_DEBUGGING) {
             this.$log.debug('[Message] Outgoing:', message.type, '/', message.subType, message);
         }
+
         switch (this.chosenTask) {
             case threema.ChosenTask.WebRTC:
                 {
@@ -3229,12 +3274,30 @@ export class WebClientService {
                         }
                     } else {
                         const bytes: Uint8Array = this.msgpackEncode(message);
-                        const chunker = new chunkedDc.Chunker(this.messageSerial, bytes, WebClientService.CHUNK_SIZE);
+                        // Note: We use the sequence number of the chunk cache here to avoid having another counter
+                        const chunker = new chunkedDc.Chunker(
+                            this.currentChunkCache.sequenceNumber, bytes, WebClientService.CHUNK_SIZE);
                         for (const chunk of chunker) {
-                            // TODO: Add to chunk cache!
+                            // Add to chunk cache
+                            try {
+                                this.currentChunkCache.append(retransmit ? null : chunk);
+                            } catch (error) {
+                                this.$log.error(this.logTag, error);
+                                this.failSession();
+                                return;
+                            }
+
+                            // Send
                             this.sendChunk(chunk);
                         }
-                        this.messageSerial += 1;
+
+                        // Check if we need to request an acknowledgement
+                        // Note: We only request if none is pending
+                        if (this.pendingAckRequest === null &&
+                            this.currentChunkCache.size > WebClientService.CHUNK_CACHE_SIZE_MAX) {
+                            this._requestConnectionAck();
+                            this.pendingAckRequest = this.currentChunkCache.sequenceNumber;
+                        }
                     }
                 }
                 break;