|
31 | 31 | import io.grpc.stub.StreamObserver;
|
32 | 32 | import io.grpc.testing.GrpcServerRule;
|
33 | 33 | import java.util.ArrayDeque;
|
| 34 | +import java.util.Set; |
34 | 35 | import java.util.concurrent.TimeUnit;
|
35 | 36 | import org.junit.After;
|
36 | 37 | import org.junit.Before;
|
@@ -109,6 +110,46 @@ public void enterTerminalState_watch() throws Exception {
|
109 | 110 | assertThat(obs.responses).isEmpty();
|
110 | 111 | }
|
111 | 112 |
|
| 113 | + @Test |
| 114 | + public void watchNotifyException() throws Exception { |
| 115 | + manager.setStatus(SERVICE1, ServingStatus.SERVING); |
| 116 | + CancellableRespObserver cobs1 = new CancellableRespObserver(); |
| 117 | + service.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), cobs1); |
| 118 | + assertThat(cobs1.responses).hasSize(1); |
| 119 | + HealthCheckResponse resp = (HealthCheckResponse) cobs1.responses.poll(); |
| 120 | + assertThat(resp.getStatus()).isEqualTo(ServingStatus.SERVING); |
| 121 | + cobs1.responses.clear(); |
| 122 | + |
| 123 | + CancellableRespObserver cobs2 = new CancellableRespObserver(); |
| 124 | + service.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), cobs2); |
| 125 | + assertThat(cobs2.responses).hasSize(1); |
| 126 | + resp = (HealthCheckResponse) cobs2.responses.poll(); |
| 127 | + assertThat(resp.getStatus()).isEqualTo(ServingStatus.SERVING); |
| 128 | + cobs2.responses.clear(); |
| 129 | + |
| 130 | + // assert that both observers are in watchers |
| 131 | + Set<StreamObserver<HealthCheckResponse>> observers = service.watchersForTest(SERVICE1); |
| 132 | + assertThat(observers).hasSize(2); |
| 133 | + |
| 134 | + // cancel first Observer |
| 135 | + cobs1 = (CancellableRespObserver) observers.stream().findFirst().get(); |
| 136 | + cobs1.setCancelled(true); |
| 137 | + |
| 138 | + cobs2 = (CancellableRespObserver) observers.stream().skip(1).findFirst().get(); |
| 139 | + |
| 140 | + manager.setStatus(SERVICE1, ServingStatus.NOT_SERVING); |
| 141 | + |
| 142 | + // first observer should notify exception |
| 143 | + assertThat(cobs1.responses).isEmpty(); |
| 144 | + |
| 145 | + // second observer should be notified |
| 146 | + assertThat(cobs2.responses).hasSize(1); |
| 147 | + resp = (HealthCheckResponse) cobs2.responses.poll(); |
| 148 | + assertThat(resp.getStatus()).isEqualTo(ServingStatus.NOT_SERVING); |
| 149 | + cobs2.responses.clear(); |
| 150 | + |
| 151 | + } |
| 152 | + |
112 | 153 | @Test
|
113 | 154 | public void enterTerminalState_ignoreClear() throws Exception {
|
114 | 155 | manager.setStatus(SERVICE1, ServingStatus.SERVING);
|
@@ -311,4 +352,23 @@ public void onCompleted() {
|
311 | 352 | responses.add("onCompleted");
|
312 | 353 | }
|
313 | 354 | }
|
| 355 | + |
| 356 | + private static class CancellableRespObserver extends RespObserver { |
| 357 | + |
| 358 | + boolean cancelled = false; |
| 359 | + |
| 360 | + public void setCancelled(boolean cancelled) { |
| 361 | + this.cancelled = cancelled; |
| 362 | + } |
| 363 | + |
| 364 | + @Override |
| 365 | + public void onNext(HealthCheckResponse value) { |
| 366 | + if (cancelled) { |
| 367 | + throw Status.CANCELLED |
| 368 | + .withDescription("call already cancelled.") |
| 369 | + .asRuntimeException(); |
| 370 | + } |
| 371 | + super.onNext(value); |
| 372 | + } |
| 373 | + } |
314 | 374 | }
|
0 commit comments