Merge pull request #64818 from bigo-sg/too_large_hash_join

Refactor `HashJoin` to avoid `too large translation units`
This commit is contained in:
vdimir 2024-07-02 09:12:02 +00:00 committed by GitHub
commit 59bf6f16bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 3120 additions and 2935 deletions

View File

@ -210,6 +210,7 @@ add_object_library(clickhouse_analyzer_passes Analyzer/Resolve)
add_object_library(clickhouse_planner Planner)
add_object_library(clickhouse_interpreters Interpreters)
add_object_library(clickhouse_interpreters_cache Interpreters/Cache)
add_object_library(clickhouse_interpreters_hash_join Interpreters/HashJoin)
add_object_library(clickhouse_interpreters_access Interpreters/Access)
add_object_library(clickhouse_interpreters_mysql Interpreters/MySQL)
add_object_library(clickhouse_interpreters_clusterproxy Interpreters/ClusterProxy)

View File

@ -5,7 +5,7 @@
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Storages/StorageJoin.h>
#include <Storages/TableLockHolder.h>

View File

@ -5,7 +5,7 @@
#include <optional>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/IJoin.h>
#include <base/defines.h>
#include <base/types.h>

View File

@ -24,7 +24,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/GraceHashJoin.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/JoinSwitcher.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/DirectJoin.h>

View File

@ -3,7 +3,7 @@
#include <Formats/formatBlock.h>
#include <Interpreters/Context.h>
#include <Interpreters/GraceHashJoin.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <base/FnTraits.h>

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,138 @@
#include <Interpreters/HashJoin/AddedColumns.h>
#include <Interpreters/NullableUtils.h>
namespace DB
{
JoinOnKeyColumns::JoinOnKeyColumns(const Block & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_)
: key_names(key_names_)
, materialized_keys_holder(JoinCommon::materializeColumns(
block, key_names)) /// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them.
, key_columns(JoinCommon::getRawPointers(materialized_keys_holder))
, null_map(nullptr)
, null_map_holder(extractNestedColumnsAndNullMap(key_columns, null_map))
, join_mask_column(JoinCommon::getColumnAsMask(block, cond_column_name))
, key_sizes(key_sizes_)
{
}
template<> void AddedColumns<false>::buildOutput()
{
}
template<>
void AddedColumns<true>::buildOutput()
{
for (size_t i = 0; i < this->size(); ++i)
{
auto& col = columns[i];
size_t default_count = 0;
auto apply_default = [&]()
{
if (default_count > 0)
{
JoinCommon::addDefaultValues(*col, type_name[i].type, default_count);
default_count = 0;
}
};
for (size_t j = 0; j < lazy_output.blocks.size(); ++j)
{
if (!lazy_output.blocks[j])
{
default_count++;
continue;
}
apply_default();
const auto & column_from_block = reinterpret_cast<const Block *>(lazy_output.blocks[j])->getByPosition(right_indexes[i]);
/// If it's joinGetOrNull, we need to wrap not-nullable columns in StorageJoin.
if (is_join_get)
{
if (auto * nullable_col = typeid_cast<ColumnNullable *>(col.get());
nullable_col && !column_from_block.column->isNullable())
{
nullable_col->insertFromNotNullable(*column_from_block.column, lazy_output.row_nums[j]);
continue;
}
}
col->insertFrom(*column_from_block.column, lazy_output.row_nums[j]);
}
apply_default();
}
}
template<>
void AddedColumns<false>::applyLazyDefaults()
{
if (lazy_defaults_count)
{
for (size_t j = 0, size = right_indexes.size(); j < size; ++j)
JoinCommon::addDefaultValues(*columns[j], type_name[j].type, lazy_defaults_count);
lazy_defaults_count = 0;
}
}
template<>
void AddedColumns<true>::applyLazyDefaults()
{
}
template <>
void AddedColumns<false>::appendFromBlock(const Block & block, size_t row_num,const bool has_defaults)
{
if (has_defaults)
applyLazyDefaults();
#ifndef NDEBUG
checkBlock(block);
#endif
if (is_join_get)
{
size_t right_indexes_size = right_indexes.size();
for (size_t j = 0; j < right_indexes_size; ++j)
{
const auto & column_from_block = block.getByPosition(right_indexes[j]);
if (auto * nullable_col = nullable_column_ptrs[j])
nullable_col->insertFromNotNullable(*column_from_block.column, row_num);
else
columns[j]->insertFrom(*column_from_block.column, row_num);
}
}
else
{
size_t right_indexes_size = right_indexes.size();
for (size_t j = 0; j < right_indexes_size; ++j)
{
const auto & column_from_block = block.getByPosition(right_indexes[j]);
columns[j]->insertFrom(*column_from_block.column, row_num);
}
}
}
template <>
void AddedColumns<true>::appendFromBlock(const Block & block, size_t row_num, bool)
{
#ifndef NDEBUG
checkBlock(block);
#endif
if (has_columns_to_add)
{
lazy_output.blocks.emplace_back(reinterpret_cast<UInt64>(&block));
lazy_output.row_nums.emplace_back(static_cast<uint32_t>(row_num));
}
}
template<>
void AddedColumns<false>::appendDefaultRow()
{
++lazy_defaults_count;
}
template<>
void AddedColumns<true>::appendDefaultRow()
{
if (has_columns_to_add)
{
lazy_output.blocks.emplace_back(0);
lazy_output.row_nums.emplace_back(0);
}
}
}

View File

