|
@@ -45,7 +45,6 @@ import {SequenceNumber} from '../protocol/sequence_number';
|
|
|
import InitializationStep = threema.InitializationStep;
|
|
|
import ContactReceiverFeature = threema.ContactReceiverFeature;
|
|
|
import DisconnectReason = threema.DisconnectReason;
|
|
|
-import ConnectionBuildupState = threema.ConnectionBuildupState;
|
|
|
|
|
|
/**
|
|
|
* Payload of a connectionInfo message.
|
|
@@ -235,7 +234,7 @@ export class WebClientService {
|
|
|
};
|
|
|
|
|
|
// pending rtc promises
|
|
|
- private requestPromises: Map<string, threema.PromiseCallbacks> = new Map();
|
|
|
+ private requestPromises: Map<string, Future<any>> = new Map();
|
|
|
|
|
|
public static $inject = [
|
|
|
'$log', '$rootScope', '$q', '$state', '$window', '$translate', '$filter', '$timeout', '$mdDialog',
|
|
@@ -340,13 +339,24 @@ export class WebClientService {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Return whether there are chunks cached from a previous connection that
|
|
|
+ * Return the amount of chunks cached from a previous connection that
|
|
|
* require immediate sending.
|
|
|
*/
|
|
|
- get immediateChunksPending(): boolean {
|
|
|
+ get immediateChunksPending(): number {
|
|
|
// TODO: Apply the chunk **push** blacklist instead of the chunk cache
|
|
|
// blacklist!
|
|
|
- return this.previousChunkCache !== null && this.previousChunkCache.chunks.length > 0;
|
|
|
+ if (this.previousChunkCache === null) {
|
|
|
+ return 0;
|
|
|
+ } else {
|
|
|
+ return this.previousChunkCache.chunks.length;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return the amount of pending requests.
|
|
|
+ */
|
|
|
+ get pendingRequests(): number {
|
|
|
+ return this.requestPromises.size;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -374,10 +384,14 @@ export class WebClientService {
|
|
|
}): void {
|
|
|
let keyStore = flags.keyStore;
|
|
|
let resume = flags.resume;
|
|
|
+ this.$log.info(`Initializing (keyStore=${keyStore !== undefined ? 'yes' : 'no'}, peerTrustedKey=` +
|
|
|
+ `${flags.peerTrustedKey !== undefined ? 'yes' : 'no'}, resume=${resume})`);
|
|
|
|
|
|
- // Reset fields in case the session should explicitly not be resumed
|
|
|
+ // Reset fields, blob cache & pending requests in case the session
|
|
|
+ // should explicitly not be resumed
|
|
|
if (!resume) {
|
|
|
- this._resetFields();
|
|
|
+ this.clearCache();
|
|
|
+ this.requestPromises.clear();
|
|
|
}
|
|
|
|
|
|
// Only move the previous connection's instances if the previous
|
|
@@ -570,16 +584,19 @@ export class WebClientService {
|
|
|
this.$log.error('Connection error:', ev);
|
|
|
});
|
|
|
this.salty.on('connection-closed', (ev) => {
|
|
|
- this.$log.warn('Connection closed:', ev);
|
|
|
+ this.$log.info('Connection closed:', ev);
|
|
|
});
|
|
|
this.salty.on('no-shared-task', (ev) => {
|
|
|
this.$log.warn('No shared task found:', ev.data);
|
|
|
const offeredWebrtc = ev.data.offered.filter((t) => t.endsWith('webrtc.tasks.saltyrtc.org')).length > 0;
|
|
|
- if (!this.browserService.supportsWebrtcTask() && offeredWebrtc) {
|
|
|
- this.showWebrtcAndroidWarning();
|
|
|
- } else {
|
|
|
- this.failSession();
|
|
|
- }
|
|
|
+ this.$rootScope.$apply(() => {
|
|
|
+ if (!this.browserService.supportsWebrtcTask() && offeredWebrtc) {
|
|
|
+ this.failSession(false);
|
|
|
+ this.showWebrtcAndroidWarning();
|
|
|
+ } else {
|
|
|
+ this.failSession();
|
|
|
+ }
|
|
|
+ });
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -620,17 +637,30 @@ export class WebClientService {
|
|
|
* A dialog will be displayed to let the user know a protocol error
|
|
|
* happened.
|
|
|
*/
|
|
|
- private failSession() {
|
|
|
+ private failSession(showAlert = true) {
|
|
|
// Stop session
|
|
|
- this.stop({
|
|
|
- reason: DisconnectReason.SessionError,
|
|
|
- send: true,
|
|
|
- close: true,
|
|
|
- redirect: true,
|
|
|
- });
|
|
|
+ const stop = () => {
|
|
|
+ this.stop({
|
|
|
+ reason: DisconnectReason.SessionError,
|
|
|
+ send: true,
|
|
|
+ // TODO: Use welcome.error once we have it
|
|
|
+ close: 'welcome',
|
|
|
+ connectionBuildupState: 'closed',
|
|
|
+ });
|
|
|
+ if (showAlert) {
|
|
|
+ this.showAlert('connection.SESSION_ERROR');
|
|
|
+ }
|
|
|
+ };
|
|
|
|
|
|
- // Show an alert
|
|
|
- this.showAlert('connection.SESSION_ERROR');
|
|
|
+ // Note: Although this is considered an anti-pattern, we simply don't
|
|
|
+ // want a digest cycle in most of the network event functionality.
|
|
|
+ // Thus, it would be pointless 99% of the time to apply a digest
|
|
|
+ // cycle somewhere higher in the call stack.
|
|
|
+ if (!this.$rootScope.$$phase) {
|
|
|
+ this.$rootScope.$apply(() => stop());
|
|
|
+ } else {
|
|
|
+ stop();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -682,6 +712,9 @@ export class WebClientService {
|
|
|
}
|
|
|
this.$log.debug(`Chunk cache pruned, acknowledged: ${result.acknowledged}, left: ${result.left}, size: ` +
|
|
|
`${size} -> ${this.previousChunkCache.byteLength}`);
|
|
|
+ if (this.config.MSG_DEBUGGING) {
|
|
|
+ this.$log.debug(`Chunks that require acknowledgement: ${this.immediateChunksPending}`);
|
|
|
+ }
|
|
|
|
|
|
// Transfer the cache (filters chunks which should not be retransmitted)
|
|
|
const transferred = this.currentChunkCache.transfer(this.previousChunkCache.chunks);
|
|
@@ -699,7 +732,7 @@ export class WebClientService {
|
|
|
const chunks = this.currentChunkCache.chunks;
|
|
|
this.$log.debug(this.logTag, `Sending cached chunks: ${chunks.length}`);
|
|
|
for (const chunk of chunks) {
|
|
|
- this.sendChunk(chunk, true, false);
|
|
|
+ this.sendChunk(chunk, true, false, false);
|
|
|
}
|
|
|
|
|
|
// Resumed!
|
|
@@ -793,7 +826,7 @@ export class WebClientService {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // Not resuming?
|
|
|
+ // If we could not resume for whatever reason
|
|
|
const requiredInitializationSteps = [];
|
|
|
if (!resumeSession || !sessionWasResumed) {
|
|
|
// Note: We cannot reset the message sequence number here any more since
|
|
@@ -801,6 +834,9 @@ export class WebClientService {
|
|
|
this.discardSession({ resetMessageSequenceNumber: false });
|
|
|
this.$log.debug(this.logTag, 'Session discarded');
|
|
|
|
|
|
+ // Remove all pending promises
|
|
|
+ this.requestPromises.clear();
|
|
|
+
|
|
|
// Set required initialisation steps
|
|
|
requiredInitializationSteps.push(
|
|
|
InitializationStep.ClientInfo,
|
|
@@ -812,8 +848,10 @@ export class WebClientService {
|
|
|
this.$log.debug(this.logTag, 'Session resumed');
|
|
|
}
|
|
|
|
|
|
- // Redirect to the conversation overview in case resuming was enabled
|
|
|
- // but the session could not be resumed
|
|
|
+ // In case...
|
|
|
+ // - we wanted to resume, but
|
|
|
+ // - we could not resume, and
|
|
|
+ // - we had a previous connection
|
|
|
if (resumeSession && !sessionWasResumed && this.clientInfo !== null) {
|
|
|
this.$rootScope.$apply(() => {
|
|
|
// TODO: Remove this conditional once we have session
|
|
@@ -821,6 +859,8 @@ export class WebClientService {
|
|
|
if (this.chosenTask !== threema.ChosenTask.RelayedData) {
|
|
|
return;
|
|
|
}
|
|
|
+
|
|
|
+ // Redirect to the conversation overview
|
|
|
if (this.$state.includes('messenger')) {
|
|
|
this.$state.go('messenger.home');
|
|
|
}
|
|
@@ -838,8 +878,9 @@ export class WebClientService {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- // Request initial data if not resuming the session
|
|
|
+ // If we could not resume for whatever reason
|
|
|
if (!resumeSession || !sessionWasResumed) {
|
|
|
+ // Request initial data
|
|
|
this._requestInitialData();
|
|
|
}
|
|
|
|
|
@@ -1003,23 +1044,15 @@ export class WebClientService {
|
|
|
* @connectionBuildupState: The connection buildup state the state service
|
|
|
* will be reset to.
|
|
|
*/
|
|
|
- public stop(
|
|
|
- args: {
|
|
|
- reason: DisconnectReason,
|
|
|
- send: boolean,
|
|
|
- close: boolean,
|
|
|
- redirect: boolean,
|
|
|
- connectionBuildupState?: ConnectionBuildupState,
|
|
|
- },
|
|
|
- ): void {
|
|
|
- this.$log.info(this.logTag, 'Stopping');
|
|
|
- let close = args.close;
|
|
|
- let remove = false;
|
|
|
-
|
|
|
- // A redirect to the welcome page always implies a close
|
|
|
- if (args.redirect) {
|
|
|
- close = true;
|
|
|
+ public stop(args: threema.WebClientServiceStopArguments): void {
|
|
|
+ if (args.close === true) {
|
|
|
+ throw new Error('args.close has been set to "true" but requires a redirect state instead');
|
|
|
}
|
|
|
+ this.$log.info(this.logTag, `Stopping (reason=${args.reason}, send=${args.send}, close=${args.close}, ` +
|
|
|
+ 'connectionBuildupState=' +
|
|
|
+ `${args.connectionBuildupState !== undefined ? args.connectionBuildupState : 'n/a'})`);
|
|
|
+ let close = args.close !== false;
|
|
|
+ let remove = false;
|
|
|
|
|
|
// Session deleted: Force close and delete
|
|
|
if (args.reason === DisconnectReason.SessionDeleted) {
|
|
@@ -1039,7 +1072,7 @@ export class WebClientService {
|
|
|
|
|
|
// Stop ack timer
|
|
|
if (this.ackTimer !== null) {
|
|
|
- clearTimeout(this.ackTimer);
|
|
|
+ self.clearTimeout(this.ackTimer);
|
|
|
this.ackTimer = null;
|
|
|
this.$log.debug(this.logTag, 'Timer stopped');
|
|
|
}
|
|
@@ -1057,7 +1090,7 @@ export class WebClientService {
|
|
|
|
|
|
// Invalidate and clear caches
|
|
|
if (close) {
|
|
|
- this.clientInfo = null;
|
|
|
+ // Clear connection ids & caches
|
|
|
this.previousConnectionId = null;
|
|
|
this.currentConnectionId = null;
|
|
|
this.previousIncomingChunkSequenceNumber = null;
|
|
@@ -1065,7 +1098,20 @@ export class WebClientService {
|
|
|
0, WebClientService.SEQUENCE_NUMBER_MIN, WebClientService.SEQUENCE_NUMBER_MAX);
|
|
|
this.previousChunkCache = null;
|
|
|
this.currentChunkCache = null;
|
|
|
+
|
|
|
+ // Reset general client information
|
|
|
+ this.clientInfo = null;
|
|
|
+
|
|
|
+ // Clear fetched messages and the blob cache
|
|
|
+ this.clearCache();
|
|
|
+
|
|
|
+ // Remove all pending promises
|
|
|
+ this.requestPromises.clear();
|
|
|
+
|
|
|
+ // Reset the push service
|
|
|
this.pushService.reset();
|
|
|
+
|
|
|
+ // Closed!
|
|
|
this.$log.debug(this.logTag, 'Session closed (cannot be resumed)');
|
|
|
} else {
|
|
|
this.previousChunkCache = this.currentChunkCache;
|
|
@@ -1103,11 +1149,11 @@ export class WebClientService {
|
|
|
this.$log.debug(this.logTag, 'Peer connection was null');
|
|
|
}
|
|
|
|
|
|
- // Done, redirect now if requested
|
|
|
- if (args.redirect) {
|
|
|
- this.$timeout(() => {
|
|
|
- this.$state.go('welcome');
|
|
|
- }, 0);
|
|
|
+ // Done, redirect now if session closed
|
|
|
+ if (close) {
|
|
|
+ // Translate close flag
|
|
|
+ const state = args.close !== false ? args.close : 'welcome';
|
|
|
+ this.$state.go(state);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1383,9 +1429,7 @@ export class WebClientService {
|
|
|
*/
|
|
|
public requestBlob(msgId: string, receiver: threema.Receiver): Promise<threema.BlobInfo> {
|
|
|
const cached = this.blobCache.get(msgId + receiver.type);
|
|
|
-
|
|
|
if (cached !== undefined) {
|
|
|
-
|
|
|
this.$log.debug(this.logTag, 'Use cached blob');
|
|
|
return new Promise((resolve) => {
|
|
|
resolve(cached);
|
|
@@ -2097,6 +2141,9 @@ export class WebClientService {
|
|
|
}
|
|
|
this.$log.debug(`Chunk cache pruned, acknowledged: ${result.acknowledged}, left: ${result.left}, size: ` +
|
|
|
`${size} -> ${this.currentChunkCache.byteLength}`);
|
|
|
+ if (this.config.MSG_DEBUGGING) {
|
|
|
+ this.$log.debug(`Chunks that require acknowledgement: ${this.immediateChunksPending}`);
|
|
|
+ }
|
|
|
|
|
|
// Clear pending ack requests
|
|
|
if (this.pendingAckRequest !== null && sequenceNumber >= this.pendingAckRequest) {
|
|
@@ -2145,8 +2192,9 @@ export class WebClientService {
|
|
|
this.stop({
|
|
|
reason: reason,
|
|
|
send: false,
|
|
|
- close: true,
|
|
|
- redirect: true,
|
|
|
+ // TODO: Use welcome.{reason} once we have it
|
|
|
+ close: 'welcome',
|
|
|
+ connectionBuildupState: 'closed',
|
|
|
});
|
|
|
this.showAlert(alertMessage);
|
|
|
}
|
|
@@ -3171,24 +3219,21 @@ export class WebClientService {
|
|
|
message.args[WebClientService.ARGUMENT_TEMPORARY_ID] = promiseId;
|
|
|
}
|
|
|
|
|
|
- return new Promise(
|
|
|
- (resolve, reject) => {
|
|
|
- const p = {
|
|
|
- resolve: resolve,
|
|
|
- reject: reject,
|
|
|
- } as threema.PromiseCallbacks;
|
|
|
- this.requestPromises.set(promiseId, p);
|
|
|
-
|
|
|
- if (timeout !== null && timeout > 0) {
|
|
|
- this.$timeout(() => {
|
|
|
- p.reject('timeout');
|
|
|
- this.requestPromises.delete(promiseId);
|
|
|
- }, timeout);
|
|
|
- }
|
|
|
+ // Store promise
|
|
|
+ const promise = new Future();
|
|
|
+ this.requestPromises.set(promiseId, promise);
|
|
|
|
|
|
- this.send(message, retransmit);
|
|
|
- },
|
|
|
- );
|
|
|
+ // Schedule rejection (if timeout set)
|
|
|
+ if (timeout !== null && timeout > 0) {
|
|
|
+ this.$timeout(() => {
|
|
|
+ promise.reject('timeout');
|
|
|
+ this.requestPromises.delete(promiseId);
|
|
|
+ }, timeout);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Send request & return promise
|
|
|
+ this.send(message, retransmit);
|
|
|
+ return promise;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -3497,7 +3542,7 @@ export class WebClientService {
|
|
|
const chunker = new chunkedDc.Chunker(messageSequenceNumber, bytes, WebClientService.CHUNK_SIZE);
|
|
|
for (const chunk of chunker) {
|
|
|
// Send (and cache)
|
|
|
- this.sendChunk(chunk, retransmit, canQueue);
|
|
|
+ this.sendChunk(chunk, retransmit, canQueue, true);
|
|
|
}
|
|
|
|
|
|
// Check if we need to request an acknowledgement
|
|
@@ -3520,7 +3565,7 @@ export class WebClientService {
|
|
|
/**
|
|
|
* Send a chunk via the underlying transport.
|
|
|
*/
|
|
|
- private sendChunk(chunk: Uint8Array, retransmit: boolean, canQueue: boolean): void {
|
|
|
+ private sendChunk(chunk: Uint8Array, retransmit: boolean, canQueue: boolean, cache: boolean): void {
|
|
|
// TODO: Support for sending in chunks via data channels will be added later
|
|
|
if (this.chosenTask !== threema.ChosenTask.RelayedData) {
|
|
|
throw new Error(`Cannot send chunk, not supported by task: ${this.chosenTask}`);
|
|
@@ -3533,33 +3578,37 @@ export class WebClientService {
|
|
|
if (shouldQueue) {
|
|
|
chunkCache = this.previousChunkCache;
|
|
|
this.$log.debug(this.logTag, 'Currently not connected, queueing chunk');
|
|
|
- if (retransmit && this.pushService.isAvailable()) {
|
|
|
+ if (!this.pushService.isAvailable()) {
|
|
|
+ this.$log.warn(this.logTag, 'Push service not available, cannot wake up peer!');
|
|
|
+ retransmit = false;
|
|
|
+ }
|
|
|
+ if (retransmit) {
|
|
|
// TODO: Apply the chunk **push** blacklist instead of the
|
|
|
// retransmit flag!
|
|
|
this.sendPush();
|
|
|
- } else {
|
|
|
- this.$log.warn(this.logTag, 'Push service not available, cannot wake up peer!');
|
|
|
}
|
|
|
} else {
|
|
|
chunkCache = this.currentChunkCache;
|
|
|
}
|
|
|
|
|
|
// Add to chunk cache
|
|
|
- if (this.config.MSG_DEBUGGING) {
|
|
|
- this.$log.debug(`[Chunk] Caching chunk (retransmit=${retransmit}:`, chunk);
|
|
|
- }
|
|
|
- try {
|
|
|
- chunkCache.append(retransmit ? chunk : null);
|
|
|
- } catch (error) {
|
|
|
- this.$log.error(this.logTag, error);
|
|
|
- this.failSession();
|
|
|
- return;
|
|
|
+ if (cache) {
|
|
|
+ if (this.config.MSG_DEBUGGING) {
|
|
|
+ this.$log.debug(`[Chunk] Caching chunk (retransmit/push=${retransmit}:`, chunk);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ chunkCache.append(retransmit ? chunk : null);
|
|
|
+ } catch (error) {
|
|
|
+ this.$log.error(this.logTag, error);
|
|
|
+ this.failSession();
|
|
|
+ return;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Send if ready
|
|
|
if (!shouldQueue) {
|
|
|
if (this.config.MSG_DEBUGGING) {
|
|
|
- this.$log.debug('[Chunk] Sending chunk:', chunk);
|
|
|
+ this.$log.debug(`[Chunk] Sending chunk (retransmit/push=${retransmit}:`, chunk);
|
|
|
}
|
|
|
this.relayedDataTask.sendMessage(chunk.buffer);
|
|
|
}
|
|
@@ -3582,11 +3631,14 @@ export class WebClientService {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // Process chunk
|
|
|
- this.unchunker.add(chunk.buffer);
|
|
|
-
|
|
|
// Schedule the periodic ack timer
|
|
|
this.scheduleConnectionAck();
|
|
|
+
|
|
|
+ // Process chunk
|
|
|
+ // Warning: Nothing should be called after the unchunker has processed
|
|
|
+ // the chunk since the message event is synchronous and can
|
|
|
+ // result in a call to .stop!
|
|
|
+ this.unchunker.add(chunk.buffer);
|
|
|
}
|
|
|
|
|
|
/**
|