Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(): memory accounting f\or JOIN/TNS algos #3429

Closed
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
32198d3
feat(PP,ByteStream): new counting memory allocator
drrtuy Nov 22, 2024
037a0b6
feat(RGData,StringStore): add counting allocator capabilities to thos…
drrtuy Nov 30, 2024
1d6076e
feat(): use boost::make_shared b/c most distros can't do allocate_sha…
drrtuy Dec 3, 2024
4f84e3d
feat(): propagate long strings SP type change
drrtuy Dec 4, 2024
780b113
feat(): change ref to atomic with ptr to atomic
drrtuy Dec 8, 2024
5741f81
feat(): dangling pointer/ref issue has been solved for both RGData an…
drrtuy Dec 13, 2024
6cdf9d9
feat(): propagated changes into SLTPoolAllocator and friends
drrtuy Jan 10, 2025
24eee70
feat(): to fix the build
drrtuy Jan 11, 2025
fce53d0
feat(): accounts hash tables RAM allocations/removes STLPoolAllocator
drrtuy Jan 17, 2025
caae64c
feat(): TupleHashJoin now handles bad_alloc case switching to disk-ba…
drrtuy Jan 22, 2025
de5ac9a
feat(): restore user-space mem allocator
drrtuy Jan 23, 2025
4601530
feat(): aggregating CountingAllocator
drrtuy Feb 7, 2025
d3d819b
feat(): related unit tests fixes
drrtuy Feb 7, 2025
116c3a5
feat(): Replacing STLPoolAllocator with CountingAllocator for in-memo…
drrtuy Feb 14, 2025
3eaad65
feat(): first cleanup
drrtuy Feb 14, 2025
e01c24c
feat(): zerocopy TNS case and JOIN results RGData with CountingAllocator
drrtuy Feb 17, 2025
32eb40c
fix(): allocate Pointer vector in both TupleJoiner ctor
drrtuy Feb 20, 2025
3239b16
fix(): remove an additional and errenous CountingAllocator for RGData…
drrtuy Feb 20, 2025
6604fa6
feat(): use CountingAllocator for DISTINCT
drrtuy Feb 21, 2025
3ef3012
fix(TNS): removed optimization
drrtuy Feb 21, 2025
a0b6ff3
feat(): replace getMaxDataSize with getMaxDataSizeWithStrings to accu…
drrtuy Mar 4, 2025
dbbbc90
feat(TNS): distribute SortingPQ that supports CountingAllocator
drrtuy Mar 10, 2025
c185c36
feat(TNS, sorting, distinct): TNS now accounts data used by RGDatas a…
drrtuy Mar 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions dbcon/joblist/batchprimitiveprocessor-jl.cpp
Original file line number Diff line number Diff line change
@@ -54,7 +54,7 @@ using namespace joiner;

namespace joblist
{
BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager* rm)
BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(ResourceManager* rm)
: ot(BPS_ELEMENT_TYPE)
, needToSetLBID(true)
, count(1)
@@ -81,6 +81,7 @@ BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager* rm)
, fJoinerChunkSize(rm->getJlJoinerChunkSize())
, hasSmallOuterJoin(false)
, _priority(1)
, rm_(rm)
{
PMJoinerCount = 0;
uuid = bu::nil_generator()();
@@ -791,7 +792,7 @@ void BatchPrimitiveProcessorJL::getRowGroupData(ByteStream& in, vector<RGData>*
if (in.length() == 0)
{
// done, return an empty RG
rgData = RGData(org, 0);
rgData = RGData(org, 0U);
org.setData(&rgData);
org.resetRowGroup(0);
out->push_back(rgData);
@@ -926,7 +927,7 @@ RGData BatchPrimitiveProcessorJL::getErrorRowGroupData(uint16_t error) const
RGData ret;
rowgroup::RowGroup rg(projectionRG);

ret = RGData(rg, 0);
ret = RGData(rg, 0U);
rg.setData(&ret);
// rg.convertToInlineDataInPlace();
rg.resetRowGroup(0);
@@ -1412,7 +1413,7 @@ bool BatchPrimitiveProcessorJL::pickNextJoinerNum()
for (i = 0; i < PMJoinerCount; i++)
{
joinerNum = (joinerNum + 1) % PMJoinerCount;
if (posByJoinerNum[joinerNum] != tJoiners[joinerNum]->getSmallSide()->size())
if (posByJoinerNum[joinerNum] != tJoiners[joinerNum]->getSmallSide().size())
break;
}
if (i == PMJoinerCount)
@@ -1425,10 +1426,9 @@ bool BatchPrimitiveProcessorJL::pickNextJoinerNum()
/* XXXPAT: Going to interleave across joiners to take advantage of the new locking env in PrimProc */
bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)
{
uint32_t size = 0, toSend, i, j;
uint32_t toSend, i, j;
ISMPacketHeader ism;
Row r;
vector<Row::Pointer>* tSmallSide;
joiner::TypelessData tlData;
uint32_t smallKeyCol;
uint32_t largeKeyCol;
@@ -1451,8 +1451,8 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)
}

memset((void*)&ism, 0, sizeof(ism));
tSmallSide = tJoiners[joinerNum]->getSmallSide();
size = tSmallSide->size();
auto& tSmallSide = tJoiners[joinerNum]->getSmallSide();
auto size = tSmallSide.size();

#if 0
if (joinerNum == PMJoinerCount - 1 && pos == size)
@@ -1497,11 +1497,13 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)