@ -0,0 +1,226 @@
#pragma once
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/TableJoin.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
struct JoinOnKeyColumns
{
Names key_names;
Columns materialized_keys_holder;
ColumnRawPtrs key_columns;
ConstNullMapPtr null_map;
ColumnPtr null_map_holder;
/// Only rows where mask == true can be joined
JoinCommon::JoinMask join_mask_column;
Sizes key_sizes;
explicit JoinOnKeyColumns(const Block & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_);
bool isRowFiltered(size_t i) const { return join_mask_column.isRowFiltered(i); }
};
template <bool lazy>
class AddedColumns
{
public:
struct TypeAndName
{
DataTypePtr type;
String name;
String qualified_name;
TypeAndName(DataTypePtr type_, const String & name_, const String & qualified_name_)
: type(type_), name(name_), qualified_name(qualified_name_)
{
}
};
struct LazyOutput
{
PaddedPODArray<UInt64> blocks;
PaddedPODArray<UInt32> row_nums;
};
AddedColumns(
const Block & left_block_,
const Block & block_with_columns_to_add,
const Block & saved_block_sample,
const HashJoin & join,
std::vector<JoinOnKeyColumns> && join_on_keys_,
ExpressionActionsPtr additional_filter_expression_,
bool is_asof_join,
bool is_join_get_)
: left_block(left_block_)
, join_on_keys(join_on_keys_)
, additional_filter_expression(additional_filter_expression_)
, rows_to_add(left_block.rows())
, is_join_get(is_join_get_)
{
size_t num_columns_to_add = block_with_columns_to_add.columns();
if (is_asof_join)
++num_columns_to_add;
if constexpr (lazy)
{
has_columns_to_add = num_columns_to_add > 0;
lazy_output.blocks.reserve(rows_to_add);
lazy_output.row_nums.reserve(rows_to_add);
}
columns.reserve(num_columns_to_add);
type_name.reserve(num_columns_to_add);
right_indexes.reserve(num_columns_to_add);
for (const auto & src_column : block_with_columns_to_add)
{
/// Column names `src_column.name` and `qualified_name` can differ for StorageJoin,
/// because it uses not qualified right block column names
auto qualified_name = join.getTableJoin().renamedRightColumnName(src_column.name);
/// Don't insert column if it's in left block
if (!left_block.has(qualified_name))
addColumn(src_column, qualified_name);
}
if (is_asof_join)
{
assert(join_on_keys.size() == 1);
const ColumnWithTypeAndName & right_asof_column = join.rightAsofKeyColumn();
addColumn(right_asof_column, right_asof_column.name);
left_asof_key = join_on_keys[0].key_columns.back();
}
for (auto & tn : type_name)
right_indexes.push_back(saved_block_sample.getPositionByName(tn.name));
nullable_column_ptrs.resize(right_indexes.size(), nullptr);
for (size_t j = 0; j < right_indexes.size(); ++j)
{
/** If it's joinGetOrNull, we will have nullable columns in result block
* even if right column is not nullable in storage (saved_block_sample).
*/
const auto & saved_column = saved_block_sample.getByPosition(right_indexes[j]).column;
if (columns[j]->isNullable() && !saved_column->isNullable())
nullable_column_ptrs[j] = typeid_cast<ColumnNullable *>(columns[j].get());
}
}
size_t size() const { return columns.size(); }
void buildOutput();
ColumnWithTypeAndName moveColumn(size_t i)
{
return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].type, type_name[i].qualified_name);
}
void appendFromBlock(const Block & block, size_t row_num, bool has_default);
void appendDefaultRow();
void applyLazyDefaults();
const IColumn & leftAsofKey() const { return *left_asof_key; }
Block left_block;
std::vector<JoinOnKeyColumns> join_on_keys;
ExpressionActionsPtr additional_filter_expression;
size_t max_joined_block_rows = 0;
size_t rows_to_add;
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
bool need_filter = false;
IColumn::Filter filter;
void reserve(bool need_replicate)
{
if (!max_joined_block_rows)
return;
/// Do not allow big allocations when user set max_joined_block_rows to huge value
size_t reserve_size = std::min<size_t>(max_joined_block_rows, DEFAULT_BLOCK_SIZE * 2);
if (need_replicate)
/// Reserve 10% more space for columns, because some rows can be repeated
reserve_size = static_cast<size_t>(1.1 * reserve_size);
for (auto & column : columns)
column->reserve(reserve_size);
}
private:
void checkBlock(const Block & block)
{
for (size_t j = 0; j < right_indexes.size(); ++j)
{
const auto * column_from_block = block.getByPosition(right_indexes[j]).column.get();
const auto * dest_column = columns[j].get();
if (auto * nullable_col = nullable_column_ptrs[j])
{
if (!is_join_get)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Columns {} and {} can have different nullability only in joinGetOrNull",
dest_column->getName(), column_from_block->getName());
dest_column = nullable_col->getNestedColumnPtr().get();
}
/** Using dest_column->structureEquals(*column_from_block) will not work for low cardinality columns,
* because dictionaries can be different, while calling insertFrom on them is safe, for example:
* ColumnLowCardinality(size = 0, UInt8(size = 0), ColumnUnique(size = 1, String(size = 1)))
* and
* ColumnLowCardinality(size = 0, UInt16(size = 0), ColumnUnique(size = 1, String(size = 1)))
*/
if (typeid(*dest_column) != typeid(*column_from_block))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns {} and {} have different types {} and {}",
dest_column->getName(), column_from_block->getName(),
demangle(typeid(*dest_column).name()), demangle(typeid(*column_from_block).name()));
}
}
MutableColumns columns;
bool is_join_get;
std::vector<size_t> right_indexes;
std::vector<TypeAndName> type_name;
std::vector<ColumnNullable *> nullable_column_ptrs;
size_t lazy_defaults_count = 0;
/// for lazy
// The default row is represented by an empty RowRef, so that fixed-size blocks can be generated sequentially,
// default_count cannot represent the position of the row
LazyOutput lazy_output;
bool has_columns_to_add;
/// for ASOF
const IColumn * left_asof_key = nullptr;
void addColumn(const ColumnWithTypeAndName & src_column, const std::string & qualified_name)
{
columns.push_back(src_column.column->cloneEmpty());
columns.back()->reserve(src_column.column->size());
type_name.emplace_back(src_column.type, src_column.name, qualified_name);
}
};
/// Adapter class to pass into addFoundRowAll
/// In joinRightColumnsWithAdditionalFilter we don't want to add rows directly into AddedColumns,
/// because they need to be filtered by additional_filter_expression.
class PreSelectedRows : public std::vector<RowRef>
{
public:
void appendFromBlock(const Block & block, size_t row_num, bool /* has_default */) { this->emplace_back(&block, row_num); }
};
}

View File

