Skip to content

Commit 107264e

Browse files
committed
Bring Apache Arrow classes into arrow-spi library so those could be shared between plugins
Signed-off-by: Andriy Redko <[email protected]>
1 parent 1275017 commit 107264e

33 files changed

+7093
-24
lines changed

gradle/libs.versions.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ opentelemetry = "1.46.0"
8484
opentelemetrysemconv = "1.29.0-alpha"
8585

8686
# arrow dependencies
87-
arrow = "18.1.0"
87+
arrow = "18.2.0"
8888
flatbuffers = "2.0.0"
8989

9090
[libraries]

libs/arrow-spi/build.gradle

+33
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,41 @@ testingConventions.enabled = false
1313

1414
dependencies {
1515
api project(':libs:opensearch-core')
16+
api "org.apache.arrow:arrow-memory-core:${versions.arrow}"
17+
api "org.apache.arrow:arrow-vector:${versions.arrow}"
18+
api "org.apache.arrow:arrow-format:${versions.arrow}"
1619
}
1720

1821
tasks.named('forbiddenApisMain').configure {
1922
replaceSignatureFiles 'jdk-signatures'
2023
}
24+
25+
tasks.named('thirdPartyAudit').configure {
26+
ignoreMissingClasses(
27+
'org.slf4j.Logger',
28+
'org.slf4j.LoggerFactory',
29+
'com.fasterxml.jackson.databind.JsonNode',
30+
'com.fasterxml.jackson.databind.MapperFeature',
31+
'com.fasterxml.jackson.databind.MappingJsonFactory',
32+
'com.fasterxml.jackson.databind.ObjectMapper',
33+
'com.fasterxml.jackson.databind.ObjectReader',
34+
'com.fasterxml.jackson.databind.ObjectWriter',
35+
'com.fasterxml.jackson.databind.SerializerProvider',
36+
'com.fasterxml.jackson.databind.json.JsonMapper',
37+
'com.fasterxml.jackson.databind.json.JsonMapper$Builder',
38+
'com.fasterxml.jackson.databind.node.ObjectNode',
39+
'com.fasterxml.jackson.databind.ser.std.StdSerializer',
40+
'com.google.flatbuffers.BaseVector',
41+
'com.google.flatbuffers.Constants',
42+
'com.google.flatbuffers.FlatBufferBuilder',
43+
'com.google.flatbuffers.IntVector',
44+
'com.google.flatbuffers.LongVector',
45+
'com.google.flatbuffers.Struct',
46+
'com.google.flatbuffers.Table',
47+
'org.apache.commons.codec.binary.Hex'
48+
)
49+
ignoreViolations(
50+
'org.apache.arrow.memory.util.MemoryUtil',
51+
'org.apache.arrow.memory.util.MemoryUtil$1'
52+
)
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
34c4c28ddc01038498b89125c3a9f2329c2227ae
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
af2e83d46089243245decb0c4a6f8bc4231a35f0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
8c4ecbc286b28d738730514cd79567d52666c086

libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamManager.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public interface StreamManager extends AutoCloseable {
3434
* @return A StreamTicket that can be used to access the stream
3535
* @throws IllegalArgumentException if producer is null or parentTaskId is invalid
3636
*/
37-
<VectorRoot, Allocator> StreamTicket registerStream(StreamProducer<VectorRoot, Allocator> producer, TaskId parentTaskId);
37+
StreamTicket registerStream(StreamProducer producer, TaskId parentTaskId);
3838

3939
/**
4040
* Creates a stream reader for consuming Arrow data using a valid ticket.
@@ -46,7 +46,7 @@ public interface StreamManager extends AutoCloseable {
4646
* @throws IllegalArgumentException if the ticket is invalid
4747
* @throws IllegalStateException if the stream has been cancelled or closed
4848
*/
49-
<VectorRoot> StreamReader<VectorRoot> getStreamReader(StreamTicket ticket);
49+
StreamReader getStreamReader(StreamTicket ticket);
5050

5151
/**
5252
* Gets the StreamTicketFactory instance associated with this StreamManager.

libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamProducer.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.opensearch.arrow.spi;
1010

11+
import org.apache.arrow.memory.BufferAllocator;
12+
import org.apache.arrow.vector.VectorSchemaRoot;
1113
import org.opensearch.common.annotation.ExperimentalApi;
1214
import org.opensearch.core.tasks.TaskId;
1315

@@ -75,7 +77,7 @@
7577
* @see StreamReader
7678
*/
7779
@ExperimentalApi
78-
public interface StreamProducer<VectorRoot, Allocator> extends Closeable {
80+
public interface StreamProducer extends Closeable {
7981

8082
/**
8183
* Creates a VectorSchemaRoot that defines the schema for this stream. This schema will be used
@@ -84,7 +86,7 @@ public interface StreamProducer<VectorRoot, Allocator> extends Closeable {
8486
* @param allocator The allocator to use for creating vectors
8587
* @return A new VectorSchemaRoot instance
8688
*/
87-
VectorRoot createRoot(Allocator allocator);
89+
VectorSchemaRoot createRoot(BufferAllocator allocator);
8890

8991
/**
9092
* Creates a job that will produce the stream data in batches. The job will populate
@@ -93,7 +95,7 @@ public interface StreamProducer<VectorRoot, Allocator> extends Closeable {
9395
* @param allocator The allocator to use for any additional memory allocations
9496
* @return A new BatchedJob instance
9597
*/
96-
BatchedJob<VectorRoot> createJob(Allocator allocator);
98+
BatchedJob createJob(BufferAllocator allocator);
9799

98100
/**
99101
* Provides an estimate of the total number of rows that will be produced.
@@ -111,7 +113,7 @@ public interface StreamProducer<VectorRoot, Allocator> extends Closeable {
111113
/**
112114
* BatchedJob interface for producing stream data in batches.
113115
*/
114-
interface BatchedJob<VectorRoot> {
116+
interface BatchedJob {
115117

116118
/**
117119
* Executes the batch processing job. Implementations should populate the root with data
@@ -120,7 +122,7 @@ interface BatchedJob<VectorRoot> {
120122
* @param root The VectorSchemaRoot to populate with data
121123
* @param flushSignal Signal to coordinate with consumers
122124
*/
123-
void run(VectorRoot root, FlushSignal flushSignal);
125+
void run(VectorSchemaRoot root, FlushSignal flushSignal);
124126

125127
/**
126128
* Called to signal producer when the job is canceled.

libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/StreamReader.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.arrow.spi;
1010

11+
import org.apache.arrow.vector.VectorSchemaRoot;
1112
import org.opensearch.common.annotation.ExperimentalApi;
1213

1314
import java.io.Closeable;
@@ -36,7 +37,7 @@
3637
* @see StreamProducer
3738
*/
3839
@ExperimentalApi
39-
public interface StreamReader<VectorRoot> extends Closeable {
40+
public interface StreamReader extends Closeable {
4041

4142
/**
4243
* Blocking request to load next batch into root.
@@ -51,5 +52,5 @@ public interface StreamReader<VectorRoot> extends Closeable {
5152
*
5253
* @return the VectorSchemaRoot
5354
*/
54-
VectorRoot getRoot();
55+
VectorSchemaRoot getRoot();
5556
}

plugins/arrow-flight-rpc/build.gradle

+3-8
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@ dependencies {
2020
implementation project(':libs:opensearch-arrow-spi')
2121
compileOnly 'org.checkerframework:checker-qual:3.44.0'
2222

23-
implementation "org.apache.arrow:arrow-vector:${versions.arrow}"
24-
implementation "org.apache.arrow:arrow-format:${versions.arrow}"
2523
implementation "org.apache.arrow:flight-core:${versions.arrow}"
26-
implementation "org.apache.arrow:arrow-memory-core:${versions.arrow}"
2724

2825
runtimeOnly "org.apache.arrow:arrow-memory-netty:${versions.arrow}"
2926
runtimeOnly "org.apache.arrow:arrow-memory-netty-buffer-patch:${versions.arrow}"
@@ -227,7 +224,8 @@ tasks.named('thirdPartyAudit').configure {
227224
'reactor.blockhound.BlockHound$Builder',
228225
'reactor.blockhound.integration.BlockHoundIntegration',
229226

230-
'com.google.protobuf.util.Timestamps'
227+
'com.google.protobuf.util.Timestamps',
228+
'io.grpc.stub.BlockingClientCall'
231229
)
232230
ignoreViolations(
233231
// Guava internal classes
@@ -293,9 +291,6 @@ tasks.named('thirdPartyAudit').configure {
293291
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueProducerLimitField',
294292
'io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess',
295293
'io.netty.util.internal.shaded.org.jctools.util.UnsafeLongArrayAccess',
296-
'io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess',
297-
'org.apache.arrow.memory.util.MemoryUtil',
298-
'org.apache.arrow.memory.util.MemoryUtil$1'
299-
294+
'io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess'
300295
)
301296
}

plugins/arrow-flight-rpc/licenses/arrow-format-18.1.0.jar.sha1

-1
This file was deleted.

plugins/arrow-flight-rpc/licenses/arrow-memory-core-18.1.0.jar.sha1

-1
This file was deleted.

plugins/arrow-flight-rpc/licenses/arrow-memory-netty-18.1.0.jar.sha1

-1
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
2b084a03bad216e0ac39d40a6b36d0659709b0b4

plugins/arrow-flight-rpc/licenses/arrow-memory-netty-buffer-patch-18.1.0.jar.sha1

-1
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3dc14b412efd53189cc6be49dcc1920580efc76e

plugins/arrow-flight-rpc/licenses/arrow-vector-18.1.0.jar.sha1

-1
This file was deleted.

plugins/arrow-flight-rpc/licenses/flight-core-18.1.0.jar.sha1

-1
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
032f5b195a45dca93ecb103474ef0b2b01512895
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
34c4c28ddc01038498b89125c3a9f2329c2227ae

0 commit comments

Comments
 (0)