ensure only the last processor could access non-joined blocks

This commit is contained in:
lgbo-ustc 2023-05-09 11:17:09 +08:00
parent 29ade23397
commit 603c024eb0
5 changed files with 115 additions and 67 deletions

View File

@ -472,9 +472,9 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const
return hash_join_is_empty;
}
// This is only be called for bucket[0] at present. other buckets' non-joined blocks are generated in
// DelayedBlocks.
// There is a finished counter in JoiningTransform, only the last JoiningTransform could call this function.
// There is a finished counter in JoiningTransform/DelayedJoinedBlocksWorkerTransform,
// only the last processor could call this function to ensure all used flags have been inserted.
// To support delayed stream mode, need to keep the hash join before next getDelayedBlocks call.
IBlocksStreamPtr
GraceHashJoin::getNonJoinedBlocks(const Block & left_sample_block_, const Block & result_sample_block_, UInt64 max_block_size_) const
{
@ -489,60 +489,30 @@ public:
Buckets buckets_,
InMemoryJoinPtr hash_join_,
const Names & left_key_names_,
const Names & right_key_names_,
const Block & left_sample_block_,
const Block & result_sample_block_,
size_t max_block_size_)
const Names & right_key_names_)
: current_bucket(current_bucket_)
, buckets(std::move(buckets_))
, hash_join(std::move(hash_join_))
, left_reader(buckets[current_bucket]->getLeftTableReader())
, left_key_names(left_key_names_)
, right_key_names(right_key_names_)
, left_sample_block(left_sample_block_)
, result_sample_block(result_sample_block_)
, max_block_size(max_block_size_)
{
}
// One DelayedBlocks is shared among multiple DelayedJoinedBlocksWorkerTransforms, need locks for
// - reading from left_reader. left_reader.read() has had a lock inside.
// - reading non-joined blocks from hash_join. Since iterate on non-joined blocks will has state
// changed inside.
Block nextImpl() override
{
// there is data race case wihthout this lock:
// 1. thread 1 read the last block from left_reader, but not finish the join
// 2. thread 2 try to read from non-joined blocks. Since thread 1 has not finished,
// the used flags in the hash_join is incomplete, then thread 2 return invalid mismatch rows.
std::lock_guard lock(non_joined_blocks_mutex);
Block block;
size_t num_buckets = buckets.size();
size_t current_idx = buckets[current_bucket]->idx;
do
{
// When left reader finish, return non-joined blocks.
// empty block means the end of this stream.
if (!is_left_reader_finished)
// One DelayedBlocks is shared among multiple DelayedJoinedBlocksWorkerTransform.
// There is a lock inside left_reader.read().
block = left_reader.read();
if (!block)
{
block = left_reader.read();
if (!block)
{
is_left_reader_finished = true;
non_joined_blocks = hash_join->getNonJoinedBlocks(left_sample_block, result_sample_block, max_block_size);
}
}
if (is_left_reader_finished)
{
if (non_joined_blocks)
{
return non_joined_blocks->next();
}
else
return {};
return {};
}
// block comes from left_reader, need to join with right table to get the result.
@ -585,12 +555,6 @@ public:
Names left_key_names;
Names right_key_names;
Block left_sample_block;
Block result_sample_block;
size_t max_block_size = 0;
bool is_left_reader_finished = false;
IBlocksStreamPtr non_joined_blocks = nullptr;
std::mutex non_joined_blocks_mutex;
};
IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
@ -636,18 +600,7 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
LOG_TRACE(log, "Loaded bucket {} with {}(/{}) rows",
bucket_idx, hash_join->getTotalRowCount(), num_rows);
auto result_sample_block = left_sample_block;
ExtraBlockPtr tmp;
hash_join->joinBlock(result_sample_block, tmp);
return std::make_unique<DelayedBlocks>(
current_bucket->idx,
buckets,
hash_join,
left_key_names,
right_key_names,
left_sample_block,
result_sample_block,
max_block_size);
return std::make_unique<DelayedBlocks>(current_bucket->idx, buckets, hash_join, left_key_names, right_key_names);
}
LOG_TRACE(log, "Finished loading all {} buckets", buckets.size());

View File

