Skip to content

Commit 790a563

Browse files
Merge branch 'master' into tzhang-si-readme
2 parents 62fc1dd + f1fc322 commit 790a563

File tree

4 files changed

+80
-16
lines changed

4 files changed

+80
-16
lines changed

src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java

+30-4
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.concurrent.CompletableFuture;
1010
import javax.annotation.Nullable;
1111
import net.snowflake.ingest.streaming.internal.ColumnProperties;
12+
import net.snowflake.ingest.utils.SFException;
1213

1314
/**
1415
* A logical partition that represents a connection to a single Snowflake table, data will be
@@ -67,17 +68,27 @@ public interface SnowflakeStreamingIngestChannel {
6768
String getFullyQualifiedTableName();
6869

6970
/**
70-
* @return a boolean which indicates whether the channel is valid
71+
* @return a boolean which indicates whether the channel is valid. Typically, this means whether
72+
* the current instance is the owner of the channel, i.e., if another Client opens a {@link
73+
* SnowflakeStreamingIngestChannel} of the same name then the current instance will be
74+
* considered "invalid" as its persisted epoch will have increased. If this returns false then
75+
* calling `insertRow(s)` on this channel instance will result in an {@link SFException} and
76+
* no further writes to the channel will be accepted.
77+
* <p>>Note: there may be a delay between server-side invalidations and the Client detecting
78+
* it so this may not immediately return false in the event of a server-side invalidation.
7179
*/
7280
boolean isValid();
7381

7482
/**
75-
* @return a boolean which indicates whether the channel is closed
83+
* @return a boolean which indicates whether the channel is closed. If true this means that the
84+
* current {@link SnowflakeStreamingIngestChannel} will not accept any additional rows on
85+
* calls to `insertRow(s)`
7686
*/
7787
boolean isClosed();
7888

