mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
refactor hash join to reduce target size
This commit is contained in:
parent
fd59d7e02f
commit
6d3836aca4
@ -210,6 +210,7 @@ add_object_library(clickhouse_analyzer_passes Analyzer/Resolve)
|
|||||||
add_object_library(clickhouse_planner Planner)
|
add_object_library(clickhouse_planner Planner)
|
||||||
add_object_library(clickhouse_interpreters Interpreters)
|
add_object_library(clickhouse_interpreters Interpreters)
|
||||||
add_object_library(clickhouse_interpreters_cache Interpreters/Cache)
|
add_object_library(clickhouse_interpreters_cache Interpreters/Cache)
|
||||||
|
add_object_library(clickhouse_interpreters_hash_join Interpreters/HashJoin)
|
||||||
add_object_library(clickhouse_interpreters_access Interpreters/Access)
|
add_object_library(clickhouse_interpreters_access Interpreters/Access)
|
||||||
add_object_library(clickhouse_interpreters_mysql Interpreters/MySQL)
|
add_object_library(clickhouse_interpreters_mysql Interpreters/MySQL)
|
||||||
add_object_library(clickhouse_interpreters_clusterproxy Interpreters/ClusterProxy)
|
add_object_library(clickhouse_interpreters_clusterproxy Interpreters/ClusterProxy)
|
||||||
|
@ -5,7 +5,7 @@
|
|||||||
#include <Functions/IFunction.h>
|
#include <Functions/IFunction.h>
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <Interpreters/DatabaseCatalog.h>
|
#include <Interpreters/DatabaseCatalog.h>
|
||||||
#include <Interpreters/HashJoin.h>
|
#include <Interpreters/HashJoin/HashJoin.h>
|
||||||
#include <Storages/StorageJoin.h>
|
#include <Storages/StorageJoin.h>
|
||||||
#include <Storages/TableLockHolder.h>
|
#include <Storages/TableLockHolder.h>
|
||||||
|
|
||||||
|
@ -5,7 +5,7 @@
|
|||||||
#include <optional>
|
#include <optional>
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <Interpreters/ExpressionActions.h>
|
#include <Interpreters/ExpressionActions.h>
|
||||||
#include <Interpreters/HashJoin.h>
|
#include <Interpreters/HashJoin/HashJoin.h>
|
||||||
#include <Interpreters/IJoin.h>
|
#include <Interpreters/IJoin.h>
|
||||||
#include <base/defines.h>
|
#include <base/defines.h>
|
||||||
#include <base/types.h>
|
#include <base/types.h>
|
||||||
|
@ -24,7 +24,7 @@
|
|||||||
#include <Interpreters/ExpressionAnalyzer.h>
|
#include <Interpreters/ExpressionAnalyzer.h>
|
||||||
#include <Interpreters/ExternalDictionariesLoader.h>
|
#include <Interpreters/ExternalDictionariesLoader.h>
|
||||||
#include <Interpreters/GraceHashJoin.h>
|
#include <Interpreters/GraceHashJoin.h>
|
||||||
#include <Interpreters/HashJoin.h>
|
#include <Interpreters/HashJoin/HashJoin.h>
|
||||||
#include <Interpreters/JoinSwitcher.h>
|
#include <Interpreters/JoinSwitcher.h>
|
||||||
#include <Interpreters/MergeJoin.h>
|
#include <Interpreters/MergeJoin.h>
|
||||||
#include <Interpreters/DirectJoin.h>
|
#include <Interpreters/DirectJoin.h>
|
||||||
|
@ -3,7 +3,7 @@
|
|||||||
#include <Formats/formatBlock.h>
|
#include <Formats/formatBlock.h>
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <Interpreters/GraceHashJoin.h>
|
#include <Interpreters/GraceHashJoin.h>
|
||||||
#include <Interpreters/HashJoin.h>
|
#include <Interpreters/HashJoin/HashJoin.h>
|
||||||
#include <Interpreters/TableJoin.h>
|
#include <Interpreters/TableJoin.h>
|
||||||
#include <Interpreters/TemporaryDataOnDisk.h>
|
#include <Interpreters/TemporaryDataOnDisk.h>
|
||||||
#include <base/FnTraits.h>
|
#include <base/FnTraits.h>
|
||||||
|
138
src/Interpreters/HashJoin/AddedColumns.cpp
Normal file
138
src/Interpreters/HashJoin/AddedColumns.cpp
Normal file
@ -0,0 +1,138 @@
|
|||||||
|
#include <Interpreters/HashJoin/AddedColumns.h>
|
||||||
|
#include <Interpreters/NullableUtils.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
JoinOnKeyColumns::JoinOnKeyColumns(const Block & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_)
|
||||||
|
: key_names(key_names_)
|
||||||
|
, materialized_keys_holder(JoinCommon::materializeColumns(
|
||||||
|
block, key_names)) /// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them.
|
||||||
|
, key_columns(JoinCommon::getRawPointers(materialized_keys_holder))
|
||||||
|
, null_map(nullptr)
|
||||||
|
, null_map_holder(extractNestedColumnsAndNullMap(key_columns, null_map))
|
||||||
|
, join_mask_column(JoinCommon::getColumnAsMask(block, cond_column_name))
|
||||||
|
, key_sizes(key_sizes_)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
template<> void AddedColumns<false>::buildOutput()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
template<>
|
||||||
|
void AddedColumns<true>::buildOutput()
|
||||||
|
{
|
||||||
|
for (size_t i = 0; i < this->size(); ++i)
|
||||||
|
{
|
||||||
|
auto& col = columns[i];
|
||||||
|
size_t default_count = 0;
|
||||||
|
auto apply_default = [&]()
|
||||||
|
{
|
||||||
|
if (default_count > 0)
|
||||||
|
{
|
||||||
|
JoinCommon::addDefaultValues(*col, type_name[i].type, default_count);
|
||||||
|
default_count = 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for (size_t j = 0; j < lazy_output.blocks.size(); ++j)
|
||||||
|
{
|
||||||
|
if (!lazy_output.blocks[j])
|
||||||
|
{
|
||||||
|
default_count++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
apply_default();
|
||||||
|
const auto & column_from_block = reinterpret_cast<const Block *>(lazy_output.blocks[j])->getByPosition(right_indexes[i]);
|
||||||
|
/// If it's joinGetOrNull, we need to wrap not-nullable columns in StorageJoin.
|
||||||
|
if (is_join_get)
|
||||||
|
{
|
||||||
|
if (auto * nullable_col = typeid_cast<ColumnNullable *>(col.get());
|
||||||
|
nullable_col && !column_from_block.column->isNullable())
|
||||||
|
{
|
||||||
|
nullable_col->insertFromNotNullable(*column_from_block.column, lazy_output.row_nums[j]);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
col->insertFrom(*column_from_block.column, lazy_output.row_nums[j]);
|
||||||
|
}
|
||||||
|
apply_default();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template<>
|
||||||
|
void AddedColumns<false>::applyLazyDefaults()
|
||||||
|
{
|
||||||
|
if (lazy_defaults_count)
|
||||||
|
{
|
||||||
|
for (size_t j = 0, size = right_indexes.size(); j < size; ++j)
|
||||||
|
JoinCommon::addDefaultValues(*columns[j], type_name[j].type, lazy_defaults_count);
|
||||||
|
lazy_defaults_count = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template<>
|
||||||
|
void AddedColumns<true>::applyLazyDefaults()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
template <>
|
||||||
|
void AddedColumns<false>::appendFromBlock(const Block & block, size_t row_num,const bool has_defaults)
|
||||||
|
{
|
||||||
|
if (has_defaults)
|
||||||
|
applyLazyDefaults();
|
||||||
|
|
||||||
|
#ifndef NDEBUG
|
||||||
|
checkBlock(block);
|
||||||
|
#endif
|
||||||
|
if (is_join_get)
|
||||||
|
{
|
||||||
|
size_t right_indexes_size = right_indexes.size();
|
||||||
|
for (size_t j = 0; j < right_indexes_size; ++j)
|
||||||
|
{
|
||||||
|
const auto & column_from_block = block.getByPosition(right_indexes[j]);
|
||||||
|
if (auto * nullable_col = nullable_column_ptrs[j])
|
||||||
|
nullable_col->insertFromNotNullable(*column_from_block.column, row_num);
|
||||||
|
else
|
||||||
|
columns[j]->insertFrom(*column_from_block.column, row_num);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
size_t right_indexes_size = right_indexes.size();
|
||||||
|
for (size_t j = 0; j < right_indexes_size; ++j)
|
||||||
|
{
|
||||||
|
const auto & column_from_block = block.getByPosition(right_indexes[j]);
|
||||||
|
columns[j]->insertFrom(*column_from_block.column, row_num);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template <>
|
||||||
|
void AddedColumns<true>::appendFromBlock(const Block & block, size_t row_num, bool)
|
||||||
|
{
|
||||||
|
#ifndef NDEBUG
|
||||||
|
checkBlock(block);
|
||||||
|
#endif
|
||||||
|
if (has_columns_to_add)
|
||||||
|
{
|
||||||
|
lazy_output.blocks.emplace_back(reinterpret_cast<UInt64>(&block));
|
||||||
|
lazy_output.row_nums.emplace_back(static_cast<uint32_t>(row_num));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
template<>
|
||||||
|
void AddedColumns<false>::appendDefaultRow()
|
||||||
|
{
|
||||||
|
++lazy_defaults_count;
|
||||||
|
}
|
||||||
|
|
||||||
|
template<>
|
||||||
|
void AddedColumns<true>::appendDefaultRow()
|
||||||
|
{
|
||||||
|
if (has_columns_to_add)
|
||||||
|
{
|
||||||
|
lazy_output.blocks.emplace_back(0);
|
||||||
|
lazy_output.row_nums.emplace_back(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
226
src/Interpreters/HashJoin/AddedColumns.h
Normal file
226
src/Interpreters/HashJoin/AddedColumns.h
Normal file
@ -0,0 +1,226 @@
|
|||||||
|
#pragma once
|
||||||
|
#include <Interpreters/HashJoin/HashJoin.h>
|
||||||
|
#include <Interpreters/TableJoin.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
class ExpressionActions;
|
||||||
|
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
||||||
|
|
||||||
|
struct JoinOnKeyColumns
|
||||||
|
{
|
||||||
|
Names key_names;
|
||||||
|
|
||||||
|
Columns materialized_keys_holder;
|
||||||
|
ColumnRawPtrs key_columns;
|
||||||
|
|
||||||
|
ConstNullMapPtr null_map;
|
||||||
|
ColumnPtr null_map_holder;
|
||||||
|
|
||||||
|
/// Only rows where mask == true can be joined
|
||||||
|
JoinCommon::JoinMask join_mask_column;
|
||||||
|
|
||||||
|
Sizes key_sizes;
|
||||||
|
|
||||||
|
explicit JoinOnKeyColumns(const Block & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_);
|
||||||
|
|
||||||
|
bool isRowFiltered(size_t i) const { return join_mask_column.isRowFiltered(i); }
|
||||||
|
};
|
||||||
|
|
||||||
|
template <bool lazy>
|
||||||
|
class AddedColumns
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
struct TypeAndName
|
||||||
|
{
|
||||||
|
DataTypePtr type;
|
||||||
|
String name;
|
||||||
|
String qualified_name;
|
||||||
|
|
||||||
|
TypeAndName(DataTypePtr type_, const String & name_, const String & qualified_name_)
|
||||||
|
: type(type_), name(name_), qualified_name(qualified_name_)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct LazyOutput
|
||||||
|
{
|
||||||
|
PaddedPODArray<UInt64> blocks;
|
||||||
|
PaddedPODArray<UInt32> row_nums;
|
||||||
|
};
|
||||||
|
|
||||||
|
AddedColumns(
|
||||||
|
const Block & left_block_,
|
||||||
|
const Block & block_with_columns_to_add,
|
||||||
|
const Block & saved_block_sample,
|
||||||
|
const HashJoin & join,
|
||||||
|
std::vector<JoinOnKeyColumns> && join_on_keys_,
|
||||||
|
ExpressionActionsPtr additional_filter_expression_,
|
||||||
|
bool is_asof_join,
|
||||||
|
bool is_join_get_)
|
||||||
|
: left_block(left_block_)
|
||||||
|
, join_on_keys(join_on_keys_)
|
||||||
|
, additional_filter_expression(additional_filter_expression_)
|
||||||
|
, rows_to_add(left_block.rows())
|
||||||
|
, is_join_get(is_join_get_)
|
||||||
|
{
|
||||||
|
size_t num_columns_to_add = block_with_columns_to_add.columns();
|
||||||
|
if (is_asof_join)
|
||||||
|
++num_columns_to_add;
|
||||||
|
|
||||||
|
if constexpr (lazy)
|
||||||
|
{
|
||||||
|
has_columns_to_add = num_columns_to_add > 0;
|
||||||
|
lazy_output.blocks.reserve(rows_to_add);
|
||||||
|
lazy_output.row_nums.reserve(rows_to_add);
|
||||||
|
}
|
||||||
|
|
||||||
|
columns.reserve(num_columns_to_add);
|
||||||
|
type_name.reserve(num_columns_to_add);
|
||||||
|
right_indexes.reserve(num_columns_to_add);
|
||||||
|
|
||||||
|
for (const auto & src_column : block_with_columns_to_add)
|
||||||
|
{
|
||||||
|
/// Column names `src_column.name` and `qualified_name` can differ for StorageJoin,
|
||||||
|
/// because it uses not qualified right block column names
|
||||||
|
auto qualified_name = join.getTableJoin().renamedRightColumnName(src_column.name);
|
||||||
|
/// Don't insert column if it's in left block
|
||||||
|
if (!left_block.has(qualified_name))
|
||||||
|
addColumn(src_column, qualified_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_asof_join)
|
||||||
|
{
|
||||||
|
assert(join_on_keys.size() == 1);
|
||||||
|
const ColumnWithTypeAndName & right_asof_column = join.rightAsofKeyColumn();
|
||||||
|
addColumn(right_asof_column, right_asof_column.name);
|
||||||
|
left_asof_key = join_on_keys[0].key_columns.back();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto & tn : type_name)
|
||||||
|
right_indexes.push_back(saved_block_sample.getPositionByName(tn.name));
|
||||||
|
|
||||||
|
nullable_column_ptrs.resize(right_indexes.size(), nullptr);
|
||||||
|
for (size_t j = 0; j < right_indexes.size(); ++j)
|
||||||
|
{
|
||||||
|
/** If it's joinGetOrNull, we will have nullable columns in result block
|
||||||
|
* even if right column is not nullable in storage (saved_block_sample).
|
||||||
|
*/
|
||||||
|
const auto & saved_column = saved_block_sample.getByPosition(right_indexes[j]).column;
|
||||||
|
if (columns[j]->isNullable() && !saved_column->isNullable())
|
||||||
|
nullable_column_ptrs[j] = typeid_cast<ColumnNullable *>(columns[j].get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t size() const { return columns.size(); }
|
||||||
|
|
||||||
|
void buildOutput();
|
||||||
|
|
||||||
|
ColumnWithTypeAndName moveColumn(size_t i)
|
||||||
|
{
|
||||||
|
return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].type, type_name[i].qualified_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
void appendFromBlock(const Block & block, size_t row_num, bool has_default);
|
||||||
|
|
||||||
|
void appendDefaultRow();
|
||||||
|
|
||||||
|
void applyLazyDefaults();
|
||||||
|
|
||||||
|
const IColumn & leftAsofKey() const { return *left_asof_key; }
|
||||||
|
|
||||||
|
Block left_block;
|
||||||
|
std::vector<JoinOnKeyColumns> join_on_keys;
|
||||||
|
ExpressionActionsPtr additional_filter_expression;
|
||||||
|
|
||||||
|
size_t max_joined_block_rows = 0;
|
||||||
|
size_t rows_to_add;
|
||||||
|
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
|
||||||
|
bool need_filter = false;
|
||||||
|
IColumn::Filter filter;
|
||||||
|
|
||||||
|
void reserve(bool need_replicate)
|
||||||
|
{
|
||||||
|
if (!max_joined_block_rows)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/// Do not allow big allocations when user set max_joined_block_rows to huge value
|
||||||
|
size_t reserve_size = std::min<size_t>(max_joined_block_rows, DEFAULT_BLOCK_SIZE * 2);
|
||||||
|
|
||||||
|
if (need_replicate)
|
||||||
|
/// Reserve 10% more space for columns, because some rows can be repeated
|
||||||
|
reserve_size = static_cast<size_t>(1.1 * reserve_size);
|
||||||
|
|
||||||
|
for (auto & column : columns)
|
||||||
|
column->reserve(reserve_size);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
|
||||||
|
void checkBlock(const Block & block)
|
||||||
|
{
|
||||||
|
for (size_t j = 0; j < right_indexes.size(); ++j)
|
||||||
|
{
|
||||||
|
const auto * column_from_block = block.getByPosition(right_indexes[j]).column.get();
|
||||||
|
const auto * dest_column = columns[j].get();
|
||||||
|
if (auto * nullable_col = nullable_column_ptrs[j])
|
||||||
|
{
|
||||||
|
if (!is_join_get)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||||
|
"Columns {} and {} can have different nullability only in joinGetOrNull",
|
||||||
|
dest_column->getName(), column_from_block->getName());
|
||||||
|
dest_column = nullable_col->getNestedColumnPtr().get();
|
||||||
|
}
|
||||||
|
/** Using dest_column->structureEquals(*column_from_block) will not work for low cardinality columns,
|
||||||
|
* because dictionaries can be different, while calling insertFrom on them is safe, for example:
|
||||||
|
* ColumnLowCardinality(size = 0, UInt8(size = 0), ColumnUnique(size = 1, String(size = 1)))
|
||||||
|
* and
|
||||||
|
* ColumnLowCardinality(size = 0, UInt16(size = 0), ColumnUnique(size = 1, String(size = 1)))
|
||||||
|
*/
|
||||||
|
if (typeid(*dest_column) != typeid(*column_from_block))
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns {} and {} have different types {} and {}",
|
||||||
|
dest_column->getName(), column_from_block->getName(),
|
||||||
|
demangle(typeid(*dest_column).name()), demangle(typeid(*column_from_block).name()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MutableColumns columns;
|
||||||
|
bool is_join_get;
|
||||||
|
std::vector<size_t> right_indexes;
|
||||||
|
std::vector<TypeAndName> type_name;
|
||||||
|
std::vector<ColumnNullable *> nullable_column_ptrs;
|
||||||
|
size_t lazy_defaults_count = 0;
|
||||||
|
|
||||||
|
/// for lazy
|
||||||
|
// The default row is represented by an empty RowRef, so that fixed-size blocks can be generated sequentially,
|
||||||
|
// default_count cannot represent the position of the row
|
||||||
|
LazyOutput lazy_output;
|
||||||
|
bool has_columns_to_add;
|
||||||
|
|
||||||
|
/// for ASOF
|
||||||
|
const IColumn * left_asof_key = nullptr;
|
||||||
|
|
||||||
|
|
||||||
|
void addColumn(const ColumnWithTypeAndName & src_column, const std::string & qualified_name)
|
||||||
|
{
|
||||||
|
columns.push_back(src_column.column->cloneEmpty());
|
||||||
|
columns.back()->reserve(src_column.column->size());
|
||||||
|
type_name.emplace_back(src_column.type, src_column.name, qualified_name);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Adapter class to pass into addFoundRowAll
|
||||||
|
/// In joinRightColumnsWithAdditionalFilter we don't want to add rows directly into AddedColumns,
|
||||||
|
/// because they need to be filtered by additional_filter_expression.
|
||||||
|
class PreSelectedRows : public std::vector<RowRef>
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
void appendFromBlock(const Block & block, size_t row_num, bool /* has_default */) { this->emplace_back(&block, row_num); }
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
11
src/Interpreters/HashJoin/FullHashJoin.cpp
Normal file
11
src/Interpreters/HashJoin/FullHashJoin.cpp
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
#include <Interpreters/HashJoin/HashJoinMethods.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
template class HashJoinMethods<JoinKind::Full, JoinStrictness::RightAny, HashJoin::MapsOne>;
|
||||||
|
template class HashJoinMethods<JoinKind::Full, JoinStrictness::Any, HashJoin::MapsAll>;
|
||||||
|
template class HashJoinMethods<JoinKind::Full, JoinStrictness::All, HashJoin::MapsAll>;
|
||||||
|
template class HashJoinMethods<JoinKind::Full, JoinStrictness::Semi, HashJoin::MapsOne>;
|
||||||
|
template class HashJoinMethods<JoinKind::Full, JoinStrictness::Anti, HashJoin::MapsOne>;
|
||||||
|
template class HashJoinMethods<JoinKind::Full, JoinStrictness::Asof, HashJoin::MapsAsof>;
|
||||||
|
}
|
1328
src/Interpreters/HashJoin/HashJoin.cpp
Normal file
1328
src/Interpreters/HashJoin/HashJoin.cpp
Normal file
File diff suppressed because it is too large
Load Diff
@ -1,5 +1,6 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
#include <variant>
|
#include <variant>
|
||||||
#include <optional>
|
#include <optional>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
@ -36,47 +37,13 @@ class ExpressionActions;
|
|||||||
|
|
||||||
namespace JoinStuff
|
namespace JoinStuff
|
||||||
{
|
{
|
||||||
|
|
||||||
/// Flags needed to implement RIGHT and FULL JOINs.
|
/// Flags needed to implement RIGHT and FULL JOINs.
|
||||||
class JoinUsedFlags
|
class JoinUsedFlags;
|
||||||
{
|
|
||||||
using RawBlockPtr = const Block *;
|
|
||||||
using UsedFlagsForBlock = std::vector<std::atomic_bool>;
|
|
||||||
|
|
||||||
/// For multiple dijuncts each empty in hashmap stores flags for particular block
|
|
||||||
/// For single dicunct we store all flags in `nullptr` entry, index is the offset in FindResult
|
|
||||||
std::unordered_map<RawBlockPtr, UsedFlagsForBlock> flags;
|
|
||||||
|
|
||||||
bool need_flags;
|
|
||||||
|
|
||||||
public:
|
|
||||||
/// Update size for vector with flags.
|
|
||||||
/// Calling this method invalidates existing flags.
|
|
||||||
/// It can be called several times, but all of them should happen before using this structure.
|
|
||||||
template <JoinKind KIND, JoinStrictness STRICTNESS>
|
|
||||||
void reinit(size_t size_);
|
|
||||||
|
|
||||||
template <JoinKind KIND, JoinStrictness STRICTNESS>
|
|
||||||
void reinit(const Block * block_ptr);
|
|
||||||
|
|
||||||
bool getUsedSafe(size_t i) const;
|
|
||||||
bool getUsedSafe(const Block * block_ptr, size_t row_idx) const;
|
|
||||||
|
|
||||||
template <bool use_flags, bool flag_per_row, typename T>
|
|
||||||
void setUsed(const T & f);
|
|
||||||
|
|
||||||
template <bool use_flags, bool flag_per_row>
|
|
||||||
void setUsed(const Block * block, size_t row_num, size_t offset);
|
|
||||||
|
|
||||||
template <bool use_flags, bool flag_per_row, typename T>
|
|
||||||
bool getUsed(const T & f);
|
|
||||||
|
|
||||||
template <bool use_flags, bool flag_per_row, typename T>
|
|
||||||
bool setUsedOnce(const T & f);
|
|
||||||
};
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <JoinKind KIND, JoinStrictness STRICTNESS, typename MapsTemplate>
|
||||||
|
class HashJoinMethods;
|
||||||
|
|
||||||
/** Data structure for implementation of JOIN.
|
/** Data structure for implementation of JOIN.
|
||||||
* It is just a hash table: keys -> rows of joined ("right") table.
|
* It is just a hash table: keys -> rows of joined ("right") table.
|
||||||
* Additionally, CROSS JOIN is supported: instead of hash table, it use just set of blocks without keys.
|
* Additionally, CROSS JOIN is supported: instead of hash table, it use just set of blocks without keys.
|
||||||
@ -400,8 +367,8 @@ public:
|
|||||||
|
|
||||||
const Block & savedBlockSample() const { return data->sample_block; }
|
const Block & savedBlockSample() const { return data->sample_block; }
|
||||||
|
|
||||||
bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); }
|
bool isUsed(size_t off) const;
|
||||||
bool isUsed(const Block * block_ptr, size_t row_idx) const { return used_flags.getUsedSafe(block_ptr, row_idx); }
|
bool isUsed(const Block * block_ptr, size_t row_idx) const;
|
||||||
|
|
||||||
void debugKeys() const;
|
void debugKeys() const;
|
||||||
|
|
||||||
@ -414,6 +381,9 @@ private:
|
|||||||
|
|
||||||
friend class JoinSource;
|
friend class JoinSource;
|
||||||
|
|
||||||
|
template <JoinKind KIND, JoinStrictness STRICTNESS, typename MapsTemplate>
|
||||||
|
friend class HashJoinMethods;
|
||||||
|
|
||||||
std::shared_ptr<TableJoin> table_join;
|
std::shared_ptr<TableJoin> table_join;
|
||||||
const JoinKind kind;
|
const JoinKind kind;
|
||||||
const JoinStrictness strictness;
|
const JoinStrictness strictness;
|
||||||
@ -433,8 +403,7 @@ private:
|
|||||||
/// Number of this flags equals to hashtable buffer size (plus one for zero value).
|
/// Number of this flags equals to hashtable buffer size (plus one for zero value).
|
||||||
/// Changes in hash table broke correspondence,
|
/// Changes in hash table broke correspondence,
|
||||||
/// so we must guarantee constantness of hash table during HashJoin lifetime (using method setLock)
|
/// so we must guarantee constantness of hash table during HashJoin lifetime (using method setLock)
|
||||||
mutable JoinStuff::JoinUsedFlags used_flags;
|
mutable std::unique_ptr<JoinStuff::JoinUsedFlags> used_flags;
|
||||||
|
|
||||||
RightTableDataPtr data;
|
RightTableDataPtr data;
|
||||||
bool have_compressed = false;
|
bool have_compressed = false;
|
||||||
|
|
||||||
@ -476,13 +445,6 @@ private:
|
|||||||
|
|
||||||
void initRightBlockStructure(Block & saved_block_sample);
|
void initRightBlockStructure(Block & saved_block_sample);
|
||||||
|
|
||||||
template <JoinKind KIND, JoinStrictness STRICTNESS, typename Maps>
|
|
||||||
Block joinBlockImpl(
|
|
||||||
Block & block,
|
|
||||||
const Block & block_with_columns_to_add,
|
|
||||||
const std::vector<const Maps *> & maps_,
|
|
||||||
bool is_join_get = false) const;
|
|
||||||
|
|
||||||
void joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed) const;
|
void joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed) const;
|
||||||
|
|
||||||
static Type chooseMethod(JoinKind kind, const ColumnRawPtrs & key_columns, Sizes & key_sizes);
|
static Type chooseMethod(JoinKind kind, const ColumnRawPtrs & key_columns, Sizes & key_sizes);
|
956
src/Interpreters/HashJoin/HashJoinMethods.h
Normal file
956
src/Interpreters/HashJoin/HashJoinMethods.h
Normal file
@ -0,0 +1,956 @@
|
|||||||
|
#pragma once
|
||||||
|
#include <Interpreters/HashJoin/HashJoin.h>
|
||||||
|
#include <Interpreters/HashJoin/KeyGetter.h>
|
||||||
|
#include <Interpreters/HashJoin/JoinFeatures.h>
|
||||||
|
#include <Interpreters/HashJoin/AddedColumns.h>
|
||||||
|
#include <Interpreters/HashJoin/KnowRowsHolder.h>
|
||||||
|
#include <Interpreters//HashJoin/JoinUsedFlags.h>
|
||||||
|
#include <Interpreters/JoinUtils.h>
|
||||||
|
#include <Interpreters/TableJoin.h>
|
||||||
|
#include <Interpreters/castColumn.h>
|
||||||
|
|
||||||
|
#include <Poco/Logger.h>
|
||||||
|
#include <Common/logger_useful.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int UNSUPPORTED_JOIN_KEYS;
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Inserting an element into a hash table of the form `key -> reference to a string`, which will then be used by JOIN.
|
||||||
|
template <typename HashMap, typename KeyGetter>
|
||||||
|
struct Inserter
|
||||||
|
{
|
||||||
|
static ALWAYS_INLINE bool
|
||||||
|
insertOne(const HashJoin & join, HashMap & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
|
||||||
|
{
|
||||||
|
auto emplace_result = key_getter.emplaceKey(map, i, pool);
|
||||||
|
|
||||||
|
if (emplace_result.isInserted() || join.anyTakeLastRow())
|
||||||
|
{
|
||||||
|
new (&emplace_result.getMapped()) typename HashMap::mapped_type(stored_block, i);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static ALWAYS_INLINE void insertAll(const HashJoin &, HashMap & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
|
||||||
|
{
|
||||||
|
auto emplace_result = key_getter.emplaceKey(map, i, pool);
|
||||||
|
|
||||||
|
if (emplace_result.isInserted())
|
||||||
|
new (&emplace_result.getMapped()) typename HashMap::mapped_type(stored_block, i);
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/// The first element of the list is stored in the value of the hash table, the rest in the pool.
|
||||||
|
emplace_result.getMapped().insert({stored_block, i}, pool);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static ALWAYS_INLINE void insertAsof(
|
||||||
|
HashJoin & join, HashMap & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn & asof_column)
|
||||||
|
{
|
||||||
|
auto emplace_result = key_getter.emplaceKey(map, i, pool);
|
||||||
|
typename HashMap::mapped_type * time_series_map = &emplace_result.getMapped();
|
||||||
|
|
||||||
|
TypeIndex asof_type = *join.getAsofType();
|
||||||
|
if (emplace_result.isInserted())
|
||||||
|
time_series_map = new (time_series_map) typename HashMap::mapped_type(createAsofRowRef(asof_type, join.getAsofInequality()));
|
||||||
|
(*time_series_map)->insert(asof_column, stored_block, i);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/// MapsTemplate is one of MapsOne, MapsAll and MapsAsof
|
||||||
|
template <JoinKind KIND, JoinStrictness STRICTNESS, typename MapsTemplate>
|
||||||
|
class HashJoinMethods
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
static size_t insertFromBlockImpl(
|
||||||
|
HashJoin & join,
|
||||||
|
HashJoin::Type type,
|
||||||
|
MapsTemplate & maps,
|
||||||
|
size_t rows,
|
||||||
|
const ColumnRawPtrs & key_columns,
|
||||||
|
const Sizes & key_sizes,
|
||||||
|
Block * stored_block,
|
||||||
|
ConstNullMapPtr null_map,
|
||||||
|
UInt8ColumnDataPtr join_mask,
|
||||||
|
Arena & pool,
|
||||||
|
bool & is_inserted)
|
||||||
|
{
|
||||||
|
switch (type)
|
||||||
|
{
|
||||||
|
case HashJoin::Type::EMPTY:
|
||||||
|
[[fallthrough]];
|
||||||
|
case HashJoin::Type::CROSS:
|
||||||
|
/// Do nothing. We will only save block, and it is enough
|
||||||
|
is_inserted = true;
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
#define M(TYPE) \
|
||||||
|
case HashJoin::Type::TYPE: \
|
||||||
|
return insertFromBlockImplTypeCase<typename KeyGetterForType<HashJoin::Type::TYPE, std::remove_reference_t<decltype(*maps.TYPE)>>::Type>(\
|
||||||
|
join, *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, join_mask, pool, is_inserted); \
|
||||||
|
break;
|
||||||
|
|
||||||
|
APPLY_FOR_JOIN_VARIANTS(M)
|
||||||
|
#undef M
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
using MapsTemplateVector = std::vector<const MapsTemplate *>;
|
||||||
|
|
||||||
|
static Block joinBlockImpl(
|
||||||
|
const HashJoin & join,
|
||||||
|
Block & block,
|
||||||
|
const Block & block_with_columns_to_add,
|
||||||
|
const MapsTemplateVector & maps_,
|
||||||
|
bool is_join_get = false)
|
||||||
|
{
|
||||||
|
constexpr JoinFeatures<KIND, STRICTNESS> join_features;
|
||||||
|
|
||||||
|
std::vector<JoinOnKeyColumns> join_on_keys;
|
||||||
|
const auto & onexprs = join.table_join->getClauses();
|
||||||
|
for (size_t i = 0; i < onexprs.size(); ++i)
|
||||||
|
{
|
||||||
|
const auto & key_names = !is_join_get ? onexprs[i].key_names_left : onexprs[i].key_names_right;
|
||||||
|
join_on_keys.emplace_back(block, key_names, onexprs[i].condColumnNames().first, join.key_sizes[i]);
|
||||||
|
}
|
||||||
|
size_t existing_columns = block.columns();
|
||||||
|
|
||||||
|
/** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized.
|
||||||
|
* Because if they are constants, then in the "not joined" rows, they may have different values
|
||||||
|
* - default values, which can differ from the values of these constants.
|
||||||
|
*/
|
||||||
|
if constexpr (join_features.right || join_features.full)
|
||||||
|
{
|
||||||
|
materializeBlockInplace(block);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** For LEFT/INNER JOIN, the saved blocks do not contain keys.
|
||||||
|
* For FULL/RIGHT JOIN, the saved blocks contain keys;
|
||||||
|
* but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped.
|
||||||
|
* For ASOF, the last column is used as the ASOF column
|
||||||
|
*/
|
||||||
|
AddedColumns<!join_features.is_any_join> added_columns(
|
||||||
|
block,
|
||||||
|
block_with_columns_to_add,
|
||||||
|
join.savedBlockSample(),
|
||||||
|
join,
|
||||||
|
std::move(join_on_keys),
|
||||||
|
join.table_join->getMixedJoinExpression(),
|
||||||
|
join_features.is_asof_join,
|
||||||
|
is_join_get);
|
||||||
|
|
||||||
|
bool has_required_right_keys = (join.required_right_keys.columns() != 0);
|
||||||
|
added_columns.need_filter = join_features.need_filter || has_required_right_keys;
|
||||||
|
added_columns.max_joined_block_rows = join.max_joined_block_rows;
|
||||||
|
if (!added_columns.max_joined_block_rows)
|
||||||
|
added_columns.max_joined_block_rows = std::numeric_limits<size_t>::max();
|
||||||
|
else
|
||||||
|
added_columns.reserve(join_features.need_replication);
|
||||||
|
|
||||||
|
size_t num_joined = switchJoinRightColumns(maps_, added_columns, join.data->type, *join.used_flags);
|
||||||
|
/// Do not hold memory for join_on_keys anymore
|
||||||
|
added_columns.join_on_keys.clear();
|
||||||
|
Block remaining_block = sliceBlock(block, num_joined);
|
||||||
|
|
||||||
|
added_columns.buildOutput();
|
||||||
|
for (size_t i = 0; i < added_columns.size(); ++i)
|
||||||
|
block.insert(added_columns.moveColumn(i));
|
||||||
|
|
||||||
|
std::vector<size_t> right_keys_to_replicate [[maybe_unused]];
|
||||||
|
|
||||||
|
if constexpr (join_features.need_filter)
|
||||||
|
{
|
||||||
|
/// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones.
|
||||||
|
for (size_t i = 0; i < existing_columns; ++i)
|
||||||
|
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(added_columns.filter, -1);
|
||||||
|
|
||||||
|
/// Add join key columns from right block if needed using value from left table because of equality
|
||||||
|
for (size_t i = 0; i < join.required_right_keys.columns(); ++i)
|
||||||
|
{
|
||||||
|
const auto & right_key = join.required_right_keys.getByPosition(i);
|
||||||
|
/// asof column is already in block.
|
||||||
|
if (join_features.is_asof_join && right_key.name == join.table_join->getOnlyClause().key_names_right.back())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
const auto & left_column = block.getByName(join.required_right_keys_sources[i]);
|
||||||
|
const auto & right_col_name = join.getTableJoin().renamedRightColumnName(right_key.name);
|
||||||
|
auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column);
|
||||||
|
block.insert(std::move(right_col));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (has_required_right_keys)
|
||||||
|
{
|
||||||
|
/// Add join key columns from right block if needed.
|
||||||
|
for (size_t i = 0; i < join.required_right_keys.columns(); ++i)
|
||||||
|
{
|
||||||
|
const auto & right_key = join.required_right_keys.getByPosition(i);
|
||||||
|
auto right_col_name = join.getTableJoin().renamedRightColumnName(right_key.name);
|
||||||
|
/// asof column is already in block.
|
||||||
|
if (join_features.is_asof_join && right_key.name == join.table_join->getOnlyClause().key_names_right.back())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
const auto & left_column = block.getByName(join.required_right_keys_sources[i]);
|
||||||
|
auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column, &added_columns.filter);
|
||||||
|
block.insert(std::move(right_col));
|
||||||
|
|
||||||
|
if constexpr (join_features.need_replication)
|
||||||
|
right_keys_to_replicate.push_back(block.getPositionByName(right_col_name));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if constexpr (join_features.need_replication)
|
||||||
|
{
|
||||||
|
std::unique_ptr<IColumn::Offsets> & offsets_to_replicate = added_columns.offsets_to_replicate;
|
||||||
|
|
||||||
|
/// If ALL ... JOIN - we replicate all the columns except the new ones.
|
||||||
|
for (size_t i = 0; i < existing_columns; ++i)
|
||||||
|
{
|
||||||
|
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicate(*offsets_to_replicate);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Replicate additional right keys
|
||||||
|
for (size_t pos : right_keys_to_replicate)
|
||||||
|
{
|
||||||
|
block.safeGetByPosition(pos).column = block.safeGetByPosition(pos).column->replicate(*offsets_to_replicate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return remaining_block;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
template <typename KeyGetter, bool is_asof_join>
|
||||||
|
static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes)
|
||||||
|
{
|
||||||
|
if constexpr (is_asof_join)
|
||||||
|
{
|
||||||
|
auto key_column_copy = key_columns;
|
||||||
|
auto key_size_copy = key_sizes;
|
||||||
|
key_column_copy.pop_back();
|
||||||
|
key_size_copy.pop_back();
|
||||||
|
return KeyGetter(key_column_copy, key_size_copy, nullptr);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
return KeyGetter(key_columns, key_sizes, nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename KeyGetter, typename HashMap>
|
||||||
|
static size_t NO_INLINE insertFromBlockImplTypeCase(
|
||||||
|
HashJoin & join, HashMap & map, size_t rows, const ColumnRawPtrs & key_columns,
|
||||||
|
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, UInt8ColumnDataPtr join_mask, Arena & pool, bool & is_inserted)
|
||||||
|
{
|
||||||
|
[[maybe_unused]] constexpr bool mapped_one = std::is_same_v<typename HashMap::mapped_type, RowRef>;
|
||||||
|
constexpr bool is_asof_join = STRICTNESS == JoinStrictness::Asof;
|
||||||
|
|
||||||
|
const IColumn * asof_column [[maybe_unused]] = nullptr;
|
||||||
|
if constexpr (is_asof_join)
|
||||||
|
asof_column = key_columns.back();
|
||||||
|
|
||||||
|
auto key_getter = createKeyGetter<KeyGetter, is_asof_join>(key_columns, key_sizes);
|
||||||
|
|
||||||
|
/// For ALL and ASOF join always insert values
|
||||||
|
is_inserted = !mapped_one || is_asof_join;
|
||||||
|
|
||||||
|
for (size_t i = 0; i < rows; ++i)
|
||||||
|
{
|
||||||
|
if (null_map && (*null_map)[i])
|
||||||
|
{
|
||||||
|
/// nulls are not inserted into hash table,
|
||||||
|
/// keep them for RIGHT and FULL joins
|
||||||
|
is_inserted = true;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check condition for right table from ON section
|
||||||
|
if (join_mask && !(*join_mask)[i])
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if constexpr (is_asof_join)
|
||||||
|
Inserter<HashMap, KeyGetter>::insertAsof(join, map, key_getter, stored_block, i, pool, *asof_column);
|
||||||
|
else if constexpr (mapped_one)
|
||||||
|
is_inserted |= Inserter<HashMap, KeyGetter>::insertOne(join, map, key_getter, stored_block, i, pool);
|
||||||
|
else
|
||||||
|
Inserter<HashMap, KeyGetter>::insertAll(join, map, key_getter, stored_block, i, pool);
|
||||||
|
}
|
||||||
|
return map.getBufferSizeInCells();
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename AddedColumns>
|
||||||
|
static size_t switchJoinRightColumns(
|
||||||
|
const std::vector<const MapsTemplate *> & mapv,
|
||||||
|
AddedColumns & added_columns,
|
||||||
|
HashJoin::Type type,
|
||||||
|
JoinStuff::JoinUsedFlags & used_flags)
|
||||||
|
{
|
||||||
|
constexpr bool is_asof_join = STRICTNESS == JoinStrictness::Asof;
|
||||||
|
switch (type)
|
||||||
|
{
|
||||||
|
case HashJoin::Type::EMPTY: {
|
||||||
|
if constexpr (!is_asof_join)
|
||||||
|
{
|
||||||
|
using KeyGetter = KeyGetterEmpty<typename MapsTemplate::MappedType>;
|
||||||
|
std::vector<KeyGetter> key_getter_vector;
|
||||||
|
key_getter_vector.emplace_back();
|
||||||
|
|
||||||
|
using MapTypeVal = typename KeyGetter::MappedType;
|
||||||
|
std::vector<const MapTypeVal *> a_map_type_vector;
|
||||||
|
a_map_type_vector.emplace_back();
|
||||||
|
return joinRightColumnsSwitchNullability<KeyGetter>(
|
||||||
|
std::move(key_getter_vector), a_map_type_vector, added_columns, used_flags);
|
||||||
|
}
|
||||||
|
throw Exception(ErrorCodes::UNSUPPORTED_JOIN_KEYS, "Unsupported JOIN keys. Type: {}", type);
|
||||||
|
}
|
||||||
|
#define M(TYPE) \
|
||||||
|
case HashJoin::Type::TYPE: \
|
||||||
|
{ \
|
||||||
|
using MapTypeVal = const typename std::remove_reference_t<decltype(MapsTemplate::TYPE)>::element_type; \
|
||||||
|
using KeyGetter = typename KeyGetterForType<HashJoin::Type::TYPE, MapTypeVal>::Type; \
|
||||||
|
std::vector<const MapTypeVal *> a_map_type_vector(mapv.size()); \
|
||||||
|
std::vector<KeyGetter> key_getter_vector; \
|
||||||
|
for (size_t d = 0; d < added_columns.join_on_keys.size(); ++d) \
|
||||||
|
{ \
|
||||||
|
const auto & join_on_key = added_columns.join_on_keys[d]; \
|
||||||
|
a_map_type_vector[d] = mapv[d]->TYPE.get(); \
|
||||||
|
key_getter_vector.push_back(std::move(createKeyGetter<KeyGetter, is_asof_join>(join_on_key.key_columns, join_on_key.key_sizes))); \
|
||||||
|
} \
|
||||||
|
return joinRightColumnsSwitchNullability<KeyGetter>( \
|
||||||
|
std::move(key_getter_vector), a_map_type_vector, added_columns, used_flags); \
|
||||||
|
}
|
||||||
|
APPLY_FOR_JOIN_VARIANTS(M)
|
||||||
|
#undef M
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw Exception(ErrorCodes::UNSUPPORTED_JOIN_KEYS, "Unsupported JOIN keys (type: {})", type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename KeyGetter, typename Map, typename AddedColumns>
|
||||||
|
static size_t joinRightColumnsSwitchNullability(
|
||||||
|
std::vector<KeyGetter> && key_getter_vector,
|
||||||
|
const std::vector<const Map *> & mapv,
|
||||||
|
AddedColumns & added_columns,
|
||||||
|
JoinStuff::JoinUsedFlags & used_flags)
|
||||||
|
{
|
||||||
|
if (added_columns.need_filter)
|
||||||
|
{
|
||||||
|
return joinRightColumnsSwitchMultipleDisjuncts<KeyGetter, Map, true>(
|
||||||
|
std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return joinRightColumnsSwitchMultipleDisjuncts<KeyGetter, Map, false>(
|
||||||
|
std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename KeyGetter, typename Map, bool need_filter, typename AddedColumns>
|
||||||
|
static size_t joinRightColumnsSwitchMultipleDisjuncts(
|
||||||
|
std::vector<KeyGetter> && key_getter_vector,
|
||||||
|
const std::vector<const Map *> & mapv,
|
||||||
|
AddedColumns & added_columns,
|
||||||
|
JoinStuff::JoinUsedFlags & used_flags)
|
||||||
|
{
|
||||||
|
constexpr JoinFeatures<KIND, STRICTNESS> join_features;
|
||||||
|
if constexpr (join_features.is_all_join)
|
||||||
|
{
|
||||||
|
if (added_columns.additional_filter_expression)
|
||||||
|
{
|
||||||
|
bool mark_per_row_used = join_features.right || join_features.full || mapv.size() > 1;
|
||||||
|
return joinRightColumnsWithAddtitionalFilter<KeyGetter, Map, join_features.need_replication>(
|
||||||
|
std::forward<std::vector<KeyGetter>>(key_getter_vector),
|
||||||
|
mapv,
|
||||||
|
added_columns,
|
||||||
|
used_flags,
|
||||||
|
need_filter,
|
||||||
|
join_features.need_flags,
|
||||||
|
join_features.add_missing,
|
||||||
|
mark_per_row_used);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (added_columns.additional_filter_expression)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Additional filter expression is not supported for this JOIN");
|
||||||
|
|
||||||
|
return mapv.size() > 1 ? joinRightColumns<KeyGetter, Map, need_filter, true>(
|
||||||
|
std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags)
|
||||||
|
: joinRightColumns<KeyGetter, Map, need_filter, false>(
|
||||||
|
std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Joins right table columns which indexes are present in right_indexes using specified map.
|
||||||
|
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
|
||||||
|
template <typename KeyGetter, typename Map, bool need_filter, bool flag_per_row, typename AddedColumns>
|
||||||
|
static size_t joinRightColumns(
|
||||||
|
std::vector<KeyGetter> && key_getter_vector,
|
||||||
|
const std::vector<const Map *> & mapv,
|
||||||
|
AddedColumns & added_columns,
|
||||||
|
JoinStuff::JoinUsedFlags & used_flags)
|
||||||
|
{
|
||||||
|
constexpr JoinFeatures<KIND, STRICTNESS> join_features;
|
||||||
|
|
||||||
|
size_t rows = added_columns.rows_to_add;
|
||||||
|
if constexpr (need_filter)
|
||||||
|
added_columns.filter = IColumn::Filter(rows, 0);
|
||||||
|
|
||||||
|
Arena pool;
|
||||||
|
|
||||||
|
if constexpr (join_features.need_replication)
|
||||||
|
added_columns.offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows);
|
||||||
|
|
||||||
|
IColumn::Offset current_offset = 0;
|
||||||
|
size_t max_joined_block_rows = added_columns.max_joined_block_rows;
|
||||||
|
size_t i = 0;
|
||||||
|
for (; i < rows; ++i)
|
||||||
|
{
|
||||||
|
if constexpr (join_features.need_replication)
|
||||||
|
{
|
||||||
|
if (unlikely(current_offset >= max_joined_block_rows))
|
||||||
|
{
|
||||||
|
added_columns.offsets_to_replicate->resize_assume_reserved(i);
|
||||||
|
added_columns.filter.resize_assume_reserved(i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool right_row_found = false;
|
||||||
|
|
||||||
|
KnownRowsHolder<flag_per_row> known_rows;
|
||||||
|
for (size_t onexpr_idx = 0; onexpr_idx < added_columns.join_on_keys.size(); ++onexpr_idx)
|
||||||
|
{
|
||||||
|
const auto & join_keys = added_columns.join_on_keys[onexpr_idx];
|
||||||
|
if (join_keys.null_map && (*join_keys.null_map)[i])
|
||||||
|
continue;
|
||||||
|
|
||||||
|
bool row_acceptable = !join_keys.isRowFiltered(i);
|
||||||
|
using FindResult = typename KeyGetter::FindResult;
|
||||||
|
auto find_result = row_acceptable ? key_getter_vector[onexpr_idx].findKey(*(mapv[onexpr_idx]), i, pool) : FindResult();
|
||||||
|
|
||||||
|
if (find_result.isFound())
|
||||||
|
{
|
||||||
|
right_row_found = true;
|
||||||
|
auto & mapped = find_result.getMapped();
|
||||||
|
if constexpr (join_features.is_asof_join)
|
||||||
|
{
|
||||||
|
const IColumn & left_asof_key = added_columns.leftAsofKey();
|
||||||
|
|
||||||
|
auto row_ref = mapped->findAsof(left_asof_key, i);
|
||||||
|
if (row_ref.block)
|
||||||
|
{
|
||||||
|
setUsed<need_filter>(added_columns.filter, i);
|
||||||
|
if constexpr (flag_per_row)
|
||||||
|
used_flags.template setUsed<join_features.need_flags, flag_per_row>(row_ref.block, row_ref.row_num, 0);
|
||||||
|
else
|
||||||
|
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
|
||||||
|
|
||||||
|
added_columns.appendFromBlock(*row_ref.block, row_ref.row_num, join_features.add_missing);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
addNotFoundRow<join_features.add_missing, join_features.need_replication>(added_columns, current_offset);
|
||||||
|
}
|
||||||
|
else if constexpr (join_features.is_all_join)
|
||||||
|
{
|
||||||
|
setUsed<need_filter>(added_columns.filter, i);
|
||||||
|
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
|
||||||
|
auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr;
|
||||||
|
addFoundRowAll<Map, join_features.add_missing>(mapped, added_columns, current_offset, known_rows, used_flags_opt);
|
||||||
|
}
|
||||||
|
else if constexpr ((join_features.is_any_join || join_features.is_semi_join) && join_features.right)
|
||||||
|
{
|
||||||
|
/// Use first appeared left key + it needs left columns replication
|
||||||
|
bool used_once = used_flags.template setUsedOnce<join_features.need_flags, flag_per_row>(find_result);
|
||||||
|
if (used_once)
|
||||||
|
{
|
||||||
|
auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr;
|
||||||
|
setUsed<need_filter>(added_columns.filter, i);
|
||||||
|
addFoundRowAll<Map, join_features.add_missing>(
|
||||||
|
mapped, added_columns, current_offset, known_rows, used_flags_opt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if constexpr (join_features.is_any_join && KIND == JoinKind::Inner)
|
||||||
|
{
|
||||||
|
bool used_once = used_flags.template setUsedOnce<join_features.need_flags, flag_per_row>(find_result);
|
||||||
|
|
||||||
|
/// Use first appeared left key only
|
||||||
|
if (used_once)
|
||||||
|
{
|
||||||
|
setUsed<need_filter>(added_columns.filter, i);
|
||||||
|
added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing);
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else if constexpr (join_features.is_any_join && join_features.full)
|
||||||
|
{
|
||||||
|
/// TODO
|
||||||
|
}
|
||||||
|
else if constexpr (join_features.is_anti_join)
|
||||||
|
{
|
||||||
|
if constexpr (join_features.right && join_features.need_flags)
|
||||||
|
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
|
||||||
|
}
|
||||||
|
else /// ANY LEFT, SEMI LEFT, old ANY (RightAny)
|
||||||
|
{
|
||||||
|
setUsed<need_filter>(added_columns.filter, i);
|
||||||
|
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
|
||||||
|
added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing);
|
||||||
|
|
||||||
|
if (join_features.is_any_or_semi_join)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!right_row_found)
|
||||||
|
{
|
||||||
|
if constexpr (join_features.is_anti_join && join_features.left)
|
||||||
|
setUsed<need_filter>(added_columns.filter, i);
|
||||||
|
addNotFoundRow<join_features.add_missing, join_features.need_replication>(added_columns, current_offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
if constexpr (join_features.need_replication)
|
||||||
|
{
|
||||||
|
(*added_columns.offsets_to_replicate)[i] = current_offset;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
added_columns.applyLazyDefaults();
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <bool need_filter>
|
||||||
|
static void setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unused]])
|
||||||
|
{
|
||||||
|
if constexpr (need_filter)
|
||||||
|
filter[pos] = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename AddedColumns>
|
||||||
|
static ColumnPtr buildAdditionalFilter(
|
||||||
|
size_t left_start_row,
|
||||||
|
const std::vector<RowRef> & selected_rows,
|
||||||
|
const std::vector<size_t> & row_replicate_offset,
|
||||||
|
AddedColumns & added_columns)
|
||||||
|
{
|
||||||
|
ColumnPtr result_column;
|
||||||
|
do
|
||||||
|
{
|
||||||
|
if (selected_rows.empty())
|
||||||
|
{
|
||||||
|
result_column = ColumnUInt8::create();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
const Block & sample_right_block = *selected_rows.begin()->block;
|
||||||
|
if (!sample_right_block || !added_columns.additional_filter_expression)
|
||||||
|
{
|
||||||
|
auto filter = ColumnUInt8::create();
|
||||||
|
filter->insertMany(1, selected_rows.size());
|
||||||
|
result_column = std::move(filter);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto required_cols = added_columns.additional_filter_expression->getRequiredColumnsWithTypes();
|
||||||
|
if (required_cols.empty())
|
||||||
|
{
|
||||||
|
Block block;
|
||||||
|
added_columns.additional_filter_expression->execute(block);
|
||||||
|
result_column = block.getByPosition(0).column->cloneResized(selected_rows.size());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
NameSet required_column_names;
|
||||||
|
for (auto & col : required_cols)
|
||||||
|
required_column_names.insert(col.name);
|
||||||
|
|
||||||
|
Block executed_block;
|
||||||
|
size_t right_col_pos = 0;
|
||||||
|
for (const auto & col : sample_right_block.getColumnsWithTypeAndName())
|
||||||
|
{
|
||||||
|
if (required_column_names.contains(col.name))
|
||||||
|
{
|
||||||
|
auto new_col = col.column->cloneEmpty();
|
||||||
|
for (const auto & selected_row : selected_rows)
|
||||||
|
{
|
||||||
|
const auto & src_col = selected_row.block->getByPosition(right_col_pos);
|
||||||
|
new_col->insertFrom(*src_col.column, selected_row.row_num);
|
||||||
|
}
|
||||||
|
executed_block.insert({std::move(new_col), col.type, col.name});
|
||||||
|
}
|
||||||
|
right_col_pos += 1;
|
||||||
|
}
|
||||||
|
if (!executed_block)
|
||||||
|
{
|
||||||
|
result_column = ColumnUInt8::create();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto & col_name : required_column_names)
|
||||||
|
{
|
||||||
|
const auto * src_col = added_columns.left_block.findByName(col_name);
|
||||||
|
if (!src_col)
|
||||||
|
continue;
|
||||||
|
auto new_col = src_col->column->cloneEmpty();
|
||||||
|
size_t prev_left_offset = 0;
|
||||||
|
for (size_t i = 1; i < row_replicate_offset.size(); ++i)
|
||||||
|
{
|
||||||
|
const size_t & left_offset = row_replicate_offset[i];
|
||||||
|
size_t rows = left_offset - prev_left_offset;
|
||||||
|
if (rows)
|
||||||
|
new_col->insertManyFrom(*src_col->column, left_start_row + i - 1, rows);
|
||||||
|
prev_left_offset = left_offset;
|
||||||
|
}
|
||||||
|
executed_block.insert({std::move(new_col), src_col->type, col_name});
|
||||||
|
}
|
||||||
|
if (!executed_block)
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::LOGICAL_ERROR,
|
||||||
|
"required columns: [{}], but not found any in left/right table. right table: {}, left table: {}",
|
||||||
|
required_cols.toString(),
|
||||||
|
sample_right_block.dumpNames(),
|
||||||
|
added_columns.left_block.dumpNames());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto & col : executed_block.getColumnsWithTypeAndName())
|
||||||
|
if (!col.column || !col.type)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal nullptr column in input block: {}", executed_block.dumpStructure());
|
||||||
|
|
||||||
|
added_columns.additional_filter_expression->execute(executed_block);
|
||||||
|
result_column = executed_block.getByPosition(0).column->convertToFullColumnIfConst();
|
||||||
|
executed_block.clear();
|
||||||
|
} while (false);
|
||||||
|
|
||||||
|
result_column = result_column->convertToFullIfNeeded();
|
||||||
|
if (result_column->isNullable())
|
||||||
|
{
|
||||||
|
/// Convert Nullable(UInt8) to UInt8 ensuring that nulls are zeros
|
||||||
|
/// Trying to avoid copying data, since we are the only owner of the column.
|
||||||
|
ColumnPtr mask_column = assert_cast<const ColumnNullable &>(*result_column).getNullMapColumnPtr();
|
||||||
|
|
||||||
|
MutableColumnPtr mutable_column;
|
||||||
|
{
|
||||||
|
ColumnPtr nested_column = assert_cast<const ColumnNullable &>(*result_column).getNestedColumnPtr();
|
||||||
|
result_column.reset();
|
||||||
|
mutable_column = IColumn::mutate(std::move(nested_column));
|
||||||
|
}
|
||||||
|
|
||||||
|
auto & column_data = assert_cast<ColumnUInt8 &>(*mutable_column).getData();
|
||||||
|
const auto & mask_column_data = assert_cast<const ColumnUInt8 &>(*mask_column).getData();
|
||||||
|
for (size_t i = 0; i < column_data.size(); ++i)
|
||||||
|
{
|
||||||
|
if (mask_column_data[i])
|
||||||
|
column_data[i] = 0;
|
||||||
|
}
|
||||||
|
return mutable_column;
|
||||||
|
}
|
||||||
|
return result_column;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// First to collect all matched rows refs by join keys, then filter out rows which are not true in additional filter expression.
|
||||||
|
template <typename KeyGetter, typename Map, bool need_replication, typename AddedColumns>
|
||||||
|
static size_t joinRightColumnsWithAddtitionalFilter(
|
||||||
|
std::vector<KeyGetter> && key_getter_vector,
|
||||||
|
const std::vector<const Map *> & mapv,
|
||||||
|
AddedColumns & added_columns,
|
||||||
|
JoinStuff::JoinUsedFlags & used_flags [[maybe_unused]],
|
||||||
|
bool need_filter [[maybe_unused]],
|
||||||
|
bool need_flags [[maybe_unused]],
|
||||||
|
bool add_missing [[maybe_unused]],
|
||||||
|
bool flag_per_row [[maybe_unused]])
|
||||||
|
{
|
||||||
|
size_t left_block_rows = added_columns.rows_to_add;
|
||||||
|
if (need_filter)
|
||||||
|
added_columns.filter = IColumn::Filter(left_block_rows, 0);
|
||||||
|
|
||||||
|
std::unique_ptr<Arena> pool;
|
||||||
|
|
||||||
|
if constexpr (need_replication)
|
||||||
|
added_columns.offsets_to_replicate = std::make_unique<IColumn::Offsets>(left_block_rows);
|
||||||
|
|
||||||
|
std::vector<size_t> row_replicate_offset;
|
||||||
|
row_replicate_offset.reserve(left_block_rows);
|
||||||
|
|
||||||
|
using FindResult = typename KeyGetter::FindResult;
|
||||||
|
size_t max_joined_block_rows = added_columns.max_joined_block_rows;
|
||||||
|
size_t left_row_iter = 0;
|
||||||
|
PreSelectedRows selected_rows;
|
||||||
|
selected_rows.reserve(left_block_rows);
|
||||||
|
std::vector<FindResult> find_results;
|
||||||
|
find_results.reserve(left_block_rows);
|
||||||
|
bool exceeded_max_block_rows = false;
|
||||||
|
IColumn::Offset total_added_rows = 0;
|
||||||
|
IColumn::Offset current_added_rows = 0;
|
||||||
|
|
||||||
|
auto collect_keys_matched_rows_refs = [&]()
|
||||||
|
{
|
||||||
|
pool = std::make_unique<Arena>();
|
||||||
|
find_results.clear();
|
||||||
|
row_replicate_offset.clear();
|
||||||
|
row_replicate_offset.push_back(0);
|
||||||
|
current_added_rows = 0;
|
||||||
|
selected_rows.clear();
|
||||||
|
for (; left_row_iter < left_block_rows; ++left_row_iter)
|
||||||
|
{
|
||||||
|
if constexpr (need_replication)
|
||||||
|
{
|
||||||
|
if (unlikely(total_added_rows + current_added_rows >= max_joined_block_rows))
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
KnownRowsHolder<true> all_flag_known_rows;
|
||||||
|
KnownRowsHolder<false> single_flag_know_rows;
|
||||||
|
for (size_t join_clause_idx = 0; join_clause_idx < added_columns.join_on_keys.size(); ++join_clause_idx)
|
||||||
|
{
|
||||||
|
const auto & join_keys = added_columns.join_on_keys[join_clause_idx];
|
||||||
|
if (join_keys.null_map && (*join_keys.null_map)[left_row_iter])
|
||||||
|
continue;
|
||||||
|
|
||||||
|
bool row_acceptable = !join_keys.isRowFiltered(left_row_iter);
|
||||||
|
auto find_result = row_acceptable
|
||||||
|
? key_getter_vector[join_clause_idx].findKey(*(mapv[join_clause_idx]), left_row_iter, *pool)
|
||||||
|
: FindResult();
|
||||||
|
|
||||||
|
if (find_result.isFound())
|
||||||
|
{
|
||||||
|
auto & mapped = find_result.getMapped();
|
||||||
|
find_results.push_back(find_result);
|
||||||
|
if (flag_per_row)
|
||||||
|
addFoundRowAll<Map, false, true>(mapped, selected_rows, current_added_rows, all_flag_known_rows, nullptr);
|
||||||
|
else
|
||||||
|
addFoundRowAll<Map, false, false>(mapped, selected_rows, current_added_rows, single_flag_know_rows, nullptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
row_replicate_offset.push_back(current_added_rows);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
auto copy_final_matched_rows = [&](size_t left_start_row, ColumnPtr filter_col)
|
||||||
|
{
|
||||||
|
const PaddedPODArray<UInt8> & filter_flags = assert_cast<const ColumnUInt8 &>(*filter_col).getData();
|
||||||
|
|
||||||
|
size_t prev_replicated_row = 0;
|
||||||
|
auto selected_right_row_it = selected_rows.begin();
|
||||||
|
size_t find_result_index = 0;
|
||||||
|
for (size_t i = 1, n = row_replicate_offset.size(); i < n; ++i)
|
||||||
|
{
|
||||||
|
bool any_matched = false;
|
||||||
|
/// For all right join, flag_per_row is true, we need mark used flags for each row.
|
||||||
|
if (flag_per_row)
|
||||||
|
{
|
||||||
|
for (size_t replicated_row = prev_replicated_row; replicated_row < row_replicate_offset[i]; ++replicated_row)
|
||||||
|
{
|
||||||
|
if (filter_flags[replicated_row])
|
||||||
|
{
|
||||||
|
any_matched = true;
|
||||||
|
added_columns.appendFromBlock(*selected_right_row_it->block, selected_right_row_it->row_num, add_missing);
|
||||||
|
total_added_rows += 1;
|
||||||
|
if (need_flags)
|
||||||
|
used_flags.template setUsed<true, true>(selected_right_row_it->block, selected_right_row_it->row_num, 0);
|
||||||
|
}
|
||||||
|
++selected_right_row_it;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
for (size_t replicated_row = prev_replicated_row; replicated_row < row_replicate_offset[i]; ++replicated_row)
|
||||||
|
{
|
||||||
|
if (filter_flags[replicated_row])
|
||||||
|
{
|
||||||
|
any_matched = true;
|
||||||
|
added_columns.appendFromBlock(*selected_right_row_it->block, selected_right_row_it->row_num, add_missing);
|
||||||
|
total_added_rows += 1;
|
||||||
|
}
|
||||||
|
++selected_right_row_it;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!any_matched)
|
||||||
|
{
|
||||||
|
if (add_missing)
|
||||||
|
addNotFoundRow<true, need_replication>(added_columns, total_added_rows);
|
||||||
|
else
|
||||||
|
addNotFoundRow<false, need_replication>(added_columns, total_added_rows);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (!flag_per_row && need_flags)
|
||||||
|
used_flags.template setUsed<true, false>(find_results[find_result_index]);
|
||||||
|
if (need_filter)
|
||||||
|
setUsed<true>(added_columns.filter, left_start_row + i - 1);
|
||||||
|
if (add_missing)
|
||||||
|
added_columns.applyLazyDefaults();
|
||||||
|
}
|
||||||
|
find_result_index += (prev_replicated_row != row_replicate_offset[i]);
|
||||||
|
|
||||||
|
if constexpr (need_replication)
|
||||||
|
{
|
||||||
|
(*added_columns.offsets_to_replicate)[left_start_row + i - 1] = total_added_rows;
|
||||||
|
}
|
||||||
|
prev_replicated_row = row_replicate_offset[i];
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
while (left_row_iter < left_block_rows && !exceeded_max_block_rows)
|
||||||
|
{
|
||||||
|
auto left_start_row = left_row_iter;
|
||||||
|
collect_keys_matched_rows_refs();
|
||||||
|
if (selected_rows.size() != current_added_rows || row_replicate_offset.size() != left_row_iter - left_start_row + 1)
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::LOGICAL_ERROR,
|
||||||
|
"Sizes are mismatched. selected_rows.size:{}, current_added_rows:{}, row_replicate_offset.size:{}, left_row_iter: {}, "
|
||||||
|
"left_start_row: {}",
|
||||||
|
selected_rows.size(),
|
||||||
|
current_added_rows,
|
||||||
|
row_replicate_offset.size(),
|
||||||
|
left_row_iter,
|
||||||
|
left_start_row);
|
||||||
|
}
|
||||||
|
auto filter_col = buildAdditionalFilter(left_start_row, selected_rows, row_replicate_offset, added_columns);
|
||||||
|
copy_final_matched_rows(left_start_row, filter_col);
|
||||||
|
|
||||||
|
if constexpr (need_replication)
|
||||||
|
{
|
||||||
|
// Add a check for current_added_rows to avoid run the filter expression on too small size batch.
|
||||||
|
if (total_added_rows >= max_joined_block_rows || current_added_rows < 1024)
|
||||||
|
{
|
||||||
|
exceeded_max_block_rows = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if constexpr (need_replication)
|
||||||
|
{
|
||||||
|
added_columns.offsets_to_replicate->resize_assume_reserved(left_row_iter);
|
||||||
|
added_columns.filter.resize_assume_reserved(left_row_iter);
|
||||||
|
}
|
||||||
|
added_columns.applyLazyDefaults();
|
||||||
|
return left_row_iter;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cut first num_rows rows from block in place and returns block with remaining rows
|
||||||
|
static Block sliceBlock(Block & block, size_t num_rows)
|
||||||
|
{
|
||||||
|
size_t total_rows = block.rows();
|
||||||
|
if (num_rows >= total_rows)
|
||||||
|
return {};
|
||||||
|
size_t remaining_rows = total_rows - num_rows;
|
||||||
|
Block remaining_block = block.cloneEmpty();
|
||||||
|
for (size_t i = 0; i < block.columns(); ++i)
|
||||||
|
{
|
||||||
|
auto & col = block.getByPosition(i);
|
||||||
|
remaining_block.getByPosition(i).column = col.column->cut(num_rows, remaining_rows);
|
||||||
|
col.column = col.column->cut(0, num_rows);
|
||||||
|
}
|
||||||
|
return remaining_block;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Since we do not store right key columns,
|
||||||
|
* this function is used to copy left key columns to right key columns.
|
||||||
|
* If the user requests some right columns, we just copy left key columns to right, since they are equal.
|
||||||
|
* Example: SELECT t1.key, t2.key FROM t1 FULL JOIN t2 ON t1.key = t2.key;
|
||||||
|
* In that case for matched rows in t2.key we will use values from t1.key.
|
||||||
|
* However, in some cases we might need to adjust the type of column, e.g. t1.key :: LowCardinality(String) and t2.key :: String
|
||||||
|
* Also, the nullability of the column might be different.
|
||||||
|
* Returns the right column after with necessary adjustments.
|
||||||
|
*/
|
||||||
|
static ColumnWithTypeAndName copyLeftKeyColumnToRight(
|
||||||
|
const DataTypePtr & right_key_type,
|
||||||
|
const String & renamed_right_column,
|
||||||
|
const ColumnWithTypeAndName & left_column,
|
||||||
|
const IColumn::Filter * null_map_filter = nullptr)
|
||||||
|
{
|
||||||
|
ColumnWithTypeAndName right_column = left_column;
|
||||||
|
right_column.name = renamed_right_column;
|
||||||
|
|
||||||
|
if (null_map_filter)
|
||||||
|
right_column.column = JoinCommon::filterWithBlanks(right_column.column, *null_map_filter);
|
||||||
|
|
||||||
|
bool should_be_nullable = isNullableOrLowCardinalityNullable(right_key_type);
|
||||||
|
if (null_map_filter)
|
||||||
|
correctNullabilityInplace(right_column, should_be_nullable, *null_map_filter);
|
||||||
|
else
|
||||||
|
correctNullabilityInplace(right_column, should_be_nullable);
|
||||||
|
|
||||||
|
if (!right_column.type->equals(*right_key_type))
|
||||||
|
{
|
||||||
|
right_column.column = castColumnAccurate(right_column, right_key_type);
|
||||||
|
right_column.type = right_key_type;
|
||||||
|
}
|
||||||
|
|
||||||
|
right_column.column = right_column.column->convertToFullColumnIfConst();
|
||||||
|
return right_column;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nullable)
|
||||||
|
{
|
||||||
|
if (nullable)
|
||||||
|
{
|
||||||
|
JoinCommon::convertColumnToNullable(column);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/// We have to replace values masked by NULLs with defaults.
|
||||||
|
if (column.column)
|
||||||
|
if (const auto * nullable_column = checkAndGetColumn<ColumnNullable>(&*column.column))
|
||||||
|
column.column = JoinCommon::filterWithBlanks(column.column, nullable_column->getNullMapColumn().getData(), true);
|
||||||
|
|
||||||
|
JoinCommon::removeColumnNullability(column);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nullable, const IColumn::Filter & negative_null_map)
|
||||||
|
{
|
||||||
|
if (nullable)
|
||||||
|
{
|
||||||
|
JoinCommon::convertColumnToNullable(column);
|
||||||
|
if (column.type->isNullable() && !negative_null_map.empty())
|
||||||
|
{
|
||||||
|
MutableColumnPtr mutable_column = IColumn::mutate(std::move(column.column));
|
||||||
|
assert_cast<ColumnNullable &>(*mutable_column).applyNegatedNullMap(negative_null_map);
|
||||||
|
column.column = std::move(mutable_column);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
JoinCommon::removeColumnNullability(column);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Instantiate template class ahead in different .cpp files to avoid `too large translation unit`.
|
||||||
|
extern template class HashJoinMethods<JoinKind::Left, JoinStrictness::RightAny, HashJoin::MapsOne>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Left, JoinStrictness::Any, HashJoin::MapsOne>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Left, JoinStrictness::All, HashJoin::MapsAll>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Left, JoinStrictness::Semi, HashJoin::MapsOne>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Left, JoinStrictness::Anti, HashJoin::MapsOne>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Left, JoinStrictness::Asof, HashJoin::MapsAsof>;
|
||||||
|
|
||||||
|
extern template class HashJoinMethods<JoinKind::Right, JoinStrictness::RightAny, HashJoin::MapsOne>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Right, JoinStrictness::Any, HashJoin::MapsAll>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Right, JoinStrictness::All, HashJoin::MapsAll>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Right, JoinStrictness::Semi, HashJoin::MapsAll>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Right, JoinStrictness::Anti, HashJoin::MapsAll>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Right, JoinStrictness::Asof, HashJoin::MapsAsof>;
|
||||||
|
|
||||||
|
extern template class HashJoinMethods<JoinKind::Inner, JoinStrictness::RightAny, HashJoin::MapsOne>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Any, HashJoin::MapsOne>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Inner, JoinStrictness::All, HashJoin::MapsAll>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Semi, HashJoin::MapsOne>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Anti, HashJoin::MapsOne>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Asof, HashJoin::MapsAsof>;
|
||||||
|
|
||||||
|
extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::RightAny, HashJoin::MapsOne>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::Any, HashJoin::MapsAll>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::All, HashJoin::MapsAll>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::Semi, HashJoin::MapsOne>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::Anti, HashJoin::MapsOne>;
|
||||||
|
extern template class HashJoinMethods<JoinKind::Full, JoinStrictness::Asof, HashJoin::MapsAsof>;
|
||||||
|
}
|
||||||
|
|
12
src/Interpreters/HashJoin/InnerHashJoin.cpp
Normal file
12
src/Interpreters/HashJoin/InnerHashJoin.cpp
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
|
||||||
|
#include <Interpreters/HashJoin/HashJoinMethods.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
template class HashJoinMethods<JoinKind::Inner, JoinStrictness::RightAny, HashJoin::MapsOne>;
|
||||||
|
template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Any, HashJoin::MapsOne>;
|
||||||
|
template class HashJoinMethods<JoinKind::Inner, JoinStrictness::All, HashJoin::MapsAll>;
|
||||||
|
template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Semi, HashJoin::MapsOne>;
|
||||||
|
template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Anti, HashJoin::MapsOne>;
|
||||||
|
template class HashJoinMethods<JoinKind::Inner, JoinStrictness::Asof, HashJoin::MapsAsof>;
|
||||||
|
}
|
28
src/Interpreters/HashJoin/JoinFeatures.h
Normal file
28
src/Interpreters/HashJoin/JoinFeatures.h
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
#pragma once
|
||||||
|
#include <Core/Joins.h>
|
||||||
|
#include <Interpreters/joinDispatch.h>
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
template <JoinKind KIND, JoinStrictness STRICTNESS>
|
||||||
|
struct JoinFeatures
|
||||||
|
{
|
||||||
|
static constexpr bool is_any_join = STRICTNESS == JoinStrictness::Any;
|
||||||
|
static constexpr bool is_any_or_semi_join = STRICTNESS == JoinStrictness::Any || STRICTNESS == JoinStrictness::RightAny || (STRICTNESS == JoinStrictness::Semi && KIND == JoinKind::Left);
|
||||||
|
static constexpr bool is_all_join = STRICTNESS == JoinStrictness::All;
|
||||||
|
static constexpr bool is_asof_join = STRICTNESS == JoinStrictness::Asof;
|
||||||
|
static constexpr bool is_semi_join = STRICTNESS == JoinStrictness::Semi;
|
||||||
|
static constexpr bool is_anti_join = STRICTNESS == JoinStrictness::Anti;
|
||||||
|
|
||||||
|
static constexpr bool left = KIND == JoinKind::Left;
|
||||||
|
static constexpr bool right = KIND == JoinKind::Right;
|
||||||
|
static constexpr bool inner = KIND == JoinKind::Inner;
|
||||||
|
static constexpr bool full = KIND == JoinKind::Full;
|
||||||
|
|
||||||
|
static constexpr bool need_replication = is_all_join || (is_any_join && right) || (is_semi_join && right);
|
||||||
|
static constexpr bool need_filter = !need_replication && (inner || right || (is_semi_join && left) || (is_anti_join && left));
|
||||||
|
static constexpr bool add_missing = (left || full) && !is_semi_join;
|
||||||
|
|
||||||
|
static constexpr bool need_flags = MapGetter<KIND, STRICTNESS>::flagged;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
154
src/Interpreters/HashJoin/JoinUsedFlags.h
Normal file
154
src/Interpreters/HashJoin/JoinUsedFlags.h
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
#pragma once
|
||||||
|
#include <vector>
|
||||||
|
#include <atomic>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <Core/Joins.h>
|
||||||
|
#include <Core/Block.h>
|
||||||
|
#include <Interpreters/joinDispatch.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
namespace JoinStuff
|
||||||
|
{
|
||||||
|
/// Flags needed to implement RIGHT and FULL JOINs.
|
||||||
|
class JoinUsedFlags
|
||||||
|
{
|
||||||
|
using RawBlockPtr = const Block *;
|
||||||
|
using UsedFlagsForBlock = std::vector<std::atomic_bool>;
|
||||||
|
|
||||||
|
/// For multiple dijuncts each empty in hashmap stores flags for particular block
|
||||||
|
/// For single dicunct we store all flags in `nullptr` entry, index is the offset in FindResult
|
||||||
|
std::unordered_map<RawBlockPtr, UsedFlagsForBlock> flags;
|
||||||
|
|
||||||
|
bool need_flags;
|
||||||
|
|
||||||
|
public:
|
||||||
|
/// Update size for vector with flags.
|
||||||
|
/// Calling this method invalidates existing flags.
|
||||||
|
/// It can be called several times, but all of them should happen before using this structure.
|
||||||
|
template <JoinKind KIND, JoinStrictness STRICTNESS>
|
||||||
|
void reinit(size_t size)
|
||||||
|
{
|
||||||
|
if constexpr (MapGetter<KIND, STRICTNESS>::flagged)
|
||||||
|
{
|
||||||
|
assert(flags[nullptr].size() <= size);
|
||||||
|
need_flags = true;
|
||||||
|
// For one disjunct clause case, we don't need to reinit each time we call addBlockToJoin.
|
||||||
|
// and there is no value inserted in this JoinUsedFlags before addBlockToJoin finish.
|
||||||
|
// So we reinit only when the hash table is rehashed to a larger size.
|
||||||
|
if (flags.empty() || flags[nullptr].size() < size) [[unlikely]]
|
||||||
|
{
|
||||||
|
flags[nullptr] = std::vector<std::atomic_bool>(size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template <JoinKind KIND, JoinStrictness STRICTNESS>
|
||||||
|
void reinit(const Block * block_ptr)
|
||||||
|
{
|
||||||
|
if constexpr (MapGetter<KIND, STRICTNESS>::flagged)
|
||||||
|
{
|
||||||
|
assert(flags[block_ptr].size() <= block_ptr->rows());
|
||||||
|
need_flags = true;
|
||||||
|
flags[block_ptr] = std::vector<std::atomic_bool>(block_ptr->rows());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool getUsedSafe(size_t i) const
|
||||||
|
{
|
||||||
|
return getUsedSafe(nullptr, i);
|
||||||
|
}
|
||||||
|
bool getUsedSafe(const Block * block_ptr, size_t row_idx) const
|
||||||
|
{
|
||||||
|
if (auto it = flags.find(block_ptr); it != flags.end())
|
||||||
|
return it->second[row_idx].load();
|
||||||
|
return !need_flags;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <bool use_flags, bool flag_per_row, typename FindResult>
|
||||||
|
void setUsed(const FindResult & f)
|
||||||
|
{
|
||||||
|
if constexpr (!use_flags)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/// Could be set simultaneously from different threads.
|
||||||
|
if constexpr (flag_per_row)
|
||||||
|
{
|
||||||
|
auto & mapped = f.getMapped();
|
||||||
|
flags[mapped.block][mapped.row_num].store(true, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
flags[nullptr][f.getOffset()].store(true, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template <bool use_flags, bool flag_per_row>
|
||||||
|
void setUsed(const Block * block, size_t row_num, size_t offset)
|
||||||
|
{
|
||||||
|
if constexpr (!use_flags)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/// Could be set simultaneously from different threads.
|
||||||
|
if constexpr (flag_per_row)
|
||||||
|
{
|
||||||
|
flags[block][row_num].store(true, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
flags[nullptr][offset].store(true, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template <bool use_flags, bool flag_per_row, typename FindResult>
|
||||||
|
bool getUsed(const FindResult & f)
|
||||||
|
{
|
||||||
|
if constexpr (!use_flags)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if constexpr (flag_per_row)
|
||||||
|
{
|
||||||
|
auto & mapped = f.getMapped();
|
||||||
|
return flags[mapped.block][mapped.row_num].load();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return flags[nullptr][f.getOffset()].load();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
template <bool use_flags, bool flag_per_row, typename FindResult>
|
||||||
|
bool setUsedOnce(const FindResult & f)
|
||||||
|
{
|
||||||
|
if constexpr (!use_flags)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if constexpr (flag_per_row)
|
||||||
|
{
|
||||||
|
auto & mapped = f.getMapped();
|
||||||
|
|
||||||
|
/// fast check to prevent heavy CAS with seq_cst order
|
||||||
|
if (flags[mapped.block][mapped.row_num].load(std::memory_order_relaxed))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
bool expected = false;
|
||||||
|
return flags[mapped.block][mapped.row_num].compare_exchange_strong(expected, true);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto off = f.getOffset();
|
||||||
|
|
||||||
|
/// fast check to prevent heavy CAS with seq_cst order
|
||||||
|
if (flags[nullptr][off].load(std::memory_order_relaxed))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
bool expected = false;
|
||||||
|
return flags[nullptr][off].compare_exchange_strong(expected, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
73
src/Interpreters/HashJoin/KeyGetter.h
Normal file
73
src/Interpreters/HashJoin/KeyGetter.h
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
#pragma once
|
||||||
|
#include <Interpreters/HashJoin/HashJoin.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
template <typename Mapped>
|
||||||
|
class KeyGetterEmpty
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
struct MappedType
|
||||||
|
{
|
||||||
|
using mapped_type = Mapped;
|
||||||
|
};
|
||||||
|
|
||||||
|
using FindResult = ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped, true>;
|
||||||
|
|
||||||
|
KeyGetterEmpty() = default;
|
||||||
|
|
||||||
|
FindResult findKey(MappedType, size_t, const Arena &) { return FindResult(); }
|
||||||
|
};
|
||||||
|
|
||||||
|
template <HashJoin::Type type, typename Value, typename Mapped>
|
||||||
|
struct KeyGetterForTypeImpl;
|
||||||
|
|
||||||
|
constexpr bool use_offset = true;
|
||||||
|
|
||||||
|
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::key8, Value, Mapped>
|
||||||
|
{
|
||||||
|
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt8, false, use_offset>;
|
||||||
|
};
|
||||||
|
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::key16, Value, Mapped>
|
||||||
|
{
|
||||||
|
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt16, false, use_offset>;
|
||||||
|
};
|
||||||
|
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::key32, Value, Mapped>
|
||||||
|
{
|
||||||
|
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt32, false, use_offset>;
|
||||||
|
};
|
||||||
|
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::key64, Value, Mapped>
|
||||||
|
{
|
||||||
|
using Type = ColumnsHashing::HashMethodOneNumber<Value, Mapped, UInt64, false, use_offset>;
|
||||||
|
};
|
||||||
|
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::key_string, Value, Mapped>
|
||||||
|
{
|
||||||
|
using Type = ColumnsHashing::HashMethodString<Value, Mapped, true, false, use_offset>;
|
||||||
|
};
|
||||||
|
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::key_fixed_string, Value, Mapped>
|
||||||
|
{
|
||||||
|
using Type = ColumnsHashing::HashMethodFixedString<Value, Mapped, true, false, use_offset>;
|
||||||
|
};
|
||||||
|
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::keys128, Value, Mapped>
|
||||||
|
{
|
||||||
|
using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt128, Mapped, false, false, false, use_offset>;
|
||||||
|
};
|
||||||
|
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::keys256, Value, Mapped>
|
||||||
|
{
|
||||||
|
using Type = ColumnsHashing::HashMethodKeysFixed<Value, UInt256, Mapped, false, false, false, use_offset>;
|
||||||
|
};
|
||||||
|
template <typename Value, typename Mapped> struct KeyGetterForTypeImpl<HashJoin::Type::hashed, Value, Mapped>
|
||||||
|
{
|
||||||
|
using Type = ColumnsHashing::HashMethodHashed<Value, Mapped, false, use_offset>;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <HashJoin::Type type, typename Data>
|
||||||
|
struct KeyGetterForType
|
||||||
|
{
|
||||||
|
using Value = typename Data::value_type;
|
||||||
|
using Mapped_t = typename Data::mapped_type;
|
||||||
|
using Mapped = std::conditional_t<std::is_const_v<Data>, const Mapped_t, Mapped_t>;
|
||||||
|
using Type = typename KeyGetterForTypeImpl<type, Value, Mapped>::Type;
|
||||||
|
};
|
||||||
|
}
|
148
src/Interpreters/HashJoin/KnowRowsHolder.h
Normal file
148
src/Interpreters/HashJoin/KnowRowsHolder.h
Normal file
@ -0,0 +1,148 @@
|
|||||||
|
#pragma once
|
||||||
|
#include <Interpreters/HashJoin/HashJoin.h>
|
||||||
|
#include <Common/ColumnsHashingImpl.h>
|
||||||
|
#include <Interpreters/RowRefs.h>
|
||||||
|
#include <Interpreters/HashJoin/JoinUsedFlags.h>
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
template <bool flag_per_row>
|
||||||
|
class KnownRowsHolder;
|
||||||
|
|
||||||
|
/// Keep already joined rows to prevent duplication if many disjuncts
|
||||||
|
/// if for a particular pair of rows condition looks like TRUE or TRUE or TRUE
|
||||||
|
/// we want to have it once in resultset
|
||||||
|
template<>
|
||||||
|
class KnownRowsHolder<true>
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
using Type = std::pair<const Block *, DB::RowRef::SizeT>;
|
||||||
|
|
||||||
|
private:
|
||||||
|
static const size_t MAX_LINEAR = 16; // threshold to switch from Array to Set
|
||||||
|
using ArrayHolder = std::array<Type, MAX_LINEAR>;
|
||||||
|
using SetHolder = std::set<Type>;
|
||||||
|
using SetHolderPtr = std::unique_ptr<SetHolder>;
|
||||||
|
|
||||||
|
ArrayHolder array_holder;
|
||||||
|
SetHolderPtr set_holder_ptr;
|
||||||
|
|
||||||
|
size_t items;
|
||||||
|
|
||||||
|
public:
|
||||||
|
KnownRowsHolder()
|
||||||
|
: items(0)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template<class InputIt>
|
||||||
|
void add(InputIt from, InputIt to)
|
||||||
|
{
|
||||||
|
const size_t new_items = std::distance(from, to);
|
||||||
|
if (items + new_items <= MAX_LINEAR)
|
||||||
|
{
|
||||||
|
std::copy(from, to, &array_holder[items]);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (items <= MAX_LINEAR)
|
||||||
|
{
|
||||||
|
set_holder_ptr = std::make_unique<SetHolder>();
|
||||||
|
set_holder_ptr->insert(std::cbegin(array_holder), std::cbegin(array_holder) + items);
|
||||||
|
}
|
||||||
|
set_holder_ptr->insert(from, to);
|
||||||
|
}
|
||||||
|
items += new_items;
|
||||||
|
}
|
||||||
|
|
||||||
|
template<class Needle>
|
||||||
|
bool isKnown(const Needle & needle)
|
||||||
|
{
|
||||||
|
return items <= MAX_LINEAR
|
||||||
|
? std::find(std::cbegin(array_holder), std::cbegin(array_holder) + items, needle) != std::cbegin(array_holder) + items
|
||||||
|
: set_holder_ptr->find(needle) != set_holder_ptr->end();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
template<>
|
||||||
|
class KnownRowsHolder<false>
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
template<class InputIt>
|
||||||
|
void add(InputIt, InputIt)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
template<class Needle>
|
||||||
|
static bool isKnown(const Needle &)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename Mapped, bool need_offset = false>
|
||||||
|
using FindResultImpl = ColumnsHashing::columns_hashing_impl::FindResultImpl<Mapped, true>;
|
||||||
|
|
||||||
|
|
||||||
|
template <typename Map, bool add_missing, bool flag_per_row, typename AddedColumns>
|
||||||
|
void addFoundRowAll(
|
||||||
|
const typename Map::mapped_type & mapped,
|
||||||
|
AddedColumns & added,
|
||||||
|
IColumn::Offset & current_offset,
|
||||||
|
KnownRowsHolder<flag_per_row> & known_rows [[maybe_unused]],
|
||||||
|
JoinStuff::JoinUsedFlags * used_flags [[maybe_unused]])
|
||||||
|
{
|
||||||
|
if constexpr (add_missing)
|
||||||
|
added.applyLazyDefaults();
|
||||||
|
|
||||||
|
if constexpr (flag_per_row)
|
||||||
|
{
|
||||||
|
std::unique_ptr<std::vector<KnownRowsHolder<true>::Type>> new_known_rows_ptr;
|
||||||
|
|
||||||
|
for (auto it = mapped.begin(); it.ok(); ++it)
|
||||||
|
{
|
||||||
|
if (!known_rows.isKnown(std::make_pair(it->block, it->row_num)))
|
||||||
|
{
|
||||||
|
added.appendFromBlock(*it->block, it->row_num, false);
|
||||||
|
++current_offset;
|
||||||
|
if (!new_known_rows_ptr)
|
||||||
|
{
|
||||||
|
new_known_rows_ptr = std::make_unique<std::vector<KnownRowsHolder<true>::Type>>();
|
||||||
|
}
|
||||||
|
new_known_rows_ptr->push_back(std::make_pair(it->block, it->row_num));
|
||||||
|
if (used_flags)
|
||||||
|
{
|
||||||
|
used_flags->JoinStuff::JoinUsedFlags::setUsedOnce<true, flag_per_row>(
|
||||||
|
FindResultImpl<const RowRef, false>(*it, true, 0));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (new_known_rows_ptr)
|
||||||
|
{
|
||||||
|
known_rows.add(std::cbegin(*new_known_rows_ptr), std::cend(*new_known_rows_ptr));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
for (auto it = mapped.begin(); it.ok(); ++it)
|
||||||
|
{
|
||||||
|
added.appendFromBlock(*it->block, it->row_num, false);
|
||||||
|
++current_offset;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template <bool add_missing, bool need_offset, typename AddedColumns>
|
||||||
|
void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & current_offset [[maybe_unused]])
|
||||||
|
{
|
||||||
|
if constexpr (add_missing)
|
||||||
|
{
|
||||||
|
added.appendDefaultRow();
|
||||||
|
if constexpr (need_offset)
|
||||||
|
++current_offset;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
11
src/Interpreters/HashJoin/LeftHashJoin.cpp
Normal file
11
src/Interpreters/HashJoin/LeftHashJoin.cpp
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
#include <Interpreters/HashJoin/HashJoinMethods.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
template class HashJoinMethods<JoinKind::Left, JoinStrictness::RightAny, HashJoin::MapsOne>;
|
||||||
|
template class HashJoinMethods<JoinKind::Left, JoinStrictness::Any, HashJoin::MapsOne>;
|
||||||
|
template class HashJoinMethods<JoinKind::Left, JoinStrictness::All, HashJoin::MapsAll>;
|
||||||
|
template class HashJoinMethods<JoinKind::Left, JoinStrictness::Semi, HashJoin::MapsOne>;
|
||||||
|
template class HashJoinMethods<JoinKind::Left, JoinStrictness::Anti, HashJoin::MapsOne>;
|
||||||
|
template class HashJoinMethods<JoinKind::Left, JoinStrictness::Asof, HashJoin::MapsAsof>;
|
||||||
|
}
|
11
src/Interpreters/HashJoin/RightHashJoin.cpp
Normal file
11
src/Interpreters/HashJoin/RightHashJoin.cpp
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
#include <Interpreters/HashJoin/HashJoinMethods.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
template class HashJoinMethods<JoinKind::Right, JoinStrictness::RightAny, HashJoin::MapsOne>;
|
||||||
|
template class HashJoinMethods<JoinKind::Right, JoinStrictness::Any, HashJoin::MapsAll>;
|
||||||
|
template class HashJoinMethods<JoinKind::Right, JoinStrictness::All, HashJoin::MapsAll>;
|
||||||
|
template class HashJoinMethods<JoinKind::Right, JoinStrictness::Semi, HashJoin::MapsAll>;
|
||||||
|
template class HashJoinMethods<JoinKind::Right, JoinStrictness::Anti, HashJoin::MapsAll>;
|
||||||
|
template class HashJoinMethods<JoinKind::Right, JoinStrictness::Asof, HashJoin::MapsAsof>;
|
||||||
|
}
|
@ -1,6 +1,6 @@
|
|||||||
#include <Common/typeid_cast.h>
|
#include <Common/typeid_cast.h>
|
||||||
#include <Interpreters/JoinSwitcher.h>
|
#include <Interpreters/JoinSwitcher.h>
|
||||||
#include <Interpreters/HashJoin.h>
|
#include <Interpreters/HashJoin/HashJoin.h>
|
||||||
#include <Interpreters/MergeJoin.h>
|
#include <Interpreters/MergeJoin.h>
|
||||||
#include <Interpreters/JoinUtils.h>
|
#include <Interpreters/JoinUtils.h>
|
||||||
|
|
||||||
|
@ -3,7 +3,7 @@
|
|||||||
#include <array>
|
#include <array>
|
||||||
#include <base/constexpr_helpers.h>
|
#include <base/constexpr_helpers.h>
|
||||||
|
|
||||||
#include <Interpreters/HashJoin.h>
|
#include <Interpreters/HashJoin/HashJoin.h>
|
||||||
|
|
||||||
|
|
||||||
/** Used in implementation of Join to process different data structures.
|
/** Used in implementation of Join to process different data structures.
|
||||||
|
@ -53,7 +53,7 @@
|
|||||||
#include <Interpreters/ArrayJoinAction.h>
|
#include <Interpreters/ArrayJoinAction.h>
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <Interpreters/DatabaseCatalog.h>
|
#include <Interpreters/DatabaseCatalog.h>
|
||||||
#include <Interpreters/HashJoin.h>
|
#include <Interpreters/HashJoin/HashJoin.h>
|
||||||
#include <Interpreters/IJoin.h>
|
#include <Interpreters/IJoin.h>
|
||||||
#include <Interpreters/TableJoin.h>
|
#include <Interpreters/TableJoin.h>
|
||||||
#include <Interpreters/getCustomKeyFilterForParallelReplicas.h>
|
#include <Interpreters/getCustomKeyFilterForParallelReplicas.h>
|
||||||
|
@ -29,7 +29,7 @@
|
|||||||
|
|
||||||
#include <Dictionaries/IDictionary.h>
|
#include <Dictionaries/IDictionary.h>
|
||||||
#include <Interpreters/IKeyValueEntity.h>
|
#include <Interpreters/IKeyValueEntity.h>
|
||||||
#include <Interpreters/HashJoin.h>
|
#include <Interpreters/HashJoin/HashJoin.h>
|
||||||
#include <Interpreters/MergeJoin.h>
|
#include <Interpreters/MergeJoin.h>
|
||||||
#include <Interpreters/FullSortingMergeJoin.h>
|
#include <Interpreters/FullSortingMergeJoin.h>
|
||||||
#include <Interpreters/ConcurrentHashJoin.h>
|
#include <Interpreters/ConcurrentHashJoin.h>
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
#include <Storages/StorageFactory.h>
|
#include <Storages/StorageFactory.h>
|
||||||
#include <Storages/StorageSet.h>
|
#include <Storages/StorageSet.h>
|
||||||
#include <Storages/TableLockHolder.h>
|
#include <Storages/TableLockHolder.h>
|
||||||
#include <Interpreters/HashJoin.h>
|
#include <Interpreters/HashJoin/HashJoin.h>
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <Parsers/ASTCreateQuery.h>
|
#include <Parsers/ASTCreateQuery.h>
|
||||||
#include <Parsers/ASTIdentifier_fwd.h>
|
#include <Parsers/ASTIdentifier_fwd.h>
|
||||||
|
Loading…
Reference in New Issue
Block a user