mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge pull request #58278 from liuneng1994/optimize-all-join
Lazy build join output to improve performance of ALL join
This commit is contained in:
commit
a8eeb899e3
@ -1019,6 +1019,7 @@ struct JoinOnKeyColumns
|
||||
bool isRowFiltered(size_t i) const { return join_mask_column.isRowFiltered(i); }
|
||||
};
|
||||
|
||||
template <bool lazy>
|
||||
class AddedColumns
|
||||
{
|
||||
public:
|
||||
@ -1034,6 +1035,12 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
struct LazyOutput
|
||||
{
|
||||
PaddedPODArray<UInt64> blocks;
|
||||
PaddedPODArray<UInt32> row_nums;
|
||||
};
|
||||
|
||||
AddedColumns(
|
||||
const Block & left_block,
|
||||
const Block & block_with_columns_to_add,
|
||||
@ -1050,6 +1057,13 @@ public:
|
||||
if (is_asof_join)
|
||||
++num_columns_to_add;
|
||||
|
||||
if constexpr (lazy)
|
||||
{
|
||||
has_columns_to_add = num_columns_to_add > 0;
|
||||
lazy_output.blocks.reserve(rows_to_add);
|
||||
lazy_output.row_nums.reserve(rows_to_add);
|
||||
}
|
||||
|
||||
columns.reserve(num_columns_to_add);
|
||||
type_name.reserve(num_columns_to_add);
|
||||
right_indexes.reserve(num_columns_to_add);
|
||||
@ -1089,81 +1103,18 @@ public:
|
||||
|
||||
size_t size() const { return columns.size(); }
|
||||
|
||||
void buildOutput();
|
||||
|
||||
ColumnWithTypeAndName moveColumn(size_t i)
|
||||
{
|
||||
return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].type, type_name[i].qualified_name);
|
||||
}
|
||||
|
||||
void appendFromBlock(const Block & block, size_t row_num, bool has_default);
|
||||
|
||||
template <bool has_defaults>
|
||||
void appendFromBlock(const Block & block, size_t row_num)
|
||||
{
|
||||
if constexpr (has_defaults)
|
||||
applyLazyDefaults();
|
||||
void appendDefaultRow();
|
||||
|
||||
#ifndef NDEBUG
|
||||
for (size_t j = 0; j < right_indexes.size(); ++j)
|
||||
{
|
||||
const auto * column_from_block = block.getByPosition(right_indexes[j]).column.get();
|
||||
const auto * dest_column = columns[j].get();
|
||||
if (auto * nullable_col = nullable_column_ptrs[j])
|
||||
{
|
||||
if (!is_join_get)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Columns {} and {} can have different nullability only in joinGetOrNull",
|
||||
dest_column->getName(), column_from_block->getName());
|
||||
dest_column = nullable_col->getNestedColumnPtr().get();
|
||||
}
|
||||
/** Using dest_column->structureEquals(*column_from_block) will not work for low cardinality columns,
|
||||
* because dictionaries can be different, while calling insertFrom on them is safe, for example:
|
||||
* ColumnLowCardinality(size = 0, UInt8(size = 0), ColumnUnique(size = 1, String(size = 1)))
|
||||
* and
|
||||
* ColumnLowCardinality(size = 0, UInt16(size = 0), ColumnUnique(size = 1, String(size = 1)))
|
||||
*/
|
||||
if (typeid(*dest_column) != typeid(*column_from_block))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns {} and {} have different types {} and {}",
|
||||
dest_column->getName(), column_from_block->getName(),
|
||||
demangle(typeid(*dest_column).name()), demangle(typeid(*column_from_block).name()));
|
||||
}
|
||||
#endif
|
||||
|
||||
if (is_join_get)
|
||||
{
|
||||
size_t right_indexes_size = right_indexes.size();
|
||||
for (size_t j = 0; j < right_indexes_size; ++j)
|
||||
{
|
||||
const auto & column_from_block = block.getByPosition(right_indexes[j]);
|
||||
if (auto * nullable_col = nullable_column_ptrs[j])
|
||||
nullable_col->insertFromNotNullable(*column_from_block.column, row_num);
|
||||
else
|
||||
columns[j]->insertFrom(*column_from_block.column, row_num);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
size_t right_indexes_size = right_indexes.size();
|
||||
for (size_t j = 0; j < right_indexes_size; ++j)
|
||||
{
|
||||
const auto & column_from_block = block.getByPosition(right_indexes[j]);
|
||||
columns[j]->insertFrom(*column_from_block.column, row_num);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void appendDefaultRow()
|
||||
{
|
||||
++lazy_defaults_count;
|
||||
}
|
||||
|
||||
void applyLazyDefaults()
|
||||
{
|
||||
if (lazy_defaults_count)
|
||||
{
|
||||
for (size_t j = 0, size = right_indexes.size(); j < size; ++j)
|
||||
JoinCommon::addDefaultValues(*columns[j], type_name[j].type, lazy_defaults_count);
|
||||
lazy_defaults_count = 0;
|
||||
}
|
||||
}
|
||||
void applyLazyDefaults();
|
||||
|
||||
const IColumn & leftAsofKey() const { return *left_asof_key; }
|
||||
|
||||
@ -1192,16 +1143,50 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<TypeAndName> type_name;
|
||||
MutableColumns columns;
|
||||
std::vector<ColumnNullable *> nullable_column_ptrs;
|
||||
|
||||
void checkBlock(const Block & block)
|
||||
{
|
||||
for (size_t j = 0; j < right_indexes.size(); ++j)
|
||||
{
|
||||
const auto * column_from_block = block.getByPosition(right_indexes[j]).column.get();
|
||||
const auto * dest_column = columns[j].get();
|
||||
if (auto * nullable_col = nullable_column_ptrs[j])
|
||||
{
|
||||
if (!is_join_get)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Columns {} and {} can have different nullability only in joinGetOrNull",
|
||||
dest_column->getName(), column_from_block->getName());
|
||||
dest_column = nullable_col->getNestedColumnPtr().get();
|
||||
}
|
||||
/** Using dest_column->structureEquals(*column_from_block) will not work for low cardinality columns,
|
||||
* because dictionaries can be different, while calling insertFrom on them is safe, for example:
|
||||
* ColumnLowCardinality(size = 0, UInt8(size = 0), ColumnUnique(size = 1, String(size = 1)))
|
||||
* and
|
||||
* ColumnLowCardinality(size = 0, UInt16(size = 0), ColumnUnique(size = 1, String(size = 1)))
|
||||
*/
|
||||
if (typeid(*dest_column) != typeid(*column_from_block))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns {} and {} have different types {} and {}",
|
||||
dest_column->getName(), column_from_block->getName(),
|
||||
demangle(typeid(*dest_column).name()), demangle(typeid(*column_from_block).name()));
|
||||
}
|
||||
}
|
||||
|
||||
MutableColumns columns;
|
||||
bool is_join_get;
|
||||
std::vector<size_t> right_indexes;
|
||||
std::vector<TypeAndName> type_name;
|
||||
std::vector<ColumnNullable *> nullable_column_ptrs;
|
||||
size_t lazy_defaults_count = 0;
|
||||
|
||||
/// for lazy
|
||||
// The default row is represented by an empty RowRef, so that fixed-size blocks can be generated sequentially,
|
||||
// default_count cannot represent the position of the row
|
||||
LazyOutput lazy_output;
|
||||
bool has_columns_to_add;
|
||||
|
||||
/// for ASOF
|
||||
const IColumn * left_asof_key = nullptr;
|
||||
|
||||
bool is_join_get;
|
||||
|
||||
void addColumn(const ColumnWithTypeAndName & src_column, const std::string & qualified_name)
|
||||
{
|
||||
@ -1210,6 +1195,126 @@ private:
|
||||
type_name.emplace_back(src_column.type, src_column.name, qualified_name);
|
||||
}
|
||||
};
|
||||
template<> void AddedColumns<false>::buildOutput()
|
||||
{
|
||||
}
|
||||
|
||||
template<>
|
||||
void AddedColumns<true>::buildOutput()
|
||||
{
|
||||
for (size_t i = 0; i < this->size(); ++i)
|
||||
{
|
||||
auto& col = columns[i];
|
||||
size_t default_count = 0;
|
||||
auto apply_default = [&]()
|
||||
{
|
||||
if (default_count > 0)
|
||||
{
|
||||
JoinCommon::addDefaultValues(*col, type_name[i].type, default_count);
|
||||
default_count = 0;
|
||||
}
|
||||
};
|
||||
|
||||
for (size_t j = 0; j < lazy_output.blocks.size(); ++j)
|
||||
{
|
||||
if (!lazy_output.blocks[j])
|
||||
{
|
||||
default_count ++;
|
||||
continue;
|
||||
}
|
||||
apply_default();
|
||||
const auto & column_from_block = reinterpret_cast<const Block *>(lazy_output.blocks[j])->getByPosition(right_indexes[i]);
|
||||
/// If it's joinGetOrNull, we need to wrap not-nullable columns in StorageJoin.
|
||||
if (is_join_get)
|
||||
{
|
||||
if (auto * nullable_col = typeid_cast<ColumnNullable *>(col.get());
|
||||
nullable_col && !column_from_block.column->isNullable())
|
||||
{
|
||||
nullable_col->insertFromNotNullable(*column_from_block.column, lazy_output.row_nums[j]);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
col->insertFrom(*column_from_block.column, lazy_output.row_nums[j]);
|
||||
}
|
||||
apply_default();
|
||||
}
|
||||
}
|
||||
|
||||
template<>
|
||||
void AddedColumns<false>::applyLazyDefaults()
|
||||
{
|
||||
if (lazy_defaults_count)
|
||||
{
|
||||
for (size_t j = 0, size = right_indexes.size(); j < size; ++j)
|
||||
JoinCommon::addDefaultValues(*columns[j], type_name[j].type, lazy_defaults_count);
|
||||
lazy_defaults_count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
template<>
|
||||
void AddedColumns<true>::applyLazyDefaults()
|
||||
{
|
||||
}
|
||||
|
||||
template <>
|
||||
void AddedColumns<false>::appendFromBlock(const Block & block, size_t row_num,const bool has_defaults)
|
||||
{
|
||||
if (has_defaults)
|
||||
applyLazyDefaults();
|
||||
|
||||
#ifndef NDEBUG
|
||||
checkBlock(block);
|
||||
#endif
|
||||
if (is_join_get)
|
||||
{
|
||||
size_t right_indexes_size = right_indexes.size();
|
||||
for (size_t j = 0; j < right_indexes_size; ++j)
|
||||
{
|
||||
const auto & column_from_block = block.getByPosition(right_indexes[j]);
|
||||
if (auto * nullable_col = nullable_column_ptrs[j])
|
||||
nullable_col->insertFromNotNullable(*column_from_block.column, row_num);
|
||||
else
|
||||
columns[j]->insertFrom(*column_from_block.column, row_num);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
size_t right_indexes_size = right_indexes.size();
|
||||
for (size_t j = 0; j < right_indexes_size; ++j)
|
||||
{
|
||||
const auto & column_from_block = block.getByPosition(right_indexes[j]);
|
||||
columns[j]->insertFrom(*column_from_block.column, row_num);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <>
|
||||
void AddedColumns<true>::appendFromBlock(const Block & block, size_t row_num, bool)
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
checkBlock(block);
|
||||
#endif
|
||||
if (has_columns_to_add)
|
||||
{
|
||||
lazy_output.blocks.emplace_back(reinterpret_cast<UInt64>(&block));
|
||||
lazy_output.row_nums.emplace_back(static_cast<uint32_t>(row_num));
|
||||
}
|
||||
}
|
||||
template<>
|
||||
void AddedColumns<false>::appendDefaultRow()
|
||||
{
|
||||
++lazy_defaults_count;
|
||||
}
|
||||
|
||||
template<>
|
||||
void AddedColumns<true>::appendDefaultRow()
|
||||
{
|
||||
if (has_columns_to_add)
|
||||
{
|
||||
lazy_output.blocks.emplace_back(0);
|
||||
lazy_output.row_nums.emplace_back(0);
|
||||
}
|
||||
}
|
||||
|
||||
template <JoinKind KIND, JoinStrictness STRICTNESS>
|
||||
struct JoinFeatures
|
||||
@ -1308,7 +1413,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
template <typename Map, bool add_missing, bool multiple_disjuncts>
|
||||
template <typename Map, bool add_missing, bool multiple_disjuncts, typename AddedColumns>
|
||||
void addFoundRowAll(
|
||||
const typename Map::mapped_type & mapped,
|
||||
AddedColumns & added,
|
||||
@ -1327,7 +1432,7 @@ void addFoundRowAll(
|
||||
{
|
||||
if (!known_rows.isKnown(std::make_pair(it->block, it->row_num)))
|
||||
{
|
||||
added.appendFromBlock<false>(*it->block, it->row_num);
|
||||
added.appendFromBlock(*it->block, it->row_num, false);
|
||||
++current_offset;
|
||||
if (!new_known_rows_ptr)
|
||||
{
|
||||
@ -1351,13 +1456,13 @@ void addFoundRowAll(
|
||||
{
|
||||
for (auto it = mapped.begin(); it.ok(); ++it)
|
||||
{
|
||||
added.appendFromBlock<false>(*it->block, it->row_num);
|
||||
added.appendFromBlock(*it->block, it->row_num, false);
|
||||
++current_offset;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <bool add_missing, bool need_offset>
|
||||
template <bool add_missing, bool need_offset, typename AddedColumns>
|
||||
void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & current_offset [[maybe_unused]])
|
||||
{
|
||||
if constexpr (add_missing)
|
||||
@ -1377,7 +1482,7 @@ void setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unuse
|
||||
|
||||
/// Joins right table columns which indexes are present in right_indexes using specified map.
|
||||
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
|
||||
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter, bool multiple_disjuncts>
|
||||
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter, bool multiple_disjuncts, typename AddedColumns>
|
||||
NO_INLINE size_t joinRightColumns(
|
||||
std::vector<KeyGetter> && key_getter_vector,
|
||||
const std::vector<const Map *> & mapv,
|
||||
@ -1440,7 +1545,7 @@ NO_INLINE size_t joinRightColumns(
|
||||
else
|
||||
used_flags.template setUsed<join_features.need_flags, multiple_disjuncts>(find_result);
|
||||
|
||||
added_columns.appendFromBlock<join_features.add_missing>(*row_ref.block, row_ref.row_num);
|
||||
added_columns.appendFromBlock(*row_ref.block, row_ref.row_num, join_features.add_missing);
|
||||
}
|
||||
else
|
||||
addNotFoundRow<join_features.add_missing, join_features.need_replication>(added_columns, current_offset);
|
||||
@ -1471,7 +1576,7 @@ NO_INLINE size_t joinRightColumns(
|
||||
if (used_once)
|
||||
{
|
||||
setUsed<need_filter>(added_columns.filter, i);
|
||||
added_columns.appendFromBlock<join_features.add_missing>(*mapped.block, mapped.row_num);
|
||||
added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing);
|
||||
}
|
||||
|
||||
break;
|
||||
@ -1489,7 +1594,7 @@ NO_INLINE size_t joinRightColumns(
|
||||
{
|
||||
setUsed<need_filter>(added_columns.filter, i);
|
||||
used_flags.template setUsed<join_features.need_flags, multiple_disjuncts>(find_result);
|
||||
added_columns.appendFromBlock<join_features.add_missing>(*mapped.block, mapped.row_num);
|
||||
added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing);
|
||||
|
||||
if (join_features.is_any_or_semi_join)
|
||||
{
|
||||
@ -1516,7 +1621,7 @@ NO_INLINE size_t joinRightColumns(
|
||||
return i;
|
||||
}
|
||||
|
||||
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter>
|
||||
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter, typename AddedColumns>
|
||||
size_t joinRightColumnsSwitchMultipleDisjuncts(
|
||||
std::vector<KeyGetter> && key_getter_vector,
|
||||
const std::vector<const Map *> & mapv,
|
||||
@ -1528,7 +1633,7 @@ size_t joinRightColumnsSwitchMultipleDisjuncts(
|
||||
: joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, need_filter, false>(std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
|
||||
}
|
||||
|
||||
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map>
|
||||
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, typename AddedColumns>
|
||||
size_t joinRightColumnsSwitchNullability(
|
||||
std::vector<KeyGetter> && key_getter_vector,
|
||||
const std::vector<const Map *> & mapv,
|
||||
@ -1545,7 +1650,7 @@ size_t joinRightColumnsSwitchNullability(
|
||||
}
|
||||
}
|
||||
|
||||
template <JoinKind KIND, JoinStrictness STRICTNESS, typename Maps>
|
||||
template <JoinKind KIND, JoinStrictness STRICTNESS, typename Maps, typename AddedColumns>
|
||||
size_t switchJoinRightColumns(
|
||||
const std::vector<const Maps *> & mapv,
|
||||
AddedColumns & added_columns,
|
||||
@ -1680,14 +1785,9 @@ Block HashJoin::joinBlockImpl(
|
||||
* but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped.
|
||||
* For ASOF, the last column is used as the ASOF column
|
||||
*/
|
||||
AddedColumns added_columns(
|
||||
block,
|
||||
block_with_columns_to_add,
|
||||
savedBlockSample(),
|
||||
*this,
|
||||
std::move(join_on_keys),
|
||||
join_features.is_asof_join,
|
||||
is_join_get);
|
||||
AddedColumns<!join_features.is_any_join> added_columns(
|
||||
block, block_with_columns_to_add, savedBlockSample(), *this, std::move(join_on_keys), join_features.is_asof_join, is_join_get);
|
||||
|
||||
|
||||
bool has_required_right_keys = (required_right_keys.columns() != 0);
|
||||
added_columns.need_filter = join_features.need_filter || has_required_right_keys;
|
||||
@ -1702,6 +1802,7 @@ Block HashJoin::joinBlockImpl(
|
||||
added_columns.join_on_keys.clear();
|
||||
Block remaining_block = sliceBlock(block, num_joined);
|
||||
|
||||
added_columns.buildOutput();
|
||||
for (size_t i = 0; i < added_columns.size(); ++i)
|
||||
block.insert(added_columns.moveColumn(i));
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user