first working version

This commit is contained in:
Maxim Alexeev 2024-05-06 23:22:41 +03:00
parent 1b7b4fc858
commit 1660a4fe86

View File

@ -64,6 +64,7 @@ struct NotProcessedCrossJoin : public ExtraBlock
{
size_t left_position;
size_t right_block;
std::unique_ptr<InputReader> reader;
};
@ -256,7 +257,6 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
, instance_log_id(!instance_id_.empty() ? "(" + instance_id_ + ") " : "")
, log(getLogger("HashJoin"))
{
LOG_INFO(log, "KEK CONSTRUCTOR {}\n", reserve_num);
LOG_TRACE(log, "{}Keys: {}, datatype: {}, kind: {}, strictness: {}, right header: {}",
instance_log_id, TableJoin::formatClauses(table_join->getClauses(), true), data->type, kind, strictness, right_sample_block.dumpStructure());
@ -830,14 +830,23 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
if (shrink_blocks)
block_to_save = block_to_save.shrinkToFit();
size_t max_bytes_in_join = table_join->sizeLimits().max_bytes;
size_t max_rows_in_join = table_join->sizeLimits().max_rows;
if (!table_join->sizeLimits().hasLimits())
{
max_bytes_in_join = table_join->defaultMaxBytes();
}
if (kind == JoinKind::Cross)
if (kind == JoinKind::Cross
&& (tmp_stream || (max_bytes_in_join && getTotalByteCount() + block_to_save.allocatedBytes() >= max_bytes_in_join)
|| (max_rows_in_join && getTotalRowCount() + block_to_save.rows() >= max_rows_in_join)))
{
if (tmp_stream == nullptr)
{
tmp_stream = &tmp_data->createStream(right_sample_block);
}
tmp_stream->write(block_to_save);
return true;
}
size_t total_rows = 0;
@ -2251,11 +2260,13 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
{
size_t start_left_row = 0;
size_t start_right_block = 0;
std::unique_ptr<InputReader> reader = nullptr;
if (not_processed)
{
auto & continuation = static_cast<NotProcessedCrossJoin &>(*not_processed);
start_left_row = continuation.left_position;
start_right_block = continuation.right_block;
reader = std::move(continuation.reader);
not_processed.reset();
}
@ -2317,8 +2328,11 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
if (tmp_stream)
{
tmp_stream->finishWritingAsyncSafe();
auto reader = tmp_stream->getReadStream();
if (reader == nullptr)
{
tmp_stream->finishWritingAsyncSafe();
reader = tmp_stream->getReadStream();
}
while (auto block_right = reader->read())
{
process_right_block(block_right);
@ -2330,7 +2344,7 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
if (rows_added > max_joined_block_rows)
{
not_processed = std::make_shared<NotProcessedCrossJoin>(
NotProcessedCrossJoin{{block.cloneEmpty()}, left_row, block_number + 1});
NotProcessedCrossJoin{{block.cloneEmpty()}, left_row, block_number + 1, std::move(reader)});
not_processed->block.swap(block);
break;
}