MessageQueue.m 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. // _____ _
  2. // |_ _| |_ _ _ ___ ___ _ __ __ _
  3. // | | | ' \| '_/ -_) -_) ' \/ _` |_
  4. // |_| |_||_|_| \___\___|_|_|_\__,_(_)
  5. //
  6. // Threema iOS Client
  7. // Copyright (c) 2012-2020 Threema GmbH
  8. //
  9. // This program is free software: you can redistribute it and/or modify
  10. // it under the terms of the GNU Affero General Public License, version 3,
  11. // as published by the Free Software Foundation.
  12. //
  13. // This program is distributed in the hope that it will be useful,
  14. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. // GNU Affero General Public License for more details.
  17. //
  18. // You should have received a copy of the GNU Affero General Public License
  19. // along with this program. If not, see <https://www.gnu.org/licenses/>.
  20. #import "AbstractMessage.h"
  21. #import "BoxTextMessage.h"
  22. #import "GroupTextMessage.h"
  23. #import "MessageQueue.h"
  24. #import "BoxedMessage.h"
  25. #import "ServerConnector.h"
  26. #import "ProtocolDefines.h"
  27. #import "UserSettings.h"
  28. #import "MyIdentityStore.h"
  29. #import "ValidationLogger.h"
  30. #import "MessageSender.h"
  31. #import "DocumentManager.h"
  32. #import "BackgroundTaskManagerProxy.h"
  33. #import "NSString+Hex.h"
  34. #import "AppGroup.h"
  35. #import "GroupLeaveMessage.h"
  36. #import "GroupRequestSyncMessage.h"
  37. #ifdef DEBUG
  38. static const DDLogLevel ddLogLevel = DDLogLevelVerbose;
  39. #else
  40. static const DDLogLevel ddLogLevel = DDLogLevelWarning;
  41. #endif
  42. @implementation MessageQueue {
  43. NSMutableArray *queue;
  44. dispatch_queue_t dispatchQueue;
  45. }
  46. + (MessageQueue*)sharedMessageQueue {
  47. static MessageQueue *instance;
  48. @synchronized (self) {
  49. if (!instance)
  50. instance = [[MessageQueue alloc] init];
  51. }
  52. return instance;
  53. }
  54. - (id)init
  55. {
  56. self = [super init];
  57. if (self) {
  58. queue = [NSMutableArray array];
  59. dispatchQueue = dispatch_queue_create("ch.threema.MessageQueue", NULL);
  60. /* register for connection status updates from ServerConnector */
  61. [[ServerConnector sharedServerConnector] addObserver:self forKeyPath:@"connectionState" options:0 context:nil];
  62. /* read from file now */
  63. [self loadFromFile];
  64. }
  65. return self;
  66. }
  67. - (void)enqueue:(AbstractMessage *)message {
  68. dispatch_async(dispatchQueue, ^{
  69. [self _enqueue:message async:YES];
  70. });
  71. }
  72. - (void)enqueueWait:(AbstractMessage *)message {
  73. dispatch_sync(dispatchQueue, ^{
  74. [self _enqueue:message async:NO];
  75. });
  76. }
  77. - (void)enqueueWaitForQuickReply:(AbstractMessage *)message {
  78. [self _enqueue:message async:NO];
  79. }
  80. - (void)_enqueue:(AbstractMessage*)message async:(BOOL)async {
  81. DDLogVerbose(@"Enqueue message %@", message);
  82. if (message == nil)
  83. return;
  84. if ([message.toIdentity isEqualToString:[MyIdentityStore sharedMyIdentityStore].identity]) {
  85. DDLogWarn(@"Drop message to myself");
  86. return;
  87. }
  88. if (message.isGroup == true && [message isKindOfClass:[AbstractGroupMessage class]]) {
  89. AbstractGroupMessage *groupMsg = (AbstractGroupMessage *)message;
  90. if ([groupMsg.groupCreator hasPrefix:@"*"] && [groupMsg.groupCreator isEqualToString:message.toIdentity] && ![message isKindOfClass:[GroupLeaveMessage class]] && ![message isKindOfClass:[GroupRequestSyncMessage class]]) {
  91. GroupProxy *proxy = [GroupProxy groupProxyForMessage:(AbstractGroupMessage *) message];
  92. if (proxy != nil) {
  93. if (![proxy.conversation.groupName hasPrefix:@"☁"]) {
  94. DDLogWarn(@"Drop message to gateway id without store-incoming-message");
  95. [MessageSender markMessageAsSent:message.messageId];
  96. return;
  97. }
  98. }
  99. }
  100. }
  101. BoxedMessage *boxmsg = [message makeBox];
  102. if (boxmsg == nil)
  103. return;
  104. /* validation logging */
  105. if ([message isKindOfClass:[BoxTextMessage class]]) {
  106. [[ValidationLogger sharedValidationLogger] logBoxedMessage:boxmsg isIncoming:NO description:nil];
  107. } else if ([message isKindOfClass:[GroupTextMessage class]]) {
  108. [[ValidationLogger sharedValidationLogger] logBoxedMessage:boxmsg isIncoming:NO description:nil];
  109. } else {
  110. [[ValidationLogger sharedValidationLogger] logSimpleMessage:message isIncoming:NO description:nil];
  111. }
  112. if ([ServerConnector sharedServerConnector].connectionState == ConnectionStateLoggedIn) {
  113. DDLogVerbose(@"Currently connected - sending message now");
  114. /* Only add to queue if we want an ACK for this message */
  115. if (!(boxmsg.flags & MESSAGE_FLAG_NOACK)) {
  116. [queue addObject:boxmsg];
  117. // do not use BackgroundTaskManagerProxy for share extension, because it's not available
  118. if ([AppGroup getCurrentType] == AppGroupTypeShareExtension) {
  119. [[ServerConnector sharedServerConnector] sendMessage:boxmsg];
  120. } else {
  121. [BackgroundTaskManagerProxy newBackgroundTaskWithKey:kAppSendingBackgroundTask timeout:10 completionHandler:^{
  122. if (async == true) {
  123. dispatch_async(dispatchQueue, ^{
  124. [[ServerConnector sharedServerConnector] sendMessage:boxmsg];
  125. });
  126. } else {
  127. dispatch_sync(dispatchQueue, ^{
  128. [[ServerConnector sharedServerConnector] sendMessage:boxmsg];
  129. });
  130. }
  131. }];
  132. }
  133. } else {
  134. [[ServerConnector sharedServerConnector] sendMessage:boxmsg];
  135. }
  136. } else {
  137. if (boxmsg.flags & MESSAGE_FLAG_IMMEDIATE) {
  138. DDLogVerbose(@"Discarding immediate message because not connected");
  139. } else {
  140. [queue addObject:boxmsg];
  141. }
  142. }
  143. }
  144. - (void)processAck:(NSData*)messageId {
  145. /* check our queue for a message with this ID, and remove it */
  146. dispatch_async(dispatchQueue, ^{
  147. DDLogVerbose(@"Process ACK for message ID %@", messageId);
  148. for (BoxedMessage *message in queue) {
  149. if ([message.messageId isEqualToData:messageId]) {
  150. [queue removeObject:message];
  151. [MessageSender markMessageAsSent:messageId];
  152. if (queue.count == 0) {
  153. [BackgroundTaskManagerProxy cancelBackgroundTaskWithKey:kAppSendingBackgroundTask];
  154. }
  155. return;
  156. }
  157. }
  158. DDLogWarn(@"Message ID %@ not found in queue", messageId);
  159. });
  160. }
  161. - (void)processQueue {
  162. DDLogInfo(@"Processing queue");
  163. for (BoxedMessage *message in queue) {
  164. [[ServerConnector sharedServerConnector] sendMessage:message];
  165. }
  166. }
  167. - (void)flush {
  168. DDLogInfo(@"Flushing queue");
  169. dispatch_async(dispatchQueue, ^{
  170. [queue removeAllObjects];
  171. });
  172. }
  173. - (void)loadFromFile {
  174. dispatch_sync(dispatchQueue, ^{
  175. NSString *savePath = self.savePath;
  176. if ([[NSFileManager defaultManager] fileExistsAtPath:savePath]) {
  177. DDLogInfo(@"Loading message queue from file");
  178. @try {
  179. NSArray *readQueue = [NSKeyedUnarchiver unarchiveObjectWithFile:self.savePath];
  180. if (readQueue != nil) {
  181. NSString *myId = [MyIdentityStore sharedMyIdentityStore].identity;
  182. int nread = 0;
  183. for (BoxedMessage *msg in readQueue) {
  184. /* ensure this has the same from identity as we're currently using,
  185. and ignore VoIP messages as they're likely to be old/stale anyway */
  186. if ([myId isEqualToString:msg.fromIdentity] && !(msg.flags & MESSAGE_FLAG_VOIP)) {
  187. [queue addObject:msg];
  188. nread++;
  189. }
  190. }
  191. DDLogInfo(@"Read %d messages from queue file", nread);
  192. }
  193. }
  194. @catch (NSException *e) {
  195. /* file corrupted or whatever */
  196. DDLogError(@"Loading message queue failed: %@", e);
  197. }
  198. /* Delete queue file now. If something is bad with it that makes us crash, we
  199. won't get stuck in a loop and the user will be able to relaunch. */
  200. [[NSFileManager defaultManager] removeItemAtPath:savePath error:nil];
  201. [self processQueue];
  202. }
  203. });
  204. }
  205. - (void)save {
  206. dispatch_async(dispatchQueue, ^{
  207. NSString *savePath = self.savePath;
  208. [[NSFileManager defaultManager] removeItemAtPath:savePath error:nil];
  209. if (queue.count > 0) {
  210. DDLogInfo(@"Writing message queue to file (%lu entries)", (unsigned long)queue.count);
  211. [NSKeyedArchiver archiveRootObject:queue toFile:self.savePath];
  212. }
  213. });
  214. }
  215. - (NSString*)savePath {
  216. NSString *documentsDir = [DocumentManager documentsDirectory].path;
  217. return [documentsDir stringByAppendingPathComponent:@"MessageQueue"];
  218. }
  219. - (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context {
  220. if (object == [ServerConnector sharedServerConnector] && [keyPath isEqualToString:@"connectionState"]) {
  221. if ([ServerConnector sharedServerConnector].connectionState == ConnectionStateLoggedIn) {
  222. /* connection is now up - process queue */
  223. dispatch_async(dispatchQueue, ^{
  224. [self processQueue];
  225. });
  226. }
  227. }
  228. }
  229. @end