Merge pull request #67782 from ClickHouse/better_parallel_hash2

Scatter blocks in hash join without copying
This commit is contained in:
Nikita Taranov 2024-11-17 21:02:26 +00:00 committed by GitHub
commit c198e1cb1f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 1118 additions and 363 deletions

View File

@ -52,6 +52,7 @@ private:
explicit ColumnVector(const size_t n) : data(n) {}
ColumnVector(const size_t n, const ValueType x) : data(n, x) {}
ColumnVector(const ColumnVector & src) : data(src.data.begin(), src.data.end()) {}
ColumnVector(Container::const_iterator begin, Container::const_iterator end) : data(begin, end) { }
/// Sugar constructor.
ColumnVector(std::initializer_list<T> il) : data{il} {}

View File

@ -150,6 +150,9 @@ Squash blocks passed to the external table to a specified size in bytes, if bloc
)", 0) \
DECLARE(UInt64, max_joined_block_size_rows, DEFAULT_BLOCK_SIZE, R"(
Maximum block size for JOIN result (if join algorithm supports it). 0 means unlimited.
)", 0) \
DECLARE(UInt64, min_joined_block_size_bytes, 524288, R"(
Minimum block size for JOIN result (if join algorithm supports it). 0 means unlimited.
)", 0) \
DECLARE(UInt64, max_insert_threads, 0, R"(
The maximum number of threads to execute the `INSERT SELECT` query.

View File

@ -80,6 +80,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"backup_restore_finish_timeout_after_error_sec", 0, 180, "New setting."},
{"query_plan_merge_filters", false, true, "Allow to merge filters in the query plan. This is required to properly support filter-push-down with a new analyzer."},
{"parallel_replicas_local_plan", false, true, "Use local plan for local replica in a query with parallel replicas"},
{"min_joined_block_size_bytes", 524288, 524288, "New setting."},
{"allow_experimental_bfloat16_type", false, false, "Add new experimental BFloat16 type"},
{"filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit", 1, 1, "Rename of setting skip_download_if_exceeds_query_cache_limit"},
{"filesystem_cache_prefer_bigger_buffer_size", true, true, "New setting"},

View File

