add lazyAddedColumns

This commit is contained in:
liuneng 2024-02-22 15:34:31 +08:00
parent 93fc7a293f
commit b0943ab3e8

View File

@ -1034,12 +1034,6 @@ public:
}
};
struct LazyOutput
{
PaddedPODArray<UInt64> blocks;
PaddedPODArray<UInt32> row_nums;
};
AddedColumns(
const Block & left_block,
const Block & block_with_columns_to_add,
@ -1055,12 +1049,9 @@ public:
size_t num_columns_to_add = block_with_columns_to_add.columns();
if (is_asof_join)
++num_columns_to_add;
has_columns_to_add = num_columns_to_add > 0;
columns.reserve(num_columns_to_add);
type_name.reserve(num_columns_to_add);
right_indexes.reserve(num_columns_to_add);
lazy_output.blocks.reserve(rows_to_add);
lazy_output.row_nums.reserve(rows_to_add);
for (const auto & src_column : block_with_columns_to_add)
{
@ -1095,9 +1086,162 @@ public:
}
}
virtual ~AddedColumns() { }
size_t size() const { return columns.size(); }
void buildOutput()
virtual void buildOutput()
{
}
ColumnWithTypeAndName moveColumn(size_t i)
{
return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].type, type_name[i].qualified_name);
}
virtual void appendFromBlock(const Block & block, size_t row_num, bool has_defaults)
{
if (has_defaults)
applyLazyDefaults();
#ifndef NDEBUG
for (size_t j = 0; j < right_indexes.size(); ++j)
{
const auto * column_from_block = block.getByPosition(right_indexes[j]).column.get();
const auto * dest_column = columns[j].get();
if (auto * nullable_col = nullable_column_ptrs[j])
{
if (!is_join_get)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Columns {} and {} can have different nullability only in joinGetOrNull",
dest_column->getName(), column_from_block->getName());
dest_column = nullable_col->getNestedColumnPtr().get();
}
/** Using dest_column->structureEquals(*column_from_block) will not work for low cardinality columns,
* because dictionaries can be different, while calling insertFrom on them is safe, for example:
* ColumnLowCardinality(size = 0, UInt8(size = 0), ColumnUnique(size = 1, String(size = 1)))
* and
* ColumnLowCardinality(size = 0, UInt16(size = 0), ColumnUnique(size = 1, String(size = 1)))
*/
if (typeid(*dest_column) != typeid(*column_from_block))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns {} and {} have different types {} and {}",
dest_column->getName(), column_from_block->getName(),
demangle(typeid(*dest_column).name()), demangle(typeid(*column_from_block).name()));
}
#endif
if (is_join_get)
{
size_t right_indexes_size = right_indexes.size();
for (size_t j = 0; j < right_indexes_size; ++j)
{
const auto & column_from_block = block.getByPosition(right_indexes[j]);
if (auto * nullable_col = nullable_column_ptrs[j])
nullable_col->insertFromNotNullable(*column_from_block.column, row_num);
else
columns[j]->insertFrom(*column_from_block.column, row_num);
}
}
else
{
size_t right_indexes_size = right_indexes.size();
for (size_t j = 0; j < right_indexes_size; ++j)
{
const auto & column_from_block = block.getByPosition(right_indexes[j]);
columns[j]->insertFrom(*column_from_block.column, row_num);
}
}
}
virtual void appendDefaultRow()
{
++lazy_defaults_count;
}
virtual void applyLazyDefaults()
{
if (lazy_defaults_count)
{
for (size_t j = 0, size = right_indexes.size(); j < size; ++j)
JoinCommon::addDefaultValues(*columns[j], type_name[j].type, lazy_defaults_count);
lazy_defaults_count = 0;
}
}
const IColumn & leftAsofKey() const { return *left_asof_key; }
std::vector<JoinOnKeyColumns> join_on_keys;
size_t max_joined_block_rows = 0;
size_t rows_to_add;
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
bool need_filter = false;
IColumn::Filter filter;
void reserve(bool need_replicate)
{
if (!max_joined_block_rows)
return;
/// Do not allow big allocations when user set max_joined_block_rows to huge value
size_t reserve_size = std::min<size_t>(max_joined_block_rows, DEFAULT_BLOCK_SIZE * 2);
if (need_replicate)
/// Reserve 10% more space for columns, because some rows can be repeated
reserve_size = static_cast<size_t>(1.1 * reserve_size);
for (auto & column : columns)
column->reserve(reserve_size);
}
protected:
MutableColumns columns;
bool is_join_get;
std::vector<size_t> right_indexes;
std::vector<TypeAndName> type_name;
std::vector<ColumnNullable *> nullable_column_ptrs;
private:
size_t lazy_defaults_count = 0;
/// for ASOF
const IColumn * left_asof_key = nullptr;
void addColumn(const ColumnWithTypeAndName & src_column, const std::string & qualified_name)
{
columns.push_back(src_column.column->cloneEmpty());
columns.back()->reserve(src_column.column->size());
type_name.emplace_back(src_column.type, src_column.name, qualified_name);
}
};
class LazyAddedColumns : public AddedColumns
{
public:
struct LazyOutput
{
PaddedPODArray<UInt64> blocks;
PaddedPODArray<UInt32> row_nums;
};
LazyAddedColumns(
const Block & left_block,
const Block & block_with_columns_to_add,
const Block & saved_block_sample,
const HashJoin & join,
std::vector<JoinOnKeyColumns> && join_on_keys_,
bool is_asof_join,
bool is_join_get_)
: AddedColumns(left_block, block_with_columns_to_add, saved_block_sample, join, std::move(join_on_keys_), is_asof_join, is_join_get_)
{
has_columns_to_add = block_with_columns_to_add.columns() > 0;
lazy_output.blocks.reserve(rows_to_add);
lazy_output.row_nums.reserve(rows_to_add);
}
virtual void buildOutput() override
{
for (size_t i = 0; i < this->size(); ++i)
{
@ -1137,13 +1281,7 @@ public:
}
}
ColumnWithTypeAndName moveColumn(size_t i)
{
return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].type, type_name[i].qualified_name);
}
void appendFromBlock(const Block & block, size_t row_num)
virtual void appendFromBlock(const Block & block, size_t row_num, bool) override
{
#ifndef NDEBUG
for (size_t j = 0; j < right_indexes.size(); ++j)
@ -1154,8 +1292,8 @@ public:
{
if (!is_join_get)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Columns {} and {} can have different nullability only in joinGetOrNull",
dest_column->getName(), column_from_block->getName());
"Columns {} and {} can have different nullability only in joinGetOrNull",
dest_column->getName(), column_from_block->getName());
dest_column = nullable_col->getNestedColumnPtr().get();
}
/** Using dest_column->structureEquals(*column_from_block) will not work for low cardinality columns,
@ -1166,8 +1304,8 @@ public:
*/
if (typeid(*dest_column) != typeid(*column_from_block))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns {} and {} have different types {} and {}",
dest_column->getName(), column_from_block->getName(),
demangle(typeid(*dest_column).name()), demangle(typeid(*column_from_block).name()));
dest_column->getName(), column_from_block->getName(),
demangle(typeid(*dest_column).name()), demangle(typeid(*column_from_block).name()));
}
#endif
if (has_columns_to_add)
@ -1177,7 +1315,7 @@ public:
}
}
void appendDefaultRow()
virtual void appendDefaultRow() override
{
if (has_columns_to_add)
{
@ -1186,54 +1324,14 @@ public:
}
}
const IColumn & leftAsofKey() const { return *left_asof_key; }
std::vector<JoinOnKeyColumns> join_on_keys;
virtual void applyLazyDefaults() override { }
private :
// The default row is represented by an empty RowRef, so that fixed-size blocks can be generated sequentially,
// default_count cannot represent the position of the row
LazyOutput lazy_output;
size_t max_joined_block_rows = 0;
size_t rows_to_add;
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
bool need_filter = false;
IColumn::Filter filter;
void reserve(bool need_replicate)
{
if (!max_joined_block_rows)
return;
/// Do not allow big allocations when user set max_joined_block_rows to huge value
size_t reserve_size = std::min<size_t>(max_joined_block_rows, DEFAULT_BLOCK_SIZE * 2);
if (need_replicate)
/// Reserve 10% more space for columns, because some rows can be repeated
reserve_size = static_cast<size_t>(1.1 * reserve_size);
for (auto & column : columns)
column->reserve(reserve_size);
}
private:
std::vector<TypeAndName> type_name;
MutableColumns columns;
std::vector<ColumnNullable *> nullable_column_ptrs;
std::vector<size_t> right_indexes;
bool has_columns_to_add;
/// for ASOF
const IColumn * left_asof_key = nullptr;
bool is_join_get;
void addColumn(const ColumnWithTypeAndName & src_column, const std::string & qualified_name)
{
columns.push_back(src_column.column->cloneEmpty());
columns.back()->reserve(src_column.column->size());
type_name.emplace_back(src_column.type, src_column.name, qualified_name);
}
};
template <JoinKind KIND, JoinStrictness STRICTNESS>
@ -1333,7 +1431,7 @@ public:
}
};
template <typename Map, bool multiple_disjuncts>
template <typename Map, bool add_missing, bool multiple_disjuncts>
void addFoundRowAll(
const typename Map::mapped_type & mapped,
AddedColumns & added,
@ -1341,6 +1439,9 @@ void addFoundRowAll(
KnownRowsHolder<multiple_disjuncts> & known_rows [[maybe_unused]],
JoinStuff::JoinUsedFlags * used_flags [[maybe_unused]])
{
if constexpr (add_missing)
added.applyLazyDefaults();
if constexpr (multiple_disjuncts)
{
std::unique_ptr<std::vector<KnownRowsHolder<true>::Type>> new_known_rows_ptr;
@ -1349,7 +1450,7 @@ void addFoundRowAll(
{
if (!known_rows.isKnown(std::make_pair(it->block, it->row_num)))
{
added.appendFromBlock(*it->block, it->row_num);
added.appendFromBlock(*it->block, it->row_num, false);
++current_offset;
if (!new_known_rows_ptr)
{
@ -1373,7 +1474,7 @@ void addFoundRowAll(
{
for (auto it = mapped.begin(); it.ok(); ++it)
{
added.appendFromBlock(*it->block, it->row_num);
added.appendFromBlock(*it->block, it->row_num, false);
++current_offset;
}
}
@ -1462,7 +1563,7 @@ NO_INLINE size_t joinRightColumns(
else
used_flags.template setUsed<join_features.need_flags, multiple_disjuncts>(find_result);
added_columns.appendFromBlock(*row_ref.block, row_ref.row_num);
added_columns.appendFromBlock(*row_ref.block, row_ref.row_num, join_features.add_missing);
}
else
addNotFoundRow<join_features.add_missing, join_features.need_replication>(added_columns, current_offset);
@ -1472,7 +1573,7 @@ NO_INLINE size_t joinRightColumns(
setUsed<need_filter>(added_columns.filter, i);
used_flags.template setUsed<join_features.need_flags, multiple_disjuncts>(find_result);
auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr;
addFoundRowAll<Map>(mapped, added_columns, current_offset, known_rows, used_flags_opt);
addFoundRowAll<Map, join_features.add_missing>(mapped, added_columns, current_offset, known_rows, used_flags_opt);
}
else if constexpr ((join_features.is_any_join || join_features.is_semi_join) && join_features.right)
{
@ -1482,7 +1583,7 @@ NO_INLINE size_t joinRightColumns(
{
auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr;
setUsed<need_filter>(added_columns.filter, i);
addFoundRowAll<Map>(mapped, added_columns, current_offset, known_rows, used_flags_opt);
addFoundRowAll<Map, join_features.add_missing>(mapped, added_columns, current_offset, known_rows, used_flags_opt);
}
}
else if constexpr (join_features.is_any_join && KIND == JoinKind::Inner)
@ -1493,7 +1594,7 @@ NO_INLINE size_t joinRightColumns(
if (used_once)
{
setUsed<need_filter>(added_columns.filter, i);
added_columns.appendFromBlock(*mapped.block, mapped.row_num);
added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing);
}
break;
@ -1511,7 +1612,7 @@ NO_INLINE size_t joinRightColumns(
{
setUsed<need_filter>(added_columns.filter, i);
used_flags.template setUsed<join_features.need_flags, multiple_disjuncts>(find_result);
added_columns.appendFromBlock(*mapped.block, mapped.row_num);
added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing);
if (join_features.is_any_or_semi_join)
{
@ -1701,14 +1802,19 @@ Block HashJoin::joinBlockImpl(
* but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped.
* For ASOF, the last column is used as the ASOF column
*/
AddedColumns added_columns(
block,
block_with_columns_to_add,
savedBlockSample(),
*this,
std::move(join_on_keys),
join_features.is_asof_join,
is_join_get);
std::unique_ptr<AddedColumns> added_columns_ptr;
if (!join_features.is_any_join)
{
added_columns_ptr = std::make_unique<AddedColumns>(
block, block_with_columns_to_add, savedBlockSample(), *this, std::move(join_on_keys), join_features.is_asof_join, is_join_get);
}
else
{
added_columns_ptr = std::make_unique<LazyAddedColumns>(
block, block_with_columns_to_add, savedBlockSample(), *this, std::move(join_on_keys), join_features.is_asof_join, is_join_get);
}
AddedColumns & added_columns = * added_columns_ptr;
bool has_required_right_keys = (required_right_keys.columns() != 0);
added_columns.need_filter = join_features.need_filter || has_required_right_keys;