@ -0,0 +1,11 @@
#include <Interpreters/HashJoin/HashJoinMethods.h>
namespace DB
{
template class HashJoinMethods<JoinKind::Full, JoinStrictness::RightAny, HashJoin::MapsOne>;
template class HashJoinMethods<JoinKind::Full, JoinStrictness::Any, HashJoin::MapsAll>;
template class HashJoinMethods<JoinKind::Full, JoinStrictness::All, HashJoin::MapsAll>;
template class HashJoinMethods<JoinKind::Full, JoinStrictness::Semi, HashJoin::MapsOne>;
template class HashJoinMethods<JoinKind::Full, JoinStrictness::Anti, HashJoin::MapsOne>;
template class HashJoinMethods<JoinKind::Full, JoinStrictness::Asof, HashJoin::MapsAsof>;
}

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,6 @@
#pragma once
#include <memory>
#include <variant>
#include <optional>
#include <deque>
@ -36,47 +37,13 @@ class ExpressionActions;
namespace JoinStuff
{
/// Flags needed to implement RIGHT and FULL JOINs.
class JoinUsedFlags
{
using RawBlockPtr = const Block *;
using UsedFlagsForBlock = std::vector<std::atomic_bool>;
/// For multiple dijuncts each empty in hashmap stores flags for particular block
/// For single dicunct we store all flags in `nullptr` entry, index is the offset in FindResult
std::unordered_map<RawBlockPtr, UsedFlagsForBlock> flags;
bool need_flags;
public:
/// Update size for vector with flags.
/// Calling this method invalidates existing flags.
/// It can be called several times, but all of them should happen before using this structure.
template <JoinKind KIND, JoinStrictness STRICTNESS>
void reinit(size_t size_);
template <JoinKind KIND, JoinStrictness STRICTNESS>
void reinit(const Block * block_ptr);
bool getUsedSafe(size_t i) const;
bool getUsedSafe(const Block * block_ptr, size_t row_idx) const;
template <bool use_flags, bool flag_per_row, typename T>
void setUsed(const T & f);
template <bool use_flags, bool flag_per_row>
void setUsed(const Block * block, size_t row_num, size_t offset);
template <bool use_flags, bool flag_per_row, typename T>
bool getUsed(const T & f);
template <bool use_flags, bool flag_per_row, typename T>
bool setUsedOnce(const T & f);
};
class JoinUsedFlags;
}
template <JoinKind KIND, JoinStrictness STRICTNESS, typename MapsTemplate>
class HashJoinMethods;
/** Data structure for implementation of JOIN.
* It is just a hash table: keys -> rows of joined ("right") table.
* Additionally, CROSS JOIN is supported: instead of hash table, it use just set of blocks without keys.
@ -400,8 +367,8 @@ public:
const Block & savedBlockSample() const { return data->sample_block; }
bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); }
bool isUsed(const Block * block_ptr, size_t row_idx) const { return used_flags.getUsedSafe(block_ptr, row_idx); }
bool isUsed(size_t off) const;
bool isUsed(const Block * block_ptr, size_t row_idx) const;
void debugKeys() const;
@ -414,6 +381,9 @@ private:
friend class JoinSource;
template <JoinKind KIND, JoinStrictness STRICTNESS, typename MapsTemplate>
friend class HashJoinMethods;
std::shared_ptr<TableJoin> table_join;
const JoinKind kind;
const JoinStrictness strictness;
@ -433,8 +403,7 @@ private:
/// Number of this flags equals to hashtable buffer size (plus one for zero value).
/// Changes in hash table broke correspondence,
/// so we must guarantee constantness of hash table during HashJoin lifetime (using method setLock)
mutable JoinStuff::JoinUsedFlags used_flags;
mutable std::unique_ptr<JoinStuff::JoinUsedFlags> used_flags;
RightTableDataPtr data;
bool have_compressed = false;
@ -476,13 +445,6 @@ private:
void initRightBlockStructure(Block & saved_block_sample);
template <JoinKind KIND, JoinStrictness STRICTNESS, typename Maps>
Block joinBlockImpl(
Block & block,
const Block & block_with_columns_to_add,
const std::vector<const Maps *> & maps_,
bool is_join_get = false) const;
void joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed) const;
static Type chooseMethod(JoinKind kind, const ColumnRawPtrs & key_columns, Sizes & key_sizes);

View File

@ -0,0 +1,954 @@
#pragma once
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/HashJoin/KeyGetter.h>
#include <Interpreters/HashJoin/JoinFeatures.h>
#include <Interpreters/HashJoin/AddedColumns.h>
#include <Interpreters/HashJoin/KnowRowsHolder.h>
#include <Interpreters//HashJoin/JoinUsedFlags.h>
#include <Interpreters/JoinUtils.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/castColumn.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNSUPPORTED_JOIN_KEYS;
extern const int LOGICAL_ERROR;
}
/// Inserting an element into a hash table of the form `key -> reference to a string`, which will then be used by JOIN.
template <typename HashMap, typename KeyGetter>
struct Inserter
{
static ALWAYS_INLINE bool
insertOne(const HashJoin & join, HashMap & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
if (emplace_result.isInserted() || join.anyTakeLastRow())
{
new (&emplace_result.getMapped()) typename HashMap::mapped_type(stored_block, i);
return true;
}
return false;
}
static ALWAYS_INLINE void insertAll(const HashJoin &, HashMap & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
if (emplace_result.isInserted())
new (&emplace_result.getMapped()) typename HashMap::mapped_type(stored_block, i);
else
{
/// The first element of the list is stored in the value of the hash table, the rest in the pool.
emplace_result.getMapped().insert({stored_block, i}, pool);
}
}
static ALWAYS_INLINE void insertAsof(
HashJoin & join, HashMap & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn & asof_column)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
typename HashMap::mapped_type * time_series_map = &emplace_result.getMapped();
TypeIndex asof_type = *join.getAsofType();
if (emplace_result.isInserted())
time_series_map = new (time_series_map) typename HashMap::mapped_type(createAsofRowRef(asof_type, join.getAsofInequality()));
(*time_series_map)->insert(asof_column, stored_block, i);
}
};
/// MapsTemplate is one of MapsOne, MapsAll and MapsAsof
template <JoinKind KIND, JoinStrictness STRICTNESS, typename MapsTemplate>
class HashJoinMethods
{
public:
static size_t insertFromBlockImpl(
HashJoin & join,
HashJoin::Type type,
MapsTemplate & maps,
size_t rows,
const ColumnRawPtrs & key_columns,
const Sizes & key_sizes,
Block * stored_block,
ConstNullMapPtr null_map,
UInt8ColumnDataPtr join_mask,
Arena & pool,
bool & is_inserted)
{
switch (type)
{
case HashJoin::Type::EMPTY:
[[fallthrough]];
case HashJoin::Type::CROSS:
/// Do nothing. We will only save block, and it is enough
is_inserted = true;
return 0;
#define M(TYPE) \
case HashJoin::Type::TYPE: \
return insertFromBlockImplTypeCase<typename KeyGetterForType<HashJoin::Type::TYPE, std::remove_reference_t<decltype(*maps.TYPE)>>::Type>(\
join, *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, join_mask, pool, is_inserted); \
break;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
}
using MapsTemplateVector = std::vector<const MapsTemplate *>;
static Block joinBlockImpl(
const HashJoin & join,
Block & block,
const Block & block_with_columns_to_add,
const MapsTemplateVector & maps_,
bool is_join_get = false)
{
constexpr JoinFeatures<KIND, STRICTNESS> join_features;
std::vector<JoinOnKeyColumns> join_on_keys;
const auto & onexprs = join.table_join->getClauses();
for (size_t i = 0; i < onexprs.size(); ++i)
{
const auto & key_names = !is_join_get ? onexprs[i].key_names_left : onexprs[i].key_names_right;
join_on_keys.emplace_back(block, key_names, onexprs[i].condColumnNames().first, join.key_sizes[i]);
}
size_t existing_columns = block.columns();
/** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized.
* Because if they are constants, then in the "not joined" rows, they may have different values
* - default values, which can differ from the values of these constants.
*/
if constexpr (join_features.right || join_features.full)
{
materializeBlockInplace(block);
}
/** For LEFT/INNER JOIN, the saved blocks do not contain keys.
* For FULL/RIGHT JOIN, the saved blocks contain keys;
* but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped.
* For ASOF, the last column is used as the ASOF column
*/
AddedColumns<!join_features.is_any_join> added_columns(
block,
block_with_columns_to_add,
join.savedBlockSample(),
join,
std::move(join_on_keys),
join.table_join->getMixedJoinExpression(),
join_features.is_asof_join,
is_join_get);
bool has_required_right_keys = (join.required_right_keys.columns() != 0);
added_columns.need_filter = join_features.need_filter || has_required_right_keys;
added_columns.max_joined_block_rows = join.max_joined_block_rows;
if (!added_columns.max_joined_block_rows)
added_columns.max_joined_block_rows = std::numeric_limits<size_t>::max();
else
added_columns.reserve(join_features.need_replication);
size_t num_joined = switchJoinRightColumns(maps_, added_columns, join.data->type, *join.used_flags);
/// Do not hold memory for join_on_keys anymore
added_columns.join_on_keys.clear();
Block remaining_block = sliceBlock(block, num_joined);
added_columns.buildOutput();
for (size_t i = 0; i < added_columns.size(); ++i)
block.insert(added_columns.moveColumn(i));
std::vector<size_t> right_keys_to_replicate [[maybe_unused]];
if constexpr (join_features.need_filter)
{
/// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones.
for (size_t i = 0; i < existing_columns; ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(added_columns.filter, -1);
/// Add join key columns from right block if needed using value from left table because of equality
for (size_t i = 0; i < join.required_right_keys.columns(); ++i)
{
const auto & right_key = join.required_right_keys.getByPosition(i);
/// asof column is already in block.
if (join_features.is_asof_join && right_key.name == join.table_join->getOnlyClause().key_names_right.back())
continue;
const auto & left_column = block.getByName(join.required_right_keys_sources[i]);
const auto & right_col_name = join.getTableJoin().renamedRightColumnName(right_key.name);
auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column);
block.insert(std::move(right_col));
}
}
else if (has_required_right_keys)
{
/// Add join key columns from right block if needed.
for (size_t i = 0; i < join.required_right_keys.columns(); ++i)
{
const auto & right_key = join.required_right_keys.getByPosition(i);
auto right_col_name = join.getTableJoin().renamedRightColumnName(right_key.name);
/// asof column is already in block.
if (join_features.is_asof_join && right_key.name == join.table_join->getOnlyClause().key_names_right.back())
continue;
const auto & left_column = block.getByName(join.required_right_keys_sources[i]);
auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column, &added_columns.filter);
block.insert(std::move(right_col));
if constexpr (join_features.need_replication)
right_keys_to_replicate.push_back(block.getPositionByName(right_col_name));
}
}
if constexpr (join_features.need_replication)
{
std::unique_ptr<IColumn::Offsets> & offsets_to_replicate = added_columns.offsets_to_replicate;
/// If ALL ... JOIN - we replicate all the columns except the new ones.
for (size_t i = 0; i < existing_columns; ++i)
{
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicate(*offsets_to_replicate);
}
/// Replicate additional right keys
for (size_t pos : right_keys_to_replicate)
{
block.safeGetByPosition(pos).column = block.safeGetByPosition(pos).column->replicate(*offsets_to_replicate);
}
}
return remaining_block;
}
private:
template <typename KeyGetter, bool is_asof_join>
static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes)
{
if constexpr (is_asof_join)
{
auto key_column_copy = key_columns;
auto key_size_copy = key_sizes;
key_column_copy.pop_back();
key_size_copy.pop_back();
return KeyGetter(key_column_copy, key_size_copy, nullptr);
}
else
return KeyGetter(key_columns, key_sizes, nullptr);
}
template <typename KeyGetter, typename HashMap>
static size_t NO_INLINE insertFromBlockImplTypeCase(
HashJoin & join, HashMap & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, UInt8ColumnDataPtr join_mask, Arena & pool, bool & is_inserted)
{
[[maybe_unused]] constexpr bool mapped_one = std::is_same_v<typename HashMap::mapped_type, RowRef>;
constexpr bool is_asof_join = STRICTNESS == JoinStrictness::Asof;
const IColumn * asof_column [[maybe_unused]] = nullptr;
if constexpr (is_asof_join)
asof_column = key_columns.back();
auto key_getter = createKeyGetter<KeyGetter, is_asof_join>(key_columns, key_sizes);
/// For ALL and ASOF join always insert values
is_inserted = !mapped_one || is_asof_join;
for (size_t i = 0; i < rows; ++i)
{
if (null_map && (*null_map)[i])
{
/// nulls are not inserted into hash table,
/// keep them for RIGHT and FULL joins
is_inserted = true;
continue;
}
/// Check condition for right table from ON section
if (join_mask && !(*join_mask)[i])
continue;
if constexpr (is_asof_join)
Inserter<HashMap, KeyGetter>::insertAsof(join, map, key_getter, stored_block, i, pool, *asof_column);
else if constexpr (mapped_one)
is_inserted |= Inserter<HashMap, KeyGetter>::insertOne(join, map, key_getter, stored_block, i, pool);
else
Inserter<HashMap, KeyGetter>::insertAll(join, map, key_getter, stored_block, i, pool);
}
return map.getBufferSizeInCells();
}
template <typename AddedColumns>
static size_t switchJoinRightColumns(
const std::vector<const MapsTemplate *> & mapv,
AddedColumns & added_columns,
HashJoin::Type type,
JoinStuff::JoinUsedFlags & used_flags)
{
constexpr bool is_asof_join = STRICTNESS == JoinStrictness::Asof;
switch (type)
{
case HashJoin::Type::EMPTY: {
if constexpr (!is_asof_join)
{
using KeyGetter = KeyGetterEmpty<typename MapsTemplate::MappedType>;
std::vector<KeyGetter> key_getter_vector;
key_getter_vector.emplace_back();
using MapTypeVal = typename KeyGetter::MappedType;
std::vector<const MapTypeVal *> a_map_type_vector;
a_map_type_vector.emplace_back();
return joinRightColumnsSwitchNullability<KeyGetter>(
std::move(key_getter_vector), a_map_type_vector, added_columns, used_flags);
}
throw Exception(ErrorCodes::UNSUPPORTED_JOIN_KEYS, "Unsupported JOIN keys. Type: {}", type);
}
#define M(TYPE) \
case HashJoin::Type::TYPE: \
{ \
using MapTypeVal = const typename std::remove_reference_t<decltype(MapsTemplate::TYPE)>::element_type; \
using KeyGetter = typename KeyGetterForType<HashJoin::Type::TYPE, MapTypeVal>::Type; \
std::vector<const MapTypeVal *> a_map_type_vector(mapv.size()); \
std::vector<KeyGetter> key_getter_vector; \
for (size_t d = 0; d < added_columns.join_on_keys.size(); ++d) \
{ \
const auto & join_on_key = added_columns.join_on_keys[d]; \
a_map_type_vector[d] = mapv[d]->TYPE.get(); \
key_getter_vector.push_back(std::move(createKeyGetter<KeyGetter, is_asof_join>(join_on_key.key_columns, join_on_key.key_sizes))); \
} \
return joinRightColumnsSwitchNullability<KeyGetter>( \
std::move(key_getter_vector), a_map_type_vector, added_columns, used_flags); \
}
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
default:
throw Exception(ErrorCodes::UNSUPPORTED_JOIN_KEYS, "Unsupported JOIN keys (type: {})", type);
}
}
template <typename KeyGetter, typename Map, typename AddedColumns>
static size_t joinRightColumnsSwitchNullability(
std::vector<KeyGetter> && key_getter_vector,
const std::vector<const Map *> & mapv,
AddedColumns & added_columns,
JoinStuff::JoinUsedFlags & used_flags)
{
if (added_columns.need_filter)
{
return joinRightColumnsSwitchMultipleDisjuncts<KeyGetter, Map, true>(
std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
}
else
{
return joinRightColumnsSwitchMultipleDisjuncts<KeyGetter, Map, false>(
std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
}
}
template <typename KeyGetter, typename Map, bool need_filter, typename AddedColumns>
static size_t joinRightColumnsSwitchMultipleDisjuncts(
std::vector<KeyGetter> && key_getter_vector,
const std::vector<const Map *> & mapv,
AddedColumns & added_columns,
JoinStuff::JoinUsedFlags & used_flags)
{
constexpr JoinFeatures<KIND, STRICTNESS> join_features;
if constexpr (join_features.is_all_join)
{
if (added_columns.additional_filter_expression)
{
bool mark_per_row_used = join_features.right || join_features.full || mapv.size() > 1;
return joinRightColumnsWithAddtitionalFilter<KeyGetter, Map, join_features.need_replication>(
std::forward<std::vector<KeyGetter>>(key_getter_vector),
mapv,
added_columns,
used_flags,
need_filter,
join_features.need_flags,
join_features.add_missing,
mark_per_row_used);
}
}
if (added_columns.additional_filter_expression)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Additional filter expression is not supported for this JOIN");
return mapv.size() > 1 ? joinRightColumns<KeyGetter, Map, need_filter, true>(
std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags)
: joinRightColumns<KeyGetter, Map, need_filter, false>(
std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
}
/// Joins right table columns which indexes are present in right_indexes using specified map.
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
template <typename KeyGetter, typename Map, bool need_filter, bool flag_per_row, typename AddedColumns>
static size_t joinRightColumns(
std::vector<KeyGetter> && key_getter_vector,
const std::vector<const Map *> & mapv,
AddedColumns & added_columns,
JoinStuff::JoinUsedFlags & used_flags)
{
constexpr JoinFeatures<KIND, STRICTNESS> join_features;
size_t rows = added_columns.rows_to_add;
if constexpr (need_filter)
added_columns.filter = IColumn::Filter(rows, 0);
Arena pool;
if constexpr (join_features.need_replication)
added_columns.offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows);
IColumn::Offset current_offset = 0;
size_t max_joined_block_rows = added_columns.max_joined_block_rows;
size_t i = 0;
for (; i < rows; ++i)
{
if constexpr (join_features.need_replication)
{
if (unlikely(current_offset >= max_joined_block_rows))
{
added_columns.offsets_to_replicate->resize_assume_reserved(i);
added_columns.filter.resize_assume_reserved(i);
break;
}
}
bool right_row_found = false;
KnownRowsHolder<flag_per_row> known_rows;
for (size_t onexpr_idx = 0; onexpr_idx < added_columns.join_on_keys.size(); ++onexpr_idx)
{
const auto & join_keys = added_columns.join_on_keys[onexpr_idx];
if (join_keys.null_map && (*join_keys.null_map)[i])
continue;
bool row_acceptable = !join_keys.isRowFiltered(i);
using FindResult = typename KeyGetter::FindResult;
auto find_result = row_acceptable ? key_getter_vector[onexpr_idx].findKey(*(mapv[onexpr_idx]), i, pool) : FindResult();
if (find_result.isFound())
{
right_row_found = true;
auto & mapped = find_result.getMapped();
if constexpr (join_features.is_asof_join)
{
const IColumn & left_asof_key = added_columns.leftAsofKey();
auto row_ref = mapped->findAsof(left_asof_key, i);
if (row_ref.block)
{
setUsed<need_filter>(added_columns.filter, i);
if constexpr (flag_per_row)
used_flags.template setUsed<join_features.need_flags, flag_per_row>(row_ref.block, row_ref.row_num, 0);
else
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
added_columns.appendFromBlock(*row_ref.block, row_ref.row_num, join_features.add_missing);
}
else
addNotFoundRow<join_features.add_missing, join_features.need_replication>(added_columns, current_offset);
}
else if constexpr (join_features.is_all_join)
{
setUsed<need_filter>(added_columns.filter, i);
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr;
addFoundRowAll<Map, join_features.add_missing>(mapped, added_columns, current_offset, known_rows, used_flags_opt);
}
else if constexpr ((join_features.is_any_join || join_features.is_semi_join) && join_features.right)
{
/// Use first appeared left key + it needs left columns replication
bool used_once = used_flags.template setUsedOnce<join_features.need_flags, flag_per_row>(find_result);
if (used_once)
{
auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr;
setUsed<need_filter>(added_columns.filter, i);
addFoundRowAll<Map, join_features.add_missing>(
mapped, added_columns, current_offset, known_rows, used_flags_opt);
}
}
else if constexpr (join_features.is_any_join && KIND == JoinKind::Inner)
{
bool used_once = used_flags.template setUsedOnce<join_features.need_flags, flag_per_row>(find_result);
/// Use first appeared left key only
if (used_once)
{
setUsed<need_filter>(added_columns.filter, i);
added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing);
}
break;
}
else if constexpr (join_features.is_any_join && join_features.full)
{
/// TODO
}
else if constexpr (join_features.is_anti_join)
{
if constexpr (join_features.right && join_features.need_flags)
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
}
else /// ANY LEFT, SEMI LEFT, old ANY (RightAny)
{
setUsed<need_filter>(added_columns.filter, i);
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing);
if (join_features.is_any_or_semi_join)
{
break;
}
}
}
}
if (!right_row_found)
{
if constexpr (join_features.is_anti_join && join_features.left)
setUsed<need_filter>(added_columns.filter, i);
addNotFoundRow<join_features.add_missing, join_features.need_replication>(added_columns, current_offset);
}
if constexpr (join_features.need_replication)
{
(*added_columns.offsets_to_replicate)[i] = current_offset;
}
}
added_columns.applyLazyDefaults();
return i;
}
template <bool need_filter>
static void setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unused]])
{
if constexpr (need_filter)
filter[pos] = 1;
}
template <typename AddedColumns>
static ColumnPtr buildAdditionalFilter(
size_t left_start_row,
const std::vector<RowRef> & selected_rows,
const std::vector<size_t> & row_replicate_offset,
AddedColumns & added_columns)
{
ColumnPtr result_column;
do
{
if (selected_rows.empty())
{
result_column = ColumnUInt8::create();
break;
}
const Block & sample_right_block = *selected_rows.begin()->block;
if (!sample_right_block || !added_columns.additional_filter_expression)
{
auto filter = ColumnUInt8::create();
filter->insertMany(1, selected_rows.size());
result_column = std::move(filter);
break;
}
auto required_cols = added_columns.additional_filter_expression->getRequiredColumnsWithTypes();
if (required_cols.empty())
{
Block block;
added_columns.additional_filter_expression->execute(block);
result_column = block.getByPosition(0).column->cloneResized(selected_rows.size());
break;
}
NameSet required_column_names;
for (auto & col : required_cols)
required_column_names.insert(col.name);
Block executed_block;
size_t right_col_pos = 0;
for (const auto & col : sample_right_block.getColumnsWithTypeAndName())
{
if (required_column_names.contains(col.name))
{
auto new_col = col.column->cloneEmpty();
for (const auto & selected_row : selected_rows)
{
const auto & src_col = selected_row.block->getByPosition(right_col_pos);
new_col->insertFrom(*src_col.column, selected_row.row_num);
}
executed_block.insert({std::move(new_col), col.type, col.name});
}
right_col_pos += 1;
}
if (!executed_block)
{
result_column = ColumnUInt8::create();
break;
}
for (const auto & col_name : required_column_names)
{
const auto * src_col = added_columns.left_block.findByName(col_name);
if (!src_col)
continue;
auto new_col = src_col->column->cloneEmpty();
size_t prev_left_offset = 0;
for (size_t i = 1; i < row_replicate_offset.size(); ++i)
{
const size_t & left_offset = row_replicate_offset[i];
size_t rows = left_offset - prev_left_offset;
if (rows)
new_col->insertManyFrom(*src_col->column, left_start_row + i - 1, rows);
prev_left_offset = left_offset;
}
executed_block.insert({std::move(new_col), src_col->type, col_name});
}
if (!executed_block)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"required columns: [{}], but not found any in left/right table. right table: {}, left table: {}",
required_cols.toString(),
sample_right_block.dumpNames(),
added_columns.left_block.dumpNames());
}
for (const auto & col : executed_block.getColumnsWithTypeAndName())
if (!col.column || !col.type)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal nullptr column in input block: {}", executed_block.dumpStructure());
added_columns.additional_filter_expression->execute(executed_block);
result_column = executed_block.getByPosition(0).column->convertToFullColumnIfConst();
executed_block.clear();
} while (false);
result_column = result_column->convertToFullIfNeeded();
if (result_column->isNullable())
{
/// Convert Nullable(UInt8) to UInt8 ensuring that nulls are zeros
/// Trying to avoid copying data, since we are the only owner of the column.
ColumnPtr mask_column = assert_cast<const ColumnNullable &>(*result_column).getNullMapColumnPtr();
MutableColumnPtr mutable_column;
{
ColumnPtr nested_column = assert_cast<const ColumnNullable &>(*result_column).getNestedColumnPtr();
result_column.reset();
mutable_column = IColumn::mutate(std::move(nested_column));
}
auto & column_data = assert_cast<ColumnUInt8 &>(*mutable_column).getData();
const auto & mask_column_data = assert_cast<const ColumnUInt8 &>(*mask_column).getData();
for (size_t i = 0; i < column_data.size(); ++i)
{
if (mask_column_data[i])
column_data[i] = 0;
}
return mutable_column;
}
return result_column;
}
/// First to collect all matched rows refs by join keys, then filter out rows which are not true in additional filter expression.
template <typename KeyGetter, typename Map, bool need_replication, typename AddedColumns>
static size_t joinRightColumnsWithAddtitionalFilter(
std::vector<KeyGetter> && key_getter_vector,
const std::vector<const Map *> & mapv,
AddedColumns & added_columns,
JoinStuff::JoinUsedFlags & used_flags [[maybe_unused]],
bool need_filter [[maybe_unused]],
bool need_flags [[maybe_unused]],
bool add_missing [[maybe_unused]],
bool flag_per_row [[maybe_unused]])
{
size_t left_block_rows = added_columns.rows_to_add;
if (need_filter)
added_columns.filter = IColumn::Filter(left_block_rows, 0);
std::unique_ptr<Arena> pool;
if constexpr (need_replication)
added_columns.offsets_to_replicate = std::make_unique<IColumn::Offsets>(left_block_rows);
std::vector<size_t> row_replicate_offset;
row_replicate_offset.reserve(left_block_rows);
using FindResult = typename KeyGetter::FindResult;
size_t max_joined_block_rows = added_columns.max_joined_block_rows;
size_t left_row_iter = 0;
PreSelectedRows selected_rows;
selected_rows.reserve(left_block_rows);
std::vector<FindResult> find_results;
find_results.reserve(left_block_rows);
bool exceeded_max_block_rows = false;
IColumn::Offset total_added_rows = 0;
IColumn::Offset current_added_rows = 0;
auto collect_keys_matched_rows_refs = [&]()
{
pool = std::make_unique<Arena>();
find_results.clear();
row_replicate_offset.clear();
row_replicate_offset.push_back(0);
current_added_rows = 0;
selected_rows.clear();
for (; left_row_iter < left_block_rows; ++left_row_iter)
{
if constexpr (need_replication)
{
if (unlikely(total_added_rows + current_added_rows >= max_joined_block_rows))
{
break;
}
}
KnownRowsHolder<true> all_flag_known_rows;
KnownRowsHolder<false> single_flag_know_rows;
for (size_t join_clause_idx = 0; join_clause_idx < added_columns.join_on_keys.size(); ++join_clause_idx)
{
const auto & join_keys = added_columns.join_on_keys[join_clause_idx];
if (join_keys.null_map && (*join_keys.null_map)[left_row_iter])
continue;
bool row_acceptable = !join_keys.isRowFiltered(left_row_iter);
auto find_result = row_acceptable
? key_getter_vector[join_clause_idx].findKey(*(mapv[join_clause_idx]), left_row_iter, *pool)
: FindResult();
if (find_result.isFound())
{
auto & mapped = find_result.getMapped();
find_results.push_back(find_result);
if (flag_per_row)
addFoundRowAll<Map, false, true>(mapped, selected_rows, current_added_rows, all_flag_known_rows, nullptr);
else
addFoundRowAll<Map, false, false>(mapped, selected_rows, current_added_rows, single_flag_know_rows, nullptr);
}
}
row_replicate_offset.push_back(current_added_rows);
}
};
auto copy_final_matched_rows = [&](size_t left_start_row, ColumnPtr filter_col)
{
const PaddedPODArray<UInt8> & filter_flags = assert_cast<const ColumnUInt8 &>(*filter_col).getData();
size_t prev_replicated_row = 0;
auto selected_right_row_it = selected_rows.begin();
size_t find_result_index = 0;
for (size_t i = 1, n = row_replicate_offset.size(); i < n; ++i)
{
bool any_matched = false;
/// For all right join, flag_per_row is true, we need mark used flags for each row.
if (flag_per_row)
{
for (size_t replicated_row = prev_replicated_row; replicated_row < row_replicate_offset[i]; ++replicated_row)
{
if (filter_flags[replicated_row])
{
any_matched = true;
added_columns.appendFromBlock(*selected_right_row_it->block, selected_right_row_it->row_num, add_missing);
total_added_rows += 1;
if (need_flags)
used_flags.template setUsed<true, true>(selected_right_row_it->block, selected_right_row_it->row_num, 0);
}
++selected_right_row_it;
}
}
else
{
for (size_t replicated_row = prev_replicated_row; replicated_row < row_replicate_offset[i]; ++replicated_row)
{
if (filter_flags[replicated_row])
{
any_matched = true;
added_columns.appendFromBlock(*selected_right_row_it->block, selected_right_row_it->row_num, add_missing);
total_added_rows += 1;
}
++selected_right_row_it;
}
}
if (!any_matched)
{
if (add_missing)
addNotFoundRow<true, need_replication>(added_columns, total_added_rows);
else
addNotFoundRow<false, need_replication>(added_columns, total_added_rows);
}
else
{
if (!flag_per_row && need_flags)
used_flags.template setUsed<true, false>(find_results[find_result_index]);
if (need_filter)
setUsed<true>(added_columns.filter, left_start_row + i - 1);
if (add_missing)
added_columns.applyLazyDefaults();
}
find_result_index += (prev_replicated_row != row_replicate_offset[i]);
if constexpr (need_replication)
{
(*added_columns.offsets_to_replicate)[left_start_row + i - 1] = total_added_rows;
}
prev_replicated_row = row_replicate_offset[i];
}
};
while (left_row_iter < left_block_rows && !exceeded_max_block_rows)
{
auto left_start_row = left_row_iter;
collect_keys_matched_rows_refs();
if (selected_rows.size() != current_added_rows || row_replicate_offset.size() != left_row_iter - left_start_row + 1)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Sizes are mismatched. selected_rows.size:{}, current_added_rows:{}, row_replicate_offset.size:{}, left_row_iter: {}, "
"left_start_row: {}",
selected_rows.size(),
current_added_rows,
row_replicate_offset.size(),
left_row_iter,
left_start_row);
}
auto filter_col = buildAdditionalFilter(left_start_row, selected_rows, row_replicate_offset, added_columns);
copy_final_matched_rows(left_start_row, filter_col);
if constexpr (need_replication)
{
// Add a check for current_added_rows to avoid run the filter expression on too small size batch.
if (total_added_rows >= max_joined_block_rows || current_added_rows < 1024)
exceeded_max_block_rows = true;
}
}
if constexpr (need_replication)
{
added_columns.offsets_to_replicate->resize_assume_reserved(left_row_iter);
added_columns.filter.resize_assume_reserved(left_row_iter);
}
added_columns.applyLazyDefaults();
return left_row_iter;
}
/// Cut first num_rows rows from block in place and returns block with remaining rows
static Block sliceBlock(Block & block, size_t num_rows)
{
size_t total_rows = block.rows();
if (num_rows >= total_rows)
return {};
size_t remaining_rows = total_rows - num_rows;
Block remaining_block = block.cloneEmpty();
for (size_t i = 0; i < block.columns(); ++i)
{
auto & col = block.getByPosition(i);
remaining_block.getByPosition(i).column = col.column->cut(num_rows, remaining_rows);
col.column = col.column->cut(0, num_rows);
}
return remaining_block;
}
/** Since we do not store right key columns,
* this function is used to copy left key columns to right key columns.
* If the user requests some right columns, we just copy left key columns to right, since they are equal.
* Example: SELECT t1.key, t2.key FROM t1 FULL JOIN t2 ON t1.key = t2.key;
* In that case for matched rows in t2.key we will use values from t1.key.
* However, in some cases we might need to adjust the type of column, e.g. t1.key :: LowCardinality(String) and t2.key :: String
* Also, the nullability of the column might be different.
* Returns the right column after with necessary adjustments.
*/
static ColumnWithTypeAndName copyLeftKeyColumnToRight(
const DataTypePtr & right_key_type,
const String & renamed_right_column,
const ColumnWithTypeAndName & left_column,
const IColumn::Filter * null_map_filter = nullptr)
{
ColumnWithTypeAndName right_column = left_column;
right_column.name = renamed_right_column;
if (null_map_filter)
right_column.column = JoinCommon::filterWithBlanks(right_column.column, *null_map_filter);
bool should_be_nullable = isNullableOrLowCardinalityNullable(right_key_type);
if (null_map_filter)
correctNullabilityInplace(right_column, should_be_nullable, *null_map_filter);
else
correctNullabilityInplace(right_column, should_be_nullable);
if (!right_column.type->equals(*right_key_type))
{
right_column.column = castColumnAccurate(right_column, right_key_type);
right_column.type = right_key_type;
}
right_column.column = right_column.column->convertToFullColumnIfConst();
return right_column;
}
static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nullable)
{
if (nullable)
{
JoinCommon::convertColumnToNullable(column);
}
else
{
/// We have to replace values masked by NULLs with defaults.
if (column.column)
if (const auto * nullable_column = checkAndGetColumn<ColumnNullable>(&*column.column))
column.column = JoinCommon::filterWithBlanks(column.column, nullable_column->getNullMapColumn().getData(), true);
JoinCommon::removeColumnNullability(column);
}
}
static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nullable, const IColumn::Filter & negative_null_map)
{
if (nullable)
{
JoinCommon::convertColumnToNullable(column);
if (column.type->isNullable() && !negative_null_map.empty())
{
MutableColumnPtr mutable_column = IColumn::mutate(std::move(column.column));
assert_cast<ColumnNullable &>(*mutable_column).applyNegatedNullMap(negative_null_map);
column.column = std::move(mutable_column);
}
}
else
JoinCommon::removeColumnNullability(column);
}
};
/// Instantiate template class ahead in different .cpp files to avoid `too large translation unit`.
extern template class HashJoinMethods<JoinKind::Left, JoinStrictness::RightAny, HashJoin::MapsOne>;
extern template class HashJoinMethods<JoinKind::Left, JoinStrictness::Any, HashJoin::MapsOne>;
extern template class HashJoinMethods<JoinKind::Left, JoinStrictness::All, HashJoin::MapsAll>;
extern template class HashJoinMethods<JoinKind::Left, JoinStrictness::Semi, HashJoin::MapsOne>;
extern template class HashJoinMethods<JoinKind::Left, JoinStrictness::Anti, HashJoin::MapsOne>;
extern template class HashJoinMethods<JoinKind::Left, JoinStrictness::Asof, HashJoin::MapsAsof>;
extern template class HashJoinMethods<JoinKind::Right, JoinStrictness::RightAny, HashJoin::MapsOne>;
extern template class HashJoinMethods<JoinKind::Right, JoinStrictness::Any, HashJoin::MapsAll>;
extern template class HashJoinMethods<JoinKind::Right, JoinStrictness::All, HashJoin::MapsAll>;
extern template class HashJoinMethods<JoinKind::Right, JoinStrictness::Semi, HashJoin::MapsAll>;
extern template class HashJoinMethods<JoinKind::Right, JoinStrictness::Anti, HashJoin::MapsAll>;
extern template class HashJoinMethods<JoinKind::Right, JoinStrictness::Asof, HashJoin::MapsAsof>;
extern template class HashJoinMethods<JoinKind::Inner, JoinStrictness::RightAny, HashJoin::MapsOne>;
extern template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Any, HashJoin::MapsOne>;
extern template class HashJoinMethods<JoinKind::Inner, JoinStrictness::All, HashJoin::MapsAll>;
extern template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Semi, HashJoin::MapsOne>;
extern template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Anti, HashJoin::MapsOne>;
extern template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Asof, HashJoin::MapsAsof>;
extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::RightAny, HashJoin::MapsOne>;
extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::Any, HashJoin::MapsAll>;
extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::All, HashJoin::MapsAll>;
extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::Semi, HashJoin::MapsOne>;
extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::Anti, HashJoin::MapsOne>;
extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::Asof, HashJoin::MapsAsof>;
}