if (tJoiners[joinerNum]->isTypelessJoin())
{
utils::FixedAllocator fa(tlKeyLens[joinerNum], true);
// TODO: change RM ptr to ref b/c its scope and lifetime lasts till the end of the program.
auto alloc = rm_->getAllocator<utils::FixedAllocatorBufType>();
utils::FixedAllocator fa(alloc, tlKeyLens[joinerNum], true);

for (i = pos; i < pos + toSend; i++)
{
r.setPointer((*tSmallSide)[i]);
r.setPointer(tSmallSide[i]);
isNull = false;
bSignedUnsigned = tJoiners[joinerNum]->isSignedUnsignedJoin();

@@ -1568,7 +1570,7 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)

for (i = pos, j = 0; i < pos + toSend; ++i, ++j)
{
r.setPointer((*tSmallSide)[i]);
r.setPointer(tSmallSide[i]);

if (r.getColType(smallKeyCol) == CalpontSystemCatalog::LONGDOUBLE)
{
@@ -1641,7 +1643,7 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs)

for (i = pos; i < pos + toSend; i++, tmpRow.nextRow())
{
r.setPointer((*tSmallSide)[i]);
r.setPointer(tSmallSide[i]);
copyRow(r, &tmpRow);
}

4 changes: 3 additions & 1 deletion dbcon/joblist/batchprimitiveprocessor-jl.h
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ class BatchPrimitiveProcessorJL
{
public:
/* Constructor used by the JobStep */
explicit BatchPrimitiveProcessorJL(const ResourceManager* rm);
explicit BatchPrimitiveProcessorJL(ResourceManager* rm);
~BatchPrimitiveProcessorJL();

/* Interface used by the JobStep */
@@ -385,6 +385,8 @@ class BatchPrimitiveProcessorJL

boost::uuids::uuid uuid;

joblist::ResourceManager* rm_ = nullptr;

friend class CommandJL;
friend class ColumnCommandJL;
friend class PassThruCommandJL;
4 changes: 2 additions & 2 deletions dbcon/joblist/diskjoinstep.cpp
Original file line number Diff line number Diff line change
@@ -481,10 +481,10 @@ void DiskJoinStep::joinFcn()
// cout << "inserting a full RG" << endl;
if (thjs)
{
if (!thjs->getMemory(l_outputRG.getMaxDataSize()))
if (!thjs->getMemory(l_outputRG.getMaxDataSizeWithStrings()))
{
// calculate guess of size required for error message
uint64_t memReqd = (unmatched.size() * outputRG.getDataSize(1)) / 1048576;
uint64_t memReqd = (l_outputRG.getMaxDataSizeWithStrings()) / 1048576;
Message::Args args;
args.add(memReqd);
args.add(thjs->resourceManager->getConfiguredUMMemLimit() / 1048576);
4 changes: 2 additions & 2 deletions dbcon/joblist/distributedenginecomm.cpp
Original file line number Diff line number Diff line change
@@ -414,7 +414,7 @@ void DistributedEngineComm::Listen(boost::shared_ptr<MessageQueueClient> client,
// eventually let jobstep error out.
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0));
sbs.reset(new ByteStream(0U));

for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
{
@@ -1097,7 +1097,7 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_
std::unique_lock lk(fMlock);
// std::cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << std::endl;
MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0));
sbs.reset(new ByteStream(0U));

for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
{
38 changes: 22 additions & 16 deletions dbcon/joblist/groupconcat.cpp
Original file line number Diff line number Diff line change
@@ -769,6 +769,7 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row)
if (concatColIsNull(row))
return;

auto& orderByQueue = getQueue();
// if the row count is less than the limit
if (fCurrentLength < fGroupConcatLen)
{
@@ -777,7 +778,7 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row)
int16_t estLen = lengthEstimate(fRow0);
fRow0.setRid(estLen);
OrderByRow newRow(fRow0, fRule);
fOrderByQueue.push(newRow);
orderByQueue.push(newRow);
fCurrentLength += estLen;

// add to the distinct map
@@ -807,11 +808,11 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row)
}
}