@ -311,8 +311,16 @@ void FillingRightJoinSideTransform::work()
}
DelayedJoinedBlocksWorkerTransform::DelayedJoinedBlocksWorkerTransform(Block output_header)
: IProcessor(InputPorts{Block()}, OutputPorts{output_header})
DelayedJoinedBlocksWorkerTransform::DelayedJoinedBlocksWorkerTransform(
Block left_header_,
Block output_header_,
size_t max_block_size_,
JoinPtr join_)
: IProcessor(InputPorts{Block()}, OutputPorts{output_header_})
, left_header(left_header_)
, output_header(output_header_)
, max_block_size(max_block_size_)
, join(join_)
{
}
@ -366,6 +374,7 @@ IProcessor::Status DelayedJoinedBlocksWorkerTransform::prepare()
if (!data.chunk.hasChunkInfo())
throw Exception(ErrorCodes::LOGICAL_ERROR, "DelayedJoinedBlocksWorkerTransform must have chunk info");
task = std::dynamic_pointer_cast<const DelayedBlocksTask>(data.chunk.getChunkInfo());
}
else
{
@ -386,12 +395,24 @@ void DelayedJoinedBlocksWorkerTransform::work()
{
if (!task)
return;
Block block;
if (!left_delayed_stream_finished)
{
block = task->delayed_blocks->next();
Block block = task->delayed_blocks->next();
if (!block)
{
left_delayed_stream_finished = true;
block = nextNonJoinedBlock();
}
}
else
{
block = nextNonJoinedBlock();
}
if (!block)
{
task.reset();
resetTask();
return;
}
@ -400,6 +421,38 @@ void DelayedJoinedBlocksWorkerTransform::work()
output_chunk.setColumns(block.getColumns(), rows);
}
void DelayedJoinedBlocksWorkerTransform::resetTask()
{
task.reset();
left_delayed_stream_finished = false;
setup_non_joined_stream = false;
non_joined_delay_stream = nullptr;
}
Block DelayedJoinedBlocksWorkerTransform::nextNonJoinedBlock()
{
if (!setup_non_joined_stream)
{
setup_non_joined_stream = true;
// Before read from non-joined stream, all blocks in left file reader must have been joined.
// For example, in HashJoin, it may return invalid mismatch rows from non-joined stream before
// the all blocks in left file reader have been finished, since the used flags are incomplete.
// To make only one processor could read from non-joined stream seems be a easy way.
if (task && task->left_delayed_stream_finish_counter->isLast())
{
if (!non_joined_delay_stream)
{
non_joined_delay_stream = join->getNonJoinedBlocks(left_header, output_header, max_block_size);
}
}
}
if (non_joined_delay_stream)
{
return non_joined_delay_stream->next();
}
return {};
}
DelayedJoinedBlocksTransform::DelayedJoinedBlocksTransform(size_t num_streams, JoinPtr join_)
: IProcessor(InputPorts{}, OutputPorts(num_streams, Block()))
, join(std::move(join_))
@ -451,10 +504,14 @@ IProcessor::Status DelayedJoinedBlocksTransform::prepare()
if (delayed_blocks)
{
// This counter is used to ensure that only the last DelayedJoinedBlocksWorkerTransform
// could read non-joined blocks from the join.
auto left_delayed_stream_finished_counter = std::make_shared<JoiningTransform::FinishCounter>(outputs.size());
for (auto & output : outputs)
{
Chunk chunk;
chunk.setChunkInfo(std::make_shared<DelayedBlocksTask>(delayed_blocks));
auto task = std::make_shared<DelayedBlocksTask>(delayed_blocks, left_delayed_stream_finished_counter);
chunk.setChunkInfo(task);
output.push(std::move(chunk));
}
delayed_blocks = nullptr;

View File

@ -116,9 +116,14 @@ class DelayedBlocksTask : public ChunkInfo
public:
explicit DelayedBlocksTask() : finished(true) {}
explicit DelayedBlocksTask(IBlocksStreamPtr delayed_blocks_) : delayed_blocks(std::move(delayed_blocks_)) {}
explicit DelayedBlocksTask(IBlocksStreamPtr delayed_blocks_, JoiningTransform::FinishCounterPtr left_delayed_stream_finish_counter_)
: delayed_blocks(std::move(delayed_blocks_))
, left_delayed_stream_finish_counter(left_delayed_stream_finish_counter_)
{
}
IBlocksStreamPtr delayed_blocks = nullptr;
JoiningTransform::FinishCounterPtr left_delayed_stream_finish_counter = nullptr;
bool finished = false;
};
@ -147,7 +152,11 @@ private:
class DelayedJoinedBlocksWorkerTransform : public IProcessor
{
public:
explicit DelayedJoinedBlocksWorkerTransform(Block output_header);
explicit DelayedJoinedBlocksWorkerTransform(
Block left_header_,
Block output_header_,
size_t max_block_size_,
JoinPtr join_);
String getName() const override { return "DelayedJoinedBlocksWorkerTransform"; }
@ -155,10 +164,19 @@ public:
void work() override;
private:
Block left_header;
Block output_header;
size_t max_block_size;
JoinPtr join;
DelayedBlocksTaskPtr task;
Chunk output_chunk;
bool finished = false;
bool left_delayed_stream_finished = false;
bool setup_non_joined_stream = false;
IBlocksStreamPtr non_joined_delay_stream = nullptr;
void resetTask();
Block nextNonJoinedBlock();
};
}

View File

@ -491,7 +491,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
if (delayed_root)
{
// Process delayed joined blocks when all JoiningTransform are finished.
auto delayed = std::make_shared<DelayedJoinedBlocksWorkerTransform>(joined_header);
auto delayed = std::make_shared<DelayedJoinedBlocksWorkerTransform>(left_header, joined_header, max_block_size, join);
if (delayed->getInputs().size() != 1 || delayed->getOutputs().size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "DelayedJoinedBlocksWorkerTransform should have one input and one output");

View File

@ -360,6 +360,7 @@ t1 ALL FULL JOIN tn2 | bs = {{ block_size }}
2 \N 5 0
3 3 4 4
t1 ALL FULL JOIN USING tn2 | bs = {{ block_size }}
{% if join_algorithm == 'full_sorting_merge' -%}
1 4 5
1 4 5
2 5 0
@ -377,6 +378,25 @@ t1 ALL FULL JOIN USING tn2 | bs = {{ block_size }}
\N 0 5
\N 0 5
\N 0 5
{% else -%}
0 0 5
0 0 5
0 0 5
0 0 5
0 0 5
0 0 5
0 0 5
1 4 5
1 4 5
2 5 0
2 5 0
2 5 0
2 5 0
2 5 0
2 5 0
2 5 0
3 4 4
{% endif -%}
tn1 ALL INNER JOIN t2 | bs = {{ block_size }}
1 1 4 5
1 1 4 5