View File

@ -0,0 +1,12 @@
#include <Interpreters/HashJoin/HashJoinMethods.h>
namespace DB
{
template class HashJoinMethods<JoinKind::Inner, JoinStrictness::RightAny, HashJoin::MapsOne>;
template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Any, HashJoin::MapsOne>;
template class HashJoinMethods<JoinKind::Inner, JoinStrictness::All, HashJoin::MapsAll>;
template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Semi, HashJoin::MapsOne>;
template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Anti, HashJoin::MapsOne>;
template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Asof, HashJoin::MapsAsof>;
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Core/Joins.h>
#include <Interpreters/joinDispatch.h>
namespace DB
{
template <JoinKind KIND, JoinStrictness STRICTNESS>
struct JoinFeatures
{
static constexpr bool is_any_join = STRICTNESS == JoinStrictness::Any;
static constexpr bool is_any_or_semi_join = STRICTNESS == JoinStrictness::Any || STRICTNESS == JoinStrictness::RightAny || (STRICTNESS == JoinStrictness::Semi && KIND == JoinKind::Left);
static constexpr bool is_all_join = STRICTNESS == JoinStrictness::All;
static constexpr bool is_asof_join = STRICTNESS == JoinStrictness::Asof;
static constexpr bool is_semi_join = STRICTNESS == JoinStrictness::Semi;
static constexpr bool is_anti_join = STRICTNESS == JoinStrictness::Anti;
static constexpr bool left = KIND == JoinKind::Left;
static constexpr bool right = KIND == JoinKind::Right;
static constexpr bool inner = KIND == JoinKind::Inner;
static constexpr bool full = KIND == JoinKind::Full;
static constexpr bool need_replication = is_all_join || (is_any_join && right) || (is_semi_join && right);
static constexpr bool need_filter = !need_replication && (inner || right || (is_semi_join && left) || (is_anti_join && left));
static constexpr bool add_missing = (left || full) && !is_semi_join;
static constexpr bool need_flags = MapGetter<KIND, STRICTNESS>::flagged;
};
}

View File

@ -0,0 +1,154 @@
#pragma once
#include <vector>
#include <atomic>
#include <unordered_map>
#include <Core/Joins.h>
#include <Core/Block.h>
#include <Interpreters/joinDispatch.h>
namespace DB
{
namespace JoinStuff
{
/// Flags needed to implement RIGHT and FULL JOINs.
class JoinUsedFlags
{
using RawBlockPtr = const Block *;
using UsedFlagsForBlock = std::vector<std::atomic_bool>;
/// For multiple dijuncts each empty in hashmap stores flags for particular block
/// For single dicunct we store all flags in `nullptr` entry, index is the offset in FindResult
std::unordered_map<RawBlockPtr, UsedFlagsForBlock> flags;
bool need_flags;
public:
/// Update size for vector with flags.
/// Calling this method invalidates existing flags.
/// It can be called several times, but all of them should happen before using this structure.
template <JoinKind KIND, JoinStrictness STRICTNESS>
void reinit(size_t size)
{
if constexpr (MapGetter<KIND, STRICTNESS>::flagged)
{
assert(flags[nullptr].size() <= size);
need_flags = true;
// For one disjunct clause case, we don't need to reinit each time we call addBlockToJoin.
// and there is no value inserted in this JoinUsedFlags before addBlockToJoin finish.
// So we reinit only when the hash table is rehashed to a larger size.
if (flags.empty() || flags[nullptr].size() < size) [[unlikely]]
{
flags[nullptr] = std::vector<std::atomic_bool>(size);
}
}
}
template <JoinKind KIND, JoinStrictness STRICTNESS>
void reinit(const Block * block_ptr)
{
if constexpr (MapGetter<KIND, STRICTNESS>::flagged)
{
assert(flags[block_ptr].size() <= block_ptr->rows());
need_flags = true;
flags[block_ptr] = std::vector<std::atomic_bool>(block_ptr->rows());
}
}
bool getUsedSafe(size_t i) const
{
return getUsedSafe(nullptr, i);
}
bool getUsedSafe(const Block * block_ptr, size_t row_idx) const
{
if (auto it = flags.find(block_ptr); it != flags.end())
return it->second[row_idx].load();
return !need_flags;
}
template <bool use_flags, bool flag_per_row, typename FindResult>
void setUsed(const FindResult & f)
{
if constexpr (!use_flags)
return;
/// Could be set simultaneously from different threads.
if constexpr (flag_per_row)
{
auto & mapped = f.getMapped();
flags[mapped.block][mapped.row_num].store(true, std::memory_order_relaxed);
}
else
{
flags[nullptr][f.getOffset()].store(true, std::memory_order_relaxed);
}
}
template <bool use_flags, bool flag_per_row>
void setUsed(const Block * block, size_t row_num, size_t offset)
{
if constexpr (!use_flags)
return;
/// Could be set simultaneously from different threads.
if constexpr (flag_per_row)
{
flags[block][row_num].store(true, std::memory_order_relaxed);
}
else
{
flags[nullptr][offset].store(true, std::memory_order_relaxed);
}
}
template <bool use_flags, bool flag_per_row, typename FindResult>
bool getUsed(const FindResult & f)
{
if constexpr (!use_flags)
return true;
if constexpr (flag_per_row)
{
auto & mapped = f.getMapped();
return flags[mapped.block][mapped.row_num].load();
}
else
{
return flags[nullptr][f.getOffset()].load();
}
}
template <bool use_flags, bool flag_per_row, typename FindResult>
bool setUsedOnce(const FindResult & f)
{
if constexpr (!use_flags)
return true;
if constexpr (flag_per_row)
{
auto & mapped = f.getMapped();
/// fast check to prevent heavy CAS with seq_cst order
if (flags[mapped.block][mapped.row_num].load(std::memory_order_relaxed))
return false;
bool expected = false;
return flags[mapped.block][mapped.row_num].compare_exchange_strong(expected, true);
}
else
{
auto off = f.getOffset();
/// fast check to prevent heavy CAS with seq_cst order
if (flags[nullptr][off].load(std::memory_order_relaxed))
return false;
bool expected = false;
return flags[nullptr][off].compare_exchange_strong(expected, true);
}
}
};
}
}

View File

@ -0,0 +1,73 @@
#pragma once
#include <Interpreters/HashJoin/HashJoin.h>
namespace DB
{
template <typename Mapped>
class KeyGetterEmpty
{
public:
struct MappedType
{
using mapped_type = Mapped;
};
using FindResult = ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped, true>;
KeyGetterEmpty() = default;
FindResult findKey(MappedType, size_t, const Arena &) { return FindResult(); }
};
template <HashJoin::Type type, typename Value, typename Mapped>
struct KeyGetterForTypeImpl;
constexpr bool use_offset = true;
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::key8, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt8, false, use_offset>;
};
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::key16, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt16, false, use_offset>;
};
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::key32, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt32, false, use_offset>;
};
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::key64, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt64, false, use_offset>;
};
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::key_string, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodString<Value, Mapped, true, false, use_offset>;
};
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::key_fixed_string, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodFixedString<Value, Mapped, true, false, use_offset>;
};
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::keys128, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt128, Mapped, false, false, false, use_offset>;
};
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::keys256, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt256, Mapped, false, false, false, use_offset>;
};
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::hashed, Value, Mapped>
{
using Type = ColumnsHashing::HashMethodHashed<Value, Mapped, false, use_offset>;
};
template <HashJoin::Type type, typename Data>
struct KeyGetterForType
{
using Value = typename Data::value_type;
using Mapped_t = typename Data::mapped_type;
using Mapped = std::conditional_t<std::is_const_v<Data>, const Mapped_t, Mapped_t>;
using Type = typename KeyGetterForTypeImpl<type, Value, Mapped>::Type;
};
}