else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), fOrderByQueue.top().fData))
else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), orderByQueue.top().fData))
{
OrderByRow swapRow = fOrderByQueue.top();
OrderByRow swapRow = orderByQueue.top();
fRow1.setData(swapRow.fData);
fOrderByQueue.pop();
orderByQueue.pop();
fCurrentLength -= fRow1.getRelRid();
fRow2.setData(swapRow.fData);

@@ -831,17 +832,20 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row)
fRow2.setRid(estLen);
fCurrentLength += estLen;

fOrderByQueue.push(swapRow);
orderByQueue.push(swapRow);
}
}

void GroupConcatOrderBy::merge(GroupConcator* gc)
{
GroupConcatOrderBy* go = dynamic_cast<GroupConcatOrderBy*>(gc);

while (go->fOrderByQueue.empty() == false)
auto& orderByQueue = getQueue();
auto mergeQueue = go->getQueue();

while (mergeQueue.empty() == false)
{
const OrderByRow& row = go->fOrderByQueue.top();
const OrderByRow& row = mergeQueue.top();

// check if the distinct row already exists
if (fDistinct && fDistinctMap->find(row.fData) != fDistinctMap->end())
@@ -852,7 +856,7 @@ void GroupConcatOrderBy::merge(GroupConcator* gc)
// if the row count is less than the limit
else if (fCurrentLength < fGroupConcatLen)
{
fOrderByQueue.push(row);
orderByQueue.push(row);
row1.setData(row.fData);
fCurrentLength += row1.getRelRid();

@@ -861,11 +865,11 @@ void GroupConcatOrderBy::merge(GroupConcator* gc)
fDistinctMap->insert(row.fData);
}

else if (fOrderByCond.size() > 0 && fRule.less(row.fData, fOrderByQueue.top().fData))
else if (fOrderByCond.size() > 0 && fRule.less(row.fData, orderByQueue.top().fData))
{
OrderByRow swapRow = fOrderByQueue.top();
OrderByRow swapRow = orderByQueue.top();
row1.setData(swapRow.fData);
fOrderByQueue.pop();
orderByQueue.pop();
fCurrentLength -= row1.getRelRid();

if (fDistinct)
@@ -877,10 +881,10 @@ void GroupConcatOrderBy::merge(GroupConcator* gc)
row1.setData(row.fData);
fCurrentLength += row1.getRelRid();

fOrderByQueue.push(row);
orderByQueue.push(row);
}

go->fOrderByQueue.pop();
mergeQueue.pop();
}
}

@@ -891,10 +895,12 @@ uint8_t* GroupConcatOrderBy::getResultImpl(const string& sep)

// need to reverse the order
stack<OrderByRow> rowStack;
while (fOrderByQueue.size() > 0)
auto& orderByQueue = getQueue();

while (orderByQueue.size() > 0)
{
rowStack.push(fOrderByQueue.top());
fOrderByQueue.pop();
rowStack.push(orderByQueue.top());
orderByQueue.pop();
}

