diff --git a/Foundation/Stream.swift b/Foundation/Stream.swift index 8e8c1fad4c..bb340ef16a 100644 --- a/Foundation/Stream.swift +++ b/Foundation/Stream.swift @@ -114,7 +114,10 @@ open class Stream: NSObject { // InputStream is an abstract class representing the base functionality of a read stream. // Subclassers are required to implement these methods. open class InputStream: Stream { - + enum _Error: Error { + case cantSeekInputStream + } + internal let _stream: CFReadStream! // reads up to length bytes into the supplied buffer, which must be at least of size len. Returns the actual number of bytes read. @@ -225,6 +228,43 @@ open class OutputStream : Stream { } } +extension InputStream { + func seek(to position: UInt64) throws { + guard position > 0 else { + return + } + + guard position < Int.max else { throw _Error.cantSeekInputStream } + + let bufferSize = 1024 + var remainingBytes = Int(position) + + let buffer = UnsafeMutableRawBufferPointer.allocate(byteCount: bufferSize, alignment: MemoryLayout.alignment) + + guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else { + buffer.deallocate() + throw _Error.cantSeekInputStream + } + + if self.streamStatus == .notOpen { + self.open() + } + + while remainingBytes > 0 && self.hasBytesAvailable { + let read = self.read(pointer, maxLength: min(bufferSize, remainingBytes)) + if read == -1 { + throw _Error.cantSeekInputStream + } + remainingBytes -= read + } + + buffer.deallocate() + if remainingBytes != 0 { + throw _Error.cantSeekInputStream + } + } +} + // Discussion of this API is ongoing for its usage of AutoreleasingUnsafeMutablePointer #if false extension Stream { diff --git a/Foundation/URLSession/BodySource.swift b/Foundation/URLSession/BodySource.swift index c5344473f6..a94b5e7b7a 100644 --- a/Foundation/URLSession/BodySource.swift +++ b/Foundation/URLSession/BodySource.swift @@ -57,6 +57,43 @@ internal enum _BodySourceDataChunk { case error } +internal final class _BodyStreamSource { + let inputStream: InputStream + + init(inputStream: InputStream) { + self.inputStream = inputStream + } +} + +extension _BodyStreamSource : _BodySource { + func getNextChunk(withLength length: Int) -> _BodySourceDataChunk { + guard inputStream.hasBytesAvailable else { + return .done + } + + let buffer = UnsafeMutableRawBufferPointer.allocate(byteCount: length, alignment: MemoryLayout.alignment) + + guard let pointer = buffer.baseAddress?.assumingMemoryBound(to: UInt8.self) else { + buffer.deallocate() + return .error + } + + let readBytes = self.inputStream.read(pointer, maxLength: length) + if readBytes > 0 { + let dispatchData = DispatchData(bytesNoCopy: UnsafeRawBufferPointer(buffer), deallocator: .custom(nil, { buffer.deallocate() })) + return .data(dispatchData.subdata(in: 0 ..< readBytes)) + } + else if readBytes == 0 { + buffer.deallocate() + return .done + } + else { + buffer.deallocate() + return .error + } + } +} + /// A body data source backed by `DispatchData`. internal final class _BodyDataSource { var data: DispatchData! diff --git a/Foundation/URLSession/NativeProtocol.swift b/Foundation/URLSession/NativeProtocol.swift index d29924dec3..3bee45e896 100644 --- a/Foundation/URLSession/NativeProtocol.swift +++ b/Foundation/URLSession/NativeProtocol.swift @@ -261,7 +261,40 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate { func seekInputStream(to position: UInt64) throws { // We will reset the body source and seek forward. - NSUnimplemented() + guard let session = task?.session as? URLSession else { fatalError() } + + var currentInputStream: InputStream? + + if let delegate = session.delegate as? URLSessionTaskDelegate { + let dispatchGroup = DispatchGroup() + dispatchGroup.enter() + + delegate.urlSession(session, task: task!, needNewBodyStream: { inputStream in + currentInputStream = inputStream + dispatchGroup.leave() + }) + + _ = dispatchGroup.wait(timeout: .now() + 7) + } + + if let url = self.request.url, let inputStream = currentInputStream { + switch self.internalState { + case .transferInProgress(let currentTransferState): + switch currentTransferState.requestBodySource { + case is _BodyStreamSource: + try inputStream.seek(to: position) + let drain = self.createTransferBodyDataDrain() + let source = _BodyStreamSource(inputStream: inputStream) + let transferState = _TransferState(url: url, bodyDataDrain: drain, bodySource: source) + self.internalState = .transferInProgress(transferState) + default: + NSUnimplemented() + } + default: + //TODO: it's possible? + break + } + } } func updateProgressMeter(with propgress: _EasyHandle._Progress) { @@ -313,8 +346,9 @@ internal class _NativeProtocol: URLProtocol, _EasyHandleDelegate { self?.easyHandle.unpauseSend() }) return _TransferState(url: url, bodyDataDrain: drain,bodySource: source) - case .stream: - NSUnimplemented() + case .stream(let inputStream): + let source = _BodyStreamSource(inputStream: inputStream) + return _TransferState(url: url, bodyDataDrain: drain, bodySource: source) } } diff --git a/Foundation/URLSession/URLSessionTask.swift b/Foundation/URLSession/URLSessionTask.swift index d87a2c69c2..1c1c68ac10 100644 --- a/Foundation/URLSession/URLSessionTask.swift +++ b/Foundation/URLSession/URLSessionTask.swift @@ -57,6 +57,8 @@ open class URLSessionTask : NSObject, NSCopying { internal convenience init(session: URLSession, request: URLRequest, taskIdentifier: Int) { if let bodyData = request.httpBody { self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: _Body.data(createDispatchData(bodyData))) + } else if let bodyStream = request.httpBodyStream { + self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: _Body.stream(bodyStream)) } else { self.init(session: session, request: request, taskIdentifier: taskIdentifier, body: .none) } diff --git a/TestFoundation/HTTPServer.swift b/TestFoundation/HTTPServer.swift index 062f39c52d..3d32c03145 100644 --- a/TestFoundation/HTTPServer.swift +++ b/TestFoundation/HTTPServer.swift @@ -210,7 +210,30 @@ class _HTTPServer { } public func request() throws -> _HTTPRequest { - return try _HTTPRequest(request: socket.readData()) + var request = try _HTTPRequest(request: socket.readData()) + + if Int(request.getHeader(for: "Content-Length") ?? "0") ?? 0 > 0 + || (request.getHeader(for: "Transfer-Encoding") ?? "").lowercased() == "chunked" { + + // According to RFC7230 https://tools.ietf.org/html/rfc7230#section-3 + // We receive messageBody after the headers, so we need read from socket minimun 2 times + // + // HTTP-message structure + // + // start-line + // *( header-field CRLF ) + // CRLF + // [ message-body ] + // We receives '{numofbytes}\r\n{data}\r\n' + // TODO read data until the end + + let substr = try socket.readData().split(separator: "\r\n") + if substr.count >= 2 { + request.messageBody = String(substr[1]) + } + } + + return request } public func respond(with response: _HTTPResponse, startDelay: TimeInterval? = nil, sendDelay: TimeInterval? = nil, bodyChunks: Int? = nil) throws { @@ -324,6 +347,7 @@ struct _HTTPRequest { let method: Method let uri: String let body: String + var messageBody: String? let headers: [String] public init(request: String) { @@ -533,12 +557,18 @@ public class TestURLSessionServer { let httpResponse = _HTTPResponse(response: .REDIRECT, headers: "Location: \(value)", body: text) return httpResponse } + + if uri == "/echo" { + return _HTTPResponse(response: .OK, body: request.messageBody ?? request.body) + } + if uri == "/redirect-with-default-port" { let text = request.getCommaSeparatedHeaders() let host = request.headers[1].components(separatedBy: " ")[1] let ip = host.components(separatedBy: ":")[0] let httpResponse = _HTTPResponse(response: .REDIRECT, headers: "Location: http://\(ip)/redirected-with-default-port", body: text) return httpResponse + } return _HTTPResponse(response: .OK, body: capitals[String(uri.dropFirst())]!) } diff --git a/TestFoundation/TestStream.swift b/TestFoundation/TestStream.swift index 1cce487702..c9763e6d36 100644 --- a/TestFoundation/TestStream.swift +++ b/TestFoundation/TestStream.swift @@ -7,6 +7,30 @@ // See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors // +#if (os(Linux) || os(Android)) + @testable import Foundation +#else + @testable import SwiftFoundation +#endif + + +private extension Data { + init(reading input: InputStream) { + self.init() + input.open() + + let bufferSize = 1024 + let buffer = UnsafeMutablePointer.allocate(capacity: bufferSize) + while input.hasBytesAvailable { + let read = input.read(buffer, maxLength: bufferSize) + self.append(buffer, count: read) + } + buffer.deallocate() + + input.close() + } +} + class TestStream : XCTestCase { static var allTests: [(String, (TestStream) -> () throws -> Void)] { return [ @@ -15,6 +39,7 @@ class TestStream : XCTestCase { ("test_InputStreamWithFile", test_InputStreamWithFile), ("test_InputStreamHasBytesAvailable", test_InputStreamHasBytesAvailable), ("test_InputStreamInvalidPath", test_InputStreamInvalidPath), + ("test_InputStreamSeekToPosition", test_InputStreamSeekToPosition), ("test_outputStreamCreationToFile", test_outputStreamCreationToFile), ("test_outputStreamCreationToBuffer", test_outputStreamCreationToBuffer), ("test_outputStreamCreationWithUrl", test_outputStreamCreationWithUrl), @@ -116,6 +141,50 @@ class TestStream : XCTestCase { XCTAssertEqual(.error, fileStream.streamStatus) } + func test_InputStreamSeekToPosition() { + let str = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras congue laoreet facilisis. Sed porta tristique orci. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel, tristique orci. Maecenas pretium, augue non elementum imperdiet, diam ex vestibulum tortor, non ultrices ante enim iaculis ex. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel, tristique orci. Maecenas pretium, augue non elementum imperdiet, diam ex vestibulum tortor, non ultrices ante enim iaculis ex.Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras congue laoreet facilisis. Sed porta tristique orci. Fusce ut nisl dignissim, tempor tortor id, molestie neque. Nam non tincidunt mi. Integer ac diam quis leo aliquam congue et non magna. In porta mauris suscipit erat pulvinar, sed fringilla quam ornare. Nulla vulputate et ligula vitae sollicitudin. Nulla vel vehicula risus. Quisque eu urna ullamcorper, tincidunt ante vitae, aliquet sem. Suspendisse nec turpis placerat, porttitor ex vel." + XCTAssert(str.count > 1024) // str.count must be bigger than buffersize inside InputStream.seek func. + + func testSubdata(_ pos: UInt64) throws -> Data? { + guard let data = str.data(using: .utf8) else { + XCTFail() + return nil + } + + let stream = InputStream(data: data) + stream.open() + + try stream.seek(to: pos) + let streamData = Data(reading: stream) + + let subdata = data[Int(pos)..