@ -5,9 +5,12 @@
#include <Core/Names.h>
#include <Core/NamesAndTypes.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/Serializations/ISerialization.h>
#include <Interpreters/ConcurrentHashJoin.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/HashJoin/ScatteredBlock.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/createBlockSelector.h>
@ -26,6 +29,12 @@
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <algorithm>
#include <numeric>
#include <vector>
using namespace DB;
namespace ProfileEvents
{
extern const Event HashJoinPreallocatedElementsInHashTables;
@ -118,9 +127,7 @@ ConcurrentHashJoin::ConcurrentHashJoin(
auto inner_hash_join = std::make_shared<InternalHashJoin>();
inner_hash_join->data = std::make_unique<HashJoin>(
table_join_, right_sample_block, any_take_last_row_, reserve_size, fmt::format("concurrent{}", idx));
/// Non zero `max_joined_block_rows` allows to process block partially and return not processed part.
/// TODO: It's not handled properly in ConcurrentHashJoin case, so we set it to 0 to disable this feature.
inner_hash_join->data->setMaxJoinedBlockRows(0);
inner_hash_join->data->setMaxJoinedBlockRows(table_join->maxJoinedBlockRows());
hash_joins[idx] = std::move(inner_hash_join);
});
}
@ -167,10 +174,13 @@ ConcurrentHashJoin::~ConcurrentHashJoin()
}
}
bool ConcurrentHashJoin::addBlockToJoin(const Block & right_block, bool check_limits)
bool ConcurrentHashJoin::addBlockToJoin(const Block & right_block_, bool check_limits)
{
Blocks dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_right, right_block);
/// We materialize columns here to avoid materializing them multiple times on different threads
/// (inside different `hash_join`-s) because the block will be shared.
Block right_block = hash_joins[0]->data->materializeColumnsFromRightBlock(right_block_);
auto dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_right, std::move(right_block));
size_t blocks_left = 0;
for (const auto & block : dispatched_blocks)
{
@ -213,19 +223,52 @@ bool ConcurrentHashJoin::addBlockToJoin(const Block & right_block, bool check_li
void ConcurrentHashJoin::joinBlock(Block & block, std::shared_ptr<ExtraBlock> & /*not_processed*/)
{
Blocks dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_left, block);
Blocks res;
ExtraScatteredBlocks extra_blocks;
joinBlock(block, extra_blocks, res);
chassert(!extra_blocks.rows());
block = concatenateBlocks(res);
}
void ConcurrentHashJoin::joinBlock(Block & block, ExtraScatteredBlocks & extra_blocks, std::vector<Block> & res)
{
ScatteredBlocks dispatched_blocks;
auto & remaining_blocks = extra_blocks.remaining_blocks;
if (extra_blocks.rows())
{
dispatched_blocks.swap(remaining_blocks);
}
else
{
hash_joins[0]->data->materializeColumnsFromLeftBlock(block);
dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_left, std::move(block));
}
block = {};
/// Just in case, should be no-op always
remaining_blocks.resize(slots);
chassert(res.empty());
res.clear();
res.reserve(dispatched_blocks.size());
for (size_t i = 0; i < dispatched_blocks.size(); ++i)
{
std::shared_ptr<ExtraBlock> none_extra_block;
auto & hash_join = hash_joins[i];
auto & dispatched_block = dispatched_blocks[i];
hash_join->data->joinBlock(dispatched_block, none_extra_block);
if (dispatched_block && (i == 0 || dispatched_block.rows()))
hash_join->data->joinBlock(dispatched_block, remaining_blocks[i]);
if (none_extra_block && !none_extra_block->empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "not_processed should be empty");
}
block = concatenateBlocks(dispatched_blocks);
for (size_t i = 0; i < dispatched_blocks.size(); ++i)
{
auto & dispatched_block = dispatched_blocks[i];
if (dispatched_block && (i == 0 || dispatched_block.rows()))
res.emplace_back(std::move(dispatched_block).getSourceBlock());
}
}
void ConcurrentHashJoin::checkTypesOfKeys(const Block & block) const
@ -304,10 +347,9 @@ static ALWAYS_INLINE IColumn::Selector hashToSelector(const WeakHash32 & hash, s
return selector;
}
IColumn::Selector ConcurrentHashJoin::selectDispatchBlock(const Strings & key_columns_names, const Block & from_block)
IColumn::Selector selectDispatchBlock(size_t num_shards, const Strings & key_columns_names, const Block & from_block)
{
size_t num_rows = from_block.rows();
size_t num_shards = hash_joins.size();
WeakHash32 hash(num_rows);
for (const auto & key_name : key_columns_names)
@ -319,30 +361,78 @@ IColumn::Selector ConcurrentHashJoin::selectDispatchBlock(const Strings & key_co
return hashToSelector(hash, num_shards);
}
Blocks ConcurrentHashJoin::dispatchBlock(const Strings & key_columns_names, const Block & from_block)
ScatteredBlocks scatterBlocksByCopying(size_t num_shards, const IColumn::Selector & selector, const Block & from_block)
{
/// TODO: use JoinCommon::scatterBlockByHash
size_t num_shards = hash_joins.size();
size_t num_cols = from_block.columns();
IColumn::Selector selector = selectDispatchBlock(key_columns_names, from_block);
Blocks result(num_shards);
Blocks blocks(num_shards);
for (size_t i = 0; i < num_shards; ++i)
result[i] = from_block.cloneEmpty();
blocks[i] = from_block.cloneEmpty();
for (size_t i = 0; i < num_cols; ++i)
for (size_t i = 0; i < from_block.columns(); ++i)
{
auto dispatched_columns = from_block.getByPosition(i).column->scatter(num_shards, selector);
assert(result.size() == dispatched_columns.size());
chassert(blocks.size() == dispatched_columns.size());
for (size_t block_index = 0; block_index < num_shards; ++block_index)
{
result[block_index].getByPosition(i).column = std::move(dispatched_columns[block_index]);
blocks[block_index].getByPosition(i).column = std::move(dispatched_columns[block_index]);
}
}
ScatteredBlocks result;
result.reserve(num_shards);
for (size_t i = 0; i < num_shards; ++i)
result.emplace_back(std::move(blocks[i]));
return result;
}
ScatteredBlocks scatterBlocksWithSelector(size_t num_shards, const IColumn::Selector & selector, const Block & from_block)
{
std::vector<ScatteredBlock::IndexesPtr> selectors(num_shards);
for (size_t i = 0; i < num_shards; ++i)
{
selectors[i] = ScatteredBlock::Indexes::create();
selectors[i]->reserve(selector.size() / num_shards + 1);
}
for (size_t i = 0; i < selector.size(); ++i)
{
const size_t shard = selector[i];
selectors[shard]->getData().push_back(i);
}
ScatteredBlocks result;
result.reserve(num_shards);
for (size_t i = 0; i < num_shards; ++i)
result.emplace_back(from_block, std::move(selectors[i]));
return result;
}
ScatteredBlocks ConcurrentHashJoin::dispatchBlock(const Strings & key_columns_names, Block && from_block)
{
size_t num_shards = hash_joins.size();
if (num_shards == 1)
{
ScatteredBlocks res;
res.emplace_back(std::move(from_block));
return res;
}
IColumn::Selector selector = selectDispatchBlock(num_shards, key_columns_names, from_block);
/// With zero-copy approach we won't copy the source columns, but will create a new one with indices.
/// This is not beneficial when the whole set of columns is e.g. a single small column.
constexpr auto threshold = sizeof(IColumn::Selector::value_type);
const auto & data_types = from_block.getDataTypes();
const bool use_zero_copy_approach
= std::accumulate(
data_types.begin(),
data_types.end(),
0u,
[](size_t sum, const DataTypePtr & type)
{ return sum + (type->haveMaximumSizeOfValue() ? type->getMaximumSizeOfValueInMemory() : threshold + 1); })
> threshold;
return use_zero_copy_approach ? scatterBlocksWithSelector(num_shards, selector, from_block)
: scatterBlocksByCopying(num_shards, selector, from_block);
}
UInt64 calculateCacheKey(
std::shared_ptr<TableJoin> & table_join, const QueryTreeNodePtr & right_table_expression, const SelectQueryInfo & select_query_info)
{

View File

@ -47,7 +47,7 @@ public:
std::string getName() const override { return "ConcurrentHashJoin"; }
const TableJoin & getTableJoin() const override { return *table_join; }
bool addBlockToJoin(const Block & block, bool check_limits) override;
bool addBlockToJoin(const Block & right_block_, bool check_limits) override;
void checkTypesOfKeys(const Block & block) const override;
void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) override;
void setTotals(const Block & block) override;
@ -57,6 +57,9 @@ public:
bool alwaysReturnsEmptySet() const override;
bool supportParallelJoin() const override { return true; }
bool isScatteredJoin() const override { return true; }
void joinBlock(Block & block, ExtraScatteredBlocks & extra_blocks, std::vector<Block> & res) override;
IBlocksStreamPtr
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
@ -78,8 +81,7 @@ private:
std::mutex totals_mutex;
Block totals;
IColumn::Selector selectDispatchBlock(const Strings & key_columns_names, const Block & from_block);
Blocks dispatchBlock(const Strings & key_columns_names, const Block & from_block);
ScatteredBlocks dispatchBlock(const Strings & key_columns_names, Block && from_block);
};
UInt64 calculateCacheKey(

View File

@ -3,14 +3,16 @@
namespace DB
{
JoinOnKeyColumns::JoinOnKeyColumns(const Block & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_)
: key_names(key_names_)
, materialized_keys_holder(JoinCommon::materializeColumns(
block, key_names)) /// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them.
JoinOnKeyColumns::JoinOnKeyColumns(
const ScatteredBlock & block_, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_)
: block(block_)
, key_names(key_names_)
/// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them.
, materialized_keys_holder(JoinCommon::materializeColumns(block.getSourceBlock(), key_names))
, key_columns(JoinCommon::getRawPointers(materialized_keys_holder))
, null_map(nullptr)
, null_map_holder(extractNestedColumnsAndNullMap(key_columns, null_map))
, join_mask_column(JoinCommon::getColumnAsMask(block, cond_column_name))
, join_mask_column(JoinCommon::getColumnAsMask(block.getSourceBlock(), cond_column_name))
, key_sizes(key_sizes_)
{
}

View File

@ -1,4 +1,6 @@
#pragma once
#include <Core/Defines.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/TableJoin.h>
@ -14,6 +16,8 @@ using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
struct JoinOnKeyColumns
{
const ScatteredBlock & block;
Names key_names;
Columns materialized_keys_holder;
@ -27,9 +31,13 @@ struct JoinOnKeyColumns
Sizes key_sizes;
explicit JoinOnKeyColumns(const Block & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_);
JoinOnKeyColumns(
const ScatteredBlock & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_);
bool isRowFiltered(size_t i) const { return join_mask_column.isRowFiltered(i); }
bool isRowFiltered(size_t i) const
{
return join_mask_column.isRowFiltered(i);
}
};
template <bool lazy>
@ -54,7 +62,7 @@ public:
};
AddedColumns(
const Block & left_block_,
const ScatteredBlock & left_block_,
const Block & block_with_columns_to_add,
const Block & saved_block_sample,
const HashJoin & join,
@ -62,10 +70,11 @@ public:
ExpressionActionsPtr additional_filter_expression_,
bool is_asof_join,
bool is_join_get_)
: left_block(left_block_)
: src_block(left_block_)
, left_block(left_block_.getSourceBlock())
, join_on_keys(join_on_keys_)
, additional_filter_expression(additional_filter_expression_)
, rows_to_add(left_block.rows())
, rows_to_add(left_block_.rows())
, join_data_avg_perkey_rows(join.getJoinedData()->avgPerKeyRows())
, output_by_row_list_threshold(join.getTableJoin().outputByRowListPerkeyRowsThreshold())
, join_data_sorted(join.getJoinedData()->sorted)
@ -139,6 +148,7 @@ public:
static constexpr bool isLazy() { return lazy; }
const ScatteredBlock & src_block;
Block left_block;
std::vector<JoinOnKeyColumns> join_on_keys;
ExpressionActionsPtr additional_filter_expression;
@ -159,7 +169,7 @@ public:
return;
/// Do not allow big allocations when user set max_joined_block_rows to huge value
size_t reserve_size = std::min<size_t>(max_joined_block_rows, DEFAULT_BLOCK_SIZE * 2);
size_t reserve_size = std::min<size_t>(max_joined_block_rows, rows_to_add * 2);
if (need_replicate)
/// Reserve 10% more space for columns, because some rows can be repeated
@ -218,7 +228,7 @@ private:
void addColumn(const ColumnWithTypeAndName & src_column, const std::string & qualified_name)
{
columns.push_back(src_column.column->cloneEmpty());
columns.back()->reserve(src_column.column->size());
columns.back()->reserve(rows_to_add);
type_name.emplace_back(src_column.type, src_column.name, qualified_name);
}

View File

@ -13,24 +13,23 @@
#include <Common/logger_useful.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/JoinUtils.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/joinDispatch.h>
#include <Interpreters/NullableUtils.h>
#include <Interpreters/RowRefs.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/joinDispatch.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Common/Exception.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/formatReadable.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Common/typeid_cast.h>
#include <Interpreters/HashJoin/HashJoinMethods.h>
#include <Interpreters/HashJoin/JoinUsedFlags.h>
@ -40,16 +39,16 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int INCOMPATIBLE_TYPE_OF_JOIN;
extern const int UNSUPPORTED_JOIN_KEYS;
extern const int LOGICAL_ERROR;
extern const int SYNTAX_ERROR;
extern const int SET_SIZE_LIMIT_EXCEEDED;
extern const int TYPE_MISMATCH;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INVALID_JOIN_ON_EXPRESSION;
extern const int NOT_IMPLEMENTED;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int INCOMPATIBLE_TYPE_OF_JOIN;
extern const int UNSUPPORTED_JOIN_KEYS;
extern const int LOGICAL_ERROR;
extern const int SYNTAX_ERROR;
extern const int SET_SIZE_LIMIT_EXCEEDED;
extern const int TYPE_MISMATCH;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INVALID_JOIN_ON_EXPRESSION;
}
namespace
@ -72,6 +71,40 @@ Int64 getCurrentQueryMemoryUsage()
return 0;
}
Block filterColumnsPresentInSampleBlock(const Block & block, const Block & sample_block)
{
Block filtered_block;
for (const auto & sample_column : sample_block.getColumnsWithTypeAndName())
filtered_block.insert(block.getByName(sample_column.name));
return filtered_block;
}
ScatteredBlock filterColumnsPresentInSampleBlock(const ScatteredBlock & block, const Block & sample_block)
{
return ScatteredBlock{filterColumnsPresentInSampleBlock(block.getSourceBlock(), sample_block)};
}
Block materializeColumnsFromRightBlock(Block block, const Block & sample_block, const Names &)
{
for (const auto & sample_column : sample_block.getColumnsWithTypeAndName())
{
auto & column = block.getByName(sample_column.name);
/// There's no optimization for right side const columns. Remove constness if any.
column.column = recursiveRemoveSparse(column.column->convertToFullColumnIfConst());
if (column.column->lowCardinality() && !sample_column.column->lowCardinality())
{
column.column = column.column->convertToFullColumnIfLowCardinality();
column.type = removeLowCardinality(column.type);
}
if (sample_column.column->isNullable())
JoinCommon::convertColumnToNullable(column);
}
return block;
}
}
static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nullable)
@ -91,8 +124,12 @@ static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nulla
}
}
HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_,
bool any_take_last_row_, size_t reserve_num_, const String & instance_id_)
HashJoin::HashJoin(
std::shared_ptr<TableJoin> table_join_,
const Block & right_sample_block_,
bool any_take_last_row_,
size_t reserve_num_,
const String & instance_id_)
: table_join(table_join_)
, kind(table_join->kind())
, strictness(table_join->strictness())
@ -107,8 +144,15 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
, instance_log_id(!instance_id_.empty() ? "(" + instance_id_ + ") " : "")
, log(getLogger("HashJoin"))
{
LOG_TRACE(log, "{}Keys: {}, datatype: {}, kind: {}, strictness: {}, right header: {}",
instance_log_id, TableJoin::formatClauses(table_join->getClauses(), true), data->type, kind, strictness, right_sample_block.dumpStructure());
LOG_TRACE(
log,
"{}Keys: {}, datatype: {}, kind: {}, strictness: {}, right header: {}",
instance_log_id,
TableJoin::formatClauses(table_join->getClauses(), true),
data->type,
kind,
strictness,
right_sample_block.dumpStructure());
validateAdditionalFilterExpression(table_join->getMixedJoinExpression());
@ -252,8 +296,8 @@ HashJoin::Type HashJoin::chooseMethod(JoinKind kind, const ColumnRawPtrs & key_c
};
const auto * key_column = key_columns[0];
if (is_string_column(key_column) ||
(isColumnConst(*key_column) && is_string_column(assert_cast<const ColumnConst *>(key_column)->getDataColumnPtr().get())))
if (is_string_column(key_column)
|| (isColumnConst(*key_column) && is_string_column(assert_cast<const ColumnConst *>(key_column)->getDataColumnPtr().get())))
return Type::key_string;
}
@ -323,7 +367,8 @@ size_t HashJoin::getTotalRowCount() const
auto prefer_use_maps_all = table_join->getMixedJoinExpression() != nullptr;
for (const auto & map : data->maps)
{
joinDispatch(kind, strictness, map, prefer_use_maps_all, [&](auto, auto, auto & map_) { res += map_.getTotalRowCount(data->type); });
joinDispatch(
kind, strictness, map, prefer_use_maps_all, [&](auto, auto, auto & map_) { res += map_.getTotalRowCount(data->type); });
}
}
@ -338,16 +383,22 @@ void HashJoin::doDebugAsserts() const
debug_blocks_allocated_size += block.allocatedBytes();
if (data->blocks_allocated_size != debug_blocks_allocated_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_allocated_size != debug_blocks_allocated_size ({} != {})",
data->blocks_allocated_size, debug_blocks_allocated_size);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"data->blocks_allocated_size != debug_blocks_allocated_size ({} != {})",
data->blocks_allocated_size,
debug_blocks_allocated_size);
size_t debug_blocks_nullmaps_allocated_size = 0;
for (const auto & nullmap : data->blocks_nullmaps)
debug_blocks_nullmaps_allocated_size += nullmap.second->allocatedBytes();
debug_blocks_nullmaps_allocated_size += nullmap.allocatedBytes();
if (data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size ({} != {})",
data->blocks_nullmaps_allocated_size, debug_blocks_nullmaps_allocated_size);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"data->blocks_nullmaps_allocated_size != debug_blocks_nullmaps_allocated_size ({} != {})",
data->blocks_nullmaps_allocated_size,
debug_blocks_nullmaps_allocated_size);
#endif
}
@ -369,7 +420,12 @@ size_t HashJoin::getTotalByteCount() const
auto prefer_use_maps_all = table_join->getMixedJoinExpression() != nullptr;
for (const auto & map : data->maps)
{
joinDispatch(kind, strictness, map, prefer_use_maps_all, [&](auto, auto, auto & map_) { res += map_.getTotalByteCountImpl(data->type); });
joinDispatch(
kind,
strictness,
map,
prefer_use_maps_all,
[&](auto, auto, auto & map_) { res += map_.getTotalByteCountImpl(data->type); });
}
}
return res;
@ -386,11 +442,8 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample)
bool multiple_disjuncts = !table_join->oneDisjunct();
/// We could remove key columns for LEFT | INNER HashJoin but we should keep them for JoinSwitcher (if any).
bool save_key_columns = table_join->isEnabledAlgorithm(JoinAlgorithm::AUTO) ||
table_join->isEnabledAlgorithm(JoinAlgorithm::GRACE_HASH) ||
isRightOrFull(kind) ||
multiple_disjuncts ||
table_join->getMixedJoinExpression();
bool save_key_columns = table_join->isEnabledAlgorithm(JoinAlgorithm::AUTO) || table_join->isEnabledAlgorithm(JoinAlgorithm::GRACE_HASH)
|| isRightOrFull(kind) || multiple_disjuncts || table_join->getMixedJoinExpression();
if (save_key_columns)
{
saved_block_sample = right_table_keys.cloneEmpty();
@ -411,29 +464,27 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample)
}
}
void HashJoin::materializeColumnsFromLeftBlock(Block & block) const
{
/** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized.
* Because if they are constants, then in the "not joined" rows, they may have different values
* - default values, which can differ from the values of these constants.
*/
if (kind == JoinKind::Right || kind == JoinKind::Full)
{
materializeBlockInplace(block);
}
}
Block HashJoin::materializeColumnsFromRightBlock(Block block) const
{
return DB::materializeColumnsFromRightBlock(std::move(block), savedBlockSample(), table_join->getAllNames(JoinTableSide::Right));
}
Block HashJoin::prepareRightBlock(const Block & block, const Block & saved_block_sample_)
{
Block structured_block;
for (const auto & sample_column : saved_block_sample_.getColumnsWithTypeAndName())
{
ColumnWithTypeAndName column = block.getByName(sample_column.name);
/// There's no optimization for right side const columns. Remove constness if any.
column.column = recursiveRemoveSparse(column.column->convertToFullColumnIfConst());
if (column.column->lowCardinality() && !sample_column.column->lowCardinality())
{
column.column = column.column->convertToFullColumnIfLowCardinality();
column.type = removeLowCardinality(column.type);
}
if (sample_column.column->isNullable())
JoinCommon::convertColumnToNullable(column);
structured_block.insert(std::move(column));
}
return structured_block;
Block prepared_block = DB::materializeColumnsFromRightBlock(block, saved_block_sample_, {});
return filterColumnsPresentInSampleBlock(prepared_block, saved_block_sample_);
}
Block HashJoin::prepareRightBlock(const Block & block) const
@ -441,15 +492,22 @@ Block HashJoin::prepareRightBlock(const Block & block) const
return prepareRightBlock(block, savedBlockSample());
}
bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
bool HashJoin::addBlockToJoin(const Block & source_block, bool check_limits)
{
auto materialized = materializeColumnsFromRightBlock(source_block);
auto scattered_block = ScatteredBlock{materialized};
return addBlockToJoin(scattered_block, check_limits);
}
bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits)
{
if (!data)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Join data was released");
/// RowRef::SizeT is uint32_t (not size_t) for hash table Cell memory efficiency.
/// It's possible to split bigger blocks and insert them by parts here. But it would be a dead code.
if (unlikely(source_block_.rows() > std::numeric_limits<RowRef::SizeT>::max()))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Too many rows in right table block for HashJoin: {}", source_block_.rows());
if (unlikely(source_block.rows() > std::numeric_limits<RowRef::SizeT>::max()))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Too many rows in right table block for HashJoin: {}", source_block.rows());
/** We do not allocate memory for stored blocks inside HashJoin, only for hash table.
* In case when we have all the blocks allocated before the first `addBlockToJoin` call, will already be quite high.
@ -458,7 +516,6 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
if (!memory_usage_before_adding_blocks)
memory_usage_before_adding_blocks = getCurrentQueryMemoryUsage();
Block source_block = source_block_;
if (strictness == JoinStrictness::Asof)
{
chassert(kind == JoinKind::Left || kind == JoinKind::Inner);
@ -467,7 +524,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
/// We support only INNER/LEFT ASOF join, so rows with NULLs never return from the right joined table.
/// So filter them out here not to handle in implementation.
const auto & asof_key_name = table_join->getOnlyClause().key_names_right.back();
auto & asof_column = source_block.getByName(asof_key_name);
const auto & asof_column = source_block.getByName(asof_key_name);
if (asof_column.type->isNullable())
{
@ -485,13 +542,12 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
for (size_t i = 0; i < asof_column_nullable.size(); ++i)
negative_null_map[i] = !asof_column_nullable[i];
for (auto & column : source_block)
column.column = column.column->filter(negative_null_map, -1);
source_block.filter(negative_null_map);
}
}
}
size_t rows = source_block.rows();
const size_t rows = source_block.rows();
data->rows_to_join += rows;
const auto & right_key_names = table_join->getAllNames(JoinTableSide::Right);
ColumnPtrMap all_key_columns(right_key_names.size());
@ -501,7 +557,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
all_key_columns[column_name] = recursiveRemoveSparse(column->convertToFullColumnIfConst())->convertToFullColumnIfLowCardinality();
}
Block block_to_save = prepareRightBlock(source_block);
ScatteredBlock block_to_save = filterColumnsPresentInSampleBlock(source_block, savedBlockSample());
if (shrink_blocks)
block_to_save = block_to_save.shrinkToFit();
@ -515,7 +571,8 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
if (!tmp_stream)
tmp_stream.emplace(right_sample_block, tmp_data.get());
tmp_stream.value()->write(block_to_save);
chassert(!source_block.wasScattered()); /// We don't run parallel_hash for cross join
tmp_stream.value()->write(block_to_save.getSourceBlock());
return true;
}
@ -527,7 +584,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
if (storage_join_lock)
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "addBlockToJoin called when HashJoin locked to prevent updates");
assertBlocksHaveEqualStructure(data->sample_block, block_to_save, "joined block");
assertBlocksHaveEqualStructure(data->sample_block, block_to_save.getSourceBlock(), "joined block");
size_t min_bytes_to_compress = table_join->crossJoinMinBytesToCompress();
size_t min_rows_to_compress = table_join->crossJoinMinRowsToCompress();
@ -536,6 +593,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
&& ((min_bytes_to_compress && getTotalByteCount() >= min_bytes_to_compress)
|| (min_rows_to_compress && getTotalRowCount() >= min_rows_to_compress)))
{
chassert(!source_block.wasScattered()); /// We don't run parallel_hash for cross join
block_to_save = block_to_save.compress();
have_compressed = true;
}
@ -543,7 +601,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
doDebugAsserts();
data->blocks_allocated_size += block_to_save.allocatedBytes();
data->blocks.emplace_back(std::move(block_to_save));
Block * stored_block = &data->blocks.back();
const auto * stored_block = &data->blocks.back();
doDebugAsserts();
if (rows)
@ -570,7 +628,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
save_nullmap |= (*null_map)[i];
}
auto join_mask_col = JoinCommon::getColumnAsMask(source_block, onexprs[onexpr_idx].condColumnNames().second);
auto join_mask_col = JoinCommon::getColumnAsMask(source_block.getSourceBlock(), onexprs[onexpr_idx].condColumnNames().second);
/// Save blocks that do not hold conditions in ON section
ColumnUInt8::MutablePtr not_joined_map = nullptr;
if (!flag_per_row && isRightOrFull(kind) && join_mask_col.hasData())
@ -595,39 +653,45 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
bool is_inserted = false;
if (kind != JoinKind::Cross)
{
joinDispatch(kind, strictness, data->maps[onexpr_idx], prefer_use_maps_all, [&](auto kind_, auto strictness_, auto & map)
{
size_t size = HashJoinMethods<kind_, strictness_, std::decay_t<decltype(map)>>::insertFromBlockImpl(
joinDispatch(
kind,
strictness,
data->maps[onexpr_idx],
prefer_use_maps_all,
[&](auto kind_, auto strictness_, auto & map)
{
size_t size = HashJoinMethods<kind_, strictness_, std::decay_t<decltype(map)>>::insertFromBlockImpl(
*this,
data->type,
map,
rows,
key_columns,
key_sizes[onexpr_idx],
stored_block,
&stored_block->getSourceBlock(),
source_block.getSelector(),
null_map,
join_mask_col.getData(),
data->pool,
is_inserted);
if (flag_per_row)
used_flags->reinit<kind_, strictness_, std::is_same_v<std::decay_t<decltype(map)>, MapsAll>>(stored_block);
else if (is_inserted)
/// Number of buckets + 1 value from zero storage
used_flags->reinit<kind_, strictness_, std::is_same_v<std::decay_t<decltype(map)>, MapsAll>>(size + 1);
});
if (flag_per_row)
used_flags->reinit<kind_, strictness_, std::is_same_v<std::decay_t<decltype(map)>, MapsAll>>(
&stored_block->getSourceBlock());
else if (is_inserted)
/// Number of buckets + 1 value from zero storage
used_flags->reinit<kind_, strictness_, std::is_same_v<std::decay_t<decltype(map)>, MapsAll>>(size + 1);
});
}
if (!flag_per_row && save_nullmap && is_inserted)
{
data->blocks_nullmaps_allocated_size += null_map_holder->allocatedBytes();
data->blocks_nullmaps.emplace_back(stored_block, null_map_holder);
data->blocks_nullmaps_allocated_size += data->blocks_nullmaps.back().allocatedBytes();
}
if (!flag_per_row && not_joined_map && is_inserted)
{
data->blocks_nullmaps_allocated_size += not_joined_map->allocatedBytes();
data->blocks_nullmaps.emplace_back(stored_block, std::move(not_joined_map));
data->blocks_nullmaps_allocated_size += data->blocks_nullmaps.back().allocatedBytes();
}
if (!flag_per_row && !is_inserted)
@ -654,7 +718,6 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_optimize)
{
Int64 current_memory_usage = getCurrentQueryMemoryUsage();
Int64 query_memory_usage_delta = current_memory_usage - memory_usage_before_adding_blocks;
Int64 max_total_bytes_for_query = memory_usage_before_adding_blocks ? table_join->getMaxMemoryUsage() : 0;
@ -671,15 +734,19 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
* is bigger than half of all memory available for query,
* then shrink stored blocks to fit.
*/
shrink_blocks = (max_total_bytes_in_join && total_bytes_in_join > max_total_bytes_in_join / 2) ||
(max_total_bytes_for_query && query_memory_usage_delta > max_total_bytes_for_query / 2);
shrink_blocks = (max_total_bytes_in_join && total_bytes_in_join > max_total_bytes_in_join / 2)
|| (max_total_bytes_for_query && query_memory_usage_delta > max_total_bytes_for_query / 2);
if (!shrink_blocks)
return;
}
LOG_DEBUG(log, "Shrinking stored blocks, memory consumption is {} {} calculated by join, {} {} by memory tracker",
ReadableSize(total_bytes_in_join), max_total_bytes_in_join ? fmt::format("/ {}", ReadableSize(max_total_bytes_in_join)) : "",
ReadableSize(query_memory_usage_delta), max_total_bytes_for_query ? fmt::format("/ {}", ReadableSize(max_total_bytes_for_query)) : "");
LOG_DEBUG(
log,
"Shrinking stored blocks, memory consumption is {} {} calculated by join, {} {} by memory tracker",
ReadableSize(total_bytes_in_join),
max_total_bytes_in_join ? fmt::format("/ {}", ReadableSize(max_total_bytes_in_join)) : "",
ReadableSize(query_memory_usage_delta),
max_total_bytes_for_query ? fmt::format("/ {}", ReadableSize(max_total_bytes_for_query)) : "");
for (auto & stored_block : data->blocks)
{
@ -692,10 +759,13 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
if (old_size >= new_size)
{
if (data->blocks_allocated_size < old_size - new_size)
throw Exception(ErrorCodes::LOGICAL_ERROR,
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Blocks allocated size value is broken: "
"blocks_allocated_size = {}, old_size = {}, new_size = {}",
data->blocks_allocated_size, old_size, new_size);
data->blocks_allocated_size,
old_size,
new_size);
data->blocks_allocated_size -= old_size - new_size;
}
@ -710,9 +780,13 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_
Int64 new_current_memory_usage = getCurrentQueryMemoryUsage();
LOG_DEBUG(log, "Shrunk stored blocks {} freed ({} by memory tracker), new memory consumption is {} ({} by memory tracker)",
ReadableSize(total_bytes_in_join - new_total_bytes_in_join), ReadableSize(current_memory_usage - new_current_memory_usage),
ReadableSize(new_total_bytes_in_join), ReadableSize(new_current_memory_usage));
LOG_DEBUG(
log,
"Shrunk stored blocks {} freed ({} by memory tracker), new memory consumption is {} ({} by memory tracker)",
ReadableSize(total_bytes_in_join - new_total_bytes_in_join),
ReadableSize(current_memory_usage - new_current_memory_usage),
ReadableSize(new_total_bytes_in_join),
ReadableSize(new_current_memory_usage));
total_bytes_in_join = new_total_bytes_in_join;
}
@ -776,7 +850,7 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
}
};
for (const Block & block_right : data->blocks)
for (const auto & block_right : data->blocks)
{
++block_number;
if (block_number < start_right_block)
@ -784,9 +858,12 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
/// The following statement cannot be substituted with `process_right_block(!have_compressed ? block_right : block_right.decompress())`
/// because it will lead to copying of `block_right` even if its branch is taken (because common type of `block_right` and `block_right.decompress()` is `Block`).
if (!have_compressed)
process_right_block(block_right);
process_right_block(block_right.getSourceBlock());
else
process_right_block(block_right.decompress());
{
chassert(!block_right.wasScattered()); /// Compression only happens for cross join and scattering only for concurrent hash
process_right_block(block_right.getSourceBlock().decompress());
}
if (rows_added > max_joined_block_rows)
{
@ -837,9 +914,11 @@ DataTypePtr HashJoin::joinGetCheckAndGetReturnType(const DataTypes & data_types,
{
size_t num_keys = data_types.size();
if (right_table_keys.columns() != num_keys)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function joinGet{} doesn't match: passed, should be equal to {}",
toString(or_null ? "OrNull" : ""), toString(num_keys));
toString(or_null ? "OrNull" : ""),
toString(num_keys));
for (size_t i = 0; i < num_keys; ++i)
{
@ -848,8 +927,13 @@ DataTypePtr HashJoin::joinGetCheckAndGetReturnType(const DataTypes & data_types,
auto left_type = removeNullable(recursiveRemoveLowCardinality(left_type_origin));
auto right_type = removeNullable(recursiveRemoveLowCardinality(right_type_origin));
if (!left_type->equals(*right_type))
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch in joinGet key {}: "
"found type {}, while the needed type is {}", i, left_type->getName(), right_type->getName());
throw Exception(
ErrorCodes::TYPE_MISMATCH,
"Type mismatch in joinGet key {}: "
"found type {}, while the needed type is {}",
i,
left_type->getName(),
right_type->getName());
}
if (!sample_block_with_columns_to_add.has(column_name))
@ -865,8 +949,7 @@ DataTypePtr HashJoin::joinGetCheckAndGetReturnType(const DataTypes & data_types,
/// TODO: return array of values when strictness == JoinStrictness::All
ColumnWithTypeAndName HashJoin::joinGet(const Block & block, const Block & block_with_columns_to_add) const
{
bool is_valid = (strictness == JoinStrictness::Any || strictness == JoinStrictness::RightAny)
&& kind == JoinKind::Left;
bool is_valid = (strictness == JoinStrictness::Any || strictness == JoinStrictness::RightAny) && kind == JoinKind::Left;
if (!is_valid)
throw Exception(ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN, "joinGet only supports StorageJoin of type Left Any");
const auto & key_names_right = table_join->getOnlyClause().key_names_right;
@ -880,12 +963,14 @@ ColumnWithTypeAndName HashJoin::joinGet(const Block & block, const Block & block
keys.insert(std::move(key));
}
static_assert(!MapGetter<JoinKind::Left, JoinStrictness::Any, false>::flagged,
"joinGet are not protected from hash table changes between block processing");
static_assert(
!MapGetter<JoinKind::Left, JoinStrictness::Any, false>::flagged,
"joinGet are not protected from hash table changes between block processing");
std::vector<const MapsOne *> maps_vector;
maps_vector.push_back(&std::get<MapsOne>(data->maps[0]));
HashJoinMethods<JoinKind::Left, JoinStrictness::Any, MapsOne>::joinBlockImpl(*this, keys, block_with_columns_to_add, maps_vector, /* is_join_get = */ true);
HashJoinMethods<JoinKind::Left, JoinStrictness::Any, MapsOne>::joinBlockImpl(
*this, keys, block_with_columns_to_add, maps_vector, /* is_join_get = */ true);
return keys.getByPosition(keys.columns() - 1);
}
@ -906,8 +991,7 @@ void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
{
auto cond_column_name = onexpr.condColumnNames();
JoinCommon::checkTypesOfKeys(
block, onexpr.key_names_left, cond_column_name.first,
right_sample_block, onexpr.key_names_right, cond_column_name.second);
block, onexpr.key_names_left, cond_column_name.first, right_sample_block, onexpr.key_names_right, cond_column_name.second);
}
if (kind == JoinKind::Cross)
@ -916,20 +1000,85 @@ void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
return;
}
if (kind == JoinKind::Right || kind == JoinKind::Full)
{
materializeBlockInplace(block);
}
materializeColumnsFromLeftBlock(block);
bool prefer_use_maps_all = table_join->getMixedJoinExpression() != nullptr;
{
std::vector<const std::decay_t<decltype(data->maps[0])> * > maps_vector;
std::vector<const std::decay_t<decltype(data->maps[0])> *> maps_vector;
for (size_t i = 0; i < table_join->getClauses().size(); ++i)
maps_vector.push_back(&data->maps[i]);
if (joinDispatch(kind, strictness, maps_vector, prefer_use_maps_all, [&](auto kind_, auto strictness_, auto & maps_vector_)
if (joinDispatch(
kind,
strictness,
maps_vector,
prefer_use_maps_all,
[&](auto kind_, auto strictness_, auto & maps_vector_)
{
Block remaining_block;
if constexpr (std::is_same_v<std::decay_t<decltype(maps_vector_)>, std::vector<const MapsAll *>>)
{
remaining_block = HashJoinMethods<kind_, strictness_, MapsAll>::joinBlockImpl(
*this, block, sample_block_with_columns_to_add, maps_vector_);
}
else if constexpr (std::is_same_v<std::decay_t<decltype(maps_vector_)>, std::vector<const MapsOne *>>)
{
remaining_block = HashJoinMethods<kind_, strictness_, MapsOne>::joinBlockImpl(
*this, block, sample_block_with_columns_to_add, maps_vector_);
}
else if constexpr (std::is_same_v<std::decay_t<decltype(maps_vector_)>, std::vector<const MapsAsof *>>)
{
remaining_block = HashJoinMethods<kind_, strictness_, MapsAsof>::joinBlockImpl(
*this, block, sample_block_with_columns_to_add, maps_vector_);
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown maps type");
}
if (remaining_block.rows())
not_processed = std::make_shared<ExtraBlock>(ExtraBlock{std::move(remaining_block)});
else
not_processed.reset();
}))
{
/// Joined
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong JOIN combination: {} {}", strictness, kind);
}
}
void HashJoin::joinBlock(ScatteredBlock & block, ScatteredBlock & remaining_block)
{
if (!data)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released");
chassert(kind == JoinKind::Left || kind == JoinKind::Inner);
for (const auto & onexpr : table_join->getClauses())
{
auto cond_column_name = onexpr.condColumnNames();
JoinCommon::checkTypesOfKeys(
block.getSourceBlock(),
onexpr.key_names_left,
cond_column_name.first,
right_sample_block,
onexpr.key_names_right,
cond_column_name.second);
}
std::vector<const std::decay_t<decltype(data->maps[0])> *> maps_vector;
for (size_t i = 0; i < table_join->getClauses().size(); ++i)
maps_vector.push_back(&data->maps[i]);
bool prefer_use_maps_all = table_join->getMixedJoinExpression() != nullptr;
const bool joined = joinDispatch(
kind,
strictness,
maps_vector,
prefer_use_maps_all,
[&](auto kind_, auto strictness_, auto & maps_vector_)
{
Block remaining_block;
if constexpr (std::is_same_v<std::decay_t<decltype(maps_vector_)>, std::vector<const MapsAll *>>)
{
remaining_block = HashJoinMethods<kind_, strictness_, MapsAll>::joinBlockImpl(
@ -949,17 +1098,9 @@ void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown maps type");
}
if (remaining_block.rows())
not_processed = std::make_shared<ExtraBlock>(ExtraBlock{std::move(remaining_block)});
else
not_processed.reset();
}))
{
/// Joined
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong JOIN combination: {} {}", strictness, kind);
}
});
chassert(joined);
}
HashJoin::~HashJoin()
@ -1023,10 +1164,7 @@ class NotJoinedHash final : public NotJoinedBlocks::RightColumnsFiller
{
public:
NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_, bool flag_per_row_)
: parent(parent_)
, max_block_size(max_block_size_)
, flag_per_row(flag_per_row_)
, current_block_start(0)
: parent(parent_), max_block_size(max_block_size_), flag_per_row(flag_per_row_), current_block_start(0)
{
if (parent.data == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released");
@ -1043,14 +1181,12 @@ public:
}
else
{
auto fill_callback = [&](auto, auto, auto & map)
{
rows_added = fillColumnsFromMap(map, columns_right);
};
auto fill_callback = [&](auto, auto, auto & map) { rows_added = fillColumnsFromMap(map, columns_right); };
bool prefer_use_maps_all = parent.table_join->getMixedJoinExpression() != nullptr;
if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps.front(), prefer_use_maps_all, fill_callback))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown JOIN strictness '{}' (must be on of: ANY, ALL, ASOF)", parent.strictness);
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Unknown JOIN strictness '{}' (must be on of: ANY, ALL, ASOF)", parent.strictness);
}
if (!flag_per_row)
@ -1070,14 +1206,14 @@ private:
std::any position;
std::optional<HashJoin::BlockNullmapList::const_iterator> nulls_position;
std::optional<BlocksList::const_iterator> used_position;
std::optional<HashJoin::ScatteredBlocksList::const_iterator> used_position;
size_t fillColumnsFromData(const BlocksList & blocks, MutableColumns & columns_right)
size_t fillColumnsFromData(const HashJoin::ScatteredBlocksList & blocks, MutableColumns & columns_right)
{
if (!position.has_value())
position = std::make_any<BlocksList::const_iterator>(blocks.begin());
position = std::make_any<HashJoin::ScatteredBlocksList::const_iterator>(blocks.begin());
auto & block_it = std::any_cast<BlocksList::const_iterator &>(position);
auto & block_it = std::any_cast<HashJoin::ScatteredBlocksList::const_iterator &>(position);
auto end = blocks.end();
size_t rows_added = 0;
@ -1113,11 +1249,11 @@ private:
{
switch (parent.data->type)
{
#define M(TYPE) \
case HashJoin::Type::TYPE: \
return fillColumns(*maps.TYPE, columns_keys_and_right);
#define M(TYPE) \
case HashJoin::Type::TYPE: \
return fillColumns(*maps.TYPE, columns_keys_and_right);
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
#undef M
default:
throw Exception(ErrorCodes::UNSUPPORTED_JOIN_KEYS, "Unsupported JOIN keys (type: {})", parent.data->type);
}
@ -1137,11 +1273,11 @@ private:
for (auto & it = *used_position; it != end && rows_added < max_block_size; ++it)
{
const Block & mapped_block = *it;
const auto & mapped_block = *it;
for (size_t row = 0; row < mapped_block.rows(); ++row)
{
if (!parent.isUsed(&mapped_block, row))
if (!parent.isUsed(&mapped_block.getSourceBlock(), row))
{
for (size_t colnum = 0; colnum < columns_keys_and_right.size(); ++colnum)
{
@ -1194,10 +1330,10 @@ private:
for (auto & it = *nulls_position; it != end && rows_added < max_block_size; ++it)
{
const auto * block = it->first;
const auto * block = it->block;
ConstNullMapPtr nullmap = nullptr;
if (it->second)
nullmap = &assert_cast<const ColumnUInt8 &>(*it->second).getData();
if (it->column)
nullmap = &assert_cast<const ColumnUInt8 &>(*it->column).getData();
for (size_t row = 0; row < block->rows(); ++row)
{
@ -1212,9 +1348,8 @@ private:
}
};
IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block,
const Block & result_sample_block,
UInt64 max_block_size) const
IBlocksStreamPtr
HashJoin::getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const
{
if (!JoinCommon::hasNonJoinedBlocks(*table_join))
return {};
@ -1227,9 +1362,14 @@ IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block,
size_t expected_columns_count = left_columns_count + required_right_keys.columns() + sample_block_with_columns_to_add.columns();
if (expected_columns_count != result_sample_block.columns())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected number of columns in result sample block: {} instead of {} ({} + {} + {})",
result_sample_block.columns(), expected_columns_count,
left_columns_count, required_right_keys.columns(), sample_block_with_columns_to_add.columns());
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Unexpected number of columns in result sample block: {} instead of {} ({} + {} + {})",
result_sample_block.columns(),
expected_columns_count,
left_columns_count,
required_right_keys.columns(),
sample_block_with_columns_to_add.columns());
}
}
@ -1250,22 +1390,37 @@ void HashJoin::reuseJoinedData(const HashJoin & join)
bool prefer_use_maps_all = join.table_join->getMixedJoinExpression() != nullptr;
for (auto & map : data->maps)
{
joinDispatch(kind, strictness, map, prefer_use_maps_all, [this](auto kind_, auto strictness_, auto & map_)
{
used_flags->reinit<kind_, strictness_, std::is_same_v<std::decay_t<decltype(map_)>, MapsAll>>(map_.getBufferSizeInCells(data->type) + 1);
});
joinDispatch(
kind,
strictness,
map,
prefer_use_maps_all,
[this](auto kind_, auto strictness_, auto & map_)
{
used_flags->reinit<kind_, strictness_, std::is_same_v<std::decay_t<decltype(map_)>, MapsAll>>(
map_.getBufferSizeInCells(data->type) + 1);
});
}
}
BlocksList HashJoin::releaseJoinedBlocks(bool restructure)
BlocksList HashJoin::releaseJoinedBlocks(bool restructure [[maybe_unused]])
{
LOG_TRACE(log, "{}Join data is being released, {} bytes and {} rows in hash table", instance_log_id, getTotalByteCount(), getTotalRowCount());
LOG_TRACE(
log, "{}Join data is being released, {} bytes and {} rows in hash table", instance_log_id, getTotalByteCount(), getTotalRowCount());
BlocksList right_blocks = std::move(data->blocks);
auto extract_source_blocks = [](ScatteredBlocksList && blocks)
{
BlocksList result;
for (auto & block : blocks)
result.emplace_back(std::move(block).getSourceBlock());
return result;
};
ScatteredBlocksList right_blocks = std::move(data->blocks);
if (!restructure)
{
data.reset();
return right_blocks;
return extract_source_blocks(std::move(right_blocks));
}
data->maps.clear();
@ -1279,7 +1434,7 @@ BlocksList HashJoin::releaseJoinedBlocks(bool restructure)
if (!right_blocks.empty())
{
positions.reserve(right_sample_block.columns());
const Block & tmp_block = *right_blocks.begin();
const Block & tmp_block = right_blocks.begin()->getSourceBlock();
for (const auto & sample_column : right_sample_block)
{
positions.emplace_back(tmp_block.getPositionByName(sample_column.name));
@ -1287,12 +1442,12 @@ BlocksList HashJoin::releaseJoinedBlocks(bool restructure)
}
}
for (Block & saved_block : right_blocks)
for (ScatteredBlock & saved_block : right_blocks)
{
Block restored_block;
for (size_t i = 0; i < positions.size(); ++i)
{
auto & column = saved_block.getByPosition(positions[i]);
auto & column = saved_block.getSourceBlock().getByPosition(positions[i]);
correctNullabilityInplace(column, is_nullable[i]);
restored_block.insert(column);
}
@ -1318,7 +1473,8 @@ void HashJoin::validateAdditionalFilterExpression(ExpressionActionsPtr additiona
if (expression_sample_block.columns() != 1)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Unexpected expression in JOIN ON section. Expected single column, got '{}'",
expression_sample_block.dumpStructure());
}
@ -1326,7 +1482,8 @@ void HashJoin::validateAdditionalFilterExpression(ExpressionActionsPtr additiona
auto type = removeNullable(expression_sample_block.getByPosition(0).type);
if (!type->equals(*std::make_shared<DataTypeUInt8>()))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Unexpected expression in JOIN ON section. Expected boolean (UInt8), got '{}'. expression:\n{}",
expression_sample_block.getByPosition(0).type->getName(),
additional_filter_expression->dumpActions());
@ -1334,10 +1491,12 @@ void HashJoin::validateAdditionalFilterExpression(ExpressionActionsPtr additiona
bool is_supported = ((strictness == JoinStrictness::All) && (isInnerOrLeft(kind) || isRightOrFull(kind)))
|| ((strictness == JoinStrictness::Semi || strictness == JoinStrictness::Any || strictness == JoinStrictness::Anti)
&& (isLeft(kind) || isRight(kind))) || (strictness == JoinStrictness::Any && (isInner(kind)));
&& (isLeft(kind) || isRight(kind)))
|| (strictness == JoinStrictness::Any && (isInner(kind)));
if (!is_supported)
{
throw Exception(ErrorCodes::INVALID_JOIN_ON_EXPRESSION,
throw Exception(
ErrorCodes::INVALID_JOIN_ON_EXPRESSION,
"Non equi condition '{}' from JOIN ON section is supported only for ALL INNER/LEFT/FULL/RIGHT JOINs",
expression_sample_block.getByPosition(0).name);
}
@ -1353,7 +1512,6 @@ bool HashJoin::isUsed(const Block * block_ptr, size_t row_idx) const
return used_flags->getUsedSafe(block_ptr, row_idx);
}
bool HashJoin::needUsedFlagsForPerRightTableRow(std::shared_ptr<TableJoin> table_join_) const
{
if (!table_join_->oneDisjunct())
@ -1372,7 +1530,7 @@ void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
throw Exception(ErrorCodes::LOGICAL_ERROR, "Only left or inner join table can be reranged.");
else
{
auto merge_rows_into_one_block = [&](BlocksList & blocks, RowRefList & rows_ref)
auto merge_rows_into_one_block = [&](ScatteredBlocksList & blocks, RowRefList & rows_ref)
{
auto it = rows_ref.begin();
if (it.ok())
@ -1384,7 +1542,7 @@ void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
{
return;
}
auto & block = blocks.back();
auto & block = blocks.back().getSourceBlock();
size_t start_row = block.rows();
for (; it.ok(); ++it)
{
@ -1401,23 +1559,22 @@ void HashJoin::tryRerangeRightTableDataImpl(Map & map [[maybe_unused]])
}
};
auto visit_rows_map = [&](BlocksList & blocks, MapsAll & rows_map)
auto visit_rows_map = [&](ScatteredBlocksList & blocks, MapsAll & rows_map)
{
switch (data->type)
{
#define M(TYPE) \
case Type::TYPE: \
{\
rows_map.TYPE->forEachMapped([&](RowRefList & rows_ref) { merge_rows_into_one_block(blocks, rows_ref); }); \
break; \
}
#define M(TYPE) \
case Type::TYPE: { \
rows_map.TYPE->forEachMapped([&](RowRefList & rows_ref) { merge_rows_into_one_block(blocks, rows_ref); }); \
break; \
}
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
#undef M
default:
break;
}
};
BlocksList sorted_blocks;
ScatteredBlocksList sorted_blocks;
visit_rows_map(sorted_blocks, map);
doDebugAsserts();
data->blocks.swap(sorted_blocks);

View File

@ -1,9 +1,11 @@
#pragma once
#include <memory>
#include <variant>
#include <optional>
#include <algorithm>
#include <deque>
#include <memory>
#include <optional>
#include <ranges>
#include <variant>
#include <vector>
#include <Parsers/ASTTablesInSelectQuery.h>
@ -12,22 +14,19 @@
#include <Interpreters/AggregationCommon.h>
#include <Interpreters/RowRefs.h>
#include <Common/Arena.h>
#include <Common/ColumnsHashing.h>
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/FixedHashMap.h>
#include <Storages/TableLockHolder.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <QueryPipeline/SizeLimits.h>
#include <Columns/ColumnString.h>
#include <Core/Block.h>
#include <Storages/IStorage_fwd.h>
#include <Interpreters/HashJoin/ScatteredBlock.h>
#include <Interpreters/IKeyValueEntity.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <QueryPipeline/SizeLimits.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/TableLockHolder.h>
#include <Common/Arena.h>
#include <Common/ColumnsHashing.h>
#include <Common/HashTable/FixedHashMap.h>
#include <Common/HashTable/HashMap.h>
namespace DB
{
@ -142,13 +141,21 @@ public:
*/
bool addBlockToJoin(const Block & source_block_, bool check_limits) override;
/// Called directly from ConcurrentJoin::addBlockToJoin
bool addBlockToJoin(ScatteredBlock & source_block_, bool check_limits);
void checkTypesOfKeys(const Block & block) const override;
using IJoin::joinBlock;
/** Join data from the map (that was previously built by calls to addBlockToJoin) to the block with data from "left" table.
* Could be called from different threads in parallel.
*/
void joinBlock(Block & block, ExtraBlockPtr & not_processed) override;
/// Called directly from ConcurrentJoin::joinBlock
void joinBlock(ScatteredBlock & block, ScatteredBlock & remaining_block);
/// Check joinGet arguments and infer the return type.
DataTypePtr joinGetCheckAndGetReturnType(const DataTypes & data_types, const String & column_name, bool or_null) const;
@ -327,8 +334,17 @@ public:
using MapsVariant = std::variant<MapsOne, MapsAll, MapsAsof>;
using RawBlockPtr = const Block *;
using BlockNullmapList = std::deque<std::pair<RawBlockPtr, ColumnPtr>>;
using RawBlockPtr = const ScatteredBlock *;
struct NullMapHolder
{
size_t allocatedBytes() const { return !column->empty() ? column->allocatedBytes() * block->rows() / column->size() : 0; }
RawBlockPtr block;
ColumnPtr column;
};
using BlockNullmapList = std::deque<NullMapHolder>;
using ScatteredBlocksList = std::list<ScatteredBlock>;
struct RightTableData
{
@ -337,7 +353,7 @@ public:
std::vector<MapsVariant> maps;
Block sample_block; /// Block as it would appear in the BlockList
BlocksList blocks; /// Blocks of "right" table.
ScatteredBlocksList blocks; /// Blocks of "right" table.
BlockNullmapList blocks_nullmaps; /// Nullmaps for blocks of "right" table (if needed)
/// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows.
@ -389,6 +405,9 @@ public:
void setMaxJoinedBlockRows(size_t value) { max_joined_block_rows = value; }
void materializeColumnsFromLeftBlock(Block & block) const;
Block materializeColumnsFromRightBlock(Block block) const;
private:
friend class NotJoinedHash;
@ -473,5 +492,4 @@ private:
void tryRerangeRightTableDataImpl(Map & map);
void doDebugAsserts() const;
};
}

View File

@ -19,7 +19,7 @@ template <typename HashMap, typename KeyGetter>
struct Inserter
{
static ALWAYS_INLINE bool
insertOne(const HashJoin & join, HashMap & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
insertOne(const HashJoin & join, HashMap & map, KeyGetter & key_getter, const Block * stored_block, size_t i, Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
@ -31,7 +31,8 @@ struct Inserter
return false;
}
static ALWAYS_INLINE void insertAll(const HashJoin &, HashMap & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
static ALWAYS_INLINE void
insertAll(const HashJoin &, HashMap & map, KeyGetter & key_getter, const Block * stored_block, size_t i, Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
@ -45,7 +46,13 @@ struct Inserter
}
static ALWAYS_INLINE void insertAsof(
HashJoin & join, HashMap & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn & asof_column)
HashJoin & join,
HashMap & map,
KeyGetter & key_getter,
const Block * stored_block,
size_t i,
Arena & pool,
const IColumn & asof_column)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
typename HashMap::mapped_type * time_series_map = &emplace_result.getMapped();
@ -66,10 +73,10 @@ public:
HashJoin & join,
HashJoin::Type type,
MapsTemplate & maps,
size_t rows,
const ColumnRawPtrs & key_columns,
const Sizes & key_sizes,
Block * stored_block,
const Block * stored_block,
const ScatteredBlock::Selector & selector,
ConstNullMapPtr null_map,
UInt8ColumnDataPtr join_mask,
Arena & pool,
@ -83,14 +90,30 @@ public:
const Block & block_with_columns_to_add,
const MapsTemplateVector & maps_,
bool is_join_get = false);
static ScatteredBlock joinBlockImpl(
const HashJoin & join,
ScatteredBlock & block,
const Block & block_with_columns_to_add,
const MapsTemplateVector & maps_,
bool is_join_get = false);
private:
template <typename KeyGetter, bool is_asof_join>
static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes);
template <typename KeyGetter, typename HashMap>
template <typename KeyGetter, typename HashMap, typename Selector>
static size_t insertFromBlockImplTypeCase(
HashJoin & join, HashMap & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, UInt8ColumnDataPtr join_mask, Arena & pool, bool & is_inserted);
HashJoin & join,
HashMap & map,
const ColumnRawPtrs & key_columns,
const Sizes & key_sizes,
const Block * stored_block,
const Selector & selector,
ConstNullMapPtr null_map,
UInt8ColumnDataPtr join_mask,
Arena & pool,
bool & is_inserted);
template <typename AddedColumns>
static size_t switchJoinRightColumns(
@ -115,12 +138,13 @@ private:
/// Joins right table columns which indexes are present in right_indexes using specified map.
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
template <typename KeyGetter, typename Map, bool need_filter, bool flag_per_row, typename AddedColumns>
template <typename KeyGetter, typename Map, bool need_filter, bool flag_per_row, typename AddedColumns, typename Selector>
static size_t joinRightColumns(
std::vector<KeyGetter> && key_getter_vector,
const std::vector<const Map *> & mapv,
AddedColumns & added_columns,
JoinStuff::JoinUsedFlags & used_flags);
JoinStuff::JoinUsedFlags & used_flags,
const Selector & selector);
template <bool need_filter>
static void setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unused]]);

