-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KIP-899] Allow producer and consumer clients to rebootstrap #4980
base: purge-brokers-plus-test-fixes
Are you sure you want to change the base?
Conversation
🎉 All Contributor License Agreements have been signed. Ready to merge. |
5bff66d
to
313f6a0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass comments for the code. Testing ongoing.
@@ -12,6 +12,7 @@ message.copy.max.bytes | * | 0 .. 1000000000 | 65535 | |||
receive.message.max.bytes | * | 1000 .. 2147483647 | 100000000 | medium | Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least `fetch.max.bytes` + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set. <br>*Type: integer* | |||
max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 | low | Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer* | |||
max.in.flight | * | 1 .. 1000000 | 1000000 | low | Alias for `max.in.flight.requests.per.connection`: Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer* | |||
metadata.recovery.strategy | * | none, rebootstrap | rebootstrap | low | Controls how the client recovers when none of the brokers known to it is available. If set to `none`, the client fails with a fatal error. If set to `rebootstrap`, the client repeats the bootstrap process using `bootstrap.servers` and brokers added through `rd_kafka_brokers_add()`. Rebootstrapping is useful when a client communicates with brokers so infrequently that the set of brokers may change entirely before the client refreshes metadata. Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously. <br>*Type: enum value* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the decision to keep rebootstrap
as default for librdkafka even though Java has none
as default for now.
Currently in librdkafka, we always keep bootstrap brokers and hence KIP-899 was not really required. But now with purging unavaiable brokers, its best to not keep bootstrap brokers always and implement KIP-899 as default.
@@ -303,6 +303,8 @@ struct rd_kafka_s { | |||
* and are not internal or logical. */ | |||
rd_atomic32_t rk_broker_down_cnt; | |||
|
|||
/**< Additional bootstrap servers list. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can elaborate on the comment that this is the list of brokers added through rd_kafka_brokers_add()
method and doesn't include bootstrap brokers provided through conf.
if (rk->rk_conf.metadata_recovery_strategy == | ||
RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE) { | ||
rd_kafka_set_fatal_error( | ||
rk, RD_KAFKA_RESP_ERR_REBOOTSTRAP_REQUIRED, "%s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RD_KAFKA_RESP_ERR_REBOOTSTRAP_REQUIRED
is not defined in this PR. That is part of KIP-1102 PR. Please move it here.
{_RK_GLOBAL, "metadata.recovery.strategy", _RK_C_S2I, | ||
_RK(metadata_recovery_strategy), | ||
"Controls how the client recovers when none of the brokers known to it " | ||
"is available. If set to `none`, the client fails with a fatal error. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should explain about default case i.e. rebootstrap
first.
7e236f5
to
925c2b2
Compare
313f6a0
to
b845ac2
Compare
@@ -344,6 +344,7 @@ void rd_kafka_broker_set_state(rd_kafka_broker_t *rkb, int state) { | |||
rd_atomic32_get( | |||
&rkb->rkb_rk->rk_logical_broker_cnt) && | |||
!rd_kafka_terminating(rkb->rkb_rk)) { | |||
rd_kafka_rebootstrap(rkb->rkb_rk); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we should be removing all the other brokers. We are rebootstraping and we don't need old brokers. We will be sending metadata request to these new bootstrap brokers.
This can be an improvement for now maybe.
925c2b2
to
2830d02
Compare
b845ac2
to
aeda39e
Compare
No description provided.