Unchunker.swift 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. /**
  2. * Copyright (c) 2018 Threema GmbH / SaltyRTC Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0, <see LICENSE-APACHE file> or
  5. * the MIT license <see LICENSE-MIT file>, at your option. This file may not be
  6. * copied, modified, or distributed except according to those terms.
  7. */
  8. import Foundation
  9. /// All errors that can occur inside the `Unchunker`.
  10. enum UnchunkerError: Error {
  11. /// Not all chunks for a message have arrived yet
  12. case messageNotYetComplete
  13. /// A chunk collector can only collect chunks belonging to the same message
  14. case inconsistentMessageId
  15. /// Chunk is smaller than the header length
  16. case chunkTooSmall
  17. }
  18. /// Delegate that will be called with the assembled message once all chunks arrived.
  19. protocol MessageCompleteDelegate: AnyObject {
  20. func messageComplete(message: Data)
  21. }
  22. /// A chunk.
  23. struct Chunk {
  24. let endOfMessage: Bool
  25. let id: UInt32
  26. let serial: UInt32
  27. let data: [UInt8]
  28. /// Create a new chunk.
  29. init(endOfMessage: Bool, id: UInt32, serial: UInt32, data: [UInt8]) {
  30. self.endOfMessage = endOfMessage
  31. self.id = id
  32. self.serial = serial
  33. self.data = data
  34. }
  35. /// Parse bytes into a chunk.
  36. /// Throws an `UnchunkerError` if the chunk is smaller than the header length.
  37. init(bytes: Data) throws {
  38. if bytes.count < Common.headerLength {
  39. throw UnchunkerError.chunkTooSmall
  40. }
  41. // Read header
  42. let options: UInt8 = bytes[0]
  43. self.endOfMessage = (options & 0x01) == 1
  44. self.id = (UInt32(bytes[1]) << 24) | (UInt32(bytes[2]) << 16) | (UInt32(bytes[3]) << 8) | UInt32(bytes[4])
  45. self.serial = (UInt32(bytes[5]) << 24) | (UInt32(bytes[6]) << 16) | (UInt32(bytes[7]) << 8) | UInt32(bytes[8])
  46. // Read data
  47. self.data = [UInt8](bytes[9..<bytes.count])
  48. }
  49. func serialize() -> [UInt8] {
  50. return makeChunkBytes(
  51. id: self.id,
  52. serial: self.serial,
  53. endOfMessage: self.endOfMessage,
  54. data: ArraySlice(self.data)
  55. )
  56. }
  57. }
  58. extension Chunk: Comparable {
  59. static func < (lhs: Chunk, rhs: Chunk) -> Bool {
  60. if lhs.id == rhs.id {
  61. return lhs.serial < rhs.serial
  62. } else {
  63. return lhs.id < rhs.id
  64. }
  65. }
  66. static func == (lhs: Chunk, rhs: Chunk) -> Bool {
  67. return lhs.endOfMessage == rhs.endOfMessage
  68. && lhs.id == rhs.id
  69. && lhs.serial == rhs.serial
  70. && lhs.data == rhs.data
  71. }
  72. }
  73. /// A chunk collector collects chunk belonging to the same message.
  74. ///
  75. /// This class is thread safe.
  76. class ChunkCollector {
  77. private var endArrived: Bool = false
  78. private var messageLength: Int?
  79. private var chunks: [Chunk] = []
  80. private var lastUpdate = Date()
  81. private let serialQueue = DispatchQueue(label: "chunkCollector")
  82. var count: Int {
  83. get { return self.chunks.count }
  84. }
  85. /// Register a new incoming chunk for this message.
  86. func addChunk(chunk: Chunk) throws {
  87. try self.serialQueue.sync {
  88. // Make sure that chunk belongs to the same message
  89. if !self.chunks.isEmpty && chunk.id != self.chunks[0].id {
  90. throw UnchunkerError.inconsistentMessageId
  91. }
  92. // Store the chunk
  93. self.chunks.append(chunk)
  94. // Update internal state
  95. self.lastUpdate = Date()
  96. if chunk.endOfMessage {
  97. self.endArrived = true
  98. self.messageLength = Int(chunk.serial) + 1
  99. }
  100. }
  101. }
  102. /// Return whether the collector contains the chunk with the specified serial
  103. func contains(serial: UInt32) -> Bool {
  104. return self.chunks.contains(where: { $0.serial == serial })
  105. }
  106. /// Return whether the message is complete, meaning that all chunks of the message arrived.
  107. func isComplete() -> Bool {
  108. return self.endArrived
  109. && self.chunks.count == self.messageLength
  110. }
  111. /// Return whether last chunk is older than the specified interval.
  112. func isOlderThan(interval: TimeInterval) -> Bool {
  113. let age = Date().timeIntervalSince(self.lastUpdate)
  114. return age > interval
  115. }
  116. /// Merge the chunks into a complete message.
  117. ///
  118. /// :returns: The assembled message `Data`
  119. func merge() throws -> Data {
  120. return try self.serialQueue.sync {
  121. // Preconditions
  122. if !self.isComplete() {
  123. throw UnchunkerError.messageNotYetComplete
  124. }
  125. // Sort chunks in-place
  126. self.chunks.sort()
  127. // Allocate buffer
  128. let capacity = self.chunks[0].data.count * self.messageLength!
  129. var data = Data(capacity: capacity)
  130. // Add chunks to buffer
  131. for chunk in self.chunks {
  132. data.append(contentsOf: chunk.data)
  133. }
  134. return data
  135. }
  136. }
  137. /// Return list of serialized chunks.
  138. ///
  139. /// Note that the "last update" timestamps will not be serialized, only the raw chunks!
  140. func serialize() -> [[UInt8]] {
  141. return self.serialQueue.sync {
  142. self.chunks.map({ $0.serialize() })
  143. }
  144. }
  145. }
  146. /// An Unchunker instance merges multiple chunks into a single `Data`.
  147. class Unchunker {
  148. weak var delegate: MessageCompleteDelegate?
  149. private var chunks: [UInt32: ChunkCollector] = [:]
  150. private let serialQueue = DispatchQueue(label: "unchunker")
  151. /// Add a chunk.
  152. ///
  153. /// :bytes: Data containing chunk with 9 byte header
  154. func addChunk(bytes: Data) throws {
  155. return try self.serialQueue.sync {
  156. let chunk = try Chunk(bytes: bytes)
  157. // Ignore repeated chunks with the same serial
  158. if self.chunks.contains(where: { id, collector in
  159. id == chunk.id && collector.contains(serial: chunk.serial)
  160. }) {
  161. return
  162. }
  163. // If this is the only chunk in the message, return it immediately.
  164. if chunk.endOfMessage && chunk.serial == 0 {
  165. self.delegate?.messageComplete(message: Data(chunk.data))
  166. self.chunks.removeValue(forKey: chunk.id)
  167. return
  168. }
  169. // Otherwise, add chunk to chunks list
  170. let collector: ChunkCollector;
  171. switch self.chunks[chunk.id] {
  172. case nil:
  173. collector = ChunkCollector()
  174. self.chunks[chunk.id] = collector
  175. case let c?:
  176. collector = c
  177. }
  178. try collector.addChunk(chunk: chunk)
  179. // Check if message is complete
  180. if collector.isComplete() {
  181. // Merge and notify delegate...
  182. self.delegate?.messageComplete(message: try collector.merge())
  183. // ...the delete the chunks.
  184. self.chunks.removeValue(forKey: chunk.id)
  185. }
  186. }
  187. }
  188. /// Run garbage collection, remove incomplete messages that haven't been
  189. /// updated for more than the specified number of milliseconds.
  190. ///
  191. /// If you want to make sure that invalid chunks don't fill up memory, call
  192. /// this method regularly.
  193. ///
  194. /// :maxAge: Remove incomplete messages that haven't been updated for the specified interval.
  195. ///
  196. /// :returns: the number of removed chunks
  197. func gc(maxAge: TimeInterval) -> UInt {
  198. return self.serialQueue.sync {
  199. var removedItems: UInt = 0
  200. self.chunks = self.chunks.filter({ (_id, collector) in
  201. if collector.isOlderThan(interval: maxAge) {
  202. removedItems += UInt(collector.count)
  203. return false
  204. } else {
  205. return true
  206. }
  207. })
  208. return removedItems
  209. }
  210. }
  211. /// Return list of serialized chunks.
  212. ///
  213. /// Note that the "last update" timestamps will not be serialized, only the raw chunks!
  214. func serialize() -> [[UInt8]] {
  215. return self.serialQueue.sync {
  216. return self.chunks.values.flatMap({ $0.serialize() })
  217. }
  218. }
  219. }