View File

@ -1,5 +1,8 @@
#pragma once
#include <type_traits>
#include <Interpreters/HashJoin/HashJoinMethods.h>
#include "Columns/IColumn.h"
#include "Interpreters/HashJoin/ScatteredBlock.h"
namespace DB
{
@ -13,10 +16,10 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::insertFromBlockImpl(
HashJoin & join,
HashJoin::Type type,
MapsTemplate & maps,
size_t rows,
const ColumnRawPtrs & key_columns,
const Sizes & key_sizes,
Block * stored_block,
const Block * stored_block,
const ScatteredBlock::Selector & selector,
ConstNullMapPtr null_map,
UInt8ColumnDataPtr join_mask,
Arena & pool,
@ -33,9 +36,14 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::insertFromBlockImpl(
#define M(TYPE) \
case HashJoin::Type::TYPE: \
return insertFromBlockImplTypeCase< \
typename KeyGetterForType<HashJoin::Type::TYPE, std::remove_reference_t<decltype(*maps.TYPE)>>::Type>( \
join, *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, join_mask, pool, is_inserted); \
if (selector.isContinuousRange()) \
return insertFromBlockImplTypeCase< \
typename KeyGetterForType<HashJoin::Type::TYPE, std::remove_reference_t<decltype(*maps.TYPE)>>::Type>( \
join, *maps.TYPE, key_columns, key_sizes, stored_block, selector.getRange(), null_map, join_mask, pool, is_inserted); \
else \
return insertFromBlockImplTypeCase< \
typename KeyGetterForType<HashJoin::Type::TYPE, std::remove_reference_t<decltype(*maps.TYPE)>>::Type>( \
join, *maps.TYPE, key_columns, key_sizes, stored_block, selector.getIndexes(), null_map, join_mask, pool, is_inserted); \
break;
APPLY_FOR_JOIN_VARIANTS(M)
@ -46,6 +54,22 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::insertFromBlockImpl(
template <JoinKind KIND, JoinStrictness STRICTNESS, typename MapsTemplate>
Block HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinBlockImpl(
const HashJoin & join, Block & block, const Block & block_with_columns_to_add, const MapsTemplateVector & maps_, bool is_join_get)
{
ScatteredBlock scattered_block{block};
auto ret = joinBlockImpl(join, scattered_block, block_with_columns_to_add, maps_, is_join_get);
ret.filterBySelector();
scattered_block.filterBySelector();
block = std::move(scattered_block.getSourceBlock());
return ret.getSourceBlock();
}
template <JoinKind KIND, JoinStrictness STRICTNESS, typename MapsTemplate>
ScatteredBlock HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinBlockImpl(
const HashJoin & join,
ScatteredBlock & block,
const Block & block_with_columns_to_add,
const MapsTemplateVector & maps_,
bool is_join_get)
{
constexpr JoinFeatures<KIND, STRICTNESS, MapsTemplate> join_features;
@ -56,16 +80,8 @@ Block HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinBlockImpl(
const auto & key_names = !is_join_get ? onexprs[i].key_names_left : onexprs[i].key_names_right;
join_on_keys.emplace_back(block, key_names, onexprs[i].condColumnNames().first, join.key_sizes[i]);
}
size_t existing_columns = block.columns();
/** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized.
* Because if they are constants, then in the "not joined" rows, they may have different values
* - default values, which can differ from the values of these constants.
*/
if constexpr (join_features.right || join_features.full)
{
materializeBlockInplace(block);
}
auto & source_block = block.getSourceBlock();
size_t existing_columns = source_block.columns();
/** For LEFT/INNER JOIN, the saved blocks do not contain keys.
* For FULL/RIGHT JOIN, the saved blocks contain keys;
@ -90,26 +106,28 @@ Block HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinBlockImpl(
else
added_columns.reserve(join_features.need_replication);
size_t num_joined = switchJoinRightColumns(maps_, added_columns, join.data->type, *join.used_flags);
const size_t num_joined = switchJoinRightColumns(maps_, added_columns, join.data->type, *join.used_flags);
/// Do not hold memory for join_on_keys anymore
added_columns.join_on_keys.clear();
Block remaining_block = sliceBlock(block, num_joined);
auto remaining_block = block.cut(num_joined);
if (is_join_get)
added_columns.buildJoinGetOutput();
else
added_columns.buildOutput();
if constexpr (join_features.need_filter)
block.filter(added_columns.filter);
block.filterBySelector();
for (size_t i = 0; i < added_columns.size(); ++i)
block.insert(added_columns.moveColumn(i));
source_block.insert(added_columns.moveColumn(i));
std::vector<size_t> right_keys_to_replicate [[maybe_unused]];
if constexpr (join_features.need_filter)
{
/// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones.
for (size_t i = 0; i < existing_columns; ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(added_columns.filter, -1);
/// Add join key columns from right block if needed using value from left table because of equality
for (size_t i = 0; i < join.required_right_keys.columns(); ++i)
{
@ -121,7 +139,7 @@ Block HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinBlockImpl(
const auto & left_column = block.getByName(join.required_right_keys_sources[i]);
const auto & right_col_name = join.getTableJoin().renamedRightColumnName(right_key.name);
auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column);
block.insert(std::move(right_col));
source_block.insert(std::move(right_col));
}
}
else if (has_required_right_keys)
@ -137,28 +155,28 @@ Block HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinBlockImpl(
const auto & left_column = block.getByName(join.required_right_keys_sources[i]);
auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column, &added_columns.filter);
block.insert(std::move(right_col));
source_block.insert(std::move(right_col));
if constexpr (join_features.need_replication)
right_keys_to_replicate.push_back(block.getPositionByName(right_col_name));
right_keys_to_replicate.push_back(source_block.getPositionByName(right_col_name));
}
}
if constexpr (join_features.need_replication)
{
std::unique_ptr<IColumn::Offsets> & offsets_to_replicate = added_columns.offsets_to_replicate;
IColumn::Offsets & offsets = *added_columns.offsets_to_replicate;
/// If ALL ... JOIN - we replicate all the columns except the new ones.
chassert(block);
chassert(offsets.size() == block.rows());
auto && columns = block.getSourceBlock().getColumns();
for (size_t i = 0; i < existing_columns; ++i)
{
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicate(*offsets_to_replicate);
}
/// Replicate additional right keys
columns[i] = columns[i]->replicate(offsets);
for (size_t pos : right_keys_to_replicate)
{
block.safeGetByPosition(pos).column = block.safeGetByPosition(pos).column->replicate(*offsets_to_replicate);
}
columns[pos] = columns[pos]->replicate(offsets);
block.getSourceBlock().setColumns(columns);
block = ScatteredBlock(std::move(block).getSourceBlock());
}
return remaining_block;
}
@ -180,14 +198,14 @@ KeyGetter HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::createKeyGetter(const
}
template <JoinKind KIND, JoinStrictness STRICTNESS, typename MapsTemplate>
template <typename KeyGetter, typename HashMap>
template <typename KeyGetter, typename HashMap, typename Selector>
size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::insertFromBlockImplTypeCase(
HashJoin & join,
HashMap & map,
size_t rows,
const ColumnRawPtrs & key_columns,
const Sizes & key_sizes,
Block * stored_block,
const Block * stored_block,
const Selector & selector,
ConstNullMapPtr null_map,
UInt8ColumnDataPtr join_mask,
Arena & pool,
@ -205,9 +223,22 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::insertFromBlockImplTypeC
/// For ALL and ASOF join always insert values
is_inserted = !mapped_one || is_asof_join;
size_t rows = 0;
if constexpr (std::is_same_v<std::decay_t<Selector>, ScatteredBlock::Indexes>)
rows = selector.getData().size();
else
rows = selector.second - selector.first;
for (size_t i = 0; i < rows; ++i)
{
if (null_map && (*null_map)[i])
size_t ind = 0;
if constexpr (std::is_same_v<std::decay_t<Selector>, ScatteredBlock::Indexes>)
ind = selector.getData()[i];
else
ind = selector.first + i;
chassert(!null_map || ind < null_map->size());
if (null_map && (*null_map)[ind])
{
/// nulls are not inserted into hash table,
/// keep them for RIGHT and FULL joins
@ -216,15 +247,16 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::insertFromBlockImplTypeC
}
/// Check condition for right table from ON section
if (join_mask && !(*join_mask)[i])
chassert(!join_mask || ind < join_mask->size());
if (join_mask && !(*join_mask)[ind])
continue;
if constexpr (is_asof_join)
Inserter<HashMap, KeyGetter>::insertAsof(join, map, key_getter, stored_block, i, pool, *asof_column);
Inserter<HashMap, KeyGetter>::insertAsof(join, map, key_getter, stored_block, ind, pool, *asof_column);
else if constexpr (mapped_one)
is_inserted |= Inserter<HashMap, KeyGetter>::insertOne(join, map, key_getter, stored_block, i, pool);
is_inserted |= Inserter<HashMap, KeyGetter>::insertOne(join, map, key_getter, stored_block, ind, pool);
else
Inserter<HashMap, KeyGetter>::insertAll(join, map, key_getter, stored_block, i, pool);
Inserter<HashMap, KeyGetter>::insertAll(join, map, key_getter, stored_block, ind, pool);
}
return map.getBufferSizeInCells();
}
@ -318,26 +350,43 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumnsSwitchMu
if (added_columns.additional_filter_expression)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Additional filter expression is not supported for this JOIN");
return mapv.size() > 1 ? joinRightColumns<KeyGetter, Map, need_filter, true>(
std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags)
: joinRightColumns<KeyGetter, Map, need_filter, false>(
std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
auto & block = added_columns.src_block;
if (block.getSelector().isContinuousRange())
{
if (mapv.size() > 1)
return joinRightColumns<KeyGetter, Map, need_filter, true>(
std::move(key_getter_vector), mapv, added_columns, used_flags, block.getSelector().getRange());
else
return joinRightColumns<KeyGetter, Map, need_filter, false>(
std::move(key_getter_vector), mapv, added_columns, used_flags, block.getSelector().getRange());
}
else
{
if (mapv.size() > 1)
return joinRightColumns<KeyGetter, Map, need_filter, true>(
std::move(key_getter_vector), mapv, added_columns, used_flags, block.getSelector().getIndexes());
else
return joinRightColumns<KeyGetter, Map, need_filter, false>(
std::move(key_getter_vector), mapv, added_columns, used_flags, block.getSelector().getIndexes());
}
}
/// Joins right table columns which indexes are present in right_indexes using specified map.
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
template <JoinKind KIND, JoinStrictness STRICTNESS, typename MapsTemplate>
template <typename KeyGetter, typename Map, bool need_filter, bool flag_per_row, typename AddedColumns>
template <typename KeyGetter, typename Map, bool need_filter, bool flag_per_row, typename AddedColumns, typename Selector>
size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumns(
std::vector<KeyGetter> && key_getter_vector,
const std::vector<const Map *> & mapv,
AddedColumns & added_columns,
JoinStuff::JoinUsedFlags & used_flags)
JoinStuff::JoinUsedFlags & used_flags,
const Selector & selector)
{
constexpr JoinFeatures<KIND, STRICTNESS, MapsTemplate> join_features;
size_t rows = added_columns.rows_to_add;
auto & block = added_columns.src_block;
size_t rows = block.rows();
if constexpr (need_filter)
added_columns.filter = IColumn::Filter(rows, 0);
if constexpr (!flag_per_row && (STRICTNESS == JoinStrictness::All || (STRICTNESS == JoinStrictness::Semi && KIND == JoinKind::Right)))
@ -353,6 +402,12 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumns(
size_t i = 0;
for (; i < rows; ++i)
{
size_t ind = 0;
if constexpr (std::is_same_v<std::decay_t<Selector>, ScatteredBlock::Indexes>)
ind = selector.getData()[i];
else
ind = selector.first + i;
if constexpr (join_features.need_replication)
{
if (unlikely(current_offset >= max_joined_block_rows))
@ -368,12 +423,12 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumns(
for (size_t onexpr_idx = 0; onexpr_idx < added_columns.join_on_keys.size(); ++onexpr_idx)
{
const auto & join_keys = added_columns.join_on_keys[onexpr_idx];
if (join_keys.null_map && (*join_keys.null_map)[i])
if (join_keys.null_map && (*join_keys.null_map)[ind])
continue;
bool row_acceptable = !join_keys.isRowFiltered(i);
bool row_acceptable = !join_keys.isRowFiltered(ind);
using FindResult = typename KeyGetter::FindResult;
auto find_result = row_acceptable ? key_getter_vector[onexpr_idx].findKey(*(mapv[onexpr_idx]), i, pool) : FindResult();
auto find_result = row_acceptable ? key_getter_vector[onexpr_idx].findKey(*(mapv[onexpr_idx]), ind, pool) : FindResult();
if (find_result.isFound())
{
@ -383,7 +438,7 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumns(
{
const IColumn & left_asof_key = added_columns.leftAsofKey();
auto row_ref = mapped->findAsof(left_asof_key, i);
auto row_ref = mapped->findAsof(left_asof_key, ind);
if (row_ref && row_ref->block)
{
setUsed<need_filter>(added_columns.filter, i);
@ -848,23 +903,6 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumnsWithAddt
return left_row_iter;
}
template <JoinKind KIND, JoinStrictness STRICTNESS, typename MapsTemplate>
Block HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::sliceBlock(Block & block, size_t num_rows)
{
size_t total_rows = block.rows();
if (num_rows >= total_rows)
return {};
size_t remaining_rows = total_rows - num_rows;
Block remaining_block = block.cloneEmpty();
for (size_t i = 0; i < block.columns(); ++i)
{
auto & col = block.getByPosition(i);
remaining_block.getByPosition(i).column = col.column->cut(num_rows, remaining_rows);
col.column = col.column->cut(0, num_rows);
}
return remaining_block;
}
template <JoinKind KIND, JoinStrictness STRICTNESS, typename MapsTemplate>
ColumnWithTypeAndName HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::copyLeftKeyColumnToRight(
const DataTypePtr & right_key_type,

View File

@ -0,0 +1,337 @@
#pragma once
#include <Columns/ColumnsNumber.h>
#include <Columns/IColumn.h>
#include <Core/Block.h>
#include <base/defines.h>
#include <Common/PODArray.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
#include <boost/noncopyable.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace detail
{
/// Previously ConcurrentHashJoin used IColumn::scatter method to split input blocks to sub-blocks by hash.
/// To avoid copying of columns, we introduce a new class ScatteredBlock that holds a block and a selector.
/// So now each threads get a copy of the source input block and a selector that tells which rows are meant for the given thread.
/// Selector can be seen as just a list of indexes or rows that belong to the given thread.
/// One optimization is to use a continuous range instead of explicit list of indexes when selector contains all indexes from [L, R).
class Selector
{
public:
using Range = std::pair<size_t, size_t>;
using Indexes = ColumnUInt64;
using IndexesPtr = ColumnUInt64::MutablePtr;
/// [begin, end)
Selector(size_t begin, size_t end) : data(Range{begin, end}) { }
Selector() : Selector(0, 0) { }
explicit Selector(size_t size) : Selector(0, size) { }
explicit Selector(IndexesPtr && selector_) : data(initializeFromSelector(std::move(selector_))) { }
class Iterator
{
public:
using iterator_category = std::forward_iterator_tag;
using value_type = size_t;
using difference_type = std::ptrdiff_t;
using pointer = size_t *;
using reference = size_t &;
Iterator(const Selector & selector_, size_t idx_) : selector(selector_), idx(idx_) { }
size_t ALWAYS_INLINE operator*() const
{
chassert(idx < selector.size());
if (idx >= selector.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Index {} out of range size {}", idx, selector.size());
return selector[idx];
}
Iterator & ALWAYS_INLINE operator++()
{
if (idx >= selector.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Index {} out of range size {}", idx, selector.size());
++idx;
return *this;
}
bool ALWAYS_INLINE operator!=(const Iterator & other) const { return idx != other.idx; }
private:
const Selector & selector;
size_t idx;
};
Iterator begin() const { return Iterator(*this, 0); }
Iterator end() const { return Iterator(*this, size()); }
size_t ALWAYS_INLINE operator[](size_t idx) const
{
chassert(idx < size());
if (std::holds_alternative<Range>(data))
{
const auto range = std::get<Range>(data);
return range.first + idx;
}
else
{
return std::get<IndexesPtr>(data)->getData()[idx];
}
}
size_t size() const
{
if (std::holds_alternative<Range>(data))
{
const auto range = std::get<Range>(data);
return range.second - range.first;
}
else
{
return std::get<IndexesPtr>(data)->size();
}
}
/// First selector contains first `num_rows` rows, second selector contains the rest
std::pair<Selector, Selector> split(size_t num_rows)
{
chassert(num_rows <= size());
if (std::holds_alternative<Range>(data))
{
const auto range = std::get<Range>(data);
if (num_rows == 0)
return {Selector(), Selector{range.first, range.second}};
if (num_rows == size())
return {Selector{range.first, range.second}, Selector()};
return {Selector(range.first, range.first + num_rows), Selector(range.first + num_rows, range.second)};
}
else
{
const auto & selector = std::get<IndexesPtr>(data)->getData();
auto && left = Selector(Indexes::create(selector.begin(), selector.begin() + num_rows));
auto && right = Selector(Indexes::create(selector.begin() + num_rows, selector.end()));
return {std::move(left), std::move(right)};
}
}
bool isContinuousRange() const { return std::holds_alternative<Range>(data); }
Range getRange() const
{
chassert(isContinuousRange());
return std::get<Range>(data);
}
const Indexes & getIndexes() const
{
chassert(!isContinuousRange());
return *std::get<IndexesPtr>(data);
}
std::string toString() const
{
if (std::holds_alternative<Range>(data))
{
const auto range = std::get<Range>(data);
return fmt::format("[{}, {})", range.first, range.second);
}
else
{
const auto & selector = std::get<IndexesPtr>(data)->getData();
return fmt::format("({})", fmt::join(selector, ","));
}
}
private:
using Data = std::variant<Range, IndexesPtr>;
Data initializeFromSelector(IndexesPtr && selector_)
{
const auto & selector = selector_->getData();
if (selector.empty())
return Range{0, 0};
/// selector represents continuous range
if (selector.back() == selector.front() + selector.size() - 1)
return Range{selector.front(), selector.front() + selector.size()};
return std::move(selector_);
}
Data data;
};
}
/// Source block + list of selected rows. See detail::Selector for more details.
struct ScatteredBlock : private boost::noncopyable
{
using Selector = detail::Selector;
using Indexes = Selector::Indexes;
using IndexesPtr = Selector::IndexesPtr;
ScatteredBlock() = default;
explicit ScatteredBlock(Block block_) : block(std::move(block_)), selector(block.rows()) { }
ScatteredBlock(Block block_, IndexesPtr && selector_) : block(std::move(block_)), selector(std::move(selector_)) { }
ScatteredBlock(Block block_, Selector selector_) : block(std::move(block_)), selector(std::move(selector_)) { }
ScatteredBlock(ScatteredBlock && other) noexcept : block(std::move(other.block)), selector(std::move(other.selector))
{
other.block.clear();
other.selector = {};
}
ScatteredBlock & operator=(ScatteredBlock && other) noexcept
{
if (this != &other)
{
block = std::move(other.block);
selector = std::move(other.selector);
other.block.clear();
other.selector = {};
}
return *this;
}
Block & getSourceBlock() & { return block; }
const Block & getSourceBlock() const & { return block; }
Block && getSourceBlock() && { return std::move(block); }
const auto & getSelector() const { return selector; }
explicit operator bool() const { return !!block; }
/// Accounts only selected rows
size_t rows() const { return selector.size(); }
/// In case of scattered block we account proportional share of the source block bytes.
/// For not scattered columns it will be trivial (bytes * N / N) calculation.
size_t allocatedBytes() const { return block.rows() ? block.allocatedBytes() * rows() / block.rows() : 0; }
ScatteredBlock shrinkToFit() const
{
if (wasScattered())
{
LOG_TEST(getLogger("HashJoin"), "shrinkToFit() is not supported for ScatteredBlock because blocks are shared");
return ScatteredBlock{block};
}
return ScatteredBlock{block.shrinkToFit()};
}
ScatteredBlock compress() const
{
if (wasScattered())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot compress ScatteredBlock");
return ScatteredBlock{block.compress()};
}
const auto & getByPosition(size_t i) const { return block.getByPosition(i); }
/// Whether `block` was scattered, i.e. `selector` != [0, block.rows())
bool wasScattered() const
{
return selector.size() != block.rows();
}
const ColumnWithTypeAndName & getByName(const std::string & name) const
{
return block.getByName(name);
}
/// Filters selector by mask discarding rows for which filter is false
void filter(const IColumn::Filter & filter)
{
chassert(block && block.rows() == filter.size());
IndexesPtr new_selector = Indexes::create();
new_selector->reserve(selector.size());
std::copy_if(
selector.begin(), selector.end(), std::back_inserter(new_selector->getData()), [&](size_t idx) { return filter[idx]; });
selector = Selector(std::move(new_selector));
}
/// Applies `selector` to the `block` in-place
void filterBySelector()
{
if (!block || !wasScattered())
return;
if (selector.isContinuousRange())
{
const auto range = selector.getRange();
for (size_t i = 0; i < block.columns(); ++i)
{
auto & col = block.getByPosition(i);
col.column = col.column->cut(range.first, range.second - range.first);
}
selector = Selector(block.rows());
return;
}
/// The general case when `selector` is non-trivial (likely the result of applying a filter)
auto columns = block.getColumns();
for (auto & col : columns)
col = col->index(selector.getIndexes(), /*limit*/ 0);
block.setColumns(columns);
selector = Selector(block.rows());
}
/// Cut first `num_rows` rows from `block` in place and returns block with remaining rows
ScatteredBlock cut(size_t num_rows)
{
SCOPE_EXIT(filterBySelector());
if (num_rows >= rows())
return ScatteredBlock{Block{}};
chassert(block);
auto && [first_num_rows, remaining_selector] = selector.split(num_rows);
auto remaining = ScatteredBlock{block, std::move(remaining_selector)};
selector = std::move(first_num_rows);
return remaining;
}
private:
Block block;
Selector selector;
};
using ScatteredBlocks = std::vector<ScatteredBlock>;
struct ExtraScatteredBlocks
{
ScatteredBlocks remaining_blocks;
bool rows() const
{
return std::ranges::any_of(remaining_blocks, [](const auto & bl) { return bl.rows(); });
}
};
}

View File

@ -2,9 +2,10 @@
#include <memory>
#include <Core/Names.h>
#include <Core/Block.h>
#include <Columns/IColumn.h>
#include <Core/Block.h>
#include <Core/Names.h>
#include <Interpreters/HashJoin/ScatteredBlock.h>
#include <Common/Exception.h>
namespace DB
@ -89,6 +90,13 @@ public:
/// Could be called from different threads in parallel.
virtual void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) = 0;
virtual bool isScatteredJoin() const { return false; }
virtual void joinBlock(
[[maybe_unused]] Block & block, [[maybe_unused]] ExtraScatteredBlocks & extra_blocks, [[maybe_unused]] std::vector<Block> & res)
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "joinBlock is not supported for {}", getName());
}
/** Set/Get totals for right table
* Keep "totals" (separate part of dataset, see WITH TOTALS) to use later.
*/

