This commit is contained in:
Nikita Taranov 2024-10-21 21:58:49 +01:00
parent 01e6b6b300
commit 88006f6e09
3 changed files with 88 additions and 48 deletions

View File

@ -30,6 +30,8 @@
#include <Common/assert_cast.h>
#include <Common/formatReadable.h>
#include <Common/typeid_cast.h>
#include "Core/Block.h"
#include "Interpreters/HashJoin/ScatteredBlock.h"
#include <Interpreters/HashJoin/HashJoinMethods.h>
@ -85,6 +87,11 @@ Block filterColumnsPresentInSampleBlock(const Block & block, const Block & sampl
return filtered_block;
}
ScatteredBlock filterColumnsPresentInSampleBlock(const ScatteredBlock & block, const Block & sample_block)
{
return ScatteredBlock{filterColumnsPresentInSampleBlock(block.getSourceBlock(), sample_block)};
}
Block materializeColumnsFromRightBlock(Block block, const Block & sample_block, const Names &)
{
for (const auto & sample_column : sample_block.getColumnsWithTypeAndName())
@ -104,12 +111,6 @@ Block materializeColumnsFromRightBlock(Block block, const Block & sample_block,
JoinCommon::convertColumnToNullable(column);
}
// for (const auto & column_name : right_key_names)
// {
// auto & column = block.getByName(column_name).column;
// column = recursiveRemoveSparse(column->convertToFullColumnIfConst())->convertToFullColumnIfLowCardinality();
// }
return block;
}
}
@ -557,7 +558,7 @@ bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits)
}
}
size_t rows = source_block.rows();
const size_t rows = source_block.rows();
data->rows_to_join += rows;
const auto & right_key_names = table_join->getAllNames(JoinTableSide::Right);
ColumnPtrMap all_key_columns(right_key_names.size());
@ -567,7 +568,7 @@ bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits)
all_key_columns[column_name] = recursiveRemoveSparse(column->convertToFullColumnIfConst())->convertToFullColumnIfLowCardinality();
}
Block block_to_save = filterColumnsPresentInSampleBlock(source_block.getSourceBlock(), savedBlockSample());
ScatteredBlock block_to_save = filterColumnsPresentInSampleBlock(source_block, savedBlockSample());
if (shrink_blocks)
block_to_save = block_to_save.shrinkToFit();
@ -583,7 +584,7 @@ bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits)
{
tmp_stream = &tmp_data->createStream(right_sample_block);
}
tmp_stream->write(block_to_save);
tmp_stream->write(block_to_save.getSourceBlock());
return true;
}
@ -595,7 +596,7 @@ bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits)
if (storage_join_lock)
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "addBlockToJoin called when HashJoin locked to prevent updates");
assertBlocksHaveEqualStructure(data->sample_block, block_to_save, "joined block");
assertBlocksHaveEqualStructure(data->sample_block, block_to_save.getSourceBlock(), "joined block");
size_t min_bytes_to_compress = table_join->crossJoinMinBytesToCompress();
size_t min_rows_to_compress = table_join->crossJoinMinRowsToCompress();
@ -609,12 +610,10 @@ bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits)
have_compressed = true;
}
/// In case of scattered block we account proportional share of the source block bytes.
/// For not scattered columns it will be trivial (bytes * N / N) calculation.
data->blocks_allocated_size += block_to_save.rows() ? block_to_save.allocatedBytes() * rows / block_to_save.rows() : 0;
data->blocks_allocated_size += block_to_save.allocatedBytes();
doDebugAsserts();
data->blocks.emplace_back(std::move(block_to_save));
Block * stored_block = &data->blocks.back();
auto * stored_block = &data->blocks.back();
doDebugAsserts();
if (rows)
@ -679,7 +678,7 @@ bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits)
map,
key_columns,
key_sizes[onexpr_idx],
stored_block,
&stored_block->getSourceBlock(),
source_block.getSelector(),
null_map,
join_mask_col.getData(),
@ -687,7 +686,8 @@ bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits)
is_inserted);
if (flag_per_row)
used_flags->reinit<kind_, strictness_, std::is_same_v<std::decay_t<decltype(map)>, MapsAll>>(stored_block);
used_flags->reinit<kind_, strictness_, std::is_same_v<std::decay_t<decltype(map)>, MapsAll>>(
&stored_block->getSourceBlock());
else if (is_inserted)
/// Number of buckets + 1 value from zero storage
used_flags->reinit<kind_, strictness_, std::is_same_v<std::decay_t<decltype(map)>, MapsAll>>(size + 1);
@ -696,14 +696,16 @@ bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits)
if (!flag_per_row && save_nullmap && is_inserted)
{
data->blocks_nullmaps_allocated_size += null_map_holder->allocatedBytes();
data->blocks_nullmaps.emplace_back(stored_block, null_map_holder);
data->blocks_nullmaps_allocated_size
+= null_map_holder->size() ? null_map_holder->allocatedBytes() * rows / null_map_holder->size() : 0;
data->blocks_nullmaps.emplace_back(&stored_block->getSourceBlock(), null_map_holder);
}
if (!flag_per_row && not_joined_map && is_inserted)
{
data->blocks_nullmaps_allocated_size += not_joined_map->allocatedBytes();
data->blocks_nullmaps.emplace_back(stored_block, std::move(not_joined_map));
data->blocks_nullmaps_allocated_size
+= not_joined_map->size() ? not_joined_map->allocatedBytes() * rows / not_joined_map->size() : 0;
data->blocks_nullmaps.emplace_back(&stored_block->getSourceBlock(), std::move(not_joined_map));
}
if (!flag_per_row && !is_inserted)
@ -861,7 +863,7 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
}
};
for (const Block & block_right : data->blocks)
for (const auto & block_right : data->blocks)
{
++block_number;
if (block_number < start_right_block)
@ -869,9 +871,12 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
/// The following statement cannot be substituted with `process_right_block(!have_compressed ? block_right : block_right.decompress())`
/// because it will lead to copying of `block_right` even if its branch is taken (because common type of `block_right` and `block_right.decompress()` is `Block`).
if (!have_compressed)
process_right_block(block_right);
process_right_block(block_right.getSourceBlock());
else
process_right_block(block_right.decompress());
{
chassert(!block_right.wasScattered()); /// Compression only happens for cross join
process_right_block(block_right.getSourceBlock().decompress());
}
if (rows_added > max_joined_block_rows)
{
@ -1221,14 +1226,14 @@ private:
std::any position;
std::optional<HashJoin::BlockNullmapList::const_iterator> nulls_position;
std::optional<BlocksList::const_iterator> used_position;
std::optional<HashJoin::ScatteredBlocksList::const_iterator> used_position;
size_t fillColumnsFromData(const BlocksList & blocks, MutableColumns & columns_right)
size_t fillColumnsFromData(const HashJoin::ScatteredBlocksList & blocks, MutableColumns & columns_right)
{
if (!position.has_value())
position = std::make_any<BlocksList::const_iterator>(blocks.begin());
position = std::make_any<HashJoin::ScatteredBlocksList::const_iterator>(blocks.begin());
auto & block_it = std::any_cast<BlocksList::const_iterator &>(position);
auto & block_it = std::any_cast<HashJoin::ScatteredBlocksList::const_iterator &>(position);
auto end = blocks.end();
size_t rows_added = 0;
@ -1288,11 +1293,11 @@ private:
for (auto & it = *used_position; it != end && rows_added < max_block_size; ++it)
{
const Block & mapped_block = *it;
const auto & mapped_block = *it;
for (size_t row = 0; row < mapped_block.rows(); ++row)
{
if (!parent.isUsed(&mapped_block, row))
if (!parent.isUsed(&mapped_block.getSourceBlock(), row))
{
for (size_t colnum = 0; colnum < columns_keys_and_right.size(); ++colnum)
{
@ -1418,16 +1423,24 @@ void HashJoin::reuseJoinedData(const HashJoin & join)
}
}
BlocksList HashJoin::releaseJoinedBlocks(bool restructure)
BlocksList HashJoin::releaseJoinedBlocks(bool restructure [[maybe_unused]])
{
LOG_TRACE(
log, "{}Join data is being released, {} bytes and {} rows in hash table", instance_log_id, getTotalByteCount(), getTotalRowCount());
BlocksList right_blocks = std::move(data->blocks);
auto extract_source_blocks = [](ScatteredBlocksList && blocks)
{
BlocksList result;
for (auto & block : blocks)
result.emplace_back(std::move(block).getSourceBlock());
return result;
};
ScatteredBlocksList right_blocks = std::move(data->blocks);
if (!restructure)
{
data.reset();
return right_blocks;
return extract_source_blocks(std::move(right_blocks));
}
data->maps.clear();
@ -1441,7 +1454,7 @@ BlocksList HashJoin::releaseJoinedBlocks(bool restructure)
if (!right_blocks.empty())
{
positions.reserve(right_sample_block.columns());
const Block & tmp_block = *right_blocks.begin();
const Block & tmp_block = right_blocks.begin()->getSourceBlock();
for (const auto & sample_column : right_sample_block)
{
positions.emplace_back(tmp_block.getPositionByName(sample_column.name));
@ -1449,12 +1462,12 @@ BlocksList HashJoin::releaseJoinedBlocks(bool restructure)
}
}
for (Block & saved_block : right_blocks)
for (ScatteredBlock & saved_block : right_blocks)
{
Block restored_block;
for (size_t i = 0; i < positions.size(); ++i)
{
auto & column = saved_block.getByPosition(positions[i]);
auto & column = saved_block.getSourceBlock().getByPosition(positions[i]);
correctNullabilityInplace(column, is_nullable[i]);
restored_block.insert(column);
}
@ -1519,7 +1532,6 @@ bool HashJoin::isUsed(const Block * block_ptr, size_t row_idx) const
return used_flags->getUsedSafe(block_ptr, row_idx);
}
bool HashJoin::needUsedFlagsForPerRightTableRow(std::shared_ptr<TableJoin> table_join_) const
{
if (!table_join_->oneDisjunct())
@ -1538,7 +1550,7 @@ void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
throw Exception(ErrorCodes::LOGICAL_ERROR, "Only left or inner join table can be reranged.");
else
{
auto merge_rows_into_one_block = [&](BlocksList & blocks, RowRefList & rows_ref)
auto merge_rows_into_one_block = [&](ScatteredBlocksList & blocks, RowRefList & rows_ref)
{
auto it = rows_ref.begin();
if (it.ok())
@ -1550,7 +1562,7 @@ void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
{
return;
}
auto & block = blocks.back();
auto & block = blocks.back().getSourceBlock();
size_t start_row = block.rows();
for (; it.ok(); ++it)
{
@ -1567,23 +1579,22 @@ void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
}
};
auto visit_rows_map = [&](BlocksList & blocks, MapsAll & rows_map)
auto visit_rows_map = [&](ScatteredBlocksList & blocks, MapsAll & rows_map)
{
switch (data->type)
{
#define M(TYPE) \
case Type::TYPE: \
{\
rows_map.TYPE->forEachMapped([&](RowRefList & rows_ref) { merge_rows_into_one_block(blocks, rows_ref); }); \
break; \
}
#define M(TYPE) \
case Type::TYPE: { \
rows_map.TYPE->forEachMapped([&](RowRefList & rows_ref) { merge_rows_into_one_block(blocks, rows_ref); }); \
break; \
}
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
#undef M
default:
break;
}
};
BlocksList sorted_blocks;
ScatteredBlocksList sorted_blocks;
visit_rows_map(sorted_blocks, map);
doDebugAsserts();
data->blocks.swap(sorted_blocks);

View File

@ -337,6 +337,8 @@ public:
using RawBlockPtr = const Block *;
using BlockNullmapList = std::deque<std::pair<RawBlockPtr, ColumnPtr>>;
using ScatteredBlocksList = std::list<ScatteredBlock>;
struct RightTableData
{
Type type = Type::EMPTY;
@ -344,7 +346,7 @@ public:
std::vector<MapsVariant> maps;
Block sample_block; /// Block as it would appear in the BlockList
BlocksList blocks; /// Blocks of "right" table.
ScatteredBlocksList blocks; /// Blocks of "right" table.
BlockNullmapList blocks_nullmaps; /// Nullmaps for blocks of "right" table (if needed)
/// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows.

View File

@ -6,6 +6,9 @@
#include <base/defines.h>
#include <Common/PODArray.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
#include <boost/noncopyable.hpp>
namespace DB
@ -225,6 +228,30 @@ struct ScatteredBlock : private boost::noncopyable
/// Accounts only selected rows
size_t rows() const { return selector.size(); }
/// In case of scattered block we account proportional share of the source block bytes.
/// For not scattered columns it will be trivial (bytes * N / N) calculation.
size_t allocatedBytes() const { return block.rows() ? block.allocatedBytes() * rows() / block.rows() : 0; }
ScatteredBlock shrinkToFit() const
{
if (wasScattered())
{
LOG_TEST(getLogger("HashJoin"), "shrinkToFit() is not supported for ScatteredBlock because blocks are shared");
return ScatteredBlock{block};
}
return ScatteredBlock{block.shrinkToFit()};
}
ScatteredBlock compress() const
{
chassert(!wasScattered());
if (wasScattered())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot compress scattered block");
return ScatteredBlock{block.compress()};
}
const auto & getByPosition(size_t i) const { return block.getByPosition(i); }
/// Whether `block` was scattered, i.e. `selector` != [0, block.rows())
bool wasScattered() const
{