size_t prevResultSize = 0;
6 changes: 6 additions & 0 deletions dbcon/joblist/jobstep.cpp
Original file line number Diff line number Diff line change
@@ -217,6 +217,12 @@ void JobStep::handleException(std::exception_ptr e, const int errorCode, const u
{
std::rethrow_exception(e);
}
// Add it here for now to handle potential bad_alloc exceptions
catch (std::bad_alloc& exc)
{
std::cerr << methodName << " caught a bad_alloc exception. " << std::endl;
catchHandler(methodName + " caught " + exc.what(), errorCode, fErrorInfo, fSessionId);
}
catch (const IDBExcept& iex)
{
std::cerr << methodName << " caught a internal exception. " << std::endl;
38 changes: 22 additions & 16 deletions dbcon/joblist/jsonarrayagg.cpp
Original file line number Diff line number Diff line change
@@ -767,6 +767,8 @@ void JsonArrayAggOrderBy::processRow(const rowgroup::Row& row)
if (concatColIsNull(row))
return;

auto& orderByQueue = getQueue();

// if the row count is less than the limit
if (fCurrentLength < fGroupConcatLen)
{
@@ -775,7 +777,7 @@ void JsonArrayAggOrderBy::processRow(const rowgroup::Row& row)
int16_t estLen = lengthEstimate(fRow0);
fRow0.setRid(estLen);
OrderByRow newRow(fRow0, fRule);
fOrderByQueue.push(newRow);
orderByQueue.push(newRow);
fCurrentLength += estLen;

// add to the distinct map
@@ -805,11 +807,11 @@ void JsonArrayAggOrderBy::processRow(const rowgroup::Row& row)
}
}

else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), fOrderByQueue.top().fData))
else if (fOrderByCond.size() > 0 && fRule.less(row.getPointer(), orderByQueue.top().fData))
{
OrderByRow swapRow = fOrderByQueue.top();
OrderByRow swapRow = orderByQueue.top();
fRow1.setData(swapRow.fData);
fOrderByQueue.pop();
orderByQueue.pop();
fCurrentLength -= fRow1.getRelRid();
fRow2.setData(swapRow.fData);

@@ -829,17 +831,20 @@ void JsonArrayAggOrderBy::processRow(const rowgroup::Row& row)
fRow2.setRid(estLen);
fCurrentLength += estLen;

fOrderByQueue.push(swapRow);
orderByQueue.push(swapRow);
}
}

void JsonArrayAggOrderBy::merge(GroupConcator* gc)
{
JsonArrayAggOrderBy* go = dynamic_cast<JsonArrayAggOrderBy*>(gc);

while (go->fOrderByQueue.empty() == false)
auto& orderByQueue = getQueue();
auto mergeQueue = go->getQueue();

while (mergeQueue.empty() == false)
{
const OrderByRow& row = go->fOrderByQueue.top();
const OrderByRow& row = mergeQueue.top();

// check if the distinct row already exists
if (fDistinct && fDistinctMap->find(row.fData) != fDistinctMap->end())
@@ -850,7 +855,7 @@ void JsonArrayAggOrderBy::merge(GroupConcator* gc)
// if the row count is less than the limit
else if (fCurrentLength < fGroupConcatLen)
{
fOrderByQueue.push(row);
orderByQueue.push(row);
row1.setData(row.fData);
fCurrentLength += row1.getRelRid();

@@ -859,11 +864,11 @@ void JsonArrayAggOrderBy::merge(GroupConcator* gc)
fDistinctMap->insert(row.fData);
}

else if (fOrderByCond.size() > 0 && fRule.less(row.fData, fOrderByQueue.top().fData))
else if (fOrderByCond.size() > 0 && fRule.less(row.fData, orderByQueue.top().fData))
{
OrderByRow swapRow = fOrderByQueue.top();
OrderByRow swapRow = orderByQueue.top();
row1.setData(swapRow.fData);
fOrderByQueue.pop();
orderByQueue.pop();
fCurrentLength -= row1.getRelRid();

if (fDistinct)
@@ -875,10 +880,10 @@ void JsonArrayAggOrderBy::merge(GroupConcator* gc)
row1.setData(row.fData);
fCurrentLength += row1.getRelRid();

fOrderByQueue.push(row);
orderByQueue.push(row);
}

go->fOrderByQueue.pop();
mergeQueue.pop();
}
}

@@ -889,11 +894,12 @@ uint8_t* JsonArrayAggOrderBy::getResultImpl(const string&)

// need to reverse the order
stack<OrderByRow> rowStack;
auto& orderByQueue = getQueue();

while (fOrderByQueue.size() > 0)
while (orderByQueue.size() > 0)
{
rowStack.push(fOrderByQueue.top());
fOrderByQueue.pop();
rowStack.push(orderByQueue.top());
orderByQueue.pop();
}
if (rowStack.size() > 0)
{
Loading