View File

@ -1887,6 +1887,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
joined_plan->getCurrentHeader(),
expressions.join,
settings[Setting::max_block_size],
0,
max_streams,
analysis_result.optimize_read_in_order);

View File

@ -104,6 +104,7 @@ namespace Setting
extern const SettingsBool optimize_move_to_prewhere;
extern const SettingsBool optimize_move_to_prewhere_if_final;
extern const SettingsBool use_concurrency_control;
extern const SettingsUInt64 min_joined_block_size_bytes;
}
namespace ErrorCodes
@ -1649,6 +1650,7 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(
right_plan.getCurrentHeader(),
std::move(join_algorithm),
settings[Setting::max_block_size],
settings[Setting::min_joined_block_size_bytes],
settings[Setting::max_threads],
false /*optimize_read_in_order*/);

View File

@ -1,9 +1,10 @@
#include <Processors/QueryPlan/JoinStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <IO/Operators.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/TableJoin.h>
#include <IO/Operators.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Processors/Transforms/SquashingTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/JSONBuilder.h>
#include <Common/typeid_cast.h>
@ -43,9 +44,14 @@ JoinStep::JoinStep(
const Header & right_header_,
JoinPtr join_,
size_t max_block_size_,
size_t min_block_size_bytes_,
size_t max_streams_,
bool keep_left_read_in_order_)
: join(std::move(join_)), max_block_size(max_block_size_), max_streams(max_streams_), keep_left_read_in_order(keep_left_read_in_order_)
: join(std::move(join_))
, max_block_size(max_block_size_)
, min_block_size_bytes(min_block_size_bytes_)
, max_streams(max_streams_)
, keep_left_read_in_order(keep_left_read_in_order_)
{
updateInputHeaders({left_header_, right_header_});
}
@ -63,15 +69,24 @@ QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines
return joined_pipeline;
}
return QueryPipelineBuilder::joinPipelinesRightLeft(
auto pipeline = QueryPipelineBuilder::joinPipelinesRightLeft(
std::move(pipelines[0]),
std::move(pipelines[1]),
join,
*output_header,
max_block_size,
min_block_size_bytes,
max_streams,
keep_left_read_in_order,
&processors);
if (join->supportParallelJoin())
{
pipeline->addSimpleTransform([&](const Block & header)
{ return std::make_shared<SimpleSquashingChunksTransform>(header, 0, min_block_size_bytes); });
}
return pipeline;
}
bool JoinStep::allowPushDownToRight() const

