Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GroupNode and parallelize TableFunctionProcessorNode #15136

Open
wants to merge 125 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
125 commits
Select commit Hold shift + click to select a range
ccb57c8
refactor
Cpaulyz Nov 19, 2024
ea028ef
refactor
Cpaulyz Nov 19, 2024
90d34e1
save
Cpaulyz Nov 19, 2024
0d971bc
save
Cpaulyz Nov 19, 2024
4490c55
rollback scalar function definition
Cpaulyz Nov 19, 2024
e1dc67e
add license
Cpaulyz Nov 19, 2024
9983999
add license
Cpaulyz Nov 19, 2024
93ad64b
fix it
Cpaulyz Nov 19, 2024
6a68d33
Merge branch 'table_udsf' into udsf
Cpaulyz Nov 19, 2024
499ae65
save
Cpaulyz Nov 19, 2024
007a2d8
spotless
Cpaulyz Nov 19, 2024
b8a4fb6
save
Cpaulyz Nov 19, 2024
f55f48d
fix builtin
Cpaulyz Nov 19, 2024
7321535
save
Cpaulyz Nov 19, 2024
87dc7ea
fix it
Cpaulyz Nov 19, 2024
fa3c683
fix it
Cpaulyz Nov 19, 2024
c215e63
fix it
Cpaulyz Nov 20, 2024
bc0873d
fix it
Cpaulyz Nov 20, 2024
656dc4c
fix it
Cpaulyz Nov 20, 2024
dbe07ca
fix review
Cpaulyz Nov 26, 2024
f30dda4
split drop function plan
Cpaulyz Nov 26, 2024
54b24c2
fix clear throw NPE
Cpaulyz Nov 26, 2024
fbe6fde
Merge branch 'table_udsf' into udsf
Cpaulyz Nov 27, 2024
0ed1d85
merge master
Cpaulyz Nov 27, 2024
b1912aa
revert useless change
Cpaulyz Nov 27, 2024
1e2ce48
Save
Cpaulyz Nov 27, 2024
e8ea11c
add IT
Cpaulyz Nov 27, 2024
b8d24a2
add license
Cpaulyz Nov 27, 2024
ebcd1b9
rename beforeStart and add getLocalDate
Cpaulyz Nov 28, 2024
5797329
Merge branch 'master' into udsf
Cpaulyz Nov 28, 2024
30be7e5
modify getdatatype
Cpaulyz Nov 28, 2024
69c6ec3
fix review
Cpaulyz Dec 4, 2024
324cac7
spotless
Cpaulyz Dec 4, 2024
736b709
add ut and fix it
Cpaulyz Dec 4, 2024
7fb82cf
save
Cpaulyz Dec 4, 2024
8f067a5
update pom and add date IT
Cpaulyz Dec 4, 2024
60fd763
add license
Cpaulyz Dec 4, 2024
bfb32a1
fix
Cpaulyz Dec 4, 2024
89adefc
fix cpp client
Cpaulyz Dec 4, 2024
2bb60b2
save
Cpaulyz Dec 5, 2024
7ce6d2b
save
Cpaulyz Dec 9, 2024
502775e
merge master
Cpaulyz Dec 9, 2024
b7141d1
fix it
Cpaulyz Dec 9, 2024
3f4e8e0
add aggregate IT
Cpaulyz Dec 10, 2024
864abf1
remove useless code
Cpaulyz Dec 10, 2024
db2b84b
exp
Cpaulyz Dec 11, 2024
0b33fb8
add removable
Cpaulyz Dec 12, 2024
1e7e8cb
delete useless:
Cpaulyz Dec 12, 2024
1374d01
done
Cpaulyz Dec 12, 2024
eb05523
resolve conflict
Cpaulyz Dec 13, 2024
852d728
save
Cpaulyz Dec 17, 2024
a9a696f
save
Cpaulyz Dec 17, 2024
806e5ea
analyze arguments
Cpaulyz Dec 17, 2024
5e6907b
save
Cpaulyz Dec 25, 2024
6e7f105
merge master
Cpaulyz Dec 25, 2024
2257a35
save
Cpaulyz Dec 29, 2024
d837a8e
save
Cpaulyz Jan 4, 2025
ba6de95
save
Cpaulyz Jan 7, 2025
6aa4546
merge master
Cpaulyz Jan 7, 2025
34c96ad
save
Cpaulyz Jan 8, 2025
60a9470
add PartitionRecognizer UT
Cpaulyz Jan 15, 2025
cb66448
add leaf operator
Cpaulyz Jan 16, 2025
87fc892
save
Cpaulyz Jan 20, 2025
602a726
merge master
Cpaulyz Feb 3, 2025
b9dd299
adjust api
Cpaulyz Feb 5, 2025
b27a282
add analyzer test
Cpaulyz Feb 7, 2025
dd881da
merge master
Cpaulyz Feb 7, 2025
53709c9
remove useless code
Cpaulyz Feb 7, 2025
82522ed
Add IT, OptimizedRule, example and register
Cpaulyz Feb 8, 2025
726c991
Add license
Cpaulyz Feb 8, 2025
03464e3
add serderialize method, fix UT and IT
Cpaulyz Feb 9, 2025
249691c
done
Cpaulyz Feb 22, 2025
0d43c04
update example
Cpaulyz Feb 22, 2025
d592a8b
merge
Cpaulyz Feb 22, 2025
1902b24
adjust package structure
Cpaulyz Feb 22, 2025
ec07201
Merge branch 'udtf' into udtf-optimize
Cpaulyz Feb 22, 2025
a8a5486
refactor
Cpaulyz Feb 22, 2025
4ba4807
fix pass through
Cpaulyz Feb 22, 2025
a8a6510
fix pass through
Cpaulyz Feb 22, 2025
22238fc
resolve conflict
Cpaulyz Feb 22, 2025
7723e8d
Merge branch 'udtf' into udtf-optimize
Cpaulyz Feb 22, 2025
ef93c66
fix UT
Cpaulyz Feb 22, 2025
de19f73
fix IT
Cpaulyz Feb 23, 2025
6f3d73b
Merge branch 'udtf' into udtf-optimize
Cpaulyz Feb 23, 2025
bc073a0
save
Cpaulyz Feb 23, 2025
fff00ed
remove prune when empty clause
Cpaulyz Feb 23, 2025
88f4815
resolve todo
Cpaulyz Feb 23, 2025
d734b70
save
Cpaulyz Feb 23, 2025
eeb91ec
save
Cpaulyz Feb 23, 2025
f1775e9
change collect node
Cpaulyz Feb 24, 2025
d2f3471
save
Cpaulyz Feb 25, 2025
55cae6b
Add window built in function
Cpaulyz Feb 28, 2025
9b5514d
refactor
Cpaulyz Feb 28, 2025
e4bc4ee
save
Cpaulyz Feb 28, 2025
689e4be
optimize SliceCache
Cpaulyz Mar 1, 2025
78f002e
Fix Agg
Cpaulyz Mar 1, 2025
d969f36
Save
Cpaulyz Mar 7, 2025
7f837fd
resolve conflict
Cpaulyz Mar 12, 2025
1ba9e50
resolve conflict
Cpaulyz Mar 12, 2025
de0bf7b
resolve conflict
Cpaulyz Mar 12, 2025
8d50c9b
Sae
Cpaulyz Mar 14, 2025
f3b40fa
save
Cpaulyz Mar 17, 2025
49b5285
Merge branch 'master' into udtf-optimize
Cpaulyz Mar 17, 2025
3327dfa
Save
Cpaulyz Mar 17, 2025
5385332
merge
Cpaulyz Mar 17, 2025
2470e4a
fix ci
Cpaulyz Mar 17, 2025
bc3fed4
Merge branch 'udtf-optimize' into builtin-udtf
Cpaulyz Mar 17, 2025
b6bda6d
save
Cpaulyz Mar 17, 2025
f44fd70
Save
Cpaulyz Mar 17, 2025
9dc0c9d
save
Cpaulyz Mar 19, 2025
893d9ff
Resolve conflict
Cpaulyz Mar 19, 2025
8630cb6
Save
Cpaulyz Mar 19, 2025
1496caa
save
Cpaulyz Mar 19, 2025
c3c3e39
save
Cpaulyz Mar 19, 2025
3f0820a
save
Cpaulyz Mar 21, 2025
b03a000
fix bug
Cpaulyz Mar 21, 2025
0002ea5
fix it
Cpaulyz Mar 21, 2025
b88175b
save
Cpaulyz Mar 24, 2025
0a5d3b0
save
Cpaulyz Mar 24, 2025
cb08966
Save
Cpaulyz Mar 24, 2025
9b0c0c0
Merge branch 'master' into builtin-udtf
Cpaulyz Mar 24, 2025
a7ec664
optimize parallel group
Cpaulyz Mar 24, 2025
9793b54
save
Cpaulyz Mar 24, 2025
6c1514a
fix it
Cpaulyz Mar 24, 2025
102d4fc
FIX IT
Cpaulyz Mar 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,23 @@ public class TableFunctionAnalysis {
// table argument
private final Map<String, List<Integer>> requiredColumns;

/**
* The `requireRecordSnapshot` field is used to inform the Analyzer of whether the table function
* processor requires a record snapshot. Default value is true, which means that the table
* function processor will receive a new record object for each record. Record object reference
* can be kept for future use. Otherwise, if it is false, the table function processor will always
* receive the same record object but with different values. It will perform better, but the
* processor must not keep the record object reference for future use.
*/
private final boolean requireRecordSnapshot;

private TableFunctionAnalysis(
Optional<DescribedSchema> properColumnSchema, Map<String, List<Integer>> requiredColumns) {
Optional<DescribedSchema> properColumnSchema,
Map<String, List<Integer>> requiredColumns,
boolean requiredRecordSnapshot) {
this.properColumnSchema = requireNonNull(properColumnSchema, "returnedType is null");
this.requiredColumns = requiredColumns;
this.requireRecordSnapshot = requiredRecordSnapshot;
}

public Optional<DescribedSchema> getProperColumnSchema() {
Expand All @@ -62,13 +75,18 @@ public Map<String, List<Integer>> getRequiredColumns() {
return requiredColumns;
}

public boolean isRequireRecordSnapshot() {
return requireRecordSnapshot;
}

public static Builder builder() {
return new Builder();
}

public static final class Builder {
private DescribedSchema properColumnSchema;
private final Map<String, List<Integer>> requiredColumns = new HashMap<>();
private boolean requireRecordSnapshot = true;

private Builder() {}

Expand All @@ -82,8 +100,14 @@ public Builder requiredColumns(String tableArgument, List<Integer> columns) {
return this;
}

public Builder requireRecordSnapshot(boolean requireRecordSnapshot) {
this.requireRecordSnapshot = requireRecordSnapshot;
return this;
}

public TableFunctionAnalysis build() {
return new TableFunctionAnalysis(Optional.ofNullable(properColumnSchema), requiredColumns);
return new TableFunctionAnalysis(
Optional.ofNullable(properColumnSchema), requiredColumns, requireRecordSnapshot);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,24 @@

public class PartitionRecognizer {

private SortKey partitionKey;
private final Comparator<SortKey> partitionComparator;
private final List<Integer> requiredChannels;
private final List<Integer> passThroughChannels;
private final List<Type> inputDataTypes;
private TsBlock currentTsBlock = null;

private boolean noMoreData = false;

private SortKey currentPartitionKey;
private TsBlock currentTsBlock = null;
private int currentIndex = 0;
private PartitionState state = PartitionState.INIT_STATE;
private PartitionState currentState = PartitionState.INIT_STATE;

public PartitionRecognizer(
List<Integer> partitionChannels,
List<Integer> requiredChannels,
List<Integer> passThroughChannels,
List<TSDataType> inputDataTypes) {
this.partitionKey = null;
this.currentPartitionKey = null;
if (partitionChannels.isEmpty()) {
// always return 0
this.partitionComparator = (o1, o2) -> 0;
Expand Down Expand Up @@ -84,37 +86,39 @@ public void noMoreData() {

public PartitionState nextState() {
updateState();
return state;
return currentState;
}

private void updateState() {
switch (state.getStateType()) {
switch (currentState.getStateType()) {
case INIT:
state = handleInitState();
currentState = handleInitState();
break;
case ITERATING:
case NEW_PARTITION:
state = handleIteratingOrNewPartitionState();
currentState = handleIteratingOrNewPartitionState();
break;
case NEED_MORE_DATA:
state = handleNeedMoreDataState();
currentState = handleNeedMoreDataState();
break;
case FINISHED:
// do nothing
return;
}
if (PartitionState.NEED_MORE_DATA_STATE.equals(state)) {
if (PartitionState.NEED_MORE_DATA_STATE.equals(currentState)) {
currentIndex = 0;
currentTsBlock = null;
}
}

private PartitionState handleInitState() {
if (currentTsBlock == null || currentTsBlock.isEmpty()) {
if (noMoreData) {
return PartitionState.FINISHED_STATE;
} else if (currentTsBlock == null || currentTsBlock.isEmpty()) {
return PartitionState.INIT_STATE;
}
// init the partition Key as the first row
partitionKey = new SortKey(currentTsBlock, currentIndex);
currentPartitionKey = new SortKey(currentTsBlock, currentIndex);
int endPartitionIndex = findNextDifferentRowIndex();
Slice slice = getSlice(currentIndex, endPartitionIndex);
currentIndex = endPartitionIndex;
Expand Down Expand Up @@ -156,15 +160,39 @@ private PartitionState handleIteratingOrNewPartitionState() {
* all rows have the same partition values, return the position count of the current TsBlock.
*/
private int findNextDifferentRowIndex() {
SortKey compareKey = new SortKey(currentTsBlock, currentIndex);
while (compareKey.rowIndex < currentTsBlock.getPositionCount()) {
if (partitionComparator.compare(partitionKey, compareKey) != 0) {
partitionKey = compareKey;
return compareKey.rowIndex;
int totalRows = currentTsBlock.getPositionCount();

// check if all rows have the same partition values
SortKey compareKey = new SortKey(currentTsBlock, totalRows - 1);
if (partitionComparator.compare(currentPartitionKey, compareKey) == 0) {
return totalRows;
}

// check the first row
compareKey.rowIndex = currentIndex;
if (partitionComparator.compare(currentPartitionKey, compareKey) != 0) {
currentPartitionKey = compareKey;
return currentIndex;
}

// binary search to find the next different partition values
int low = currentIndex;
int high = totalRows - 1;
int firstDiff = totalRows;
while (low <= high) {
compareKey.rowIndex = low + (high - low) / 2;
int cmp = partitionComparator.compare(currentPartitionKey, compareKey);
if (cmp == 0) {
low = compareKey.rowIndex + 1;
} else {
// try to find earlier different row
firstDiff = compareKey.rowIndex;
high = compareKey.rowIndex - 1;
}
compareKey.rowIndex++;
}
return compareKey.rowIndex;
compareKey.rowIndex = firstDiff;
currentPartitionKey = compareKey;
return firstDiff;
}

private Slice getSlice(int startPartitionIndex, int endPartitionIndex) {
Expand Down
Loading
Loading