Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring Apache Arrow classes into arrow-spi library so those could be shared between plugins #17580

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ opentelemetry = "1.46.0"
opentelemetrysemconv = "1.29.0-alpha"

# arrow dependencies
arrow = "18.1.0"
arrow = "18.2.0"
flatbuffers = "2.0.0"

[libraries]
Expand Down
33 changes: 33 additions & 0 deletions libs/arrow-spi/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,41 @@ testingConventions.enabled = false

dependencies {
api project(':libs:opensearch-core')
api "org.apache.arrow:arrow-memory-core:${versions.arrow}"
api "org.apache.arrow:arrow-vector:${versions.arrow}"
api "org.apache.arrow:arrow-format:${versions.arrow}"
}

tasks.named('forbiddenApisMain').configure {
replaceSignatureFiles 'jdk-signatures'
}

tasks.named('thirdPartyAudit').configure {
ignoreMissingClasses(
'org.slf4j.Logger',
'org.slf4j.LoggerFactory',
'com.fasterxml.jackson.databind.JsonNode',
'com.fasterxml.jackson.databind.MapperFeature',
'com.fasterxml.jackson.databind.MappingJsonFactory',
'com.fasterxml.jackson.databind.ObjectMapper',
'com.fasterxml.jackson.databind.ObjectReader',
'com.fasterxml.jackson.databind.ObjectWriter',
'com.fasterxml.jackson.databind.SerializerProvider',
'com.fasterxml.jackson.databind.json.JsonMapper',
'com.fasterxml.jackson.databind.json.JsonMapper$Builder',
'com.fasterxml.jackson.databind.node.ObjectNode',
'com.fasterxml.jackson.databind.ser.std.StdSerializer',
'com.google.flatbuffers.BaseVector',
'com.google.flatbuffers.Constants',
'com.google.flatbuffers.FlatBufferBuilder',
'com.google.flatbuffers.IntVector',
'com.google.flatbuffers.LongVector',
'com.google.flatbuffers.Struct',
'com.google.flatbuffers.Table',
'org.apache.commons.codec.binary.Hex'
)
ignoreViolations(
'org.apache.arrow.memory.util.MemoryUtil',
'org.apache.arrow.memory.util.MemoryUtil$1'
)
}
1 change: 1 addition & 0 deletions libs/arrow-spi/licenses/arrow-format-18.2.0.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
34c4c28ddc01038498b89125c3a9f2329c2227ae
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
af2e83d46089243245decb0c4a6f8bc4231a35f0
1 change: 1 addition & 0 deletions libs/arrow-spi/licenses/arrow-vector-18.2.0.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
8c4ecbc286b28d738730514cd79567d52666c086
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface StreamManager extends AutoCloseable {
* @return A StreamTicket that can be used to access the stream
* @throws IllegalArgumentException if producer is null or parentTaskId is invalid
*/
<VectorRoot, Allocator> StreamTicket registerStream(StreamProducer<VectorRoot, Allocator> producer, TaskId parentTaskId);
StreamTicket registerStream(StreamProducer producer, TaskId parentTaskId);

/**
* Creates a stream reader for consuming Arrow data using a valid ticket.
Expand All @@ -46,7 +46,7 @@ public interface StreamManager extends AutoCloseable {
* @throws IllegalArgumentException if the ticket is invalid
* @throws IllegalStateException if the stream has been cancelled or closed
*/
<VectorRoot> StreamReader<VectorRoot> getStreamReader(StreamTicket ticket);
StreamReader getStreamReader(StreamTicket ticket);

/**
* Gets the StreamTicketFactory instance associated with this StreamManager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.arrow.spi;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.tasks.TaskId;

Expand Down Expand Up @@ -75,7 +77,7 @@
* @see StreamReader
*/
@ExperimentalApi
public interface StreamProducer<VectorRoot, Allocator> extends Closeable {
public interface StreamProducer extends Closeable {

/**
* Creates a VectorSchemaRoot that defines the schema for this stream. This schema will be used
Expand All @@ -84,7 +86,7 @@ public interface StreamProducer<VectorRoot, Allocator> extends Closeable {
* @param allocator The allocator to use for creating vectors
* @return A new VectorSchemaRoot instance
*/
VectorRoot createRoot(Allocator allocator);
VectorSchemaRoot createRoot(BufferAllocator allocator);

/**
* Creates a job that will produce the stream data in batches. The job will populate
Expand All @@ -93,7 +95,7 @@ public interface StreamProducer<VectorRoot, Allocator> extends Closeable {
* @param allocator The allocator to use for any additional memory allocations
* @return A new BatchedJob instance
*/
BatchedJob<VectorRoot> createJob(Allocator allocator);
BatchedJob createJob(BufferAllocator allocator);

/**
* Provides an estimate of the total number of rows that will be produced.
Expand All @@ -111,7 +113,7 @@ public interface StreamProducer<VectorRoot, Allocator> extends Closeable {
/**
* BatchedJob interface for producing stream data in batches.
*/
interface BatchedJob<VectorRoot> {
interface BatchedJob {

/**
* Executes the batch processing job. Implementations should populate the root with data
Expand All @@ -120,7 +122,7 @@ interface BatchedJob<VectorRoot> {
* @param root The VectorSchemaRoot to populate with data
* @param flushSignal Signal to coordinate with consumers
*/
void run(VectorRoot root, FlushSignal flushSignal);
void run(VectorSchemaRoot root, FlushSignal flushSignal);

/**
* Called to signal producer when the job is canceled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.arrow.spi;

import org.apache.arrow.vector.VectorSchemaRoot;
import org.opensearch.common.annotation.ExperimentalApi;

import java.io.Closeable;
Expand Down Expand Up @@ -36,7 +37,7 @@
* @see StreamProducer
*/
@ExperimentalApi
public interface StreamReader<VectorRoot> extends Closeable {
public interface StreamReader extends Closeable {

/**
* Blocking request to load next batch into root.
Expand All @@ -51,5 +52,5 @@ public interface StreamReader<VectorRoot> extends Closeable {
*
* @return the VectorSchemaRoot
*/
VectorRoot getRoot();
VectorSchemaRoot getRoot();
}
11 changes: 3 additions & 8 deletions plugins/arrow-flight-rpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ dependencies {
implementation project(':libs:opensearch-arrow-spi')
compileOnly 'org.checkerframework:checker-qual:3.44.0'

implementation "org.apache.arrow:arrow-vector:${versions.arrow}"
implementation "org.apache.arrow:arrow-format:${versions.arrow}"
implementation "org.apache.arrow:flight-core:${versions.arrow}"
implementation "org.apache.arrow:arrow-memory-core:${versions.arrow}"

runtimeOnly "org.apache.arrow:arrow-memory-netty:${versions.arrow}"
runtimeOnly "org.apache.arrow:arrow-memory-netty-buffer-patch:${versions.arrow}"
Expand Down Expand Up @@ -227,7 +224,8 @@ tasks.named('thirdPartyAudit').configure {
'reactor.blockhound.BlockHound$Builder',
'reactor.blockhound.integration.BlockHoundIntegration',

'com.google.protobuf.util.Timestamps'
'com.google.protobuf.util.Timestamps',
'io.grpc.stub.BlockingClientCall'
)
ignoreViolations(
// Guava internal classes
Expand Down Expand Up @@ -293,9 +291,6 @@ tasks.named('thirdPartyAudit').configure {
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueProducerLimitField',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeLongArrayAccess',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess',
'org.apache.arrow.memory.util.MemoryUtil',
'org.apache.arrow.memory.util.MemoryUtil$1'

'io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess'
)
}

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2b084a03bad216e0ac39d40a6b36d0659709b0b4

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3dc14b412efd53189cc6be49dcc1920580efc76e

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
032f5b195a45dca93ecb103474ef0b2b01512895
1 change: 1 addition & 0 deletions server/licenses/arrow-format-18.2.0.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
34c4c28ddc01038498b89125c3a9f2329c2227ae
Loading
Loading