Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added stats for phase results processors at search pipeline level #17559

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions release-notes/opensearch.release-notes-3.0.0-alpha1.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
- Fix Bug - Handle unsigned long in sorting order assertion of LongHashSet ([#17207](https://github.com/opensearch-project/OpenSearch/pull/17207))
- Implemented computation of segment replication stats at shard level ([#17055](https://github.com/opensearch-project/OpenSearch/pull/17055))
- [Rule Based Auto-tagging] Add in-memory attribute value store ([#17342](https://github.com/opensearch-project/OpenSearch/pull/17342))
- Added pipeline level stats for search phase result processor ([#17559](https://github.com/opensearch-project/OpenSearch/pull/17559))


### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ protected void afterRequestProcessor(Processor processor, long timeInNanos) {}

protected void onRequestProcessorFailed(Processor processor) {}

protected void beforeTransformPhaseResults() {}

protected void afterTransformPhaseResults(long timeInNanos) {}

protected void onTransformPhaseResultsFailure() {}

protected void beforePhaseResultsProcessor(Processor processor) {}

protected void afterPhaseResultsProcessor(Processor processor, long timeInNanos) {}

protected void onPhaseResultsProcessorFailed(Processor processor) {}

protected void beforeTransformResponse() {}

protected void afterTransformResponse(long timeInNanos) {}
Expand Down Expand Up @@ -276,13 +288,22 @@ <Result extends SearchPhaseResult> void runSearchPhaseResultsTransformer(
String nextPhase,
PipelineProcessingContext requestContext
) throws SearchPipelineProcessingException {
long pipelineStart = relativeTimeSupplier.getAsLong();
beforeTransformPhaseResults();
try {
for (SearchPhaseResultsProcessor searchPhaseResultsProcessor : searchPhaseResultsProcessors) {
beforePhaseResultsProcessor(searchPhaseResultsProcessor);
long start = relativeTimeSupplier.getAsLong();
if (currentPhase.equals(searchPhaseResultsProcessor.getBeforePhase().getName())
&& nextPhase.equals(searchPhaseResultsProcessor.getAfterPhase().getName())) {
try {
searchPhaseResultsProcessor.process(searchPhaseResult, context, requestContext);
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterPhaseResultsProcessor(searchPhaseResultsProcessor, took);
} catch (Exception e) {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterPhaseResultsProcessor(searchPhaseResultsProcessor, took);
onPhaseResultsProcessorFailed(searchPhaseResultsProcessor);
if (searchPhaseResultsProcessor.isIgnoreFailure()) {
logger.warn(
"The exception from search phase results processor ["
Expand All @@ -300,7 +321,11 @@ <Result extends SearchPhaseResult> void runSearchPhaseResultsTransformer(
}
}
} catch (RuntimeException e) {
onTransformPhaseResultsFailure();
throw new SearchPipelineProcessingException(e);
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
afterTransformPhaseResults(took);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ class PipelineWithMetrics extends Pipeline {

private final OperationMetrics totalRequestMetrics;
private final OperationMetrics totalResponseMetrics;
private final OperationMetrics totalPhaseResultsMetrics;
private final OperationMetrics pipelineRequestMetrics = new OperationMetrics();
private final OperationMetrics pipelineResponseMetrics = new OperationMetrics();
private final Map<String, OperationMetrics> requestProcessorMetrics = new HashMap<>();
private final Map<String, OperationMetrics> responseProcessorMetrics = new HashMap<>();
private final Map<String, OperationMetrics> phaseResultsProcessorMetrics = new HashMap<>();

PipelineWithMetrics(
String id,
Expand All @@ -48,6 +50,7 @@ class PipelineWithMetrics extends Pipeline {
NamedWriteableRegistry namedWriteableRegistry,
OperationMetrics totalRequestMetrics,
OperationMetrics totalResponseMetrics,
OperationMetrics totalPhaseResultMetrics,
LongSupplier relativeTimeSupplier
) {
super(
Expand All @@ -62,12 +65,16 @@ class PipelineWithMetrics extends Pipeline {
);
this.totalRequestMetrics = totalRequestMetrics;
this.totalResponseMetrics = totalResponseMetrics;
this.totalPhaseResultsMetrics = totalPhaseResultMetrics;
for (Processor requestProcessor : getSearchRequestProcessors()) {
requestProcessorMetrics.putIfAbsent(getProcessorKey(requestProcessor), new OperationMetrics());
}
for (Processor responseProcessor : getSearchResponseProcessors()) {
responseProcessorMetrics.putIfAbsent(getProcessorKey(responseProcessor), new OperationMetrics());
}
for (Processor phaseResultsProcessor : getSearchPhaseResultsProcessors()) {
phaseResultsProcessorMetrics.putIfAbsent(getProcessorKey(phaseResultsProcessor), new OperationMetrics());
}
}

static PipelineWithMetrics create(
Expand All @@ -79,6 +86,7 @@ static PipelineWithMetrics create(
NamedWriteableRegistry namedWriteableRegistry,
OperationMetrics totalRequestProcessingMetrics,
OperationMetrics totalResponseProcessingMetrics,
OperationMetrics totalPhaseResultProcessingMetrics,
Processor.PipelineContext pipelineContext
) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
Expand Down Expand Up @@ -129,6 +137,7 @@ static PipelineWithMetrics create(
namedWriteableRegistry,
totalRequestProcessingMetrics,
totalResponseProcessingMetrics,
totalPhaseResultProcessingMetrics,
System::nanoTime
);

Expand Down Expand Up @@ -236,6 +245,39 @@ protected void onResponseProcessorFailed(Processor processor) {
responseProcessorMetrics.get(getProcessorKey(processor)).failed();
}

@Override
protected void beforeTransformPhaseResults() {
super.beforeTransformRequest();
totalRequestMetrics.before();
pipelineRequestMetrics.before();
}

@Override
protected void afterTransformPhaseResults(long timeInNanos) {
super.afterTransformRequest(timeInNanos);
totalRequestMetrics.after(timeInNanos);
pipelineRequestMetrics.after(timeInNanos);
}

@Override
protected void onTransformPhaseResultsFailure() {
super.onTransformRequestFailure();
totalRequestMetrics.failed();
pipelineRequestMetrics.failed();
}

protected void beforePhaseResultsProcessor(Processor processor) {
phaseResultsProcessorMetrics.get(getProcessorKey(processor)).before();
}

protected void afterPhaseResultsProcessor(Processor processor, long timeInNanos) {
phaseResultsProcessorMetrics.get(getProcessorKey(processor)).after(timeInNanos);
}

protected void onPhaseResultsProcessorFailed(Processor processor) {
phaseResultsProcessorMetrics.get(getProcessorKey(processor)).failed();
}

void copyMetrics(PipelineWithMetrics oldPipeline) {
pipelineRequestMetrics.add(oldPipeline.pipelineRequestMetrics);
pipelineResponseMetrics.add(oldPipeline.pipelineResponseMetrics);
Expand Down Expand Up @@ -272,5 +314,14 @@ void populateStats(SearchPipelineStats.Builder statsBuilder) {
String key = getProcessorKey(processor);
statsBuilder.addResponseProcessorStats(getId(), key, processor.getType(), responseProcessorMetrics.get(key));
}
for (Processor phaseResultsProcessor : getSearchPhaseResultsProcessors()) {
String key = getProcessorKey(phaseResultsProcessor);
statsBuilder.addPhaseResultsProcessorStats(
getId(),
key,
phaseResultsProcessor.getType(),
phaseResultsProcessorMetrics.get(key)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class SearchPipelineService implements ClusterStateApplier, ReportingServ

private final OperationMetrics totalRequestProcessingMetrics = new OperationMetrics();
private final OperationMetrics totalResponseProcessingMetrics = new OperationMetrics();
private final OperationMetrics totalPhaseProcessingMetrics = new OperationMetrics();

public SearchPipelineService(
ClusterService clusterService,
Expand Down Expand Up @@ -191,6 +192,7 @@ void innerUpdatePipelines(SearchPipelineMetadata newSearchPipelineMetadata) {
namedWriteableRegistry,
totalRequestProcessingMetrics,
totalResponseProcessingMetrics,
totalPhaseProcessingMetrics,
new Processor.PipelineContext(Processor.PipelineSource.UPDATE_PIPELINE)
);
newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline));
Expand Down Expand Up @@ -288,6 +290,7 @@ void validatePipeline(Map<DiscoveryNode, SearchPipelineInfo> searchPipelineInfos
namedWriteableRegistry,
new OperationMetrics(), // Use ephemeral metrics for validation
new OperationMetrics(),
new OperationMetrics(),
new Processor.PipelineContext(Processor.PipelineSource.VALIDATE_PIPELINE)
);
List<Exception> exceptions = new ArrayList<>();
Expand Down Expand Up @@ -384,6 +387,7 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest, IndexNameEx
namedWriteableRegistry,
totalRequestProcessingMetrics,
totalResponseProcessingMetrics,
totalPhaseProcessingMetrics,
new Processor.PipelineContext(Processor.PipelineSource.SEARCH_REQUEST)
);
} catch (Exception e) {
Expand Down Expand Up @@ -457,7 +461,7 @@ public SearchPipelineInfo info() {

public SearchPipelineStats stats() {
SearchPipelineStats.Builder builder = new SearchPipelineStats.Builder();
builder.withTotalStats(totalRequestProcessingMetrics, totalResponseProcessingMetrics);
builder.withTotalStats(totalRequestProcessingMetrics, totalResponseProcessingMetrics, totalPhaseProcessingMetrics);
for (PipelineHolder pipelineHolder : pipelines.values()) {
PipelineWithMetrics pipeline = pipelineHolder.pipeline;
pipeline.populateStats(builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,21 @@ public SearchPipelineStats(StreamInput in) throws IOException {
OperationStats processorStats = new OperationStats(in);
responseProcessorStats.add(new ProcessorStats(processorName, processorType, processorStats));
}
int numPhaseResultsProcessors = in.readVInt();
List<ProcessorStats> phaseResultsProcessorStats = new ArrayList<>(numPhaseResultsProcessors);
for (int j = 0; j < numPhaseResultsProcessors; j++) {
String processorName = in.readString();
String processorType = in.readString();
OperationStats processorStats = new OperationStats(in);
phaseResultsProcessorStats.add(new ProcessorStats(processorName, processorType, processorStats));
}
pipelineDetailStatsMap.put(
pipelineId,
new PipelineDetailStats(unmodifiableList(requestProcessorStats), unmodifiableList(responseProcessorStats))
new PipelineDetailStats(
unmodifiableList(requestProcessorStats),
unmodifiableList(phaseResultsProcessorStats),
unmodifiableList(responseProcessorStats)
)
);
}
this.perPipelineStats = unmodifiableList(perPipelineStats);
Expand Down Expand Up @@ -122,6 +134,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.endObject();
}
builder.endArray();
builder.startArray("phase_results_processors");
for (ProcessorStats processorStats : pipelineDetailStats.phaseResultsProcessorStats) {
builder.startObject();
processorStats.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
builder.endObject();
}
builder.endObject();
Expand Down Expand Up @@ -151,19 +170,32 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(processorStats.processorType);
processorStats.stats.writeTo(out);
}
out.writeVInt(pipelineDetailStats.phaseResultsProcessorStats.size());
for (ProcessorStats processorStats : pipelineDetailStats.phaseResultsProcessorStats) {
out.writeString(processorStats.processorName);
out.writeString(processorStats.processorType);
processorStats.stats.writeTo(out);
}
}
}

static class Builder {
private OperationStats totalRequestStats;
private OperationStats totalResponseStats;
private OperationStats totalPhaseResultsStats;
private final List<PerPipelineStats> perPipelineStats = new ArrayList<>();
private final Map<String, List<ProcessorStats>> requestProcessorStatsPerPipeline = new HashMap<>();
private final Map<String, List<ProcessorStats>> responseProcessorStatsPerPipeline = new HashMap<>();
private final Map<String, List<ProcessorStats>> phaseResultsProcessorStatsPerPipeline = new HashMap<>();

Builder withTotalStats(OperationMetrics totalRequestMetrics, OperationMetrics totalResponseMetrics) {
Builder withTotalStats(
OperationMetrics totalRequestMetrics,
OperationMetrics totalResponseMetrics,
OperationMetrics totalPhaseProcessingMetrics
) {
this.totalRequestStats = totalRequestMetrics.createStats();
this.totalResponseStats = totalResponseMetrics.createStats();
this.totalPhaseResultsStats = totalPhaseProcessingMetrics.createStats();
return this;
}

Expand All @@ -180,6 +212,17 @@ Builder addRequestProcessorStats(String pipelineId, String processorName, String
return this;
}

Builder addPhaseResultsProcessorStats(
String pipelineId,
String processorName,
String processorType,
OperationMetrics processorMetrics
) {
this.phaseResultsProcessorStatsPerPipeline.computeIfAbsent(pipelineId, k -> new ArrayList<>())
.add(new ProcessorStats(processorName, processorType, processorMetrics.createStats()));
return this;
}

Builder addResponseProcessorStats(
String pipelineId,
String processorName,
Expand All @@ -198,12 +241,17 @@ SearchPipelineStats build() {
pipelineStat.pipelineId,
emptyList()
);
List<ProcessorStats> phaseResultsProcessorStats = phaseResultsProcessorStatsPerPipeline.getOrDefault(
pipelineStat.pipelineId,
emptyList()
);
List<ProcessorStats> responseProcessorStats = responseProcessorStatsPerPipeline.getOrDefault(
pipelineStat.pipelineId,
emptyList()
);
PipelineDetailStats pipelineDetailStats = new PipelineDetailStats(
unmodifiableList(requestProcessorStats),
unmodifiableList(phaseResultsProcessorStats),
unmodifiableList(responseProcessorStats)
);
pipelineDetailStatsMap.put(pipelineStat.pipelineId, pipelineDetailStats);
Expand Down Expand Up @@ -256,10 +304,16 @@ public int hashCode() {

static class PipelineDetailStats {
private final List<ProcessorStats> requestProcessorStats;
private final List<ProcessorStats> phaseResultsProcessorStats;
private final List<ProcessorStats> responseProcessorStats;

public PipelineDetailStats(List<ProcessorStats> requestProcessorStats, List<ProcessorStats> responseProcessorStats) {
public PipelineDetailStats(
List<ProcessorStats> requestProcessorStats,
List<ProcessorStats> phaseResultsProcessorStats,
List<ProcessorStats> responseProcessorStats
) {
this.requestProcessorStats = requestProcessorStats;
this.phaseResultsProcessorStats = phaseResultsProcessorStats;
this.responseProcessorStats = responseProcessorStats;
}

Expand All @@ -271,17 +325,23 @@ public List<ProcessorStats> responseProcessorStats() {
return responseProcessorStats;
}

public List<ProcessorStats> phaseResultsProcessorStats() {
return phaseResultsProcessorStats;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PipelineDetailStats that = (PipelineDetailStats) o;
return requestProcessorStats.equals(that.requestProcessorStats) && responseProcessorStats.equals(that.responseProcessorStats);
return requestProcessorStats.equals(that.requestProcessorStats)
&& responseProcessorStats.equals(that.responseProcessorStats)
&& phaseResultsProcessorStats.equals(that.phaseResultsProcessorStats);
}

@Override
public int hashCode() {
return Objects.hash(requestProcessorStats, responseProcessorStats);
return Objects.hash(requestProcessorStats, responseProcessorStats, phaseResultsProcessorStats);
}
}

Expand Down
Loading
Loading