12
12
//
13
13
//===----------------------------------------------------------------------===//
14
14
15
+ import NIOConcurrencyHelpers
15
16
import NIOCore
16
17
import NIOHTTPTypes
17
18
@@ -27,10 +28,11 @@ public enum HBRequestBody: Sendable, AsyncSequence {
27
28
28
29
public func makeAsyncIterator( ) -> HBStreamedRequestBody . AsyncIterator {
29
30
switch self {
30
- case . byteBuffer:
31
- /// The server always creates the HBRequestBody as a stream. If it is converted
32
- /// into a single ByteBuffer it cannot be treated as a stream after that
33
- preconditionFailure ( " Cannot convert collapsed request body back into a sequence " )
31
+ case . byteBuffer( let buffer) :
32
+ let ( stream, source) = NIOAsyncChannelInboundStream< HTTPRequestPart> . makeTestingStream( )
33
+ source. yield ( . body( buffer) )
34
+ source. finish ( )
35
+ return HBStreamedRequestBody ( iterator: stream. makeAsyncIterator ( ) ) . makeAsyncIterator ( )
34
36
case . stream( let streamer) :
35
37
return streamer. makeAsyncIterator ( )
36
38
}
@@ -49,13 +51,19 @@ public enum HBRequestBody: Sendable, AsyncSequence {
49
51
}
50
52
51
53
/// Request body that is a stream of ByteBuffers.
52
- public struct HBStreamedRequestBody : Sendable , AsyncSequence {
54
+ ///
55
+ /// This is a unicast async sequence that allows a single iterator to be created.
56
+ public final class HBStreamedRequestBody : Sendable , AsyncSequence {
53
57
public typealias Element = ByteBuffer
54
58
public typealias InboundStream = NIOAsyncChannelInboundStream < HTTPRequestPart >
55
59
60
+ private let underlyingIterator : UnsafeTransfer < NIOAsyncChannelInboundStream < HTTPRequestPart > . AsyncIterator >
61
+ private let alreadyIterated : NIOLockedValueBox < Bool >
62
+
56
63
/// Initialize HBStreamedRequestBody from AsyncIterator of a NIOAsyncChannelInboundStream
57
64
public init ( iterator: InboundStream . AsyncIterator ) {
58
65
self . underlyingIterator = . init( iterator)
66
+ self . alreadyIterated = . init( false )
59
67
}
60
68
61
69
/// Async Iterator for HBStreamedRequestBody
@@ -65,9 +73,9 @@ public struct HBStreamedRequestBody: Sendable, AsyncSequence {
65
73
private var underlyingIterator : InboundStream . AsyncIterator
66
74
private var done : Bool
67
75
68
- init ( underlyingIterator: InboundStream . AsyncIterator ) {
76
+ init ( underlyingIterator: InboundStream . AsyncIterator , done : Bool = false ) {
69
77
self . underlyingIterator = underlyingIterator
70
- self . done = false
78
+ self . done = done
71
79
}
72
80
73
81
public mutating func next( ) async throws -> ByteBuffer ? {
@@ -88,8 +96,15 @@ public struct HBStreamedRequestBody: Sendable, AsyncSequence {
88
96
}
89
97
90
98
public func makeAsyncIterator( ) -> AsyncIterator {
91
- AsyncIterator ( underlyingIterator: self . underlyingIterator. wrappedValue)
99
+ // verify if an iterator has already been created. If it has then create an
100
+ // iterator that returns nothing. This could be a precondition failure (currently
101
+ // an assert) as you should not be allowed to do this.
102
+ let done = self . alreadyIterated. withLockedValue {
103
+ assert ( $0 == false , " Can only create iterator from request body once " )
104
+ let done = $0
105
+ $0 = true
106
+ return done
107
+ }
108
+ return AsyncIterator ( underlyingIterator: self . underlyingIterator. wrappedValue, done: done)
92
109
}
93
-
94
- private var underlyingIterator : UnsafeTransfer < NIOAsyncChannelInboundStream < HTTPRequestPart > . AsyncIterator >
95
110
}
0 commit comments