View File

@ -0,0 +1,148 @@
#pragma once
#include <Interpreters/HashJoin/HashJoin.h>
#include <Common/ColumnsHashingImpl.h>
#include <Interpreters/RowRefs.h>
#include <Interpreters/HashJoin/JoinUsedFlags.h>
namespace DB
{
template <bool flag_per_row>
class KnownRowsHolder;
/// Keep already joined rows to prevent duplication if many disjuncts
/// if for a particular pair of rows condition looks like TRUE or TRUE or TRUE
/// we want to have it once in resultset
template<>
class KnownRowsHolder<true>
{
public:
using Type = std::pair<const Block *, DB::RowRef::SizeT>;
private:
static const size_t MAX_LINEAR = 16; // threshold to switch from Array to Set
using ArrayHolder = std::array<Type, MAX_LINEAR>;
using SetHolder = std::set<Type>;
using SetHolderPtr = std::unique_ptr<SetHolder>;
ArrayHolder array_holder;
SetHolderPtr set_holder_ptr;
size_t items;
public:
KnownRowsHolder()
: items(0)
{
}
template<class InputIt>
void add(InputIt from, InputIt to)
{
const size_t new_items = std::distance(from, to);
if (items + new_items <= MAX_LINEAR)
{
std::copy(from, to, &array_holder[items]);
}
else
{
if (items <= MAX_LINEAR)
{
set_holder_ptr = std::make_unique<SetHolder>();
set_holder_ptr->insert(std::cbegin(array_holder), std::cbegin(array_holder) + items);
}
set_holder_ptr->insert(from, to);
}
items += new_items;
}
template<class Needle>
bool isKnown(const Needle & needle)
{
return items <= MAX_LINEAR
? std::find(std::cbegin(array_holder), std::cbegin(array_holder) + items, needle) != std::cbegin(array_holder) + items
: set_holder_ptr->find(needle) != set_holder_ptr->end();
}
};
template<>
class KnownRowsHolder<false>
{
public:
template<class InputIt>
void add(InputIt, InputIt)
{
}
template<class Needle>
static bool isKnown(const Needle &)
{
return false;
}
};
template <typename Mapped, bool need_offset = false>
using FindResultImpl = ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped, true>;
template <typename Map, bool add_missing, bool flag_per_row, typename AddedColumns>
void addFoundRowAll(
const typename Map::mapped_type & mapped,
AddedColumns & added,
IColumn::Offset & current_offset,
KnownRowsHolder<flag_per_row> & known_rows [[maybe_unused]],
JoinStuff::JoinUsedFlags * used_flags [[maybe_unused]])
{
if constexpr (add_missing)
added.applyLazyDefaults();
if constexpr (flag_per_row)
{
std::unique_ptr<std::vector<KnownRowsHolder<true>::Type>> new_known_rows_ptr;
for (auto it = mapped.begin(); it.ok(); ++it)
{
if (!known_rows.isKnown(std::make_pair(it->block, it->row_num)))
{
added.appendFromBlock(*it->block, it->row_num, false);
++current_offset;
if (!new_known_rows_ptr)
{
new_known_rows_ptr = std::make_unique<std::vector<KnownRowsHolder<true>::Type>>();
}
new_known_rows_ptr->push_back(std::make_pair(it->block, it->row_num));
if (used_flags)
{
used_flags->JoinStuff::JoinUsedFlags::setUsedOnce<true, flag_per_row>(
FindResultImpl<const RowRef, false>(*it, true, 0));
}
}
}
if (new_known_rows_ptr)
{
known_rows.add(std::cbegin(*new_known_rows_ptr), std::cend(*new_known_rows_ptr));
}
}
else
{
for (auto it = mapped.begin(); it.ok(); ++it)
{
added.appendFromBlock(*it->block, it->row_num, false);
++current_offset;
}
}
}
template <bool add_missing, bool need_offset, typename AddedColumns>
void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & current_offset [[maybe_unused]])
{
if constexpr (add_missing)
{
added.appendDefaultRow();
if constexpr (need_offset)
++current_offset;
}
}
}