View File

@ -18,6 +18,7 @@ public:
const Header & right_header_,
JoinPtr join_,
size_t max_block_size_,
size_t min_block_size_bytes_,
size_t max_streams_,
bool keep_left_read_in_order_);
@ -39,6 +40,7 @@ private:
JoinPtr join;
size_t max_block_size;
size_t min_block_size_bytes;
size_t max_streams;
bool keep_left_read_in_order;
};

View File

@ -75,8 +75,9 @@ IProcessor::Status JoiningTransform::prepare()
/// Output if has data.
if (has_output)
{
output.push(std::move(output_chunk));
has_output = false;
output.push(std::move(output_chunks.front()));
output_chunks.pop_front();
has_output = !output_chunks.empty();
return Status::PortFull;
}
@ -122,10 +123,10 @@ void JoiningTransform::work()
{
if (has_input)
{
chassert(output_chunks.empty());
transform(input_chunk);
output_chunk.swap(input_chunk);
has_input = not_processed != nullptr;
has_output = !output_chunk.empty();
has_output = !output_chunks.empty();
}
else
{
@ -153,8 +154,7 @@ void JoiningTransform::work()
return;
}
auto rows = block.rows();
output_chunk.setColumns(block.getColumns(), rows);
output_chunks.emplace_back(block.getColumns(), block.rows());
has_output = true;
}
}
@ -173,7 +173,7 @@ void JoiningTransform::transform(Chunk & chunk)
}
}
Block block;
Blocks res;
if (on_totals)
{
const auto & left_totals = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns());
@ -184,39 +184,58 @@ void JoiningTransform::transform(Chunk & chunk)
if (default_totals && !right_totals)
return;
block = outputs.front().getHeader().cloneEmpty();
JoinCommon::joinTotals(left_totals, right_totals, join->getTableJoin(), block);
res.emplace_back();
res.back() = outputs.front().getHeader().cloneEmpty();
JoinCommon::joinTotals(left_totals, right_totals, join->getTableJoin(), res.back());
}
else
block = readExecute(chunk);
auto num_rows = block.rows();
chunk.setColumns(block.getColumns(), num_rows);
res = readExecute(chunk);
std::ranges::for_each(res, [this](Block & block) { output_chunks.emplace_back(block.getColumns(), block.rows()); });
}
Block JoiningTransform::readExecute(Chunk & chunk)
Blocks JoiningTransform::readExecute(Chunk & chunk)
{
Block res;
Blocks res;
Block block;
auto join_block = [&]()
{
if (join->isScatteredJoin())
{
join->joinBlock(block, remaining_blocks, res);
if (remaining_blocks.rows())
not_processed = std::make_shared<ExtraBlock>();
else
not_processed.reset();
}
else
{
join->joinBlock(block, not_processed);
res.push_back(std::move(block));
}
};
if (!not_processed)
{
if (chunk.hasColumns())
res = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns());
block = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns());
if (res)
join->joinBlock(res, not_processed);
if (block)
join_block();
}
else if (not_processed->empty()) /// There's not processed data inside expression.
{
if (chunk.hasColumns())
res = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns());
block = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns());
not_processed.reset();
join->joinBlock(res, not_processed);
join_block();
}
else
{
res = std::move(not_processed->block);
join->joinBlock(res, not_processed);
block = std::move(not_processed->block);
join_block();
}
return res;

