15
15
16
16
import java .io .IOException ;
17
17
import java .nio .ByteBuffer ;
18
+ import java .nio .channels .ClosedChannelException ;
18
19
import java .util .ArrayDeque ;
19
20
import java .util .ArrayList ;
20
21
import java .util .Collection ;
25
26
import java .util .Queue ;
26
27
import java .util .Set ;
27
28
29
+ import org .eclipse .jetty .http2 .ErrorCode ;
28
30
import org .eclipse .jetty .http2 .FlowControlStrategy ;
29
31
import org .eclipse .jetty .http2 .HTTP2Session ;
30
32
import org .eclipse .jetty .http2 .HTTP2Stream ;
33
+ import org .eclipse .jetty .http2 .frames .FrameType ;
31
34
import org .eclipse .jetty .http2 .frames .WindowUpdateFrame ;
32
35
import org .eclipse .jetty .http2 .hpack .HpackException ;
33
36
import org .eclipse .jetty .io .ByteBufferPool ;
@@ -92,10 +95,9 @@ public boolean prepend(HTTP2Session.Entry entry)
92
95
entries .offerFirst (entry );
93
96
if (LOG .isDebugEnabled ())
94
97
LOG .debug ("Prepended {}, entries={}" , entry , entries .size ());
98
+ return true ;
95
99
}
96
100
}
97
- if (closed == null )
98
- return true ;
99
101
closed (entry , closed );
100
102
return false ;
101
103
}
@@ -106,15 +108,17 @@ public boolean append(HTTP2Session.Entry entry)
106
108
try (AutoLock ignored = lock .lock ())
107
109
{
108
110
closed = terminated ;
111
+ // If it was not possible to HPACK encode, then allow to send a GOAWAY.
112
+ if (closed instanceof HpackException .SessionException && entry .frame ().getType () == FrameType .GO_AWAY )
113
+ closed = null ;
109
114
if (closed == null )
110
115
{
111
116
entries .offer (entry );
112
117
if (LOG .isDebugEnabled ())
113
118
LOG .debug ("Appended {}, entries={}, {}" , entry , entries .size (), this );
119
+ return true ;
114
120
}
115
121
}
116
- if (closed == null )
117
- return true ;
118
122
closed (entry , closed );
119
123
return false ;
120
124
}
@@ -130,10 +134,9 @@ public boolean append(List<HTTP2Session.Entry> list)
130
134
list .forEach (entries ::offer );
131
135
if (LOG .isDebugEnabled ())
132
136
LOG .debug ("Appended {}, entries={} {}" , list , entries .size (), this );
137
+ return true ;
133
138
}
134
139
}
135
- if (closed == null )
136
- return true ;
137
140
list .forEach (entry -> closed (entry , closed ));
138
141
return false ;
139
142
}
@@ -163,7 +166,21 @@ protected Action process() throws Throwable
163
166
try (AutoLock ignored = lock .lock ())
164
167
{
165
168
if (terminated != null )
166
- throw terminated ;
169
+ {
170
+ boolean rethrow = true ;
171
+ if (terminated instanceof HpackException .SessionException )
172
+ {
173
+ HTTP2Session .Entry entry = entries .peek ();
174
+ if (entry != null && entry .frame ().getType () == FrameType .GO_AWAY )
175
+ {
176
+ // Allow a SessionException to be processed once to send a GOAWAY.
177
+ terminated = new ClosedChannelException ().initCause (terminated );
178
+ rethrow = false ;
179
+ }
180
+ }
181
+ if (rethrow )
182
+ throw terminated ;
183
+ }
167
184
168
185
WindowEntry windowEntry ;
169
186
while ((windowEntry = windows .poll ()) != null )
@@ -248,6 +265,15 @@ protected Action process() throws Throwable
248
265
entry .failed (failure );
249
266
pending .remove ();
250
267
}
268
+ catch (HpackException .SessionException failure )
269
+ {
270
+ if (LOG .isDebugEnabled ())
271
+ LOG .debug ("Failure generating {}" , entry , failure );
272
+ onSessionFailure (failure );
273
+ // The method above will try to send
274
+ // a GOAWAY, so we will iterate again.
275
+ return Action .IDLE ;
276
+ }
251
277
catch (Throwable failure )
252
278
{
253
279
// Failure to generate the entry is catastrophic.
@@ -339,7 +365,23 @@ protected void onCompleteSuccess()
339
365
protected void onCompleteFailure (Throwable x )
340
366
{
341
367
accumulator .release ();
368
+ Throwable closed = fail (x );
369
+ // If the failure came from within the
370
+ // flusher, we need to close the connection.
371
+ if (closed == null )
372
+ session .onWriteFailure (x );
373
+ }
374
+
375
+ private void onSessionFailure (Throwable x )
376
+ {
377
+ accumulator .release ();
378
+ Throwable closed = fail (x );
379
+ if (closed == null )
380
+ session .close (ErrorCode .COMPRESSION_ERROR .code , null , NOOP );
381
+ }
342
382
383
+ private Throwable fail (Throwable x )
384
+ {
343
385
Throwable closed ;
344
386
Set <HTTP2Session .Entry > allEntries ;
345
387
try (AutoLock ignored = lock .lock ())
@@ -361,11 +403,7 @@ protected void onCompleteFailure(Throwable x)
361
403
allEntries .addAll (pendingEntries );
362
404
pendingEntries .clear ();
363
405
allEntries .forEach (entry -> entry .failed (x ));
364
-
365
- // If the failure came from within the
366
- // flusher, we need to close the connection.
367
- if (closed == null )
368
- session .onWriteFailure (x );
406
+ return closed ;
369
407
}
370
408
371
409
public void terminate (Throwable cause )
0 commit comments