View File

@ -0,0 +1,11 @@
#include <Interpreters/HashJoin/HashJoinMethods.h>
namespace DB
{
template class HashJoinMethods<JoinKind::Left, JoinStrictness::RightAny, HashJoin::MapsOne>;
template class HashJoinMethods<JoinKind::Left, JoinStrictness::Any, HashJoin::MapsOne>;
template class HashJoinMethods<JoinKind::Left, JoinStrictness::All, HashJoin::MapsAll>;
template class HashJoinMethods<JoinKind::Left, JoinStrictness::Semi, HashJoin::MapsOne>;
template class HashJoinMethods<JoinKind::Left, JoinStrictness::Anti, HashJoin::MapsOne>;
template class HashJoinMethods<JoinKind::Left, JoinStrictness::Asof, HashJoin::MapsAsof>;
}

View File

@ -0,0 +1,11 @@
#include <Interpreters/HashJoin/HashJoinMethods.h>
namespace DB
{
template class HashJoinMethods<JoinKind::Right, JoinStrictness::RightAny, HashJoin::MapsOne>;
template class HashJoinMethods<JoinKind::Right, JoinStrictness::Any, HashJoin::MapsAll>;
template class HashJoinMethods<JoinKind::Right, JoinStrictness::All, HashJoin::MapsAll>;
template class HashJoinMethods<JoinKind::Right, JoinStrictness::Semi, HashJoin::MapsAll>;
template class HashJoinMethods<JoinKind::Right, JoinStrictness::Anti, HashJoin::MapsAll>;
template class HashJoinMethods<JoinKind::Right, JoinStrictness::Asof, HashJoin::MapsAsof>;
}

View File

@ -1,6 +1,6 @@
#include <Common/typeid_cast.h>
#include <Interpreters/JoinSwitcher.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/JoinUtils.h>

View File

@ -3,7 +3,7 @@
#include <array>
#include <base/constexpr_helpers.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/HashJoin/HashJoin.h>
/** Used in implementation of Join to process different data structures.

View File

@ -53,7 +53,7 @@
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/getCustomKeyFilterForParallelReplicas.h>

View File

@ -29,7 +29,7 @@
#include <Dictionaries/IDictionary.h>
#include <Interpreters/IKeyValueEntity.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/FullSortingMergeJoin.h>
#include <Interpreters/ConcurrentHashJoin.h>

View File

@ -2,7 +2,7 @@
#include <Storages/StorageFactory.h>
#include <Storages/StorageSet.h>
#include <Storages/TableLockHolder.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTIdentifier_fwd.h>