use template class

This commit is contained in:
liuneng 2024-02-26 15:34:56 +08:00
parent 2279885c3e
commit 94f78ac44b

View File

@ -1019,6 +1019,7 @@ struct JoinOnKeyColumns
bool isRowFiltered(size_t i) const { return join_mask_column.isRowFiltered(i); } bool isRowFiltered(size_t i) const { return join_mask_column.isRowFiltered(i); }
}; };
template <bool lazy>
class AddedColumns class AddedColumns
{ {
public: public:
@ -1034,6 +1035,12 @@ public:
} }
}; };
struct LazyOutput
{
PaddedPODArray<UInt64> blocks;
PaddedPODArray<UInt32> row_nums;
};
AddedColumns( AddedColumns(
const Block & left_block, const Block & left_block,
const Block & block_with_columns_to_add, const Block & block_with_columns_to_add,
@ -1047,6 +1054,14 @@ public:
, is_join_get(is_join_get_) , is_join_get(is_join_get_)
{ {
size_t num_columns_to_add = block_with_columns_to_add.columns(); size_t num_columns_to_add = block_with_columns_to_add.columns();
if constexpr (lazy)
{
has_columns_to_add = block_with_columns_to_add.columns() > 0;
lazy_output.blocks.reserve(rows_to_add);
lazy_output.row_nums.reserve(rows_to_add);
}
if (is_asof_join) if (is_asof_join)
++num_columns_to_add; ++num_columns_to_add;
columns.reserve(num_columns_to_add); columns.reserve(num_columns_to_add);
@ -1086,76 +1101,21 @@ public:
} }
} }
virtual ~AddedColumns() = default;
size_t size() const { return columns.size(); } size_t size() const { return columns.size(); }
virtual void buildOutput() { } void buildOutput();
ColumnWithTypeAndName moveColumn(size_t i) ColumnWithTypeAndName moveColumn(size_t i)
{ {
return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].type, type_name[i].qualified_name); return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].type, type_name[i].qualified_name);
} }
virtual void appendFromBlock(const Block & block, size_t row_num, bool has_defaults) void appendFromBlock(const Block & block, size_t row_num, bool has_default);
{
if (has_defaults)
applyLazyDefaults();
#ifndef NDEBUG void appendDefaultRow();
for (size_t j = 0; j < right_indexes.size(); ++j)
{
const auto * column_from_block = block.getByPosition(right_indexes[j]).column.get();
const auto * dest_column = columns[j].get();
if (auto * nullable_col = nullable_column_ptrs[j])
{
if (!is_join_get)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Columns {} and {} can have different nullability only in joinGetOrNull",
dest_column->getName(), column_from_block->getName());
dest_column = nullable_col->getNestedColumnPtr().get();
}
/** Using dest_column->structureEquals(*column_from_block) will not work for low cardinality columns,
* because dictionaries can be different, while calling insertFrom on them is safe, for example:
* ColumnLowCardinality(size = 0, UInt8(size = 0), ColumnUnique(size = 1, String(size = 1)))
* and
* ColumnLowCardinality(size = 0, UInt16(size = 0), ColumnUnique(size = 1, String(size = 1)))
*/
if (typeid(*dest_column) != typeid(*column_from_block))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns {} and {} have different types {} and {}",
dest_column->getName(), column_from_block->getName(),
demangle(typeid(*dest_column).name()), demangle(typeid(*column_from_block).name()));
}
#endif
if (is_join_get)
{
size_t right_indexes_size = right_indexes.size();
for (size_t j = 0; j < right_indexes_size; ++j)
{
const auto & column_from_block = block.getByPosition(right_indexes[j]);
if (auto * nullable_col = nullable_column_ptrs[j])
nullable_col->insertFromNotNullable(*column_from_block.column, row_num);
else
columns[j]->insertFrom(*column_from_block.column, row_num);
}
}
else
{
size_t right_indexes_size = right_indexes.size();
for (size_t j = 0; j < right_indexes_size; ++j)
{
const auto & column_from_block = block.getByPosition(right_indexes[j]);
columns[j]->insertFrom(*column_from_block.column, row_num);
}
}
}
virtual void appendDefaultRow()
{
++lazy_defaults_count;
}
virtual void applyLazyDefaults() void applyLazyDefaults()
{ {
if (lazy_defaults_count) if (lazy_defaults_count)
{ {
@ -1197,6 +1157,12 @@ protected:
std::vector<size_t> right_indexes; std::vector<size_t> right_indexes;
std::vector<TypeAndName> type_name; std::vector<TypeAndName> type_name;
std::vector<ColumnNullable *> nullable_column_ptrs; std::vector<ColumnNullable *> nullable_column_ptrs;
// The default row is represented by an empty RowRef, so that fixed-size blocks can be generated sequentially,
// default_count cannot represent the position of the row
LazyOutput lazy_output;
bool has_columns_to_add;
private: private:
size_t lazy_defaults_count = 0; size_t lazy_defaults_count = 0;
/// for ASOF /// for ASOF
@ -1210,123 +1176,153 @@ private:
type_name.emplace_back(src_column.type, src_column.name, qualified_name); type_name.emplace_back(src_column.type, src_column.name, qualified_name);
} }
}; };
template<> void AddedColumns<false>::buildOutput()
class LazyAddedColumns : public AddedColumns
{ {
public: }
struct LazyOutput template<>
void AddedColumns<true>::buildOutput()
{
for (size_t i = 0; i < this->size(); ++i)
{ {
PaddedPODArray<UInt64> blocks; auto& col = columns[i];
PaddedPODArray<UInt32> row_nums; size_t default_count = 0;
}; auto apply_default = [&]()
LazyAddedColumns(
const Block & left_block,
const Block & block_with_columns_to_add,
const Block & saved_block_sample,
const HashJoin & join,
std::vector<JoinOnKeyColumns> && join_on_keys_,
bool is_asof_join,
bool is_join_get_)
: AddedColumns(left_block, block_with_columns_to_add, saved_block_sample, join, std::move(join_on_keys_), is_asof_join, is_join_get_)
{
has_columns_to_add = block_with_columns_to_add.columns() > 0;
lazy_output.blocks.reserve(rows_to_add);
lazy_output.row_nums.reserve(rows_to_add);
}
void buildOutput() override
{
for (size_t i = 0; i < this->size(); ++i)
{ {
auto& col = columns[i]; if (default_count > 0)
size_t default_count = 0;
auto apply_default = [&]()
{ {
if (default_count > 0) JoinCommon::addDefaultValues(*col, type_name[i].type, default_count);
{ default_count = 0;
JoinCommon::addDefaultValues(*col, type_name[i].type, default_count); }
default_count = 0; };
}
};
for (size_t j = 0; j < lazy_output.blocks.size(); ++j) for (size_t j = 0; j < lazy_output.blocks.size(); ++j)
{
if (!lazy_output.blocks[j])
{ {
if (!lazy_output.blocks[j]) default_count ++;
{ continue;
default_count ++;
continue;
}
apply_default();
const auto & column_from_block = reinterpret_cast<const Block *>(lazy_output.blocks[j])->getByPosition(right_indexes[i]);
/// If it's joinGetOrNull, we need to wrap not-nullable columns in StorageJoin.
if (is_join_get)
{
if (auto * nullable_col = typeid_cast<ColumnNullable *>(col.get());
nullable_col && !column_from_block.column->isNullable())
{
nullable_col->insertFromNotNullable(*column_from_block.column, lazy_output.row_nums[j]);
continue;
}
}
col->insertFrom(*column_from_block.column, lazy_output.row_nums[j]);
} }
apply_default(); apply_default();
} const auto & column_from_block = reinterpret_cast<const Block *>(lazy_output.blocks[j])->getByPosition(right_indexes[i]);
} /// If it's joinGetOrNull, we need to wrap not-nullable columns in StorageJoin.
if (is_join_get)
void appendFromBlock(const Block & block, size_t row_num, bool) override
{
#ifndef NDEBUG
for (size_t j = 0; j < right_indexes.size(); ++j)
{
const auto * column_from_block = block.getByPosition(right_indexes[j]).column.get();
const auto * dest_column = columns[j].get();
if (auto * nullable_col = nullable_column_ptrs[j])
{ {
if (!is_join_get) if (auto * nullable_col = typeid_cast<ColumnNullable *>(col.get());
throw Exception(ErrorCodes::LOGICAL_ERROR, nullable_col && !column_from_block.column->isNullable())
"Columns {} and {} can have different nullability only in joinGetOrNull", {
dest_column->getName(), column_from_block->getName()); nullable_col->insertFromNotNullable(*column_from_block.column, lazy_output.row_nums[j]);
dest_column = nullable_col->getNestedColumnPtr().get(); continue;
}
} }
/** Using dest_column->structureEquals(*column_from_block) will not work for low cardinality columns, col->insertFrom(*column_from_block.column, lazy_output.row_nums[j]);
}
apply_default();
}
}
template <>
void AddedColumns<false>::appendFromBlock(const Block & block, size_t row_num,const bool has_defaults)
{
if (has_defaults)
applyLazyDefaults();
#ifndef NDEBUG
for (size_t j = 0; j < right_indexes.size(); ++j)
{
const auto * column_from_block = block.getByPosition(right_indexes[j]).column.get();
const auto * dest_column = columns[j].get();
if (auto * nullable_col = nullable_column_ptrs[j])
{
if (!is_join_get)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Columns {} and {} can have different nullability only in joinGetOrNull",
dest_column->getName(), column_from_block->getName());
dest_column = nullable_col->getNestedColumnPtr().get();
}
/** Using dest_column->structureEquals(*column_from_block) will not work for low cardinality columns,
* because dictionaries can be different, while calling insertFrom on them is safe, for example: * because dictionaries can be different, while calling insertFrom on them is safe, for example:
* ColumnLowCardinality(size = 0, UInt8(size = 0), ColumnUnique(size = 1, String(size = 1))) * ColumnLowCardinality(size = 0, UInt8(size = 0), ColumnUnique(size = 1, String(size = 1)))
* and * and
* ColumnLowCardinality(size = 0, UInt16(size = 0), ColumnUnique(size = 1, String(size = 1))) * ColumnLowCardinality(size = 0, UInt16(size = 0), ColumnUnique(size = 1, String(size = 1)))
*/ */
if (typeid(*dest_column) != typeid(*column_from_block)) if (typeid(*dest_column) != typeid(*column_from_block))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns {} and {} have different types {} and {}", throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns {} and {} have different types {} and {}",
dest_column->getName(), column_from_block->getName(), dest_column->getName(), column_from_block->getName(),
demangle(typeid(*dest_column).name()), demangle(typeid(*column_from_block).name())); demangle(typeid(*dest_column).name()), demangle(typeid(*column_from_block).name()));
} }
#endif #endif
if (has_columns_to_add) if (is_join_get)
{
lazy_output.blocks.emplace_back(reinterpret_cast<UInt64>(&block));
lazy_output.row_nums.emplace_back(static_cast<uint32_t>(row_num));
}
}
void appendDefaultRow() override
{ {
if (has_columns_to_add) size_t right_indexes_size = right_indexes.size();
for (size_t j = 0; j < right_indexes_size; ++j)
{ {
lazy_output.blocks.emplace_back(0); const auto & column_from_block = block.getByPosition(right_indexes[j]);
lazy_output.row_nums.emplace_back(0); if (auto * nullable_col = nullable_column_ptrs[j])
nullable_col->insertFromNotNullable(*column_from_block.column, row_num);
else
columns[j]->insertFrom(*column_from_block.column, row_num);
} }
} }
else
{
size_t right_indexes_size = right_indexes.size();
for (size_t j = 0; j < right_indexes_size; ++j)
{
const auto & column_from_block = block.getByPosition(right_indexes[j]);
columns[j]->insertFrom(*column_from_block.column, row_num);
}
}
}
void applyLazyDefaults() override { } template <>
void AddedColumns<true>::appendFromBlock(const Block & block, size_t row_num, bool)
{
#ifndef NDEBUG
for (size_t j = 0; j < right_indexes.size(); ++j)
{
const auto * column_from_block = block.getByPosition(right_indexes[j]).column.get();
const auto * dest_column = columns[j].get();
if (auto * nullable_col = nullable_column_ptrs[j])
{
if (!is_join_get)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Columns {} and {} can have different nullability only in joinGetOrNull",
dest_column->getName(), column_from_block->getName());
dest_column = nullable_col->getNestedColumnPtr().get();
}
/** Using dest_column->structureEquals(*column_from_block) will not work for low cardinality columns,
* because dictionaries can be different, while calling insertFrom on them is safe, for example:
* ColumnLowCardinality(size = 0, UInt8(size = 0), ColumnUnique(size = 1, String(size = 1)))
* and
* ColumnLowCardinality(size = 0, UInt16(size = 0), ColumnUnique(size = 1, String(size = 1)))
*/
if (typeid(*dest_column) != typeid(*column_from_block))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns {} and {} have different types {} and {}",
dest_column->getName(), column_from_block->getName(),
demangle(typeid(*dest_column).name()), demangle(typeid(*column_from_block).name()));
}
#endif
if (has_columns_to_add)
{
lazy_output.blocks.emplace_back(reinterpret_cast<UInt64>(&block));
lazy_output.row_nums.emplace_back(static_cast<uint32_t>(row_num));
}
}
template<>
void AddedColumns<false>::appendDefaultRow()
{
++lazy_defaults_count;
}
private : template<>
// The default row is represented by an empty RowRef, so that fixed-size blocks can be generated sequentially, void AddedColumns<true>::appendDefaultRow()
// default_count cannot represent the position of the row {
LazyOutput lazy_output; if (has_columns_to_add)
bool has_columns_to_add; {
lazy_output.blocks.emplace_back(0);
}; lazy_output.row_nums.emplace_back(0);
}
}
template <JoinKind KIND, JoinStrictness STRICTNESS> template <JoinKind KIND, JoinStrictness STRICTNESS>
struct JoinFeatures struct JoinFeatures
@ -1425,7 +1421,7 @@ public:
} }
}; };
template <typename Map, bool add_missing, bool multiple_disjuncts> template <typename Map, bool add_missing, bool multiple_disjuncts, typename AddedColumns>
void addFoundRowAll( void addFoundRowAll(
const typename Map::mapped_type & mapped, const typename Map::mapped_type & mapped,
AddedColumns & added, AddedColumns & added,
@ -1474,7 +1470,7 @@ void addFoundRowAll(
} }
} }
template <bool add_missing, bool need_offset> template <bool add_missing, bool need_offset, typename AddedColumns>
void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & current_offset [[maybe_unused]]) void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & current_offset [[maybe_unused]])
{ {
if constexpr (add_missing) if constexpr (add_missing)
@ -1494,7 +1490,7 @@ void setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unuse
/// Joins right table columns which indexes are present in right_indexes using specified map. /// Joins right table columns which indexes are present in right_indexes using specified map.
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS). /// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter, bool multiple_disjuncts> template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter, bool multiple_disjuncts, typename AddedColumns>
NO_INLINE size_t joinRightColumns( NO_INLINE size_t joinRightColumns(
std::vector<KeyGetter> && key_getter_vector, std::vector<KeyGetter> && key_getter_vector,
const std::vector<const Map *> & mapv, const std::vector<const Map *> & mapv,
@ -1633,7 +1629,7 @@ NO_INLINE size_t joinRightColumns(
return i; return i;
} }
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter> template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool need_filter, typename AddedColumns>
size_t joinRightColumnsSwitchMultipleDisjuncts( size_t joinRightColumnsSwitchMultipleDisjuncts(
std::vector<KeyGetter> && key_getter_vector, std::vector<KeyGetter> && key_getter_vector,
const std::vector<const Map *> & mapv, const std::vector<const Map *> & mapv,
@ -1645,7 +1641,7 @@ size_t joinRightColumnsSwitchMultipleDisjuncts(
: joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, need_filter, false>(std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags); : joinRightColumns<KIND, STRICTNESS, KeyGetter, Map, need_filter, false>(std::forward<std::vector<KeyGetter>>(key_getter_vector), mapv, added_columns, used_flags);
} }
template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map> template <JoinKind KIND, JoinStrictness STRICTNESS, typename KeyGetter, typename Map, typename AddedColumns>
size_t joinRightColumnsSwitchNullability( size_t joinRightColumnsSwitchNullability(
std::vector<KeyGetter> && key_getter_vector, std::vector<KeyGetter> && key_getter_vector,
const std::vector<const Map *> & mapv, const std::vector<const Map *> & mapv,
@ -1662,7 +1658,7 @@ size_t joinRightColumnsSwitchNullability(
} }
} }
template <JoinKind KIND, JoinStrictness STRICTNESS, typename Maps> template <JoinKind KIND, JoinStrictness STRICTNESS, typename Maps, typename AddedColumns>
size_t switchJoinRightColumns( size_t switchJoinRightColumns(
const std::vector<const Maps *> & mapv, const std::vector<const Maps *> & mapv,
AddedColumns & added_columns, AddedColumns & added_columns,
@ -1797,19 +1793,9 @@ Block HashJoin::joinBlockImpl(
* but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped. * but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped.
* For ASOF, the last column is used as the ASOF column * For ASOF, the last column is used as the ASOF column
*/ */
std::unique_ptr<AddedColumns> added_columns_ptr; AddedColumns<!join_features.is_any_join> added_columns(
if (!join_features.is_any_join) block, block_with_columns_to_add, savedBlockSample(), *this, std::move(join_on_keys), join_features.is_asof_join, is_join_get);
{
added_columns_ptr = std::make_unique<AddedColumns>(
block, block_with_columns_to_add, savedBlockSample(), *this, std::move(join_on_keys), join_features.is_asof_join, is_join_get);
}
else
{
added_columns_ptr = std::make_unique<LazyAddedColumns>(
block, block_with_columns_to_add, savedBlockSample(), *this, std::move(join_on_keys), join_features.is_asof_join, is_join_get);
}
AddedColumns & added_columns = * added_columns_ptr;
bool has_required_right_keys = (required_right_keys.columns() != 0); bool has_required_right_keys = (required_right_keys.columns() != 0);
added_columns.need_filter = join_features.need_filter || has_required_right_keys; added_columns.need_filter = join_features.need_filter || has_required_right_keys;