mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 21:42:39 +00:00
879 lines
30 KiB
C++
879 lines
30 KiB
C++
#include <Interpreters/JoinUtils.h>
|
|
|
|
#include <Columns/ColumnConst.h>
|
|
#include <Columns/ColumnLowCardinality.h>
|
|
#include <Columns/ColumnSparse.h>
|
|
#include <Columns/ColumnNullable.h>
|
|
|
|
#include <DataTypes/DataTypeLowCardinality.h>
|
|
#include <DataTypes/DataTypeNullable.h>
|
|
#include <DataTypes/DataTypesNumber.h>
|
|
|
|
#include <Interpreters/ActionsDAG.h>
|
|
#include <Interpreters/TableJoin.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
#include <Common/HashTable/Hash.h>
|
|
#include <Common/WeakHash.h>
|
|
|
|
#include <base/FnTraits.h>
|
|
|
|
namespace DB
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
{
|
|
extern const int INVALID_JOIN_ON_EXPRESSION;
|
|
extern const int LOGICAL_ERROR;
|
|
extern const int TYPE_MISMATCH;
|
|
}
|
|
|
|
namespace
|
|
{
|
|
|
|
void insertFromNullableOrDefault(MutableColumnPtr & dst, const ColumnNullable * nullable_col)
|
|
{
|
|
const auto & nested = nullable_col->getNestedColumn();
|
|
const auto & nullmap = nullable_col->getNullMapColumn().getData();
|
|
if (auto * lc = typeid_cast<ColumnLowCardinality *>(dst.get()); lc && !nested.lowCardinality())
|
|
{
|
|
for (size_t i = 0; i < nullable_col->size(); ++i)
|
|
{
|
|
if (nullmap[i])
|
|
lc->insertDefault();
|
|
else
|
|
lc->insertRangeFromFullColumn(nested, i, 1);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
for (size_t i = 0; i < nullable_col->size(); ++i)
|
|
{
|
|
if (nullmap[i])
|
|
dst->insertDefault();
|
|
else
|
|
dst->insertFrom(nested, i);
|
|
}
|
|
}
|
|
}
|
|
|
|
ColumnPtr changeLowCardinality(const ColumnPtr & column, const ColumnPtr & dst_sample)
|
|
{
|
|
if (dst_sample->lowCardinality())
|
|
{
|
|
MutableColumnPtr lc = dst_sample->cloneEmpty();
|
|
if (const auto * nullable_col = typeid_cast<const ColumnNullable *>(column.get()))
|
|
insertFromNullableOrDefault(lc, nullable_col);
|
|
else
|
|
typeid_cast<ColumnLowCardinality &>(*lc).insertRangeFromFullColumn(*column, 0, column->size());
|
|
return lc;
|
|
}
|
|
|
|
return column->convertToFullColumnIfLowCardinality();
|
|
}
|
|
|
|
struct LowcardAndNull
|
|
{
|
|
bool is_lowcard;
|
|
bool is_nullable;
|
|
};
|
|
|
|
LowcardAndNull getLowcardAndNullability(const ColumnPtr & col)
|
|
{
|
|
if (col->lowCardinality())
|
|
{
|
|
/// Currently only `LowCardinality(Nullable(T))` is possible, but not `Nullable(LowCardinality(T))`
|
|
assert(!col->canBeInsideNullable());
|
|
const auto * col_as_lc = assert_cast<const ColumnLowCardinality *>(col.get());
|
|
return {true, col_as_lc->nestedIsNullable()};
|
|
}
|
|
return {false, col->isNullable()};
|
|
}
|
|
|
|
}
|
|
|
|
namespace JoinCommon
|
|
{
|
|
|
|
void changeLowCardinalityInplace(ColumnWithTypeAndName & column)
|
|
{
|
|
if (column.type->lowCardinality())
|
|
{
|
|
column.type = recursiveRemoveLowCardinality(column.type);
|
|
column.column = column.column->convertToFullColumnIfLowCardinality();
|
|
}
|
|
else
|
|
{
|
|
column.type = std::make_shared<DataTypeLowCardinality>(column.type);
|
|
MutableColumnPtr lc = column.type->createColumn();
|
|
typeid_cast<ColumnLowCardinality &>(*lc).insertRangeFromFullColumn(*column.column, 0, column.column->size());
|
|
column.column = std::move(lc);
|
|
}
|
|
}
|
|
|
|
bool canBecomeNullable(const DataTypePtr & type)
|
|
{
|
|
bool can_be_inside = type->canBeInsideNullable();
|
|
if (const auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(type.get()))
|
|
can_be_inside |= low_cardinality_type->getDictionaryType()->canBeInsideNullable();
|
|
return can_be_inside;
|
|
}
|
|
|
|
bool isNullable(const DataTypePtr & type)
|
|
{
|
|
bool is_nullable = type->isNullable();
|
|
if (const auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(type.get()))
|
|
is_nullable |= low_cardinality_type->getDictionaryType()->isNullable();
|
|
return is_nullable;
|
|
}
|
|
|
|
/// Add nullability to type.
|
|
/// Note: LowCardinality(T) transformed to LowCardinality(Nullable(T))
|
|
DataTypePtr convertTypeToNullable(const DataTypePtr & type)
|
|
{
|
|
if (isNullable(type))
|
|
return type;
|
|
|
|
if (const auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(type.get()))
|
|
{
|
|
const auto & dict_type = low_cardinality_type->getDictionaryType();
|
|
if (dict_type->canBeInsideNullable())
|
|
return std::make_shared<DataTypeLowCardinality>(makeNullable(dict_type));
|
|
}
|
|
|
|
if (type->canBeInsideNullable())
|
|
return makeNullable(type);
|
|
|
|
return type;
|
|
}
|
|
|
|
/// Convert column to nullable. If column LowCardinality or Const, convert nested column.
|
|
/// Returns nullptr if conversion cannot be performed.
|
|
static ColumnPtr tryConvertColumnToNullable(ColumnPtr col)
|
|
{
|
|
if (col->isSparse())
|
|
col = recursiveRemoveSparse(col);
|
|
|
|
if (isColumnNullable(*col) || col->canBeInsideNullable())
|
|
return makeNullable(col);
|
|
|
|
if (col->lowCardinality())
|
|
{
|
|
auto mut_col = IColumn::mutate(std::move(col));
|
|
ColumnLowCardinality * col_lc = assert_cast<ColumnLowCardinality *>(mut_col.get());
|
|
if (col_lc->nestedIsNullable())
|
|
{
|
|
return mut_col;
|
|
}
|
|
else if (col_lc->nestedCanBeInsideNullable())
|
|
{
|
|
col_lc->nestedToNullable();
|
|
return mut_col;
|
|
}
|
|
}
|
|
else if (const ColumnConst * col_const = checkAndGetColumn<ColumnConst>(*col))
|
|
{
|
|
const auto & nested = col_const->getDataColumnPtr();
|
|
if (nested->isNullable() || nested->canBeInsideNullable())
|
|
{
|
|
return makeNullable(col);
|
|
}
|
|
else if (nested->lowCardinality())
|
|
{
|
|
ColumnPtr nested_nullable = tryConvertColumnToNullable(nested);
|
|
if (nested_nullable)
|
|
return ColumnConst::create(nested_nullable, col_const->size());
|
|
}
|
|
}
|
|
return nullptr;
|
|
}
|
|
|
|
void convertColumnToNullable(ColumnWithTypeAndName & column)
|
|
{
|
|
if (!column.column)
|
|
{
|
|
column.type = convertTypeToNullable(column.type);
|
|
return;
|
|
}
|
|
|
|
ColumnPtr nullable_column = tryConvertColumnToNullable(column.column);
|
|
if (nullable_column)
|
|
{
|
|
column.type = convertTypeToNullable(column.type);
|
|
column.column = std::move(nullable_column);
|
|
}
|
|
}
|
|
|
|
void convertColumnsToNullable(Block & block, size_t starting_pos)
|
|
{
|
|
for (size_t i = starting_pos; i < block.columns(); ++i)
|
|
convertColumnToNullable(block.getByPosition(i));
|
|
}
|
|
|
|
void convertColumnsToNullable(MutableColumns & mutable_columns, size_t starting_pos)
|
|
{
|
|
for (size_t i = starting_pos; i < mutable_columns.size(); ++i)
|
|
{
|
|
ColumnPtr column = std::move(mutable_columns[i]);
|
|
column = makeNullable(column);
|
|
mutable_columns[i] = IColumn::mutate(std::move(column));
|
|
}
|
|
}
|
|
|
|
/// @warning It assumes that every NULL has default value in nested column (or it does not matter)
|
|
void removeColumnNullability(ColumnWithTypeAndName & column)
|
|
{
|
|
if (column.type->lowCardinality())
|
|
{
|
|
/// LowCardinality(Nullable(T)) case
|
|
const auto & dict_type = typeid_cast<const DataTypeLowCardinality *>(column.type.get())->getDictionaryType();
|
|
column.type = std::make_shared<DataTypeLowCardinality>(removeNullable(dict_type));
|
|
|
|
if (column.column && column.column->lowCardinality())
|
|
{
|
|
auto mut_col = IColumn::mutate(std::move(column.column));
|
|
ColumnLowCardinality * col_as_lc = typeid_cast<ColumnLowCardinality *>(mut_col.get());
|
|
if (col_as_lc && col_as_lc->nestedIsNullable())
|
|
col_as_lc->nestedRemoveNullable();
|
|
column.column = std::move(mut_col);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
column.type = removeNullable(column.type);
|
|
|
|
if (column.column && column.column->isNullable())
|
|
{
|
|
column.column = column.column->convertToFullColumnIfConst();
|
|
const auto * nullable_col = checkAndGetColumn<ColumnNullable>(*column.column);
|
|
if (!nullable_col)
|
|
{
|
|
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Column '{}' is expected to be nullable", column.dumpStructure());
|
|
}
|
|
|
|
MutableColumnPtr mutable_column = nullable_col->getNestedColumn().cloneEmpty();
|
|
insertFromNullableOrDefault(mutable_column, nullable_col);
|
|
column.column = std::move(mutable_column);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Change both column nullability and low cardinality
|
|
void changeColumnRepresentation(const ColumnPtr & src_column, ColumnPtr & dst_column)
|
|
{
|
|
bool nullable_src = src_column->isNullable();
|
|
bool nullable_dst = dst_column->isNullable();
|
|
|
|
ColumnPtr dst_not_null = JoinCommon::emptyNotNullableClone(dst_column);
|
|
bool lowcard_src = JoinCommon::emptyNotNullableClone(src_column)->lowCardinality();
|
|
bool lowcard_dst = dst_not_null->lowCardinality();
|
|
bool change_lowcard = lowcard_src != lowcard_dst;
|
|
|
|
if (nullable_src && !nullable_dst)
|
|
{
|
|
const auto * nullable = checkAndGetColumn<ColumnNullable>(*src_column);
|
|
if (change_lowcard)
|
|
dst_column = changeLowCardinality(nullable->getNestedColumnPtr(), dst_column);
|
|
else
|
|
dst_column = nullable->getNestedColumnPtr();
|
|
}
|
|
else if (!nullable_src && nullable_dst)
|
|
{
|
|
if (change_lowcard)
|
|
dst_column = makeNullable(changeLowCardinality(src_column, dst_not_null));
|
|
else
|
|
dst_column = makeNullable(src_column);
|
|
}
|
|
else /// same nullability
|
|
{
|
|
if (change_lowcard)
|
|
{
|
|
if (const auto * nullable = checkAndGetColumn<ColumnNullable>(*src_column))
|
|
{
|
|
dst_column = makeNullable(changeLowCardinality(nullable->getNestedColumnPtr(), dst_not_null));
|
|
assert_cast<ColumnNullable &>(*dst_column->assumeMutable()).applyNullMap(nullable->getNullMapColumn());
|
|
}
|
|
else
|
|
dst_column = changeLowCardinality(src_column, dst_not_null);
|
|
}
|
|
else
|
|
dst_column = src_column;
|
|
}
|
|
}
|
|
|
|
ColumnPtr emptyNotNullableClone(const ColumnPtr & column)
|
|
{
|
|
if (column->isNullable())
|
|
return checkAndGetColumn<ColumnNullable>(*column)->getNestedColumnPtr()->cloneEmpty();
|
|
return column->cloneEmpty();
|
|
}
|
|
|
|
ColumnRawPtrs materializeColumnsInplace(Block & block, const Names & names)
|
|
{
|
|
ColumnRawPtrs ptrs;
|
|
ptrs.reserve(names.size());
|
|
|
|
for (const auto & column_name : names)
|
|
{
|
|
auto & column = block.getByName(column_name).column;
|
|
column = recursiveRemoveLowCardinality(recursiveRemoveSparse(column->convertToFullColumnIfConst()));
|
|
ptrs.push_back(column.get());
|
|
}
|
|
|
|
return ptrs;
|
|
}
|
|
|
|
ColumnRawPtrMap materializeColumnsInplaceMap(Block & block, const Names & names)
|
|
{
|
|
ColumnRawPtrMap ptrs;
|
|
ptrs.reserve(names.size());
|
|
|
|
for (const auto & column_name : names)
|
|
{
|
|
auto & column = block.getByName(column_name);
|
|
column.column = recursiveRemoveLowCardinality(column.column->convertToFullColumnIfConst());
|
|
column.type = recursiveRemoveLowCardinality(column.type);
|
|
ptrs[column_name] = column.column.get();
|
|
}
|
|
|
|
return ptrs;
|
|
}
|
|
|
|
ColumnPtr materializeColumn(const Block & block, const String & column_name)
|
|
{
|
|
const auto & src_column = block.getByName(column_name).column;
|
|
return recursiveRemoveLowCardinality(
|
|
recursiveRemoveSparse(src_column->convertToFullColumnIfConst()));
|
|
}
|
|
|
|
Columns materializeColumns(const Block & block, const Names & names)
|
|
{
|
|
Columns materialized;
|
|
materialized.reserve(names.size());
|
|
|
|
for (const auto & column_name : names)
|
|
{
|
|
materialized.emplace_back(materializeColumn(block, column_name));
|
|
}
|
|
|
|
return materialized;
|
|
}
|
|
|
|
ColumnRawPtrs getRawPointers(const Columns & columns)
|
|
{
|
|
ColumnRawPtrs ptrs;
|
|
ptrs.reserve(columns.size());
|
|
|
|
for (const auto & column : columns)
|
|
ptrs.push_back(column.get());
|
|
|
|
return ptrs;
|
|
}
|
|
|
|
void convertToFullColumnsInplace(Block & block)
|
|
{
|
|
for (size_t i = 0; i < block.columns(); ++i)
|
|
{
|
|
auto & col = block.getByPosition(i);
|
|
col.column = recursiveRemoveLowCardinality(recursiveRemoveSparse(col.column));
|
|
col.type = recursiveRemoveLowCardinality(col.type);
|
|
}
|
|
}
|
|
|
|
void convertToFullColumnsInplace(Block & block, const Names & names, bool change_type)
|
|
{
|
|
for (const String & column_name : names)
|
|
{
|
|
auto & col = block.getByName(column_name);
|
|
col.column = recursiveRemoveLowCardinality(recursiveRemoveSparse(col.column));
|
|
if (change_type)
|
|
col.type = recursiveRemoveLowCardinality(col.type);
|
|
}
|
|
}
|
|
|
|
void restoreLowCardinalityInplace(Block & block, const Names & lowcard_keys)
|
|
{
|
|
for (const auto & column_name : lowcard_keys)
|
|
{
|
|
if (!block.has(column_name))
|
|
continue;
|
|
if (auto & col = block.getByName(column_name); !col.type->lowCardinality())
|
|
JoinCommon::changeLowCardinalityInplace(col);
|
|
}
|
|
|
|
for (size_t i = 0; i < block.columns(); ++i)
|
|
{
|
|
auto & col = block.getByPosition(i);
|
|
if (col.type->lowCardinality() && col.column && !col.column->lowCardinality())
|
|
col.column = changeLowCardinality(col.column, col.type->createColumn());
|
|
}
|
|
}
|
|
|
|
ColumnRawPtrs extractKeysForJoin(const Block & block_keys, const Names & key_names)
|
|
{
|
|
size_t keys_size = key_names.size();
|
|
ColumnRawPtrs key_columns(keys_size);
|
|
|
|
for (size_t i = 0; i < keys_size; ++i)
|
|
{
|
|
const String & column_name = key_names[i];
|
|
key_columns[i] = block_keys.getByName(column_name).column.get();
|
|
|
|
/// We will join only keys, where all components are not NULL.
|
|
if (const auto * nullable = checkAndGetColumn<ColumnNullable>(*key_columns[i]))
|
|
key_columns[i] = &nullable->getNestedColumn();
|
|
|
|
if (const auto * sparse = checkAndGetColumn<ColumnSparse>(*key_columns[i]))
|
|
key_columns[i] = &sparse->getValuesColumn();
|
|
}
|
|
|
|
return key_columns;
|
|
}
|
|
|
|
void checkTypesOfKeys(const Block & block_left, const Names & key_names_left,
|
|
const Block & block_right, const Names & key_names_right)
|
|
{
|
|
size_t keys_size = key_names_left.size();
|
|
|
|
for (size_t i = 0; i < keys_size; ++i)
|
|
{
|
|
DataTypePtr left_type = removeNullable(recursiveRemoveLowCardinality(block_left.getByName(key_names_left[i]).type));
|
|
DataTypePtr right_type = removeNullable(recursiveRemoveLowCardinality(block_right.getByName(key_names_right[i]).type));
|
|
|
|
if (!left_type->equals(*right_type))
|
|
{
|
|
throw DB::Exception(
|
|
ErrorCodes::TYPE_MISMATCH,
|
|
"Type mismatch of columns to JOIN by: {} :: {} at left, {} :: {} at right",
|
|
key_names_left[i], left_type->getName(),
|
|
key_names_right[i], right_type->getName());
|
|
}
|
|
}
|
|
}
|
|
|
|
void checkTypesOfKeys(const Block & block_left, const Names & key_names_left, const String & condition_name_left,
|
|
const Block & block_right, const Names & key_names_right, const String & condition_name_right)
|
|
{
|
|
checkTypesOfKeys(block_left, key_names_left, block_right, key_names_right);
|
|
checkTypesOfMasks(block_left, condition_name_left, block_right, condition_name_right);
|
|
}
|
|
|
|
void checkTypesOfMasks(const Block & block_left, const String & condition_name_left,
|
|
const Block & block_right, const String & condition_name_right)
|
|
{
|
|
auto check_cond_column_type = [](const Block & block, const String & col_name)
|
|
{
|
|
if (col_name.empty())
|
|
return;
|
|
|
|
DataTypePtr dtype = removeNullable(recursiveRemoveLowCardinality(block.getByName(col_name).type));
|
|
|
|
if (!dtype->equals(DataTypeUInt8{}))
|
|
throw Exception(ErrorCodes::INVALID_JOIN_ON_EXPRESSION,
|
|
"Expected logical expression in JOIN ON section, got unexpected column '{}' of type '{}'",
|
|
col_name, dtype->getName());
|
|
};
|
|
check_cond_column_type(block_left, condition_name_left);
|
|
check_cond_column_type(block_right, condition_name_right);
|
|
}
|
|
|
|
void createMissedColumns(Block & block)
|
|
{
|
|
for (size_t i = 0; i < block.columns(); ++i)
|
|
{
|
|
auto & column = block.getByPosition(i);
|
|
if (!column.column) //-V1051
|
|
column.column = column.type->createColumn();
|
|
}
|
|
}
|
|
|
|
/// Append totals from right to left block, correct types if needed
|
|
void joinTotals(Block left_totals, Block right_totals, const TableJoin & table_join, Block & out_block)
|
|
{
|
|
if (table_join.forceNullableLeft())
|
|
JoinCommon::convertColumnsToNullable(left_totals);
|
|
|
|
if (table_join.forceNullableRight())
|
|
JoinCommon::convertColumnsToNullable(right_totals);
|
|
|
|
for (auto & col : out_block)
|
|
{
|
|
if (const auto * left_col = left_totals.findByName(col.name))
|
|
col = *left_col;
|
|
else if (const auto * right_col = right_totals.findByName(col.name))
|
|
col = *right_col;
|
|
else
|
|
col.column = col.type->createColumnConstWithDefaultValue(1)->convertToFullColumnIfConst();
|
|
|
|
/// In case of using `arrayJoin` we can get more or less rows than one
|
|
if (col.column->size() != 1)
|
|
col.column = col.column->cloneResized(1);
|
|
}
|
|
}
|
|
|
|
void addDefaultValues(IColumn & column, const DataTypePtr & type, size_t count)
|
|
{
|
|
column.reserve(column.size() + count);
|
|
for (size_t i = 0; i < count; ++i)
|
|
type->insertDefaultInto(column);
|
|
}
|
|
|
|
bool typesEqualUpToNullability(DataTypePtr left_type, DataTypePtr right_type)
|
|
{
|
|
DataTypePtr left_type_strict = removeNullable(recursiveRemoveLowCardinality(left_type));
|
|
DataTypePtr right_type_strict = removeNullable(recursiveRemoveLowCardinality(right_type));
|
|
return left_type_strict->equals(*right_type_strict);
|
|
}
|
|
|
|
JoinMask getColumnAsMask(const Block & block, const String & column_name)
|
|
{
|
|
if (column_name.empty())
|
|
return JoinMask(true);
|
|
|
|
const auto & src_col = block.getByName(column_name);
|
|
|
|
DataTypePtr col_type = recursiveRemoveLowCardinality(src_col.type);
|
|
if (isNothing(col_type))
|
|
return JoinMask(false);
|
|
|
|
if (const auto * const_cond = checkAndGetColumn<ColumnConst>(*src_col.column))
|
|
{
|
|
return JoinMask(const_cond->getBool(0));
|
|
}
|
|
|
|
ColumnPtr join_condition_col = recursiveRemoveLowCardinality(src_col.column->convertToFullColumnIfConst());
|
|
if (const auto * nullable_col = typeid_cast<const ColumnNullable *>(join_condition_col.get()))
|
|
{
|
|
if (isNothing(assert_cast<const DataTypeNullable &>(*col_type).getNestedType()))
|
|
return JoinMask(false);
|
|
|
|
/// Return nested column with NULL set to false
|
|
const auto & nest_col = assert_cast<const ColumnUInt8 &>(nullable_col->getNestedColumn());
|
|
const auto & null_map = nullable_col->getNullMapColumn();
|
|
|
|
auto res = ColumnUInt8::create(nullable_col->size(), 0);
|
|
for (size_t i = 0, sz = nullable_col->size(); i < sz; ++i)
|
|
res->getData()[i] = !null_map.getData()[i] && nest_col.getData()[i];
|
|
return JoinMask(std::move(res));
|
|
}
|
|
else
|
|
return JoinMask(std::move(join_condition_col));
|
|
}
|
|
|
|
|
|
void splitAdditionalColumns(const Names & key_names, const Block & sample_block, Block & block_keys, Block & block_others)
|
|
{
|
|
block_others = materializeBlock(sample_block);
|
|
|
|
for (const String & column_name : key_names)
|
|
{
|
|
/// Extract right keys with correct keys order. There could be the same key names.
|
|
if (!block_keys.has(column_name))
|
|
{
|
|
auto & col = block_others.getByName(column_name);
|
|
block_keys.insert(col);
|
|
block_others.erase(column_name);
|
|
}
|
|
}
|
|
}
|
|
|
|
template <Fn<size_t(size_t)> Sharder>
|
|
static IColumn::Selector hashToSelector(const WeakHash32 & hash, Sharder sharder)
|
|
{
|
|
const auto & hashes = hash.getData();
|
|
size_t num_rows = hashes.size();
|
|
|
|
IColumn::Selector selector(num_rows);
|
|
for (size_t i = 0; i < num_rows; ++i)
|
|
selector[i] = sharder(intHashCRC32(hashes[i]));
|
|
return selector;
|
|
}
|
|
|
|
template <Fn<size_t(size_t)> Sharder>
|
|
static Blocks scatterBlockByHashImpl(const Strings & key_columns_names, const Block & block, size_t num_shards, Sharder sharder)
|
|
{
|
|
size_t num_rows = block.rows();
|
|
size_t num_cols = block.columns();
|
|
|
|
/// Use non-standard initial value so as not to degrade hash map performance inside shard that uses the same CRC32 algorithm.
|
|
WeakHash32 hash(num_rows);
|
|
for (const auto & key_name : key_columns_names)
|
|
{
|
|
ColumnPtr key_col = materializeColumn(block, key_name);
|
|
key_col->updateWeakHash32(hash);
|
|
}
|
|
auto selector = hashToSelector(hash, sharder);
|
|
|
|
Blocks result;
|
|
result.reserve(num_shards);
|
|
for (size_t i = 0; i < num_shards; ++i)
|
|
{
|
|
result.emplace_back(block.cloneEmpty());
|
|
}
|
|
|
|
for (size_t i = 0; i < num_cols; ++i)
|
|
{
|
|
auto dispatched_columns = block.getByPosition(i).column->scatter(num_shards, selector);
|
|
assert(result.size() == dispatched_columns.size());
|
|
for (size_t block_index = 0; block_index < num_shards; ++block_index)
|
|
{
|
|
result[block_index].getByPosition(i).column = std::move(dispatched_columns[block_index]);
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
static Blocks scatterBlockByHashPow2(const Strings & key_columns_names, const Block & block, size_t num_shards)
|
|
{
|
|
size_t mask = num_shards - 1;
|
|
return scatterBlockByHashImpl(key_columns_names, block, num_shards, [mask](size_t hash) { return hash & mask; });
|
|
}
|
|
|
|
static Blocks scatterBlockByHashGeneric(const Strings & key_columns_names, const Block & block, size_t num_shards)
|
|
{
|
|
return scatterBlockByHashImpl(key_columns_names, block, num_shards, [num_shards](size_t hash) { return hash % num_shards; });
|
|
}
|
|
|
|
Blocks scatterBlockByHash(const Strings & key_columns_names, const Block & block, size_t num_shards)
|
|
{
|
|
if (num_shards == 0)
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of shards must be positive");
|
|
UNUSED(scatterBlockByHashPow2);
|
|
// if (likely(isPowerOf2(num_shards)))
|
|
// return scatterBlockByHashPow2(key_columns_names, block, num_shards);
|
|
return scatterBlockByHashGeneric(key_columns_names, block, num_shards);
|
|
}
|
|
|
|
template<typename T>
|
|
static Blocks scatterBlockByHashForList(const Strings & key_columns_names, const T & blocks, size_t num_shards)
|
|
{
|
|
std::vector<Blocks> scattered_blocks(num_shards);
|
|
for (const auto & block : blocks)
|
|
{
|
|
if (block.rows() == 0)
|
|
continue;
|
|
auto scattered = scatterBlockByHash(key_columns_names, block, num_shards);
|
|
for (size_t i = 0; i < num_shards; ++i)
|
|
scattered_blocks[i].emplace_back(std::move(scattered[i]));
|
|
}
|
|
|
|
Blocks result;
|
|
result.reserve(num_shards);
|
|
for (size_t i = 0; i < num_shards; ++i)
|
|
{
|
|
result.emplace_back(concatenateBlocks(scattered_blocks[i]));
|
|
}
|
|
return result;
|
|
}
|
|
|
|
Blocks scatterBlockByHash(const Strings & key_columns_names, const Blocks & blocks, size_t num_shards)
|
|
{
|
|
return scatterBlockByHashForList(key_columns_names, blocks, num_shards);
|
|
}
|
|
|
|
Blocks scatterBlockByHash(const Strings & key_columns_names, const BlocksList & blocks, size_t num_shards)
|
|
{
|
|
return scatterBlockByHashForList(key_columns_names, blocks, num_shards);
|
|
}
|
|
|
|
bool hasNonJoinedBlocks(const TableJoin & table_join)
|
|
{
|
|
return table_join.strictness() != JoinStrictness::Asof && table_join.strictness() != JoinStrictness::Semi
|
|
&& isRightOrFull(table_join.kind());
|
|
}
|
|
|
|
ColumnPtr filterWithBlanks(ColumnPtr src_column, const IColumn::Filter & filter, bool inverse_filter)
|
|
{
|
|
ColumnPtr column = src_column->convertToFullColumnIfConst();
|
|
MutableColumnPtr mut_column = column->cloneEmpty();
|
|
mut_column->reserve(column->size());
|
|
|
|
if (inverse_filter)
|
|
{
|
|
for (size_t row = 0; row < filter.size(); ++row)
|
|
{
|
|
if (filter[row])
|
|
mut_column->insertDefault();
|
|
else
|
|
mut_column->insertFrom(*column, row);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
for (size_t row = 0; row < filter.size(); ++row)
|
|
{
|
|
if (filter[row])
|
|
mut_column->insertFrom(*column, row);
|
|
else
|
|
mut_column->insertDefault();
|
|
}
|
|
}
|
|
|
|
return mut_column;
|
|
}
|
|
|
|
}
|
|
|
|
NotJoinedBlocks::NotJoinedBlocks(std::unique_ptr<RightColumnsFiller> filler_,
|
|
const Block & result_sample_block_,
|
|
size_t left_columns_count,
|
|
const TableJoin & table_join)
|
|
: filler(std::move(filler_))
|
|
, saved_block_sample(filler->getEmptyBlock())
|
|
, result_sample_block(materializeBlock(result_sample_block_))
|
|
{
|
|
const auto & left_to_right_key_remap = table_join.leftToRightKeyRemap();
|
|
for (size_t left_pos = 0; left_pos < left_columns_count; ++left_pos)
|
|
{
|
|
/// We need right 'x' for 'RIGHT JOIN ... USING(x)'
|
|
auto left_name = result_sample_block.getByPosition(left_pos).name;
|
|
const auto & right_key = left_to_right_key_remap.find(left_name);
|
|
if (right_key != left_to_right_key_remap.end())
|
|
{
|
|
size_t right_key_pos = saved_block_sample.getPositionByName(right_key->second);
|
|
setRightIndex(right_key_pos, left_pos);
|
|
}
|
|
else
|
|
column_indices_left.emplace_back(left_pos);
|
|
}
|
|
|
|
/// `saved_block_sample` may contains non unique column names, get any of them
|
|
/// (e.g. in case of `... JOIN (SELECT a, a, b FROM table) as t2`)
|
|
for (const auto & [right_name, right_pos] : saved_block_sample.getNamesToIndexesMap())
|
|
{
|
|
String column_name(right_name);
|
|
if (table_join.getStorageJoin())
|
|
{
|
|
/// StorageJoin operates with original non qualified column names, so apply renaming here
|
|
column_name = table_join.renamedRightColumnName(column_name);
|
|
}
|
|
|
|
/// Start from left_columns_count to don't remap left keys twice. We need only qualified right keys here
|
|
/// `result_sample_block` may contains non unique column names, need to set index for all of them
|
|
for (size_t result_pos = left_columns_count; result_pos < result_sample_block.columns(); ++result_pos)
|
|
{
|
|
const auto & result_name = result_sample_block.getByPosition(result_pos).name;
|
|
if (result_name == column_name)
|
|
setRightIndex(right_pos, result_pos);
|
|
}
|
|
}
|
|
|
|
if (column_indices_left.size() + column_indices_right.size() + same_result_keys.size() != result_sample_block.columns())
|
|
{
|
|
throw Exception(
|
|
ErrorCodes::LOGICAL_ERROR,
|
|
"Error in columns mapping in JOIN: assertion failed {} + {} + {} != {}; "
|
|
"Result block [{}], Saved block [{}]",
|
|
column_indices_left.size(), column_indices_right.size(), same_result_keys.size(), result_sample_block.columns(),
|
|
result_sample_block.dumpNames(), saved_block_sample.dumpNames());
|
|
}
|
|
}
|
|
|
|
void NotJoinedBlocks::setRightIndex(size_t right_pos, size_t result_position)
|
|
{
|
|
if (!column_indices_right.contains(right_pos))
|
|
{
|
|
column_indices_right[right_pos] = result_position;
|
|
extractColumnChanges(right_pos, result_position);
|
|
}
|
|
else
|
|
same_result_keys[result_position] = column_indices_right[right_pos];
|
|
}
|
|
|
|
void NotJoinedBlocks::extractColumnChanges(size_t right_pos, size_t result_pos)
|
|
{
|
|
auto src_props = getLowcardAndNullability(saved_block_sample.getByPosition(right_pos).column);
|
|
auto dst_props = getLowcardAndNullability(result_sample_block.getByPosition(result_pos).column);
|
|
|
|
if (src_props.is_nullable != dst_props.is_nullable)
|
|
right_nullability_changes.push_back({result_pos, dst_props.is_nullable});
|
|
|
|
if (src_props.is_lowcard != dst_props.is_lowcard)
|
|
right_lowcard_changes.push_back({result_pos, dst_props.is_lowcard});
|
|
}
|
|
|
|
void NotJoinedBlocks::correctLowcardAndNullability(Block & block)
|
|
{
|
|
for (auto & [pos, added] : right_nullability_changes)
|
|
{
|
|
auto & col = block.getByPosition(pos);
|
|
if (added)
|
|
JoinCommon::convertColumnToNullable(col);
|
|
else
|
|
JoinCommon::removeColumnNullability(col);
|
|
}
|
|
|
|
for (auto & [pos, added] : right_lowcard_changes)
|
|
{
|
|
auto & col = block.getByPosition(pos);
|
|
if (added)
|
|
{
|
|
if (!col.type->lowCardinality())
|
|
col.type = std::make_shared<DataTypeLowCardinality>(col.type);
|
|
col.column = changeLowCardinality(col.column, col.type->createColumn());
|
|
}
|
|
else
|
|
{
|
|
col.column = recursiveRemoveLowCardinality(col.column);
|
|
col.type = recursiveRemoveLowCardinality(col.type);
|
|
}
|
|
}
|
|
}
|
|
|
|
void NotJoinedBlocks::addLeftColumns(Block & block, size_t rows_added) const
|
|
{
|
|
for (size_t pos : column_indices_left)
|
|
{
|
|
auto & col = block.getByPosition(pos);
|
|
|
|
auto mut_col = col.column->cloneEmpty();
|
|
JoinCommon::addDefaultValues(*mut_col, col.type, rows_added);
|
|
col.column = std::move(mut_col);
|
|
}
|
|
}
|
|
|
|
void NotJoinedBlocks::addRightColumns(Block & block, MutableColumns & columns_right) const
|
|
{
|
|
for (const auto & pr : column_indices_right)
|
|
{
|
|
auto & right_column = columns_right[pr.first];
|
|
auto & result_column = block.getByPosition(pr.second).column;
|
|
result_column = std::move(right_column);
|
|
}
|
|
}
|
|
|
|
void NotJoinedBlocks::copySameKeys(Block & block) const
|
|
{
|
|
for (const auto & pr : same_result_keys)
|
|
{
|
|
auto & src_column = block.getByPosition(pr.second).column;
|
|
auto & dst_column = block.getByPosition(pr.first).column;
|
|
JoinCommon::changeColumnRepresentation(src_column, dst_column);
|
|
}
|
|
}
|
|
|
|
Block NotJoinedBlocks::nextImpl()
|
|
{
|
|
Block result_block = result_sample_block.cloneEmpty();
|
|
{
|
|
Block right_block = filler->getEmptyBlock();
|
|
MutableColumns columns_right = right_block.cloneEmptyColumns();
|
|
size_t rows_added = filler->fillColumns(columns_right);
|
|
if (rows_added == 0)
|
|
return {};
|
|
|
|
addLeftColumns(result_block, rows_added);
|
|
addRightColumns(result_block, columns_right);
|
|
}
|
|
copySameKeys(result_block);
|
|
correctLowcardAndNullability(result_block);
|
|
|
|
#ifndef NDEBUG
|
|
assertBlocksHaveEqualStructure(result_block, result_sample_block, "NotJoinedBlocks");
|
|
#endif
|
|
return result_block;
|
|
}
|
|
|
|
}
|