7989
/**
80-
* Close the channel, this function will make sure all the data in this channel is committed
90+
* Close the channel. Closing entails draining any outstanding data in the Channel's buffer and
91+
* marking the Channel as no longer being able to accept writes via `insertRow(s)`*
8192
*
8293
* @return a completable future which will be completed when the channel is closed
8394
*/
@@ -206,7 +217,7 @@ public interface SnowflakeStreamingIngestChannel {
206217
* </li>
207218
*
208219
* </ul>
209-
*
220+
* <p>
210221
* For TIMESTAMP_LTZ and TIMESTAMP_TZ, all input without timezone will be by default interpreted in the timezone "America/Los_Angeles". This can be changed by calling {@link net.snowflake.ingest.streaming.OpenChannelRequest.OpenChannelRequestBuilder#setDefaultTimezone(ZoneId)}.
211222
* </td>
212223
* <tr>
@@ -248,6 +259,9 @@ public interface SnowflakeStreamingIngestChannel {
248259
* @param offsetToken offset of given row, used for replay in case of failures. It could be null
249260
* if you don't plan on replaying or can't replay
250261
* @return insert response that possibly contains errors because of insertion failures
262+
* @throws SFException if the channel is not considered "valid". Typically, this means that
263+
* another Client has claimed ownership of the Channel. Writes to this channel will be
264+
* rejected and result in this exception being thrown.
251265
*/
252266
InsertValidationResponse insertRow(Map<String, Object> row, @Nullable String offsetToken);
253267

@@ -262,6 +276,9 @@ public interface SnowflakeStreamingIngestChannel {
262276
* @param endOffsetToken end offset of the batch/row-set, used for replay in case of failures, *
263277
* It could be null if you don't plan on replaying or can't replay
264278
* @return insert response that possibly contains errors because of insertion failures
279+
* @throws SFException if the channel is not considered "valid". Typically, this means that
280+
* another Client has claimed ownership of the Channel. Writes to this channel will be
281+
* rejected and result in this exception being thrown.
265282
*/
266283
InsertValidationResponse insertRows(
267284
Iterable<Map<String, Object>> rows,
@@ -271,6 +288,10 @@ InsertValidationResponse insertRows(
271288
/**
272289
* Insert a batch of rows into the channel with the end offset token only, please see {@link
273290
* SnowflakeStreamingIngestChannel#insertRows(Iterable, String, String)} for more information.
291+
*
292+
* @throws SFException if the channel is not considered "valid". Typically, this means that
293+
* another Client has claimed ownership of the Channel. Writes to this channel will be
294+
* rejected and result in this exception being thrown.
274295
*/
275296
InsertValidationResponse insertRows(
276297
Iterable<Map<String, Object>> rows, @Nullable String offsetToken);
@@ -279,6 +300,11 @@ InsertValidationResponse insertRows(
279300
* Get the latest committed offset token from Snowflake
280301
*
281302
* @return the latest committed offset token
303+
* @throws SFException if Snowflake returns an invalid response code from its `status` endpoint
304+
* which typically indicates that the channel needs to be re-opened. This is evaluated with
305+
* respect to the current Client epoch, i.e. if another Client opens the channel with the same
306+
* name this will throw an exception as the current instant is not the active version of the
307+
* channel.
282308
*/
283309
@Nullable
284310
String getLatestCommittedOffsetToken();

src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java

+6
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ static class FlushInfo {
4343
/**
4444
* Add a channel to the channel cache
4545
*
46+
* <p>Note: if there was a previous instance of the channel then the old one is considered
47+
* "invalid". Callers with a reference to the old channel object will have their writes rejected
48+
* as the channel reference will be marked "invalid". Similarly, calls to fetch the current status
49+
* of old versions of the channel will have an exception thrown as the channel is considered
50+
* invalid.
51+
*
4652
* @param channel
4753
*/
4854
void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {

src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java

+41-10
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ public boolean isClosed() {
220220
return this.isClosed;
221221
}
222222

223-
/** Mark the channel as closed */
223+
/** Mark the channel as closed, meaning that no additional writes can be made to the channel */
224224
void markClosed() {
225225
this.isClosed = true;
226226
logger.logInfo(
@@ -262,6 +262,14 @@ public CompletableFuture<Void> close() {
262262
return this.close(false);
263263
}
264264

265+
/**
266+
* Attempts to close the channel by draining any outstanding data and rejecting further writes to
267+
* the channel.
268+
*
269+
* @param drop if true, the channel will be dropped after all data is successfully committed.
270+
* @throws SFException when the channel is invalid or closed, meaning that no new rows can be
271+
* inserted via this instance of the channel.
272+
*/
265273
@Override
266274
public CompletableFuture<Void> close(boolean drop) {
267275
checkValidation();
@@ -302,7 +310,7 @@ public CompletableFuture<Void> close(boolean drop) {
302310
}
303311

304312
/**
305-
* Setup the column fields and vectors using the column metadata from the server
313+
* Set up the column fields and vectors using the column metadata from the server
306314
*
307315
* @param columns
308316
*/
@@ -322,10 +330,15 @@ void setupSchema(List<ColumnMetadata> columns) {
322330
/**
323331
* The row is represented using Map where the key is column name and the value is a row of data
324332
*
333+
* <p>Notes: this may throttle on the calling thread if there is insufficient memory to accept the
334+
* supplied rows. This is done by sleeping in the current thread and checking memory conditions.
335+
* Thus, the caller should expect this to not return immediately.
336+
*
325337
* @param row object data to write
326338
* @param offsetToken offset of given row, used for replay in case of failures
327339
* @return insert response that possibly contains errors because of insertion failures
328-
* @throws SFException when the channel is invalid or closed
340+
* @throws SFException when the channel is invalid or closed, meaning that no new rows can be
341+
* inserted via this instance of the channel.
329342
*/
330343
@Override
331344
public InsertValidationResponse insertRow(Map<String, Object> row, String offsetToken) {
@@ -344,11 +357,19 @@ public InsertValidationResponse insertRow(Map<String, Object> row, String offset
344357
* SnowflakeStreamingIngestChannel#insertRow(Map, String)} for more information about accepted
345358
* values.
346359
*
360+
* <p>Notes: this may throttle on the calling thread if there is insufficient memory to accept the
361+
* supplied rows. This is done by sleeping in the current thread and checking memory conditions.
362+
* Thus, the caller should expect this to not return immediately.
363+
*
347364
* @param rows object data to write
348365
* @param startOffsetToken start offset of the batch/row-set
349366
* @param endOffsetToken end offset of the batch/row-set, used for replay in case of failures, *
350367
* It could be null if you don't plan on replaying or can't replay
351-
* @return insert response that possibly contains errors because of insertion failures
368+
* @return insert response that possibly contains errors because of insertion failures throws
369+
* SFException if the channel is closed or is considered "invalid", i.e. another Client
370+
* instance has opened the Channel.
371+
* @throws SFException when the channel is invalid or closed, meaning that no new rows can be
372+
* inserted via this instance of the channel.
352373
*/
353374
@Override
354375
public InsertValidationResponse insertRows(
@@ -401,11 +422,12 @@ void collectRowSize(float rowSize) {
401422
}
402423

403424
/**
404-
* Get the latest committed offset token from Snowflake, an exception will be thrown if the
405-
* channel becomes invalid due to errors and the channel needs to be reopened in order to return a
406-
* valid offset token
425+
* Get the latest committed offset token from Snowflake for this Channel instance.
407426
*
408-
* @return the latest committed offset token
427+
* @return the latest committed offset token for this Channel instance.
428+
* @throws SFException if the current instance of the Channel is not the current owner OR if the
429+
* caller cannot communicate with Snowflake. Typically, this indicates that another Client
430+
* instance has opened up the Channel with the same name.
409431
*/
410432
@Override
411433
public String getLatestCommittedOffsetToken() {
@@ -427,7 +449,11 @@ public Map<String, ColumnProperties> getTableSchema() {
427449
return this.tableColumns;
428450
}
429451

430-
/** Check whether we need to throttle the insertRows API */
452+
/**
453+
* Potentially throttles a call to the `insertRow(s)` APIs if needed. Note that throttling applies
454+
* to the calling thread by sleeping on the curernt thread and checking whether memory conditions
455+
* are suitable to accepting the current row.
456+
*/
431457
void throttleInsertIfNeeded(MemoryInfoProvider memoryInfoProvider) {
432458
int retry = 0;
433459
while ((hasLowRuntimeMemory(memoryInfoProvider)
@@ -471,7 +497,12 @@ private boolean hasLowRuntimeMemory(MemoryInfoProvider memoryInfoProvider) {
471497
return hasLowRuntimeMemory;
472498
}
473499

474-
/** Check whether the channel is still valid, cleanup and throw an error if not */
500+
/**
501+
* Check whether the channel is considered "valid", i.e. the current instance of the channel
502+
* matches the server-side persisted Client epoch.
503+
*
504+
* @throws SFException if the current instance of the channel is no longer considered valid
505+
*/
475506
private void checkValidation() {
476507
if (!isValid()) {
477508
this.owningClient.removeChannelIfSequencersMatch(this);

src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -481,10 +481,11 @@ public Map<String, String> getLatestCommittedOffsetTokens(
481481
}
482482

483483
/**
484-
* Fetch channels status from Snowflake
484+
* Fetch the status of one or more Channels from Snowflake
485485
*
486-
* @param channels a list of channels that we want to get the status on
486+
* @param channels a list of channels that we want to get the status of
487487
* @return a ChannelsStatusResponse object
488+
* @throws SFException if the caller cannot communicate with Snowflake
488489
*/
489490
ChannelsStatusResponse getChannelsStatus(
490491
List<SnowflakeStreamingIngestChannelInternal<?>> channels) {

0 commit comments

Comments
 (0)