From 0464c5022d062bad8b4181b5d8b2162ccde51f01 Mon Sep 17 00:00:00 2001 From: Albert Aleksieiev Date: Thu, 12 Jul 2018 11:03:08 +0300 Subject: [PATCH 01/13] URLSessionTask: InputStream functionality and implement seekInputStream --- Foundation/URLSession/BodySource.swift | 33 ++++++++++++++++++++++ Foundation/URLSession/NativeProtocol.swift | 30 +++++++++++++++++--- Foundation/URLSession/URLSessionTask.swift | 2 ++ 3 files changed, 61 insertions(+), 4 deletions(-) diff --git a/Foundation/URLSession/BodySource.swift b/Foundation/URLSession/BodySource.swift index 221a92d136..357eac1e38 100644 --- a/Foundation/URLSession/BodySource.swift +++ b/Foundation/URLSession/BodySource.swift @@ -57,6 +57,39 @@ internal enum _BodySourceDataChunk { case error } +internal final class _BodyStreamSource { + let inputStream: InputStream + + init(inputStream: InputStream) { + self.inputStream = inputStream + } +} + +extension _BodyStreamSource : _BodySource { + func getNextChunk(withLength length: Int) -> _BodySourceDataChunk { + if inputStream.hasBytesAvailable { + let buffer = UnsafeMutableRawBufferPointer.allocate(count: length) + guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else { + return .error + } + let readBytes = self.inputStream.read(pointer, maxLength: length) + if readBytes > 0 { + let dispatchData = DispatchData(bytes: UnsafeRawBufferPointer(buffer)) + return .data(dispatchData.subdata(in: 0 ..< readBytes)) + } + else if readBytes == 0 { + return .done + } + else { + return .error + } + } + else { + return .done + } + } +} + /// A body data source backed by `DispatchData`. internal final class _BodyDataSource { var data: DispatchData! diff --git a/Foundation/URLSession/NativeProtocol.swift b/Foundation/URLSession/NativeProtocol.swift index 05988d3b3c..4d0ec5c708 100644 --- a/Foundation/URLSession/NativeProtocol.swift +++ b/Foundation/URLSession/NativeProtocol.swift @@ -260,8 +260,29 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate { } func seekInputStream(to position: UInt64) throws { - // We will reset the body source and seek forward. - NSUnimplemented() + // We will reset the body sourse and seek forward. + guard let session = task?.session as? URLSession else { fatalError() } + if let delegate = session.delegate as? URLSessionTaskDelegate { + delegate.urlSession(session, task: task!, needNewBodyStream: { [weak self] inputStream in + if let strongSelf = self, let url = strongSelf.request.url, let inputStream = inputStream { + switch strongSelf.internalState { + case .transferInProgress(let currentTransferState): + switch currentTransferState.requestBodySource { + case is _BodyStreamSource: + let drain = strongSelf.createTransferBodyDataDrain() + let source = _BodyStreamSource(inputStream: inputStream) + let transferState = _TransferState(url: url, bodyDataDrain: drain, bodySource: source) + strongSelf.internalState = .transferInProgress(transferState) + default: + NSUnimplemented() + } + default: + //TODO: it's possible? + break + } + } + }) + } } func updateProgressMeter(with propgress: _EasyHandle._Progress) { @@ -313,8 +334,9 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate { self?.easyHandle.unpauseSend() }) return _TransferState(url: url, bodyDataDrain: drain,bodySource: source) - case .stream: - NSUnimplemented() + case .stream(let inputStream): + let source = _BodyStreamSource(inputStream: inputStream) + return _TransferState(url: url, bodyDataDrain: drain, bodySource: source) } } diff --git a/Foundation/URLSession/URLSessionTask.swift b/Foundation/URLSession/URLSessionTask.swift index 1f8b28ecd2..c2c8418377 100644 --- a/Foundation/URLSession/URLSessionTask.swift +++ b/Foundation/URLSession/URLSessionTask.swift @@ -57,6 +57,8 @@ open class URLSessionTask : NSObject, NSCopying { internal convenience init(session: URLSession, request: URLRequest, taskIdentifier: Int) { if let bodyData = request.httpBody { self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: _Body.data(createDispatchData(bodyData))) + }else if let bodyStream = request.httpBodyStream { + self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: _Body.stream(bodyStream)) } else { self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: .none) } From 1729e55d6e83b2c1bc2a56361c0416a083018505 Mon Sep 17 00:00:00 2001 From: Albert Aleksieiev Date: Thu, 12 Jul 2018 11:03:40 +0300 Subject: [PATCH 02/13] URLSessionTask: InputStream tests --- TestFoundation/TestURLSession.swift | 96 +++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/TestFoundation/TestURLSession.swift b/TestFoundation/TestURLSession.swift index 8a531b7085..286ea56d98 100644 --- a/TestFoundation/TestURLSession.swift +++ b/TestFoundation/TestURLSession.swift @@ -15,6 +15,7 @@ class TestURLSession : LoopbackServerTest { ("test_dataTaskWithURLRequest", test_dataTaskWithURLRequest), ("test_dataTaskWithURLCompletionHandler", test_dataTaskWithURLCompletionHandler), ("test_dataTaskWithURLRequestCompletionHandler", test_dataTaskWithURLRequestCompletionHandler), + ("test_dataTaskWithHttpInputStream", test_dataTaskWithHttpInputStream), ("test_downloadTaskWithURL", test_downloadTaskWithURL), ("test_downloadTaskWithURLRequest", test_downloadTaskWithURLRequest), ("test_downloadTaskWithRequestAndHandler", test_downloadTaskWithRequestAndHandler), @@ -119,6 +120,56 @@ class TestURLSession : LoopbackServerTest { waitForExpectations(timeout: 12) } + func test_dataTaskWithHttpInputStream() { + func randomString(length: Int) -> String { + let letters : NSString = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + let len = UInt32(letters.length) + + var randomString = "" + + for _ in 0 ..< length { + let rand = arc4random_uniform(len) + var nextChar = letters.character(at: Int(rand)) + randomString += NSString(characters: &nextChar, length: 1) as String + } + return randomString + } + + let delegate = HTTPBinResponseDelegateJSON() + + let dataString = randomString(length: 65537) + + let urlString = "http://httpbin.org/post" + let url = URL(string: urlString)! + let urlSession = URLSession(configuration: URLSessionConfiguration.default, delegate: delegate, delegateQueue: nil) + + var urlRequest = URLRequest(url: url) + urlRequest.httpMethod = "POST" + + guard let data = dataString.data(using: .utf8) else { + XCTFail() + return + } + + let inputStream = InputStream(data: data) + inputStream.open() + + urlRequest.httpBodyStream = inputStream + + urlRequest.setValue("en-us", forHTTPHeaderField: "Accept-Language") + urlRequest.setValue("text/xml; charset=utf-8", forHTTPHeaderField: "Content-Type") + urlRequest.setValue("chunked", forHTTPHeaderField: "Transfer-Encoding") + + let urlTask = urlSession.dataTask(with: urlRequest) + urlTask.resume() + + delegate.semaphore.wait() + + XCTAssertTrue(urlTask.response != nil) + XCTAssertTrue(delegate.response != nil) + XCTAssertTrue(delegate.response?.data == dataString) + } + func test_downloadTaskWithURL() { let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/country.txt" let url = URL(string: urlString)! @@ -879,6 +930,51 @@ class HTTPRedirectionDataTask : NSObject { } } + +class HTTPBinResponseDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegate { + let semaphore = DispatchSemaphore(value: 0) + let outputStream = OutputStream.toMemory() + var response: T? + + override init() { + outputStream.open() + } + + deinit { + outputStream.close() + } + + public func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) { + _ = data.withUnsafeBytes({ (bytes: UnsafePointer) in + outputStream.write(bytes, maxLength: data.count) + }) + } + + public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) { + if let data = outputStream.property(forKey: .dataWrittenToMemoryStreamKey) as? NSData { + response = parseResposne(data: data._bridgeToSwift()) + } + semaphore.signal() + } + + public func parseResposne(data: Data) -> T? { + fatalError("") + } +} + +class HTTPBinResponseDelegateJSON: HTTPBinResponseDelegate { + override func parseResposne(data: Data) -> T? { + return try? JSONDecoder().decode(T.self, from: data) + } +} + +struct HTTPBinResponse: Codable { + let data: String + let headers: [String: String] + let origin: String + let url: String +} + extension HTTPRedirectionDataTask : URLSessionDataDelegate { public func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive response: URLResponse, completionHandler: @escaping (URLSession.ResponseDisposition) -> Void) { guard let httpresponse = response as? HTTPURLResponse else { fatalError() } From 821d6dfcb55bd5f47cf91841ffd109ed7f2ac3a3 Mon Sep 17 00:00:00 2001 From: Albert Aleksieiev Date: Mon, 24 Sep 2018 12:08:38 +0300 Subject: [PATCH 03/13] fix test compilation on Linux --- TestFoundation/TestURLSession.swift | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/TestFoundation/TestURLSession.swift b/TestFoundation/TestURLSession.swift index 286ea56d98..cb4b42808c 100644 --- a/TestFoundation/TestURLSession.swift +++ b/TestFoundation/TestURLSession.swift @@ -121,16 +121,23 @@ class TestURLSession : LoopbackServerTest { } func test_dataTaskWithHttpInputStream() { + func uniformRandom(_ max: Int) -> Int { +#if os(Linux) + return Int(random() % max) +#else + return Int(arc4random_uniform(UInt32(max))) +#endif + } func randomString(length: Int) -> String { - let letters : NSString = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" - let len = UInt32(letters.length) + let letters = Array("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") + let len = letters.count var randomString = "" for _ in 0 ..< length { - let rand = arc4random_uniform(len) - var nextChar = letters.character(at: Int(rand)) - randomString += NSString(characters: &nextChar, length: 1) as String + let rand = uniformRandom(len) + let nextChar = letters[Int(rand)] + randomString += String(nextChar) } return randomString } From 0f96d24d000a80e2beedcfa62ba7b6a6d7d0627c Mon Sep 17 00:00:00 2001 From: Albert Aleksieiev Date: Thu, 27 Sep 2018 12:32:00 +0300 Subject: [PATCH 04/13] Use a new unified random API --- TestFoundation/TestURLSession.swift | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/TestFoundation/TestURLSession.swift b/TestFoundation/TestURLSession.swift index cb4b42808c..ad015f929e 100644 --- a/TestFoundation/TestURLSession.swift +++ b/TestFoundation/TestURLSession.swift @@ -121,13 +121,6 @@ class TestURLSession : LoopbackServerTest { } func test_dataTaskWithHttpInputStream() { - func uniformRandom(_ max: Int) -> Int { -#if os(Linux) - return Int(random() % max) -#else - return Int(arc4random_uniform(UInt32(max))) -#endif - } func randomString(length: Int) -> String { let letters = Array("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") let len = letters.count @@ -135,8 +128,8 @@ class TestURLSession : LoopbackServerTest { var randomString = "" for _ in 0 ..< length { - let rand = uniformRandom(len) - let nextChar = letters[Int(rand)] + let rand = Int.random(in: 0.. Date: Sat, 20 Oct 2018 15:20:37 +0300 Subject: [PATCH 05/13] Use Internal echo server / parse HTTP Body from socket data (HTTP toy server) --- Foundation/URLSession/BodySource.swift | 34 +++---- Foundation/URLSession/NativeProtocol.swift | 2 +- TestFoundation/HTTPServer.swift | 30 +++++- TestFoundation/TestURLSession.swift | 103 ++++++--------------- 4 files changed, 76 insertions(+), 93 deletions(-) diff --git a/Foundation/URLSession/BodySource.swift b/Foundation/URLSession/BodySource.swift index 357eac1e38..a77953c063 100644 --- a/Foundation/URLSession/BodySource.swift +++ b/Foundation/URLSession/BodySource.swift @@ -67,26 +67,26 @@ internal final class _BodyStreamSource { extension _BodyStreamSource : _BodySource { func getNextChunk(withLength length: Int) -> _BodySourceDataChunk { - if inputStream.hasBytesAvailable { - let buffer = UnsafeMutableRawBufferPointer.allocate(count: length) - guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else { - return .error - } - let readBytes = self.inputStream.read(pointer, maxLength: length) - if readBytes > 0 { - let dispatchData = DispatchData(bytes: UnsafeRawBufferPointer(buffer)) - return .data(dispatchData.subdata(in: 0 ..< readBytes)) - } - else if readBytes == 0 { - return .done - } - else { - return .error - } + guard inputStream.hasBytesAvailable else { + return .done } - else { + + + let buffer = UnsafeMutableRawBufferPointer.allocate(count: length) + guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else { + return .error + } + let readBytes = self.inputStream.read(pointer, maxLength: length) + if readBytes > 0 { + let dispatchData = DispatchData(bytes: UnsafeRawBufferPointer(buffer)) + return .data(dispatchData.subdata(in: 0 ..< readBytes)) + } + else if readBytes == 0 { return .done } + else { + return .error + } } } diff --git a/Foundation/URLSession/NativeProtocol.swift b/Foundation/URLSession/NativeProtocol.swift index 4d0ec5c708..573e355e5c 100644 --- a/Foundation/URLSession/NativeProtocol.swift +++ b/Foundation/URLSession/NativeProtocol.swift @@ -260,7 +260,7 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate { } func seekInputStream(to position: UInt64) throws { - // We will reset the body sourse and seek forward. + // We will reset the body source and seek forward. guard let session = task?.session as? URLSession else { fatalError() } if let delegate = session.delegate as? URLSessionTaskDelegate { delegate.urlSession(session, task: task!, needNewBodyStream: { [weak self] inputStream in diff --git a/TestFoundation/HTTPServer.swift b/TestFoundation/HTTPServer.swift index bc87cdd09f..8218feae7f 100644 --- a/TestFoundation/HTTPServer.swift +++ b/TestFoundation/HTTPServer.swift @@ -207,7 +207,30 @@ class _HTTPServer { } public func request() throws -> _HTTPRequest { - return try _HTTPRequest(request: socket.readData()) + var request = try _HTTPRequest(request: socket.readData()) + + if Int(request.getHeader(for: "Content-Length") ?? "0") ?? 0 > 0 + || (request.getHeader(for: "Transfer-Encoding") ?? "").lowercased() == "chunked" { + + // According to RFC7230 https://tools.ietf.org/html/rfc7230#section-3 + // We receive messageBody after the headers, so we need read from socket minimun 2 times + // + // HTTP-message structure + // + // start-line + // *( header-field CRLF ) + // CRLF + // [ message-body ] + // We receives '{numofbytes}\r\n{data}\r\n' + // TODO read data until the end + + let substr = try socket.readData().split(separator: "\r\n") + if substr.count >= 2 { + request.messageBody = String(substr[1]) + } + } + + return request } public func respond(with response: _HTTPResponse, startDelay: TimeInterval? = nil, sendDelay: TimeInterval? = nil, bodyChunks: Int? = nil) throws { @@ -293,6 +316,7 @@ struct _HTTPRequest { let method: Method let uri: String let body: String + var messageBody: String? let headers: [String] public init(request: String) { @@ -480,6 +504,10 @@ public class TestURLSessionServer { let httpResponse = _HTTPResponse(response: .REDIRECT, headers: "Location: \(value)", body: text) return httpResponse } + + if uri == "/echo" { + return _HTTPResponse(response: .OK, body: request.messageBody ?? request.body) + } return _HTTPResponse(response: .OK, body: capitals[String(uri.dropFirst())]!) } diff --git a/TestFoundation/TestURLSession.swift b/TestFoundation/TestURLSession.swift index ad015f929e..e6c9bea4f6 100644 --- a/TestFoundation/TestURLSession.swift +++ b/TestFoundation/TestURLSession.swift @@ -121,28 +121,19 @@ class TestURLSession : LoopbackServerTest { } func test_dataTaskWithHttpInputStream() { - func randomString(length: Int) -> String { - let letters = Array("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") - let len = letters.count - - var randomString = "" - - for _ in 0 ..< length { - let rand = Int.random(in: 0..() + let dataString = """ + Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras congue laoreet facilisis. Sed porta tristique orci. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel, tristique orci. Maecenas pretium, augue non elementum imperdiet, diam ex vestibulum tortor, non ultrices ante enim iaculis ex. - let dataString = randomString(length: 65537) + Suspendisse ante eros, scelerisque ut molestie vitae, lacinia nec metus. Sed in feugiat sem. Nullam sed congue nulla, id vehicula mauris. Aliquam ultrices ultricies pellentesque. Etiam blandit ultrices quam in egestas. Donec a vulputate est, ut ultricies dui. In non maximus velit. + + Vivamus vehicula faucibus odio vel maximus. Vivamus elementum, quam at accumsan rhoncus, ex ligula maximus sem, sed pretium urna enim ut urna. Donec semper porta augue at faucibus. Quisque vel congue purus. Morbi vitae elit pellentesque, finibus lectus quis, laoreet nulla. Praesent in fermentum felis. Aenean vestibulum dictum lorem quis egestas. Sed dictum elementum est laoreet volutpat. + """ - let urlString = "http://httpbin.org/post" let url = URL(string: urlString)! - let urlSession = URLSession(configuration: URLSessionConfiguration.default, delegate: delegate, delegateQueue: nil) - + let urlSession = URLSession(configuration: URLSessionConfiguration.default) + var urlRequest = URLRequest(url: url) urlRequest.httpMethod = "POST" @@ -150,24 +141,33 @@ class TestURLSession : LoopbackServerTest { XCTFail() return } - + let inputStream = InputStream(data: data) inputStream.open() - + urlRequest.httpBodyStream = inputStream urlRequest.setValue("en-us", forHTTPHeaderField: "Accept-Language") urlRequest.setValue("text/xml; charset=utf-8", forHTTPHeaderField: "Content-Type") urlRequest.setValue("chunked", forHTTPHeaderField: "Transfer-Encoding") - - let urlTask = urlSession.dataTask(with: urlRequest) - urlTask.resume() - - delegate.semaphore.wait() - - XCTAssertTrue(urlTask.response != nil) - XCTAssertTrue(delegate.response != nil) - XCTAssertTrue(delegate.response?.data == dataString) + + let expect = expectation(description: "POST \(urlString): with HTTP Body as InputStream") + let task = urlSession.dataTask(with: urlRequest) { respData, response, error in + XCTAssertNotNil(respData) + XCTAssertNotNil(response) + XCTAssertNil(error) + + defer { expect.fulfill() } + guard let httpResponse = response as? HTTPURLResponse else { + XCTFail("response (\(response.debugDescription)) invalid") + return + } + + XCTAssertEqual(data, respData!, "Response Data and Data is not equal") + XCTAssertEqual(200, httpResponse.statusCode, "HTTP response code is not 200") + } + task.resume() + waitForExpectations(timeout: 12) } func test_downloadTaskWithURL() { @@ -930,51 +930,6 @@ class HTTPRedirectionDataTask : NSObject { } } - -class HTTPBinResponseDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegate { - let semaphore = DispatchSemaphore(value: 0) - let outputStream = OutputStream.toMemory() - var response: T? - - override init() { - outputStream.open() - } - - deinit { - outputStream.close() - } - - public func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) { - _ = data.withUnsafeBytes({ (bytes: UnsafePointer) in - outputStream.write(bytes, maxLength: data.count) - }) - } - - public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) { - if let data = outputStream.property(forKey: .dataWrittenToMemoryStreamKey) as? NSData { - response = parseResposne(data: data._bridgeToSwift()) - } - semaphore.signal() - } - - public func parseResposne(data: Data) -> T? { - fatalError("") - } -} - -class HTTPBinResponseDelegateJSON: HTTPBinResponseDelegate { - override func parseResposne(data: Data) -> T? { - return try? JSONDecoder().decode(T.self, from: data) - } -} - -struct HTTPBinResponse: Codable { - let data: String - let headers: [String: String] - let origin: String - let url: String -} - extension HTTPRedirectionDataTask : URLSessionDataDelegate { public func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive response: URLResponse, completionHandler: @escaping (URLSession.ResponseDisposition) -> Void) { guard let httpresponse = response as? HTTPURLResponse else { fatalError() } From 9b7dd7d9f94c720df7ae4cb1306bea8eb5212424 Mon Sep 17 00:00:00 2001 From: Albert Aleksieiev Date: Sat, 20 Oct 2018 21:45:36 +0300 Subject: [PATCH 06/13] Codestyle: remove unnecessary spaces --- Foundation/URLSession/URLSessionTask.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Foundation/URLSession/URLSessionTask.swift b/Foundation/URLSession/URLSessionTask.swift index 1a390bf901..1c1c68ac10 100644 --- a/Foundation/URLSession/URLSessionTask.swift +++ b/Foundation/URLSession/URLSessionTask.swift @@ -57,7 +57,7 @@ open class URLSessionTask : NSObject, NSCopying { internal convenience init(session: URLSession, request: URLRequest, taskIdentifier: Int) { if let bodyData = request.httpBody { self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: _Body.data(createDispatchData(bodyData))) - }else if let bodyStream = request.httpBodyStream { + } else if let bodyStream = request.httpBodyStream { self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: _Body.stream(bodyStream)) } else { self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: .none) From f601b0f8050f5a67ee6ba1d207dd2bef8101982f Mon Sep 17 00:00:00 2001 From: Albert Aleksieiev Date: Wed, 14 Nov 2018 13:40:02 +0200 Subject: [PATCH 07/13] fix deallocation - use DispatchData(bytesNoCopy with deallocator) --- Foundation/URLSession/BodySource.swift | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Foundation/URLSession/BodySource.swift b/Foundation/URLSession/BodySource.swift index d9ecb6803a..d83bcb797e 100644 --- a/Foundation/URLSession/BodySource.swift +++ b/Foundation/URLSession/BodySource.swift @@ -74,17 +74,20 @@ extension _BodyStreamSource : _BodySource { let buffer = UnsafeMutableRawBufferPointer.allocate(count: length) guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else { + buffer.deallocate() return .error } let readBytes = self.inputStream.read(pointer, maxLength: length) if readBytes > 0 { - let dispatchData = DispatchData(bytes: UnsafeRawBufferPointer(buffer)) + let dispatchData = DispatchData(bytesNoCopy: UnsafeRawBufferPointer(buffer), deallocator: .free) return .data(dispatchData.subdata(in: 0 ..< readBytes)) } else if readBytes == 0 { + buffer.deallocate() return .done } else { + buffer.deallocate() return .error } } From 4f67097eff0dcaa885e437ae308277279e5681ec Mon Sep 17 00:00:00 2001 From: Albert Aleksieiev Date: Mon, 19 Nov 2018 17:03:45 +0200 Subject: [PATCH 08/13] fix deallocation - use buffer.deallocate --- Foundation/URLSession/BodySource.swift | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/Foundation/URLSession/BodySource.swift b/Foundation/URLSession/BodySource.swift index d83bcb797e..c9dcff3d40 100644 --- a/Foundation/URLSession/BodySource.swift +++ b/Foundation/URLSession/BodySource.swift @@ -70,24 +70,25 @@ extension _BodyStreamSource : _BodySource { guard inputStream.hasBytesAvailable else { return .done } - let buffer = UnsafeMutableRawBufferPointer.allocate(count: length) - guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else { + defer { buffer.deallocate() + } + + guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else { return .error } + let readBytes = self.inputStream.read(pointer, maxLength: length) if readBytes > 0 { - let dispatchData = DispatchData(bytesNoCopy: UnsafeRawBufferPointer(buffer), deallocator: .free) + let dispatchData = DispatchData(bytes: UnsafeRawBufferPointer(buffer)) return .data(dispatchData.subdata(in: 0 ..< readBytes)) } else if readBytes == 0 { - buffer.deallocate() return .done } else { - buffer.deallocate() return .error } } From 0c49fb1c25423c20a17865741cfd56b45f893364 Mon Sep 17 00:00:00 2001 From: Albert Aleksieiev Date: Fri, 23 Nov 2018 13:02:09 +0200 Subject: [PATCH 09/13] Use new API for allocation memory with aligment --- Foundation/URLSession/BodySource.swift | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/Foundation/URLSession/BodySource.swift b/Foundation/URLSession/BodySource.swift index c9dcff3d40..2058a140d0 100644 --- a/Foundation/URLSession/BodySource.swift +++ b/Foundation/URLSession/BodySource.swift @@ -70,11 +70,9 @@ extension _BodyStreamSource : _BodySource { guard inputStream.hasBytesAvailable else { return .done } - - let buffer = UnsafeMutableRawBufferPointer.allocate(count: length) - defer { - buffer.deallocate() - } + + let buffer = UnsafeMutableRawBufferPointer.allocate(byteCount: length, alignment: MemoryLayout.alignment) + defer { buffer.deallocate() } guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else { return .error From f14cf81bffc36d9609d4d34905956fc765249206 Mon Sep 17 00:00:00 2001 From: Albert Aleksieiev Date: Mon, 26 Nov 2018 12:56:36 +0200 Subject: [PATCH 10/13] use DispatchData(bytesNoCopy:deallocator:) with custom deallocator --- Foundation/URLSession/BodySource.swift | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Foundation/URLSession/BodySource.swift b/Foundation/URLSession/BodySource.swift index 2058a140d0..a94b5e7b7a 100644 --- a/Foundation/URLSession/BodySource.swift +++ b/Foundation/URLSession/BodySource.swift @@ -72,21 +72,23 @@ extension _BodyStreamSource : _BodySource { } let buffer = UnsafeMutableRawBufferPointer.allocate(byteCount: length, alignment: MemoryLayout.alignment) - defer { buffer.deallocate() } guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else { + buffer.deallocate() return .error } let readBytes = self.inputStream.read(pointer, maxLength: length) if readBytes > 0 { - let dispatchData = DispatchData(bytes: UnsafeRawBufferPointer(buffer)) + let dispatchData = DispatchData(bytesNoCopy: UnsafeRawBufferPointer(buffer), deallocator: .custom(nil, { buffer.deallocate() })) return .data(dispatchData.subdata(in: 0 ..< readBytes)) } else if readBytes == 0 { + buffer.deallocate() return .done } else { + buffer.deallocate() return .error } } From d5adb0f5e0820f58add6d44c2b1a80ef878543fe Mon Sep 17 00:00:00 2001 From: Albert Aleksieiev Date: Mon, 21 Jan 2019 10:18:43 +0200 Subject: [PATCH 11/13] Implement seek InputStream --- Foundation/Stream.swift | 42 ++++++++++++++- Foundation/URLSession/NativeProtocol.swift | 1 + TestFoundation/TestStream.swift | 62 ++++++++++++++++++++++ 3 files changed, 104 insertions(+), 1 deletion(-) diff --git a/Foundation/Stream.swift b/Foundation/Stream.swift index 8e8c1fad4c..bb340ef16a 100644 --- a/Foundation/Stream.swift +++ b/Foundation/Stream.swift @@ -114,7 +114,10 @@ open class Stream: NSObject { // InputStream is an abstract class representing the base functionality of a read stream. // Subclassers are required to implement these methods. open class InputStream: Stream { - + enum _Error: Error { + case cantSeekInputStream + } + internal let _stream: CFReadStream! // reads up to length bytes into the supplied buffer, which must be at least of size len. Returns the actual number of bytes read. @@ -225,6 +228,43 @@ open class OutputStream : Stream { } } +extension InputStream { + func seek(to position: UInt64) throws { + guard position > 0 else { + return + } + + guard position < Int.max else { throw _Error.cantSeekInputStream } + + let bufferSize = 1024 + var remainingBytes = Int(position) + + let buffer = UnsafeMutableRawBufferPointer.allocate(byteCount: bufferSize, alignment: MemoryLayout.alignment) + + guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else { + buffer.deallocate() + throw _Error.cantSeekInputStream + } + + if self.streamStatus == .notOpen { + self.open() + } + + while remainingBytes > 0 && self.hasBytesAvailable { + let read = self.read(pointer, maxLength: min(bufferSize, remainingBytes)) + if read == -1 { + throw _Error.cantSeekInputStream + } + remainingBytes -= read + } + + buffer.deallocate() + if remainingBytes != 0 { + throw _Error.cantSeekInputStream + } + } +} + // Discussion of this API is ongoing for its usage of AutoreleasingUnsafeMutablePointer #if false extension Stream { diff --git a/Foundation/URLSession/NativeProtocol.swift b/Foundation/URLSession/NativeProtocol.swift index eeb0e52e05..956a67afab 100644 --- a/Foundation/URLSession/NativeProtocol.swift +++ b/Foundation/URLSession/NativeProtocol.swift @@ -269,6 +269,7 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate { case .transferInProgress(let currentTransferState): switch currentTransferState.requestBodySource { case is _BodyStreamSource: + try inputStream.seek(to: position) let drain = strongSelf.createTransferBodyDataDrain() let source = _BodyStreamSource(inputStream: inputStream) let transferState = _TransferState(url: url, bodyDataDrain: drain, bodySource: source) diff --git a/TestFoundation/TestStream.swift b/TestFoundation/TestStream.swift index 1cce487702..88f906a9c0 100644 --- a/TestFoundation/TestStream.swift +++ b/TestFoundation/TestStream.swift @@ -7,6 +7,23 @@ // See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors // +private extension Data { + init(reading input: InputStream) { + self.init() + input.open() + + let bufferSize = 1024 + let buffer = UnsafeMutablePointer.allocate(capacity: bufferSize) + while input.hasBytesAvailable { + let read = input.read(buffer, maxLength: bufferSize) + self.append(buffer, count: read) + } + buffer.deallocate(capacity: bufferSize) + + input.close() + } +} + class TestStream : XCTestCase { static var allTests: [(String, (TestStream) -> () throws -> Void)] { return [ @@ -15,6 +32,7 @@ class TestStream : XCTestCase { ("test_InputStreamWithFile", test_InputStreamWithFile), ("test_InputStreamHasBytesAvailable", test_InputStreamHasBytesAvailable), ("test_InputStreamInvalidPath", test_InputStreamInvalidPath), + ("test_InputStreamSeekToPosition", test_InputStreamSeekToPosition), ("test_outputStreamCreationToFile", test_outputStreamCreationToFile), ("test_outputStreamCreationToBuffer", test_outputStreamCreationToBuffer), ("test_outputStreamCreationWithUrl", test_outputStreamCreationWithUrl), @@ -116,6 +134,50 @@ class TestStream : XCTestCase { XCTAssertEqual(.error, fileStream.streamStatus) } + func test_InputStreamSeekToPosition() { + let str = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras congue laoreet facilisis. Sed porta tristique orci. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel, tristique orci. Maecenas pretium, augue non elementum imperdiet, diam ex vestibulum tortor, non ultrices ante enim iaculis ex. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel, tristique orci. Maecenas pretium, augue non elementum imperdiet, diam ex vestibulum tortor, non ultrices ante enim iaculis ex.Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras congue laoreet facilisis. Sed porta tristique orci. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel." + XCTAssert(str.count > 1024) // str.count must be bigger than buffersize inside InputStream.seek func. + + func testSubdata(_ pos: UInt64) throws -> Data? { + guard let data = str.data(using: .utf8) else { + XCTFail() + return nil + } + + let stream = InputStream(data: data) + stream.open() + + try stream.seek(to: pos) + let streamData = Data(reading: stream) + + let subdata = data.subdata(in: Range(Int(pos).. Date: Mon, 21 Jan 2019 11:56:37 +0200 Subject: [PATCH 12/13] fix compilation of NativeProtocol `seekInputStream` func --- Foundation/URLSession/NativeProtocol.swift | 49 +++++++++++++--------- TestFoundation/TestStream.swift | 11 ++++- 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/Foundation/URLSession/NativeProtocol.swift b/Foundation/URLSession/NativeProtocol.swift index 956a67afab..3bee45e896 100644 --- a/Foundation/URLSession/NativeProtocol.swift +++ b/Foundation/URLSession/NativeProtocol.swift @@ -262,27 +262,38 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate { func seekInputStream(to position: UInt64) throws { // We will reset the body source and seek forward. guard let session = task?.session as? URLSession else { fatalError() } + + var currentInputStream: InputStream? + if let delegate = session.delegate as? URLSessionTaskDelegate { - delegate.urlSession(session, task: task!, needNewBodyStream: { [weak self] inputStream in - if let strongSelf = self, let url = strongSelf.request.url, let inputStream = inputStream { - switch strongSelf.internalState { - case .transferInProgress(let currentTransferState): - switch currentTransferState.requestBodySource { - case is _BodyStreamSource: - try inputStream.seek(to: position) - let drain = strongSelf.createTransferBodyDataDrain() - let source = _BodyStreamSource(inputStream: inputStream) - let transferState = _TransferState(url: url, bodyDataDrain: drain, bodySource: source) - strongSelf.internalState = .transferInProgress(transferState) - default: - NSUnimplemented() - } - default: - //TODO: it's possible? - break - } - } + let dispatchGroup = DispatchGroup() + dispatchGroup.enter() + + delegate.urlSession(session, task: task!, needNewBodyStream: { inputStream in + currentInputStream = inputStream + dispatchGroup.leave() }) + + _ = dispatchGroup.wait(timeout: .now() + 7) + } + + if let url = self.request.url, let inputStream = currentInputStream { + switch self.internalState { + case .transferInProgress(let currentTransferState): + switch currentTransferState.requestBodySource { + case is _BodyStreamSource: + try inputStream.seek(to: position) + let drain = self.createTransferBodyDataDrain() + let source = _BodyStreamSource(inputStream: inputStream) + let transferState = _TransferState(url: url, bodyDataDrain: drain, bodySource: source) + self.internalState = .transferInProgress(transferState) + default: + NSUnimplemented() + } + default: + //TODO: it's possible? + break + } } } diff --git a/TestFoundation/TestStream.swift b/TestFoundation/TestStream.swift index 88f906a9c0..870fd60020 100644 --- a/TestFoundation/TestStream.swift +++ b/TestFoundation/TestStream.swift @@ -6,6 +6,13 @@ // See http://swift.org/LICENSE.txt for license information // See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors // +#if NS_FOUNDATION_ALLOWS_TESTABLE_IMPORT + #if (os(Linux) || os(Android)) + @testable import Foundation + #else + @testable import SwiftFoundation + #endif +#endif private extension Data { init(reading input: InputStream) { @@ -18,7 +25,7 @@ private extension Data { let read = input.read(buffer, maxLength: bufferSize) self.append(buffer, count: read) } - buffer.deallocate(capacity: bufferSize) + buffer.deallocate() input.close() } @@ -150,7 +157,7 @@ class TestStream : XCTestCase { try stream.seek(to: pos) let streamData = Data(reading: stream) - let subdata = data.subdata(in: Range(Int(pos).. Date: Thu, 24 Jan 2019 18:19:24 +0200 Subject: [PATCH 13/13] get rid of `NS_FOUNDATION_ALLOWS_TESTABLE_IMPORT` --- TestFoundation/TestStream.swift | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/TestFoundation/TestStream.swift b/TestFoundation/TestStream.swift index 870fd60020..c9763e6d36 100644 --- a/TestFoundation/TestStream.swift +++ b/TestFoundation/TestStream.swift @@ -6,14 +6,14 @@ // See http://swift.org/LICENSE.txt for license information // See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors // -#if NS_FOUNDATION_ALLOWS_TESTABLE_IMPORT - #if (os(Linux) || os(Android)) - @testable import Foundation - #else - @testable import SwiftFoundation - #endif + +#if (os(Linux) || os(Android)) + @testable import Foundation +#else + @testable import SwiftFoundation #endif + private extension Data { init(reading input: InputStream) { self.init()