浏览代码

Update to new SaltyRTC WebRTC Task API

This update allows us to control data channels ourselves and thus apply
a sender-side flow control which prevents data channels from becoming
closed (#594).

This is also preliminary work for #7.

Further changes:

- Bump SaltyRTC client and WebRTC task dependencies
- Use chunking API v2
- Add helper classes that ease the use of the data channel flow control
- Allow to send files > 15 MiB towards Android
Lennart Grahl 6 年之前
父节点
当前提交
04d215f4ce
共有 7 个文件被更改,包括 350 次插入77 次删除
  1. 7 7
      package-lock.json
  2. 1 1
      package.json
  3. 2 0
      src/config.ts
  4. 140 0
      src/helpers/data_channel.ts
  5. 79 9
      src/services/peerconnection.ts
  6. 120 60
      src/services/webclient.ts
  7. 1 0
      src/threema.d.ts

+ 7 - 7
package-lock.json

@@ -1142,9 +1142,9 @@
       }
     },
     "@saltyrtc/chunked-dc": {
-      "version": "1.1.1",
-      "resolved": "https://registry.npmjs.org/@saltyrtc/chunked-dc/-/chunked-dc-1.1.1.tgz",
-      "integrity": "sha512-im7GXKhUsNKTbppZOA0Jqx0Yku+3FILe/CENMlX5PT1tP95Dfu1VadIaBgNevxstnCadrYPTtxVeXc2MwxO3jw=="
+      "version": "2.0.1",
+      "resolved": "https://registry.npmjs.org/@saltyrtc/chunked-dc/-/chunked-dc-2.0.1.tgz",
+      "integrity": "sha512-0LhnDVsiI3trhba90Ih8QZrgmN4qy27YAAiuNq06Ql7FNzocNS1BA80Y0Vd4nbWAbOE+LcPkrDyPCyFGc7ku/g=="
     },
     "@saltyrtc/client": {
       "version": "0.14.4",
@@ -1157,11 +1157,11 @@
       "integrity": "sha512-TgucXvVHKKS40nMk+xoLdo4rqDP4seby0iE19gUj+oYytuwf58rc00DBRguwf7KLlf1IUVoJIXEt4TAvUe97lA=="
     },
     "@saltyrtc/task-webrtc": {
-      "version": "0.13.0",
-      "resolved": "https://registry.npmjs.org/@saltyrtc/task-webrtc/-/task-webrtc-0.13.0.tgz",
-      "integrity": "sha512-FAnsCjPt3/ksap741V9BT5AE6uvqfltgavk9QHTGlMrY/Qp5VMfCDdOJc2/tf3CVglprfe8XN6maZwErHUsr5A==",
+      "version": "0.14.1",
+      "resolved": "https://registry.npmjs.org/@saltyrtc/task-webrtc/-/task-webrtc-0.14.1.tgz",
+      "integrity": "sha512-361TV2Wv5qk+Ipx+GNDQM6T3zUtN53imTH6JYpC84EinNouZS0WiCkmTsD8Fmc3lUyTzw8Y9NHEhJPC2H56Vcg==",
       "requires": {
-        "@saltyrtc/chunked-dc": "^1.1.1"
+        "@saltyrtc/chunked-dc": "^2.0.1"
       }
     },
     "@sentry/core": {

+ 1 - 1
package.json

@@ -41,7 +41,7 @@
     "@babel/runtime": "^7.4.5",
     "@saltyrtc/client": "^0.14.4",
     "@saltyrtc/task-relayed-data": "^0.3.1",
-    "@saltyrtc/task-webrtc": "^0.13.0",
+    "@saltyrtc/task-webrtc": "^0.14.1",
     "@threema/compose-area": "^0.3.3",
     "@types/angular": "^1.6.54",
     "@types/angular-material": "^1.1.68",

+ 2 - 0
src/config.ts

@@ -71,5 +71,7 @@ export default {
     // outgoing protocol messages.
     // Note: Affects performance and contains sensitive information.
     MSGPACK_LOG_TRACE: false,
+    // Transport log level
+    TRANSPORT_LOG_LEVEL: 'warn',
 
 } as threema.Config;

+ 140 - 0
src/helpers/data_channel.ts

@@ -0,0 +1,140 @@
+/**
+ * This file is part of Threema Web.
+ *
+ * Threema Web is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with Threema Web. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+import {Logger} from 'ts-log';
+
+import {LogService} from '../services/log';
+
+/**
+ * A flow-controlled (sender side) data channel.
+ */
+export class FlowControlledDataChannel {
+    // Logging
+    private readonly log: Logger = null;
+
+    // Data channel
+    public readonly dc: RTCDataChannel;
+    public readonly highWaterMark: number;
+
+    // Flow control mechanism
+    private _ready: Future<void> = new Future();
+
+    /**
+     * Create a flow-controlled (sender side) data channel.
+     *
+     * @param dc The data channel to be flow-controlled.
+     * @param logService The logging service.
+     * @param logLevel The desired log level.
+     * @param lowWaterMark The low water mark unpauses the data channel
+     *   once the buffered amount of bytes becomes less or equal to it.
+     *   Defaults to 256 KiB.
+     * @param highWaterMark The high water mark pauses the data channel
+     *   once the buffered amount of bytes becomes greater or equal to it.
+     *   Defaults to 1 MiB.
+     */
+    public constructor(
+        dc: RTCDataChannel,
+        logService: LogService,
+        logLevel: threema.LogLevel,
+        lowWaterMark: number = 256 * 1024,
+        highWaterMark: number = 1024 * 1024,
+    ) {
+        this.log = logService.getLogger(
+            `FlowControlledDataChannel.${dc.id}`, 'color: #fff; background-color: #009688', logLevel);
+        this.dc = dc;
+        this.highWaterMark = highWaterMark;
+
+        // Allow writing
+        this._ready.resolve();
+
+        // Unpause once low water mark has been reached
+        this.dc.bufferedAmountLowThreshold = lowWaterMark;
+        this.dc.onbufferedamountlow = () => {
+            if (!this._ready.done) {
+                this.log.debug(`${this.dc.label} resumed (buffered=${this.dc.bufferedAmount})`);
+                this._ready.resolve();
+            }
+        };
+    }
+
+    /**
+     * A future whether the data channel is ready to be written on.
+     */
+    public get ready(): Future<void> {
+        return this._ready;
+    }
+
+    /**
+     * Write a message to the data channel's internal buffer for delivery to
+     * the remote side.
+     *
+     * Important: Before calling this, the `ready` Promise must be awaited.
+     *
+     * @param message The message to be sent.
+     * @throws Error in case the data channel is currently paused.
+     */
+    public write(message: Uint8Array): void {
+        // Throw if paused
+        if (!this._ready.done) {
+            throw new Error('Unable to write, data channel is paused!');
+        }
+
+        // Try sending
+        // Note: Technically we should be able to catch a TypeError in case the
+        //       underlying buffer is full. However, there are other reasons
+        //       that can result in a TypeError and no browser has implemented
+        //       this properly so far. Thus, we use a well-tested high water
+        //       mark instead and try to never fill the buffer completely.
+        this.dc.send(message);
+
+        // Pause once high water mark has been reached
+        if (this.dc.bufferedAmount >= this.highWaterMark) {
+            this._ready = new Future();
+            this.log.debug(`${this.dc.label} paused (buffered=${this.dc.bufferedAmount})`);
+        }
+    }
+}
+
+/**
+ * A flow-controlled (sender side) data channel that allows to queue an
+ * infinite amount of messages.
+ *
+ * While this cancels the effect of the flow control, it prevents the data
+ * channel's underlying buffer from becoming saturated by queueing all messages
+ * in application space.
+ */
+export class UnboundedFlowControlledDataChannel extends FlowControlledDataChannel {
+    private queue: Promise<void> = this.ready;
+
+    /**
+     * Write a message to the data channel's internal or application buffer for
+     * delivery to the remote side.
+     *
+     * @param message The message to be sent.
+     */
+    public write(message: Uint8Array) {
+        // Wait until ready, then write
+        // Note: This very simple technique allows for ordered message
+        //       queueing by using the event loop.
+        this.queue = this.queue.then(() => this.writeWhenReady(message));
+    }
+
+    private async writeWhenReady(message: Uint8Array): Promise<void> {
+        await this.ready;
+        super.write(message);
+    }
+}

+ 79 - 9
src/services/peerconnection.ts

@@ -19,6 +19,7 @@ import TaskConnectionState = threema.TaskConnectionState;
 import {Logger} from 'ts-log';
 
 import {ConfidentialIceCandidate} from '../helpers/confidential';
+import {UnboundedFlowControlledDataChannel} from '../helpers/data_channel';
 import {LogService} from './log';
 import {TimeoutService} from './timeout';
 
@@ -34,25 +35,38 @@ export class PeerConnectionHelper {
     private readonly $rootScope: ng.IRootScopeService;
 
     // Custom services
+    private readonly config: threema.Config;
+    private readonly logService: LogService;
     private readonly timeoutService: TimeoutService;
 
-        // WebRTC
-    private readonly pc: RTCPeerConnection;
+    // WebRTC
+    public readonly pc: RTCPeerConnection;
     private readonly webrtcTask: saltyrtc.tasks.webrtc.WebRTCTask;
     private connectionFailedTimer: ng.IPromise<void> | null = null;
 
+    // Handed over signalling channel
+    private sdc: UnboundedFlowControlledDataChannel | null = null;
+
     // Calculated connection state
     public connectionState: TaskConnectionState = TaskConnectionState.New;
     public onConnectionStateChange: (state: TaskConnectionState) => void = null;
 
-    constructor($q: ng.IQService, $rootScope: ng.IRootScopeService,
-                logService: LogService, timeoutService: TimeoutService,
-                webrtcTask: saltyrtc.tasks.webrtc.WebRTCTask, iceServers: RTCIceServer[]) {
+    constructor(
+        $q: ng.IQService,
+        $rootScope: ng.IRootScopeService,
+        config: threema.Config,
+        logService: LogService,
+        timeoutService: TimeoutService,
+        webrtcTask: saltyrtc.tasks.webrtc.WebRTCTask,
+        iceServers: RTCIceServer[],
+    ) {
         this.log = logService.getLogger('PeerConnection', 'color: #fff; background-color: #3333ff');
         this.log.info('Initialize WebRTC PeerConnection');
         this.log.debug('ICE servers used:', [].concat(...iceServers.map((c) => c.urls)));
         this.$q = $q;
         this.$rootScope = $rootScope;
+        this.config = config;
+        this.logService = logService;
 
         this.timeoutService = timeoutService;
         this.webrtcTask = webrtcTask;
@@ -191,12 +205,68 @@ export class PeerConnectionHelper {
     }
 
     /**
-     * Create a new secure data channel.
+     * Initiate the handover process.
      */
-    public createSecureDataChannel(label: string): saltyrtc.tasks.webrtc.SecureDataChannel {
-        const dc: RTCDataChannel = this.pc.createDataChannel(label);
+    public handover(): void {
+        if (this.sdc !== null) {
+            throw new Error('Handover already inintiated');
+        }
+
+        // Get transport link
+        const link: saltyrtc.tasks.webrtc.SignalingTransportLink = this.webrtcTask.getTransportLink();
+
+        // Create data channel
+        const dc = this.pc.createDataChannel(link.label, {
+            id: link.id,
+            negotiated: true,
+            ordered: true,
+            protocol: link.protocol,
+        });
         dc.binaryType = 'arraybuffer';
-        return this.webrtcTask.wrapDataChannel(dc);
+
+        // Wrap as an unbounded, flow-controlled data channel
+        this.sdc = new UnboundedFlowControlledDataChannel(dc, this.logService, this.config.TRANSPORT_LOG_LEVEL);
+
+        // Create transport handler
+        const self = this;
+        const handler = {
+            get maxMessageSize(): number {
+                return self.pc.sctp.maxMessageSize;
+            },
+            close(): void {
+                self.log.debug(`Signalling data channel close request`);
+                dc.close();
+            },
+            send(message: Uint8Array): void {
+                self.log.debug(`Signalling data channel outgoing signaling message of ` +
+                    `length ${message.byteLength}`);
+                self.sdc.write(message);
+            },
+        };
+
+        // Bind events
+        dc.onopen = () => {
+            this.log.info(`Signalling data channel open`);
+
+            // Rebind close event
+            dc.onclose = () => {
+                this.log.info(`Signalling data channel closed`);
+                link.closed();
+            };
+
+            // Initiate handover
+            this.webrtcTask.handover(handler);
+        };
+        dc.onclose = () => {
+            this.log.error(`Signalling data channel closed`);
+        };
+        dc.onerror = (event) => {
+            this.log.error(`Signalling data channel error:`, event);
+        };
+        dc.onmessage = (event) => {
+            this.log.debug(`Signalling data channel incoming message of length ${event.data.byteLength}`);
+            link.receive(new Uint8Array(event.data));
+        };
     }
 
     /**

+ 120 - 60
src/services/webclient.ts

@@ -24,12 +24,23 @@ import {Logger} from 'ts-log';
 
 import * as msgpack from 'msgpack-lite';
 import {
-    arraysAreEqual, base64ToU8a, bufferToUrl, copyDeepOrReference, hasFeature, hasValue, hexToU8a,
-    msgpackVisualizer, randomString, stringToUtf8a, u8aToHex,
+    arraysAreEqual,
+    base64ToU8a,
+    bufferToUrl,
+    copyDeepOrReference,
+    hasFeature,
+    hasValue,
+    hexToU8a,
+    msgpackVisualizer,
+    randomString,
+    stringToUtf8a,
+    u8aToHex,
 } from '../helpers';
 import {
-    isContactReceiver, isDistributionListReceiver,
-    isGroupReceiver, isValidReceiverType,
+    isContactReceiver,
+    isDistributionListReceiver,
+    isGroupReceiver,
+    isValidReceiverType,
 } from '../typeguards';
 import {BatteryStatusService} from './battery';
 import {BrowserService} from './browser';
@@ -49,6 +60,7 @@ import {VersionService} from './version';
 
 import {TimeoutError} from '../exceptions';
 import {ConfidentialWireMessage} from '../helpers/confidential';
+import {UnboundedFlowControlledDataChannel} from '../helpers/data_channel';
 import {DeviceUnreachableController} from '../partials/messenger';
 import {ChunkCache} from '../protocol/cache';
 import {SequenceNumber} from '../protocol/sequence_number';
@@ -82,13 +94,14 @@ const fakeConnectionId = Uint8Array.from([
  */
 export class WebClientService {
     public static readonly MAX_CONNECT_ATTEMPTS = 3;
-    private static CHUNK_SIZE = 64 * 1024;
+    private static DATA_CHANNEL_MAX_CHUNK_SIZE = 256 * 1024;
+    private static RELAYED_DATA_CHUNK_SIZE = 64 * 1024;
     private static SEQUENCE_NUMBER_MIN = 0;
     private static SEQUENCE_NUMBER_MAX = (2 ** 32) - 1;
     private static CHUNK_CACHE_SIZE_MAX = 2 * 1024 * 1024;
     private static AVATAR_LOW_MAX_SIZE = 48;
     private static MAX_TEXT_LENGTH = 3500;
-    private static MAX_FILE_SIZE_WEBRTC = 15 * 1024 * 1024;
+    private static MAX_FILE_SIZE_WEBRTC_TASK_V0 = 15 * 1024 * 1024;
     private static CONNECTION_ID_NONCE = stringToUtf8a('connectionidconnectionid');
 
     private static TYPE_REQUEST = 'request';
@@ -203,9 +216,11 @@ export class WebClientService {
     private saltyRtcHost: string = null;
     public salty: saltyrtc.SaltyRTC = null;
     private connectionInfoFuture: Future<ConnectionInfo> = null;
-    private webrtcTask: saltyrtc.tasks.webrtc.WebRTCTask = null;
     private relayedDataTask: saltyrtc.tasks.relayed_data.RelayedDataTask = null;
-    private secureDataChannel: saltyrtc.tasks.webrtc.SecureDataChannel = null;
+    private secureDataChannel: UnboundedFlowControlledDataChannel = null;
+    private secureDataChannelCrypto: saltyrtc.tasks.webrtc.DataChannelCryptoContext = null;
+    private secureDataChannelChunkLength: number = null;
+    private secureDataChannelMessageId: number = 0;
     public chosenTask: threema.ChosenTask = threema.ChosenTask.None;
     private outgoingMessageSequenceNumber: SequenceNumber;
     private previousConnectionId: Uint8Array = null;
@@ -219,7 +234,7 @@ export class WebClientService {
     private pendingAckRequest: number | null = null;
 
     // Message chunking
-    private unchunker: chunkedDc.Unchunker = null;
+    private unchunker: chunkedDc.UnreliableUnorderedUnchunker = null;
 
     // Messenger data
     public messages: threema.Container.Messages;
@@ -481,12 +496,28 @@ export class WebClientService {
         // Create new handshake future
         this.connectionInfoFuture = new Future();
 
-        // Create WebRTC task instance
-        const maxPacketSize = this.browserService.getBrowser().isFirefox(false) ? 16384 : 65536;
-        this.webrtcTask = new saltyrtcTaskWebrtc.WebRTCTask(true, maxPacketSize, this.config.SALTYRTC_LOG_LEVEL);
+        // Create tasks
+        const tasks: saltyrtc.Task[] = [];
+
+        // Create WebRTC task instance (if supported)
+        if (this.browserService.supportsWebrtcTask()) {
+            // TODO: Remove legacy v0 after a transitional period
+            tasks.push(new saltyrtcTaskWebrtc.WebRTCTaskBuilder()
+                .withLoggingLevel(this.config.SALTYRTC_LOG_LEVEL)
+                .withVersion('v1')
+                .withHandover(true)
+                .build());
+            tasks.push(new saltyrtcTaskWebrtc.WebRTCTaskBuilder()
+                .withLoggingLevel(this.config.SALTYRTC_LOG_LEVEL)
+                .withVersion('v0')
+                .withHandover(true)
+                .withMaxChunkLength(this.browserService.getBrowser().isFirefox(false) ? 16384 : 65536)
+                .build());
+        }
 
         // Create Relayed Data task instance
         this.relayedDataTask = new saltyrtcTaskRelayedData.RelayedDataTask(this.config.SALTYRTC_LOG_LEVEL === 'debug');
+        tasks.push(this.relayedDataTask);
 
         // Create new keystore if necessary
         if (!keyStore) {
@@ -504,14 +535,6 @@ export class WebClientService {
                 + this.config.SALTYRTC_HOST_SUFFIX;
         }
 
-        // Determine SaltyRTC tasks
-        let tasks;
-        if (this.browserService.supportsWebrtcTask()) {
-            tasks = [this.webrtcTask, this.relayedDataTask];
-        } else {
-            tasks = [this.relayedDataTask];
-        }
-
         // Create SaltyRTC client
         let builder = new saltyrtcClient.SaltyRTCBuilder()
             .connectTo(this.saltyRtcHost, this.config.SALTYRTC_PORT)
@@ -570,11 +593,8 @@ export class WebClientService {
 
         // Wait for handover to be finished
         this.salty.on('handover', () => {
-            // Ignore handovers requested by non-WebRTC tasks
-            if (this.chosenTask === threema.ChosenTask.WebRTC) {
-                this.arpLog.debug('Handover done');
-                this.onHandover(resumeSession);
-            }
+            this.arpLog.debug('Handover done');
+            this.onHandover(resumeSession);
         });
 
         // Handle SaltyRTC errors
@@ -768,7 +788,7 @@ export class WebClientService {
             this.outgoingMessageSequenceNumber = new SequenceNumber(
                 0, WebClientService.SEQUENCE_NUMBER_MIN, WebClientService.SEQUENCE_NUMBER_MAX);
         }
-        this.unchunker = new chunkedDc.Unchunker();
+        this.unchunker = new chunkedDc.UnreliableUnorderedUnchunker();
         this.unchunker.onMessage = this.handleIncomingMessageBytes.bind(this);
 
         // Discard previous connection instances
@@ -871,18 +891,19 @@ export class WebClientService {
                 this.skipIceDs();
             }
 
+            // Create peer connection
             this.pcHelper = new PeerConnectionHelper(
                 this.$q, this.$rootScope,
-                this.logService, this.timeoutService,
-                this.webrtcTask, this.config.ICE_SERVERS);
+                this.config, this.logService, this.timeoutService,
+                task as saltyrtc.tasks.webrtc.WebRTCTask, this.config.ICE_SERVERS);
 
             // On state changes in the PeerConnectionHelper class, let state service know about it
             this.pcHelper.onConnectionStateChange = (state: threema.TaskConnectionState) => {
                 this.stateService.updateTaskConnectionState(state);
             };
 
-            // Initiate handover
-            this.webrtcTask.handover(this.pcHelper.peerConnection);
+            // Initiate handover process
+            this.pcHelper.handover();
 
         // Otherwise, no handover is necessary.
         } else {
@@ -1031,35 +1052,66 @@ export class WebClientService {
 
         // Derive connection ID
         // Note: We need to make sure this is done before any ARP messages can be received
-        const box = this.salty.encryptForPeer(new Uint8Array(0), WebClientService.CONNECTION_ID_NONCE);
+        const connectionIdBox = this.salty.encryptForPeer(new Uint8Array(0), WebClientService.CONNECTION_ID_NONCE);
         // Note: We explicitly copy the data here to be able to use the underlying buffer directly
-        this.currentConnectionId = new Uint8Array(box.data);
+        this.currentConnectionId = new Uint8Array(connectionIdBox.data);
 
         // If the WebRTC task was chosen, initialize the data channel
         if (this.chosenTask === threema.ChosenTask.WebRTC) {
-            // Create secure data channel
-            this.arpLog.debug('Create SecureDataChannel "' + WebClientService.DC_LABEL + '"...');
-            this.secureDataChannel = this.pcHelper.createSecureDataChannel(WebClientService.DC_LABEL);
-            this.secureDataChannel.onopen = () => {
-                this.arpLog.debug('SecureDataChannel open');
+            const task = this.salty.getTask() as saltyrtc.tasks.webrtc.WebRTCTask;
+
+            // Create data channel
+            this.arpLog.debug(`Creating data channel ${WebClientService.DC_LABEL}`);
+            const dc = this.pcHelper.pc.createDataChannel(WebClientService.DC_LABEL);
+            dc.binaryType = 'arraybuffer';
+
+            // Wrap as unbounded, flow-controlled data channel
+            this.secureDataChannel = new UnboundedFlowControlledDataChannel(
+                dc, this.logService, this.config.TRANSPORT_LOG_LEVEL);
+
+            // Create crypto context
+            // Note: We need to apply encrypt-then-chunk for backwards
+            //       compatibility reasons.
+            this.secureDataChannelCrypto = task.createCryptoContext(dc.id);
+
+            // Create unchunker
+            // Note: We need to use an unreliable unordered unchunker for backwards
+            //       compatibility reasons.
+            const unchunker = new chunkedDc.UnreliableUnorderedUnchunker();
+
+            // Bind events
+            dc.onopen = () => {
+                this.arpLog.info(`Data channel ${dc.label} open`);
+
+                // Determine chunk length
+                this.secureDataChannelChunkLength = Math.min(
+                    WebClientService.DATA_CHANNEL_MAX_CHUNK_SIZE, this.pcHelper.pc.sctp.maxMessageSize);
+                this.arpLog.debug(`Using chunk length: ${this.secureDataChannelChunkLength} for data channel` +
+                    dc.label);
+
+                // Connection established
                 this.onConnectionEstablished(resumeSession).catch((error) => {
                     this.arpLog.error('Error during handshake:', error);
                 });
             };
-
-            // Handle incoming messages
-            this.secureDataChannel.onmessage = (ev: MessageEvent) => {
-                const bytes = new Uint8Array(ev.data);
-                this.handleIncomingMessageBytes(bytes);
+            dc.onclose = () => {
+                this.arpLog.error(`Data channel ${dc.label} closed prematurely`);
+                this.failSession();
             };
-            this.secureDataChannel.onbufferedamountlow = (ev: Event) => {
-                this.arpLog.debug('Secure data channel: Buffered amount low');
+            dc.onerror = (event) => {
+                this.arpLog.error(`Data channel ${dc.label} error:`, event);
+                this.failSession();
             };
-            this.secureDataChannel.onerror = (e: ErrorEvent) => {
-                this.arpLog.warn('Secure data channel: Error:', e.message);
+            dc.onmessage = (event) => {
+                this.arpLogV.debug(`Data channel ${dc.label} incoming chunk of length ${event.data.byteLength}`);
+                unchunker.add(new Uint8Array(event.data));
             };
-            this.secureDataChannel.onclose = (ev: Event) => {
-                this.arpLog.warn('Secure data channel: Closed');
+            // noinspection JSUndefinedPropertyAssignment
+            unchunker.onMessage = (array) => {
+                const box = saltyrtcClient.Box.fromUint8Array(
+                    array, saltyrtcTaskWebrtc.DataChannelCryptoContext.NONCE_LENGTH);
+                const message = this.secureDataChannelCrypto.decrypt(box);
+                this.handleIncomingMessageBytes(message);
             };
 
             // Mark as handed over
@@ -1324,13 +1376,13 @@ export class WebClientService {
 
         // Close data channel
         if (this.secureDataChannel !== null) {
-            this.arpLog.debug('Closing secure datachannel');
-            this.secureDataChannel.onopen = null;
-            this.secureDataChannel.onmessage = null;
-            this.secureDataChannel.onbufferedamountlow = null;
-            this.secureDataChannel.onerror = null;
-            this.secureDataChannel.onclose = null;
-            this.secureDataChannel.close();
+            this.arpLog.debug('Closing data channel');
+            this.secureDataChannel.dc.onopen = null;
+            this.secureDataChannel.dc.onmessage = null;
+            this.secureDataChannel.dc.onbufferedamountlow = null;
+            this.secureDataChannel.dc.onerror = null;
+            this.secureDataChannel.dc.onclose = null;
+            this.secureDataChannel.dc.close();
         }
 
         // Close SaltyRTC connection
@@ -1743,7 +1795,8 @@ export class WebClientService {
 
                 // Validate max file size
                 if (this.chosenTask === threema.ChosenTask.WebRTC) {
-                    if (fileData.size > WebClientService.MAX_FILE_SIZE_WEBRTC) {
+                    const task = this.salty.getTask() as saltyrtc.tasks.webrtc.WebRTCTask;
+                    if (task.version === 'v0' && fileData.size > WebClientService.MAX_FILE_SIZE_WEBRTC_TASK_V0) {
                         throw this.$translate.instant('error.FILE_TOO_LARGE_WEB');
                     }
                 } else {
@@ -3954,7 +4007,14 @@ export class WebClientService {
                     if (this.config.MSGPACK_LOG_TRACE) {
                         this.msgpackLog.debug('Outgoing message payload: ' + msgpackVisualizer(bytes));
                     }
-                    this.secureDataChannel.send(bytes);
+                    const box = this.secureDataChannelCrypto.encrypt(bytes);
+                    const chunker = new chunkedDc.UnreliableUnorderedChunker(
+                        this.secureDataChannelMessageId++, box.toUint8Array(), this.secureDataChannelChunkLength);
+                    for (const chunk of chunker) {
+                        this.arpLogV.debug(`Data channel ${this.secureDataChannel.dc.label} outgoing ` +
+                            `chunk of length ${chunk.byteLength}`);
+                        this.secureDataChannel.write(chunk);
+                    }
                 }
                 break;
             case threema.ChosenTask.RelayedData:
@@ -3971,7 +4031,8 @@ export class WebClientService {
 
                     // Increment the outgoing message sequence number
                     const messageSequenceNumber = this.outgoingMessageSequenceNumber.increment();
-                    const chunker = new chunkedDc.Chunker(messageSequenceNumber, bytes, WebClientService.CHUNK_SIZE);
+                    const chunker = new chunkedDc.UnreliableUnorderedChunker(
+                        messageSequenceNumber, bytes, WebClientService.RELAYED_DATA_CHUNK_SIZE);
                     for (const chunk of chunker) {
                         // Send (and cache)
                         this.sendChunk(chunk, retransmit, canQueue, true);
@@ -4075,7 +4136,7 @@ export class WebClientService {
         // Warning: Nothing should be called after the unchunker has processed
         //          the chunk since the message event is synchronous and can
         //          result in a call to .stop!
-        this.unchunker.add(chunk.buffer);
+        this.unchunker.add(chunk);
     }
 
     /**
@@ -4089,7 +4150,6 @@ export class WebClientService {
 
         // Decode bytes
         const message: threema.WireMessage = this.msgpackDecode(bytes);
-
         return this.handleIncomingMessage(message);
     }
 

+ 1 - 0
src/threema.d.ts

@@ -687,6 +687,7 @@ declare namespace threema {
         ARP_LOG_LEVEL: LogLevel;
         ARP_LOG_TRACE: boolean;
         MSGPACK_LOG_TRACE: boolean;
+        TRANSPORT_LOG_LEVEL: LogLevel;
     }
 
     interface InitialConversationData {