View File

@ -1,6 +1,10 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Interpreters/HashJoin/ScatteredBlock.h>
#include <Processors/Chunk.h>
#include <Processors/IProcessor.h>
#include <deque>
#include <memory>
namespace DB
@ -66,7 +70,7 @@ protected:
private:
Chunk input_chunk;
Chunk output_chunk;
std::deque<Chunk> output_chunks;
bool has_input = false;
bool has_output = false;
bool stop_reading = false;
@ -80,13 +84,16 @@ private:
bool default_totals;
bool initialized = false;
/// Only used with ConcurrentHashJoin
ExtraScatteredBlocks remaining_blocks;
ExtraBlockPtr not_processed;
FinishCounterPtr finish_counter;
IBlocksStreamPtr non_joined_blocks;
size_t max_block_size;
Block readExecute(Chunk & chunk);
Blocks readExecute(Chunk & chunk);
};
/// Fills Join with block from right table.

View File

@ -78,7 +78,7 @@ Chunk SimpleSquashingChunksTransform::generate()
bool SimpleSquashingChunksTransform::canGenerate()
{
return !squashed_chunk.empty();
return squashed_chunk.hasRows();
}
Chunk SimpleSquashingChunksTransform::getRemaining()

View File

@ -26,6 +26,7 @@
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/PasteJoinTransform.h>
#include <Processors/Transforms/SquashingTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
#include <QueryPipeline/narrowPipe.h>
#include <Common/CurrentThread.h>
@ -385,6 +386,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
JoinPtr join,
const Block & output_header,
size_t max_block_size,
size_t min_block_size_bytes,
size_t max_streams,
bool keep_left_read_in_order,
Processors * collected_processors)
@ -441,9 +443,12 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
Processors processors;
for (auto & outport : outports)
{
auto squashing = std::make_shared<SimpleSquashingChunksTransform>(right->getHeader(), 0, min_block_size_bytes);
connect(*outport, squashing->getInputs().front());
processors.emplace_back(squashing);
auto adding_joined = std::make_shared<FillingRightJoinSideTransform>(right->getHeader(), join);
connect(*outport, adding_joined->getInputs().front());
processors.emplace_back(adding_joined);
connect(squashing->getOutputPort(), adding_joined->getInputs().front());
processors.emplace_back(std::move(adding_joined));
}
return processors;
};
@ -497,10 +502,13 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
Block left_header = left->getHeader();
for (size_t i = 0; i < num_streams; ++i)
{
auto squashing = std::make_shared<SimpleSquashingChunksTransform>(left->getHeader(), 0, min_block_size_bytes);
connect(**lit, squashing->getInputs().front());
auto joining = std::make_shared<JoiningTransform>(
left_header, output_header, join, max_block_size, false, default_totals, finish_counter);
connect(**lit, joining->getInputs().front());
connect(squashing->getOutputPort(), joining->getInputs().front());
connect(**rit, joining->getInputs().back());
if (delayed_root)
{
@ -532,6 +540,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
if (collected_processors)
collected_processors->emplace_back(joining);
left->pipe.processors->emplace_back(std::move(squashing));
left->pipe.processors->emplace_back(std::move(joining));
}

View File

@ -126,6 +126,7 @@ public:
JoinPtr join,
const Block & output_header,
size_t max_block_size,
size_t min_block_size_bytes,
size_t max_streams,
bool keep_left_read_in_order,
Processors * collected_processors = nullptr);

