Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-14358 ConsumeKinesisStream added pausing on backpressure #9812

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

dariuszseweryn
Copy link
Contributor

Summary

NIFI-14358

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using mvn clean install -P contrib-check
    • JDK 21

Licensing

  • (no new) New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • (no new) New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • (no changes) Documentation formatting appears as expected in rendered files

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for proposing these changes @dariuszseweryn. Honoring back pressure is an important improvement, but I think the interface naming and implementation needs to be reworked.

The Pause naming is very generic, and as this relates more specifically the consuming records, a more specific naming strategy would be more intuitive.

On the read side in the RecordProcessor classes, treating this as more of a conditional rather than consumePause() would be much clearer. There are some analogies to a Semaphore, although I don't think that actual class should be used here because of unnecessary complexity. The ManagedBlocker seems a bit closer to what is intended here. What do you think about naming the basic interface something like RecordProcessorBlocker? That highlights the usage in RecordProcessor and also connotes blocking, which is the purpose of the abstraction. Perhaps RecordProcessorLatch could be better, inspired by the CountDownLatch class, but not necessarily tied to that implementation. The basic method could be await() following the CountDownLatch pattern.

The write side in the ConsumeKinesisStream Processor class would need a different set of methods, which could be declared on a concrete class if only referenced in that Processor. On that side, I don't think it is necessary pass in the ProcessContext in an onTrigger() method. Instead, the Processor itself could check the available relationship status, and then call a method. As far as naming, perhaps something using block could be useful, such as block() and unblock().

With that background, this highlights some of the shortcomings of the current Processor design, as described in other Jira issues, but it seems like a reasonable way forward with the right implementation.

@dariuszseweryn
Copy link
Contributor Author

Hey @exceptionfactory
Thanks for the suggestions! Much appreciated as "naming things is hard™". I'll incorporate your suggestions.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @dariuszseweryn, the naming looks like it is headed in a good direction. I noted a handful of additional naming consistency items, along with some questions around test and runtime code.

@dariuszseweryn
Copy link
Contributor Author

Yet again, great catches. I probably shouldn't code on evenings!

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @dariuszseweryn, I raised one more key question regarding the CountDownLatch instance, and some test-related items, but otherwise this looks close to completion.

@@ -72,7 +73,7 @@ public void setUp() {

// default test fixture will try operations twice with very little wait in between
fixture = new AbstractKinesisRecordProcessor(processSessionFactory, runner.getLogger(), "kinesis-test",
"endpoint-prefix", null, 10_000L, 1L, 2, DATE_TIME_FORMATTER) {
"endpoint-prefix", null, 10_000L, 1L, 2, DATE_TIME_FORMATTER, NoopRecordProcessorBlocker.create()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the StandardRecordProcessorBlocker be used here? It seems like the Noop instance is not required for standard behavior in this case.

onPauseFinished = true;
}

public void awaitAwaited() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void awaitAwaited() {
public void awaitStarted() {

Comment on lines +86 to +95
private boolean onPauseAwaited = false;
private boolean onPauseFinished = false;

public void onPauseAwaited() {
onPauseAwaited = true;
}

public void onPauseFinished() {
onPauseFinished = true;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like these variables and methods should be updated to reflect the Blocker naming.

protected StandardRecordProcessorBlocker() { }

public void await() throws InterruptedException {
blocker.await();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if unblock() and block() get called while the RecordProcessor is waiting on this call, since the CountDownLatch instance would be changed in the block() method?

The CyclicBarrier can be reset, but doesn't have all the semantics that seem to be required.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants