Skip to content

Commit a3b2ed1

Browse files
committed
Add Iceberg Spark geospatial support library
1 parent a4a8c05 commit a3b2ed1

File tree

6 files changed

+323
-0
lines changed

6 files changed

+323
-0
lines changed

spark/iceberg/pom.xml

+117
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
<!--
2+
~ Licensed to the Apache Software Foundation (ASF) under one
3+
~ or more contributor license agreements. See the NOTICE file
4+
~ distributed with this work for additional information
5+
~ regarding copyright ownership. The ASF licenses this file
6+
~ to you under the Apache License, Version 2.0 (the
7+
~ "License"); you may not use this file except in compliance
8+
~ with the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing,
13+
~ software distributed under the License is distributed on an
14+
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
~ KIND, either express or implied. See the License for the
16+
~ specific language governing permissions and limitations
17+
~ under the License.
18+
-->
19+
20+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<modelVersion>4.0.0</modelVersion>
22+
<parent>
23+
<groupId>org.apache.sedona</groupId>
24+
<artifactId>sedona-spark-parent-${spark.compat.version}_${scala.compat.version}</artifactId>
25+
<version>1.7.1-SNAPSHOT</version>
26+
<relativePath>../pom.xml</relativePath>
27+
</parent>
28+
<artifactId>sedona-spark-iceberg-${spark.compat.version}_${scala.compat.version}</artifactId>
29+
30+
<name>${project.groupId}:${project.artifactId}</name>
31+
<description>A cluster computing system for processing large-scale spatial data: Iceberg Spark Support.</description>
32+
<url>https://sedona.apache.org/</url>
33+
<packaging>jar</packaging>
34+
35+
<properties>
36+
<maven.deploy.skip>false</maven.deploy.skip>
37+
</properties>
38+
39+
<dependencies>
40+
<dependency>
41+
<groupId>org.apache.sedona</groupId>
42+
<artifactId>sedona-common</artifactId>
43+
<version>${project.version}</version>
44+
<scope>provided</scope>
45+
<exclusions>
46+
<exclusion>
47+
<groupId>com.fasterxml.jackson.core</groupId>
48+
<artifactId>*</artifactId>
49+
</exclusion>
50+
</exclusions>
51+
</dependency>
52+
<dependency>
53+
<groupId>org.apache.sedona</groupId>
54+
<artifactId>sedona-spark-common-${spark.compat.version}_${scala.compat.version}</artifactId>
55+
<version>${project.version}</version>
56+
<scope>provided</scope>
57+
</dependency>
58+
<dependency>
59+
<groupId>org.apache.spark</groupId>
60+
<artifactId>spark-core_${scala.compat.version}</artifactId>
61+
</dependency>
62+
<dependency>
63+
<groupId>org.apache.spark</groupId>
64+
<artifactId>spark-sql_${scala.compat.version}</artifactId>
65+
</dependency>
66+
<dependency>
67+
<groupId>org.apache.iceberg</groupId>
68+
<artifactId>iceberg-spark-runtime-${spark.compat.version}_${scala.compat.version}</artifactId>
69+
<version>${iceberg.version}</version>
70+
<scope>provided</scope>
71+
</dependency>
72+
<dependency>
73+
<groupId>org.locationtech.jts</groupId>
74+
<artifactId>jts-core</artifactId>
75+
<scope>provided</scope>
76+
</dependency>
77+
<dependency>
78+
<groupId>org.scala-lang</groupId>
79+
<artifactId>scala-library</artifactId>
80+
<scope>provided</scope>
81+
</dependency>
82+
<dependency>
83+
<groupId>org.scala-lang.modules</groupId>
84+
<artifactId>scala-collection-compat_${scala.compat.version}</artifactId>
85+
<scope>provided</scope>
86+
</dependency>
87+
<dependency>
88+
<groupId>org.scalatest</groupId>
89+
<artifactId>scalatest_${scala.compat.version}</artifactId>
90+
</dependency>
91+
</dependencies>
92+
<build>
93+
<sourceDirectory>src/main/scala</sourceDirectory>
94+
<plugins>
95+
<plugin>
96+
<groupId>net.alchim31.maven</groupId>
97+
<artifactId>scala-maven-plugin</artifactId>
98+
<executions>
99+
<execution>
100+
<id>attach-javadocs</id>
101+
<configuration>
102+
<skip>true</skip>
103+
</configuration>
104+
</execution>
105+
</executions>
106+
</plugin>
107+
<plugin>
108+
<groupId>org.scalatest</groupId>
109+
<artifactId>scalatest-maven-plugin</artifactId>
110+
</plugin>
111+
<plugin>
112+
<groupId>org.scalastyle</groupId>
113+
<artifactId>scalastyle-maven-plugin</artifactId>
114+
</plugin>
115+
</plugins>
116+
</build>
117+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.sedona.SedonaGeospatialLibraryProvider
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.sedona
20+
21+
import SedonaGeospatialLibrary.BACKTICKS_PATTERN
22+
import org.apache.iceberg.expressions.{Expressions => IcebergExpressions}
23+
import org.apache.iceberg.spark.geo.spi.GeospatialLibrary
24+
import org.apache.iceberg.{Geography, expressions}
25+
import org.apache.sedona.common.geometryObjects.{Geography => SedonaGeography}
26+
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
27+
import org.apache.spark.sql.execution.datasources.{PushableColumn, PushableColumnBase}
28+
import org.apache.spark.sql.sedona_sql.UDT.{GeographyUDT, GeometryUDT}
29+
import org.apache.spark.sql.sedona_sql.expressions._
30+
import org.apache.spark.sql.types.DataType
31+
import org.locationtech.jts.geom.Geometry
32+
33+
import java.util.regex.Pattern
34+
35+
class SedonaGeospatialLibrary extends GeospatialLibrary {
36+
override def getGeometryType: DataType = GeometryUDT
37+
38+
override def getGeographyType: DataType = GeographyUDT
39+
40+
override def fromGeometry(geometry: Geometry): AnyRef = GeometryUDT.serialize(geometry)
41+
42+
override def toGeometry(datum: Any): Geometry = GeometryUDT.deserialize(datum)
43+
44+
override def fromGeography(geography: Geography): AnyRef =
45+
GeographyUDT.serialize(new SedonaGeography(geography.geometry()))
46+
47+
override def toGeography(datum: Any): Geography = {
48+
val sedonaGeography = GeographyUDT.deserialize(datum)
49+
new Geography(sedonaGeography.getGeometry)
50+
}
51+
52+
override def isSpatialFilter(expression: Expression): Boolean = expression match {
53+
case pred: ST_Predicate => !pred.isInstanceOf[ST_Disjoint]
54+
case _ => false
55+
}
56+
57+
override def translateToIceberg(expression: Expression): expressions.Expression = {
58+
val pushableColumn = PushableColumn(nestedPredicatePushdownEnabled = true)
59+
expression match {
60+
case ST_Intersects(_) | ST_Contains(_) | ST_Covers(_) | ST_Within(_) | ST_CoveredBy(_) =>
61+
val icebergExpr = {
62+
for ((name, value) <- resolveNameAndLiteral(expression.children, pushableColumn))
63+
yield IcebergExpressions.stIntersects(unquote(name), GeometryUDT.deserialize(value))
64+
}
65+
icebergExpr.orNull
66+
case _ => null
67+
}
68+
}
69+
70+
private def unquote(attributeName: String) = {
71+
val matcher = BACKTICKS_PATTERN.matcher(attributeName)
72+
matcher.replaceAll("$2")
73+
}
74+
75+
private def resolveNameAndLiteral(
76+
expressions: Seq[Expression],
77+
pushableColumn: PushableColumnBase): Option[(String, Any)] = {
78+
expressions match {
79+
case Seq(pushableColumn(name), Literal(v, _)) => Some(name, v)
80+
case Seq(Literal(v, _), pushableColumn(name)) => Some(name, v)
81+
case _ => None
82+
}
83+
}
84+
}
85+
86+
object SedonaGeospatialLibrary {
87+
private val BACKTICKS_PATTERN = Pattern.compile("""([`])(.|$)""")
88+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.sedona
20+
21+
import org.apache.iceberg.spark.geo.spi.GeospatialLibrary;
22+
import org.apache.iceberg.spark.geo.spi.GeospatialLibraryProvider;
23+
24+
class SedonaGeospatialLibraryProvider extends GeospatialLibraryProvider {
25+
override def name(): String = "sedona"
26+
27+
override def create(): GeospatialLibrary = new SedonaGeospatialLibrary()
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.sedona
20+
21+
import org.apache.spark.sql.SparkSession
22+
import org.scalatest.BeforeAndAfterAll
23+
import org.scalatest.funsuite.AnyFunSuite
24+
25+
import java.io.File
26+
27+
class TestGeospatial extends AnyFunSuite with BeforeAndAfterAll {
28+
29+
private var warehouse: File = _
30+
31+
override protected def beforeAll(): Unit = {
32+
super.beforeAll()
33+
warehouse = File.createTempFile("warehouse", null)
34+
warehouse.delete()
35+
}
36+
37+
override protected def afterAll(): Unit = {
38+
super.afterAll()
39+
warehouse.delete()
40+
}
41+
42+
test("test geospatial") {
43+
val spark = SparkSession
44+
.builder()
45+
.master("local[*]")
46+
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
47+
.config("spark.kryo.registrator", "org.apache.sedona.core.serde.SedonaKryoRegistrator")
48+
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
49+
.config("spark.sql.catalog.local.type", "hadoop")
50+
.config("spark.sql.catalog.local.warehouse", warehouse.getAbsolutePath)
51+
.config(
52+
"spark.sql.extensions",
53+
"org.apache.sedona.sql.SedonaSqlExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
54+
.config("spark.sedona.enableParserExtension", "false")
55+
.getOrCreate()
56+
57+
spark
58+
.sql(
59+
"CREATE OR REPLACE TABLE local.tmp.geom_table (id INT, geom GEOMETRY) USING iceberg TBLPROPERTIES ('format-version' = '3')")
60+
.show()
61+
spark
62+
.sql("""
63+
INSERT INTO local.tmp.geom_table VALUES
64+
(1, ST_GeomFromText('POLYGON ((0 0, 0 10, 10 10, 10 0, 0 0))')),
65+
(2, ST_GeomFromText('POINT (100 40)'))
66+
""")
67+
.show()
68+
spark.sql("SELECT * FROM local.tmp.geom_table").show()
69+
spark
70+
.sql("""
71+
SELECT * FROM local.tmp.geom_table WHERE ST_Intersects(geom, ST_GeomFromText('POINT (1 2)'))
72+
""")
73+
.show()
74+
75+
spark
76+
.sql(
77+
"CREATE OR REPLACE TABLE local.tmp.geog_table (id INT, geog GEOGRAPHY) USING iceberg TBLPROPERTIES ('format-version' = '3')")
78+
.show()
79+
spark
80+
.sql("""
81+
INSERT INTO local.tmp.geog_table VALUES
82+
(1, ST_GeogFromWKT('POLYGON ((0 0, 0 10, 10 10, 10 0, 0 0))')),
83+
(2, ST_GeogFromWKT('POINT (100 40)'))
84+
""")
85+
.show()
86+
spark.sql("SELECT * FROM local.tmp.geog_table").show()
87+
}
88+
}

spark/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
<modules>
4040
<module>common</module>
41+
<module>iceberg</module>
4142
<module>spark-${spark.compat.version}</module>
4243
</modules>
4344

0 commit comments

Comments
 (0)