View File

@ -5,11 +5,12 @@
<fill_query>INSERT INTO test SELECT number % 10000, number % 10000, number % 10000 FROM numbers(10000000)</fill_query>
<fill_query>INSERT INTO test1 SELECT number % 1000 , number % 1000, number % 1000 FROM numbers(100000)</fill_query>
<query tag='INNER'>SELECT MAX(test1.a) FROM test INNER JOIN test1 on test.b = test1.b</query>
<query tag='INNER'>SELECT MAX(test1.a) FROM test INNER JOIN test1 on test.b = test1.b settings join_algorithm='hash'</query>
<query tag='INNER'>SELECT MAX(test1.a) FROM test INNER JOIN test1 on test.b = test1.b settings join_algorithm='parallel_hash'</query>
<query tag='LEFT'>SELECT MAX(test1.a) FROM test LEFT JOIN test1 on test.b = test1.b</query>
<query tag='RIGHT'>SELECT MAX(test1.a) FROM test RIGHT JOIN test1 on test.b = test1.b</query>
<query tag='FULL'>SELECT MAX(test1.a) FROM test FULL JOIN test1 on test.b = test1.b</query>
<drop_query>DROP TABLE IF EXISTS test</drop_query>
<drop_query>DROP TABLE IF EXISTS test1</drop_query>
</test>
</test>

View File

@ -9,6 +9,7 @@
<name>settings</name>
<values>
<value>join_algorithm='hash'</value>
<value>join_algorithm='parallel_hash'</value>
<value>join_algorithm='grace_hash'</value>
</values>
</substitution>

View File

@ -1,3 +1,4 @@
<test>
<query tag='INNER'>SELECT count(c) FROM numbers_mt(100000000) AS a INNER JOIN (SELECT number, toString(number) AS c FROM numbers(2000000)) AS b ON (a.number % 10000000) = b.number</query>
<query tag='INNER'>SELECT count(c) FROM numbers_mt(100000000) AS a INNER JOIN (SELECT number, toString(number) AS c FROM numbers(2000000)) AS b ON (a.number % 10000000) = b.number settings join_algorithm='hash'</query>
<query tag='INNER'>SELECT count(c) FROM numbers_mt(100000000) AS a INNER JOIN (SELECT number, toString(number) AS c FROM numbers(2000000)) AS b ON (a.number % 10000000) = b.number settings join_algorithm='parallel_hash'</query>
</test>

View File

@ -13,7 +13,8 @@
<query short='1' tag='ANY LEFT IN'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query tag='INNER'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 = 20042</query>
<query tag='INNER KEY'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='INNER KEY'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0 settings join_algorithm='hash'</query>
<query tag='INNER KEY'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0 settings join_algorithm='parallel_hash'</query>
<query tag='INNER ON'>SELECT COUNT() FROM ints l INNER JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query tag='INNER IN'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>

File diff suppressed because one or more lines are too long

View File

@ -478,6 +478,8 @@ for query_index in queries_to_run:
client_seconds = time.perf_counter() - start_seconds
print(f"client-time\t{query_index}\t{client_seconds}\t{server_seconds}")
median = [statistics.median(t) for t in all_server_times]
print(f"median\t{query_index}\t{median[0]}")
# Run additional profiling queries to collect profile data, but only if test times appeared to be different.
# We have to do it after normal runs because otherwise it will affect test statistics too much
@ -491,7 +493,6 @@ for query_index in queries_to_run:
pvalue = stats.ttest_ind(
all_server_times[0], all_server_times[1], equal_var=False
).pvalue
median = [statistics.median(t) for t in all_server_times]
# Keep this consistent with the value used in report. Should eventually move
# to (median[1] - median[0]) / min(median), which is compatible with "times"
# difference we use in report (max(median) / min(median)).

View File

@ -9,7 +9,8 @@
<max_threads>1</max_threads>
</settings>
<query>SELECT 1 FROM hits_10m_words AS l ANY LEFT JOIN hits_10m_words AS r USING (word) FORMAT Null</query>
<query>SELECT 1 FROM hits_10m_words AS l ANY LEFT JOIN hits_10m_words AS r USING (word) FORMAT Null settings join_algorithm='hash'</query>
<query>SELECT 1 FROM hits_10m_words AS l ANY LEFT JOIN hits_10m_words AS r USING (word) FORMAT Null settings join_algorithm='parallel_hash'</query>
<query>SELECT 1 FROM strings AS l ANY LEFT JOIN strings AS r USING (short) FORMAT Null</query>
<query>SELECT 1 FROM strings AS l ANY LEFT JOIN strings AS r USING (long) FORMAT Null</query>

View File

@ -1,2 +1,3 @@
SET min_joined_block_size_bytes = 0;
SET max_block_size = 6;
SELECT blockSize() bs FROM (SELECT 1 s) js1 ALL RIGHT JOIN (SELECT arrayJoin([2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3]) s) js2 USING (s) GROUP BY bs ORDER BY bs;

View File

@ -7,6 +7,7 @@ CREATE TABLE t2 (id Int) ENGINE = MergeTree ORDER BY id;
INSERT INTO t1 VALUES (1), (2);
INSERT INTO t2 SELECT number + 5 AS x FROM (SELECT * FROM system.numbers LIMIT 1111);
SET min_joined_block_size_bytes = 0;
SET max_block_size = 100;
SELECT count() == 2222 FROM t1 JOIN t2 ON 1 = 1;

View File

@ -8,6 +8,8 @@ CREATE table t2 (a UInt64) ENGINE = Memory;
INSERT INTO t2 SELECT number % 2 FROM numbers(10);
SET min_joined_block_size_bytes = 0;
-- block size is always multiple of 5 because we have 5 rows for each key in right table
-- we do not split rows corresponding to the same key
@ -27,11 +29,9 @@ SETTINGS max_joined_block_size_rows = 10;
SELECT '--';
-- parallel_hash doen't support max_joined_block_size_rows
SET join_algorithm = 'parallel_hash';
SELECT max(bs) > 10, b FROM (
SELECT max(bs) <= 10, b FROM (
SELECT blockSize() as bs, * FROM t1 JOIN t2 ON t1.a = t2.a
) GROUP BY b
ORDER BY b