mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 03:22:14 +00:00
Merge branch 'master' of github.com:ClickHouse/ClickHouse into divanik/add_local_and_azure_iceberg_support
This commit is contained in:
commit
40b45d84ee
@ -327,6 +327,7 @@ class IColumn;
|
||||
\
|
||||
M(Bool, join_use_nulls, false, "Use NULLs for non-joined rows of outer JOINs for types that can be inside Nullable. If false, use default value of corresponding columns data type.", IMPORTANT) \
|
||||
\
|
||||
M(Int32, join_output_by_rowlist_perkey_rows_threshold, 5, "The lower limit of per-key average rows in the right table to determine whether to output by row list in hash join.", 0) \
|
||||
M(JoinStrictness, join_default_strictness, JoinStrictness::All, "Set default strictness in JOIN query. Possible values: empty string, 'ANY', 'ALL'. If empty, query without strictness will throw exception.", 0) \
|
||||
M(Bool, any_join_distinct_right_table_keys, false, "Enable old ANY JOIN logic with many-to-one left-to-right table keys mapping for all ANY JOINs. It leads to confusing not equal results for 't1 ANY LEFT JOIN t2' and 't2 ANY RIGHT JOIN t1'. ANY RIGHT JOIN needs one-to-many keys mapping to be consistent with LEFT one.", IMPORTANT) \
|
||||
M(Bool, single_join_prefer_left_table, true, "For single JOIN in case of identifier ambiguity prefer left table", IMPORTANT) \
|
||||
|
@ -348,6 +348,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
|
||||
{"allow_experimental_time_series_table", false, false, "Added new setting to allow the TimeSeries table engine"},
|
||||
{"enable_analyzer", 1, 1, "Added an alias to a setting `allow_experimental_analyzer`."},
|
||||
{"optimize_functions_to_subcolumns", false, true, "Enabled settings by default"},
|
||||
{"join_output_by_rowlist_perkey_rows_threshold", 0, 5, "The lower limit of per-key average rows in the right table to determine whether to output by row list in hash join."},
|
||||
{"allow_experimental_vector_similarity_index", false, false, "Added new setting to allow experimental vector similarity indexes"},
|
||||
{"local_create_new_file_on_insert", false, false, "Enabled creating a new file on each insert in local object storage engine tables"}
|
||||
}
|
||||
|
@ -15,48 +15,115 @@ JoinOnKeyColumns::JoinOnKeyColumns(const Block & block, const Names & key_names_
|
||||
{
|
||||
}
|
||||
|
||||
template<> void AddedColumns<false>::buildOutput()
|
||||
{
|
||||
}
|
||||
template<>
|
||||
void AddedColumns<false>::buildOutput() {}
|
||||
|
||||
template<>
|
||||
void AddedColumns<false>::buildJoinGetOutput() {}
|
||||
|
||||
template<>
|
||||
template<bool from_row_list>
|
||||
void AddedColumns<false>::buildOutputFromBlocks() {}
|
||||
|
||||
template<>
|
||||
void AddedColumns<true>::buildOutput()
|
||||
{
|
||||
if (!output_by_row_list)
|
||||
buildOutputFromBlocks<false>();
|
||||
else
|
||||
{
|
||||
if (join_data_avg_perkey_rows < output_by_row_list_threshold)
|
||||
buildOutputFromBlocks<true>();
|
||||
else
|
||||
{
|
||||
for (size_t i = 0; i < this->size(); ++i)
|
||||
{
|
||||
auto & col = columns[i];
|
||||
size_t default_count = 0;
|
||||
auto apply_default = [&]()
|
||||
for (auto row_ref_i : lazy_output.row_refs)
|
||||
{
|
||||
if (default_count > 0)
|
||||
if (row_ref_i)
|
||||
{
|
||||
JoinCommon::addDefaultValues(*col, type_name[i].type, default_count);
|
||||
default_count = 0;
|
||||
const RowRefList * row_ref_list = reinterpret_cast<const RowRefList *>(row_ref_i);
|
||||
for (auto it = row_ref_list->begin(); it.ok(); ++it)
|
||||
col->insertFrom(*it->block->getByPosition(right_indexes[i]).column, it->row_num);
|
||||
}
|
||||
else
|
||||
type_name[i].type->insertDefaultInto(*col);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
for (size_t j = 0; j < lazy_output.blocks.size(); ++j)
|
||||
template<>
|
||||
void AddedColumns<true>::buildJoinGetOutput()
|
||||
{
|
||||
if (!lazy_output.blocks[j])
|
||||
for (size_t i = 0; i < this->size(); ++i)
|
||||
{
|
||||
default_count++;
|
||||
auto & col = columns[i];
|
||||
for (auto row_ref_i : lazy_output.row_refs)
|
||||
{
|
||||
if (!row_ref_i)
|
||||
{
|
||||
type_name[i].type->insertDefaultInto(*col);
|
||||
continue;
|
||||
}
|
||||
apply_default();
|
||||
const auto & column_from_block = reinterpret_cast<const Block *>(lazy_output.blocks[j])->getByPosition(right_indexes[i]);
|
||||
/// If it's joinGetOrNull, we need to wrap not-nullable columns in StorageJoin.
|
||||
if (is_join_get)
|
||||
const auto * row_ref = reinterpret_cast<const RowRef *>(row_ref_i);
|
||||
const auto & column_from_block = row_ref->block->getByPosition(right_indexes[i]);
|
||||
if (auto * nullable_col = typeid_cast<ColumnNullable *>(col.get()); nullable_col && !column_from_block.column->isNullable())
|
||||
nullable_col->insertFromNotNullable(*column_from_block.column, row_ref->row_num);
|
||||
else
|
||||
col->insertFrom(*column_from_block.column, row_ref->row_num);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template<>
|
||||
template<bool from_row_list>
|
||||
void AddedColumns<true>::buildOutputFromBlocks()
|
||||
{
|
||||
if (auto * nullable_col = typeid_cast<ColumnNullable *>(col.get());
|
||||
nullable_col && !column_from_block.column->isNullable())
|
||||
if (this->size() == 0)
|
||||
return;
|
||||
std::vector<const Block *> blocks;
|
||||
std::vector<UInt32> row_nums;
|
||||
blocks.reserve(lazy_output.row_refs.size());
|
||||
row_nums.reserve(lazy_output.row_refs.size());
|
||||
for (auto row_ref_i : lazy_output.row_refs)
|
||||
{
|
||||
nullable_col->insertFromNotNullable(*column_from_block.column, lazy_output.row_nums[j]);
|
||||
continue;
|
||||
if (row_ref_i)
|
||||
{
|
||||
if constexpr (from_row_list)
|
||||
{
|
||||
const RowRefList * row_ref_list = reinterpret_cast<const RowRefList *>(row_ref_i);
|
||||
for (auto it = row_ref_list->begin(); it.ok(); ++it)
|
||||
{
|
||||
blocks.emplace_back(it->block);
|
||||
row_nums.emplace_back(it->row_num);
|
||||
}
|
||||
}
|
||||
col->insertFrom(*column_from_block.column, lazy_output.row_nums[j]);
|
||||
else
|
||||
{
|
||||
const RowRef * row_ref = reinterpret_cast<const RowRefList *>(row_ref_i);
|
||||
blocks.emplace_back(row_ref->block);
|
||||
row_nums.emplace_back(row_ref->row_num);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
blocks.emplace_back(nullptr);
|
||||
row_nums.emplace_back(0);
|
||||
}
|
||||
}
|
||||
for (size_t i = 0; i < this->size(); ++i)
|
||||
{
|
||||
auto & col = columns[i];
|
||||
for (size_t j = 0; j < blocks.size(); ++j)
|
||||
{
|
||||
if (blocks[j])
|
||||
col->insertFrom(*blocks[j]->getByPosition(right_indexes[i]).column, row_nums[j]);
|
||||
else
|
||||
type_name[i].type->insertDefaultInto(*col);
|
||||
}
|
||||
apply_default();
|
||||
}
|
||||
}
|
||||
|
||||
@ -72,29 +139,27 @@ void AddedColumns<false>::applyLazyDefaults()
|
||||
}
|
||||
|
||||
template<>
|
||||
void AddedColumns<true>::applyLazyDefaults()
|
||||
{
|
||||
}
|
||||
void AddedColumns<true>::applyLazyDefaults() {}
|
||||
|
||||
template <>
|
||||
void AddedColumns<false>::appendFromBlock(const Block & block, size_t row_num,const bool has_defaults)
|
||||
void AddedColumns<false>::appendFromBlock(const RowRef * row_ref, const bool has_defaults)
|
||||
{
|
||||
if (has_defaults)
|
||||
applyLazyDefaults();
|
||||
|
||||
#ifndef NDEBUG
|
||||
checkBlock(block);
|
||||
checkBlock(*row_ref->block);
|
||||
#endif
|
||||
if (is_join_get)
|
||||
{
|
||||
size_t right_indexes_size = right_indexes.size();
|
||||
for (size_t j = 0; j < right_indexes_size; ++j)
|
||||
{
|
||||
const auto & column_from_block = block.getByPosition(right_indexes[j]);
|
||||
const auto & column_from_block = row_ref->block->getByPosition(right_indexes[j]);
|
||||
if (auto * nullable_col = nullable_column_ptrs[j])
|
||||
nullable_col->insertFromNotNullable(*column_from_block.column, row_num);
|
||||
nullable_col->insertFromNotNullable(*column_from_block.column, row_ref->row_num);
|
||||
else
|
||||
columns[j]->insertFrom(*column_from_block.column, row_num);
|
||||
columns[j]->insertFrom(*column_from_block.column, row_ref->row_num);
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -102,22 +167,21 @@ void AddedColumns<false>::appendFromBlock(const Block & block, size_t row_num,co
|
||||
size_t right_indexes_size = right_indexes.size();
|
||||
for (size_t j = 0; j < right_indexes_size; ++j)
|
||||
{
|
||||
const auto & column_from_block = block.getByPosition(right_indexes[j]);
|
||||
columns[j]->insertFrom(*column_from_block.column, row_num);
|
||||
const auto & column_from_block = row_ref->block->getByPosition(right_indexes[j]);
|
||||
columns[j]->insertFrom(*column_from_block.column, row_ref->row_num);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <>
|
||||
void AddedColumns<true>::appendFromBlock(const Block & block, size_t row_num, bool)
|
||||
void AddedColumns<true>::appendFromBlock(const RowRef * row_ref, bool)
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
checkBlock(block);
|
||||
checkBlock(*row_ref->block);
|
||||
#endif
|
||||
if (has_columns_to_add)
|
||||
{
|
||||
lazy_output.blocks.emplace_back(reinterpret_cast<UInt64>(&block));
|
||||
lazy_output.row_nums.emplace_back(static_cast<uint32_t>(row_num));
|
||||
lazy_output.row_refs.emplace_back(reinterpret_cast<UInt64>(row_ref));
|
||||
}
|
||||
}
|
||||
template<>
|
||||
@ -131,8 +195,7 @@ void AddedColumns<true>::appendDefaultRow()
|
||||
{
|
||||
if (has_columns_to_add)
|
||||
{
|
||||
lazy_output.blocks.emplace_back(0);
|
||||
lazy_output.row_nums.emplace_back(0);
|
||||
lazy_output.row_refs.emplace_back(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -50,8 +50,7 @@ public:
|
||||
|
||||
struct LazyOutput
|
||||
{
|
||||
PaddedPODArray<UInt64> blocks;
|
||||
PaddedPODArray<UInt32> row_nums;
|
||||
PaddedPODArray<UInt64> row_refs;
|
||||
};
|
||||
|
||||
AddedColumns(
|
||||
@ -76,8 +75,7 @@ public:
|
||||
if constexpr (lazy)
|
||||
{
|
||||
has_columns_to_add = num_columns_to_add > 0;
|
||||
lazy_output.blocks.reserve(rows_to_add);
|
||||
lazy_output.row_nums.reserve(rows_to_add);
|
||||
lazy_output.row_refs.reserve(rows_to_add);
|
||||
}
|
||||
|
||||
columns.reserve(num_columns_to_add);
|
||||
@ -115,18 +113,22 @@ public:
|
||||
if (columns[j]->isNullable() && !saved_column->isNullable())
|
||||
nullable_column_ptrs[j] = typeid_cast<ColumnNullable *>(columns[j].get());
|
||||
}
|
||||
join_data_avg_perkey_rows = join.getJoinedData()->avgPerKeyRows();
|
||||
output_by_row_list_threshold = join.getTableJoin().outputByRowListPerkeyRowsThreshold();
|
||||
}
|
||||
|
||||
size_t size() const { return columns.size(); }
|
||||
|
||||
void buildOutput();
|
||||
|
||||
void buildJoinGetOutput();
|
||||
|
||||
ColumnWithTypeAndName moveColumn(size_t i)
|
||||
{
|
||||
return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].type, type_name[i].qualified_name);
|
||||
}
|
||||
|
||||
void appendFromBlock(const Block & block, size_t row_num, bool has_default);
|
||||
void appendFromBlock(const RowRef * row_ref, bool has_default);
|
||||
|
||||
void appendDefaultRow();
|
||||
|
||||
@ -134,6 +136,8 @@ public:
|
||||
|
||||
const IColumn & leftAsofKey() const { return *left_asof_key; }
|
||||
|
||||
static constexpr bool isLazy() { return lazy; }
|
||||
|
||||
Block left_block;
|
||||
std::vector<JoinOnKeyColumns> join_on_keys;
|
||||
ExpressionActionsPtr additional_filter_expression;
|
||||
@ -142,6 +146,9 @@ public:
|
||||
size_t rows_to_add;
|
||||
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
|
||||
bool need_filter = false;
|
||||
bool output_by_row_list = false;
|
||||
size_t join_data_avg_perkey_rows = 0;
|
||||
size_t output_by_row_list_threshold = 0;
|
||||
IColumn::Filter filter;
|
||||
|
||||
void reserve(bool need_replicate)
|
||||
@ -212,15 +219,22 @@ private:
|
||||
columns.back()->reserve(src_column.column->size());
|
||||
type_name.emplace_back(src_column.type, src_column.name, qualified_name);
|
||||
}
|
||||
|
||||
/** Build output from the blocks that extract from `RowRef` or `RowRefList`, to avoid block cache miss which may cause performance slow down.
|
||||
* And This problem would happen it we directly build output from `RowRef` or `RowRefList`.
|
||||
*/
|
||||
template<bool from_row_list>
|
||||
void buildOutputFromBlocks();
|
||||
};
|
||||
|
||||
/// Adapter class to pass into addFoundRowAll
|
||||
/// In joinRightColumnsWithAdditionalFilter we don't want to add rows directly into AddedColumns,
|
||||
/// because they need to be filtered by additional_filter_expression.
|
||||
class PreSelectedRows : public std::vector<RowRef>
|
||||
class PreSelectedRows : public std::vector<const RowRef *>
|
||||
{
|
||||
public:
|
||||
void appendFromBlock(const Block & block, size_t row_num, bool /* has_default */) { this->emplace_back(&block, row_num); }
|
||||
void appendFromBlock(const RowRef * row_ref, bool /* has_default */) { this->emplace_back(row_ref); }
|
||||
static constexpr bool isLazy() { return false; }
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -495,7 +495,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
|
||||
}
|
||||
|
||||
size_t rows = source_block.rows();
|
||||
|
||||
data->rows_to_join += rows;
|
||||
const auto & right_key_names = table_join->getAllNames(JoinTableSide::Right);
|
||||
ColumnPtrMap all_key_columns(right_key_names.size());
|
||||
for (const auto & column_name : right_key_names)
|
||||
@ -647,7 +647,7 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
|
||||
total_bytes = getTotalByteCount();
|
||||
}
|
||||
}
|
||||
|
||||
data->keys_to_join = total_rows;
|
||||
shrinkStoredBlocksToFit(total_bytes);
|
||||
|
||||
return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
|
||||
|
@ -345,6 +345,18 @@ public:
|
||||
|
||||
size_t blocks_allocated_size = 0;
|
||||
size_t blocks_nullmaps_allocated_size = 0;
|
||||
|
||||
/// Number of rows of right table to join
|
||||
size_t rows_to_join = 0;
|
||||
/// Number of keys of right table to join
|
||||
size_t keys_to_join = 0;
|
||||
|
||||
size_t avgPerKeyRows() const
|
||||
{
|
||||
if (keys_to_join == 0)
|
||||
return 0;
|
||||
return rows_to_join / keys_to_join;
|
||||
}
|
||||
};
|
||||
|
||||
using RightTableDataPtr = std::shared_ptr<RightTableData>;
|
||||
|
@ -83,6 +83,7 @@ public:
|
||||
const Block & block_with_columns_to_add,
|
||||
const MapsTemplateVector & maps_,
|
||||
bool is_join_get = false);
|
||||
|
||||
private:
|
||||
template <typename KeyGetter, bool is_asof_join>
|
||||
static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes);
|
||||
@ -128,7 +129,7 @@ private:
|
||||
template <typename AddedColumns>
|
||||
static ColumnPtr buildAdditionalFilter(
|
||||
size_t left_start_row,
|
||||
const std::vector<RowRef> & selected_rows,
|
||||
const std::vector<const RowRef *> & selected_rows,
|
||||
const std::vector<size_t> & row_replicate_offset,
|
||||
AddedColumns & added_columns);
|
||||
|
||||
|
@ -95,6 +95,9 @@ Block HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinBlockImpl(
|
||||
added_columns.join_on_keys.clear();
|
||||
Block remaining_block = sliceBlock(block, num_joined);
|
||||
|
||||
if (is_join_get)
|
||||
added_columns.buildJoinGetOutput();
|
||||
else
|
||||
added_columns.buildOutput();
|
||||
for (size_t i = 0; i < added_columns.size(); ++i)
|
||||
block.insert(added_columns.moveColumn(i));
|
||||
@ -339,6 +342,8 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumns(
|
||||
size_t rows = added_columns.rows_to_add;
|
||||
if constexpr (need_filter)
|
||||
added_columns.filter = IColumn::Filter(rows, 0);
|
||||
if constexpr (!flag_per_row && (STRICTNESS == JoinStrictness::All || (STRICTNESS == JoinStrictness::Semi && KIND == JoinKind::Right)))
|
||||
added_columns.output_by_row_list = true;
|
||||
|
||||
Arena pool;
|
||||
|
||||
@ -381,15 +386,15 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumns(
|
||||
const IColumn & left_asof_key = added_columns.leftAsofKey();
|
||||
|
||||
auto row_ref = mapped->findAsof(left_asof_key, i);
|
||||
if (row_ref.block)
|
||||
if (row_ref && row_ref->block)
|
||||
{
|
||||
setUsed<need_filter>(added_columns.filter, i);
|
||||
if constexpr (flag_per_row)
|
||||
used_flags.template setUsed<join_features.need_flags, flag_per_row>(row_ref.block, row_ref.row_num, 0);
|
||||
used_flags.template setUsed<join_features.need_flags, flag_per_row>(row_ref->block, row_ref->row_num, 0);
|
||||
else
|
||||
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
|
||||
|
||||
added_columns.appendFromBlock(*row_ref.block, row_ref.row_num, join_features.add_missing);
|
||||
added_columns.appendFromBlock(row_ref, join_features.add_missing);
|
||||
}
|
||||
else
|
||||
addNotFoundRow<join_features.add_missing, join_features.need_replication>(added_columns, current_offset);
|
||||
@ -420,7 +425,7 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumns(
|
||||
if (used_once)
|
||||
{
|
||||
setUsed<need_filter>(added_columns.filter, i);
|
||||
added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing);
|
||||
added_columns.appendFromBlock(&mapped, join_features.add_missing);
|
||||
}
|
||||
|
||||
break;
|
||||
@ -438,7 +443,7 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumns(
|
||||
{
|
||||
setUsed<need_filter>(added_columns.filter, i);
|
||||
used_flags.template setUsed<join_features.need_flags, flag_per_row>(find_result);
|
||||
added_columns.appendFromBlock(*mapped.block, mapped.row_num, join_features.add_missing);
|
||||
added_columns.appendFromBlock(&mapped, join_features.add_missing);
|
||||
|
||||
if (join_features.is_any_or_semi_join)
|
||||
{
|
||||
@ -477,7 +482,7 @@ template <JoinKind KIND, JoinStrictness STRICTNESS, typename MapsTemplate>
|
||||
template <typename AddedColumns>
|
||||
ColumnPtr HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::buildAdditionalFilter(
|
||||
size_t left_start_row,
|
||||
const std::vector<RowRef> & selected_rows,
|
||||
const std::vector<const RowRef *> & selected_rows,
|
||||
const std::vector<size_t> & row_replicate_offset,
|
||||
AddedColumns & added_columns)
|
||||
{
|
||||
@ -489,7 +494,7 @@ ColumnPtr HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::buildAdditionalFilter
|
||||
result_column = ColumnUInt8::create();
|
||||
break;
|
||||
}
|
||||
const Block & sample_right_block = *selected_rows.begin()->block;
|
||||
const Block & sample_right_block = *((*selected_rows.begin())->block);
|
||||
if (!sample_right_block || !added_columns.additional_filter_expression)
|
||||
{
|
||||
auto filter = ColumnUInt8::create();
|
||||
@ -519,8 +524,8 @@ ColumnPtr HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::buildAdditionalFilter
|
||||
auto new_col = col.column->cloneEmpty();
|
||||
for (const auto & selected_row : selected_rows)
|
||||
{
|
||||
const auto & src_col = selected_row.block->getByPosition(right_col_pos);
|
||||
new_col->insertFrom(*src_col.column, selected_row.row_num);
|
||||
const auto & src_col = selected_row->block->getByPosition(right_col_pos);
|
||||
new_col->insertFrom(*src_col.column, selected_row->row_num);
|
||||
}
|
||||
executed_block.insert({std::move(new_col), col.type, col.name});
|
||||
}
|
||||
@ -700,26 +705,24 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumnsWithAddt
|
||||
{
|
||||
// For inner join, we need mark each right row'flag, because we only use each right row once.
|
||||
auto used_once = used_flags.template setUsedOnce<join_features.need_flags, true>(
|
||||
selected_right_row_it->block, selected_right_row_it->row_num, 0);
|
||||
(*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0);
|
||||
if (used_once)
|
||||
{
|
||||
any_matched = true;
|
||||
total_added_rows += 1;
|
||||
added_columns.appendFromBlock(
|
||||
*selected_right_row_it->block, selected_right_row_it->row_num, join_features.add_missing);
|
||||
added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
auto used_once = used_flags.template setUsedOnce<join_features.need_flags, true>(
|
||||
selected_right_row_it->block, selected_right_row_it->row_num, 0);
|
||||
(*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0);
|
||||
if (used_once)
|
||||
{
|
||||
any_matched = true;
|
||||
total_added_rows += 1;
|
||||
added_columns.appendFromBlock(
|
||||
*selected_right_row_it->block, selected_right_row_it->row_num, join_features.add_missing);
|
||||
added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -727,16 +730,14 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumnsWithAddt
|
||||
{
|
||||
any_matched = true;
|
||||
if constexpr (join_features.right && join_features.need_flags)
|
||||
used_flags.template setUsed<true, true>(selected_right_row_it->block, selected_right_row_it->row_num, 0);
|
||||
used_flags.template setUsed<true, true>((*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
any_matched = true;
|
||||
total_added_rows += 1;
|
||||
added_columns.appendFromBlock(
|
||||
*selected_right_row_it->block, selected_right_row_it->row_num, join_features.add_missing);
|
||||
used_flags.template setUsed<join_features.need_flags, true>(
|
||||
selected_right_row_it->block, selected_right_row_it->row_num, 0);
|
||||
added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing);
|
||||
used_flags.template setUsed<join_features.need_flags, true>((*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0);
|
||||
}
|
||||
}
|
||||
|
||||
@ -756,8 +757,7 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumnsWithAddt
|
||||
if (filter_flags[replicated_row])
|
||||
{
|
||||
any_matched = true;
|
||||
added_columns.appendFromBlock(
|
||||
*selected_right_row_it->block, selected_right_row_it->row_num, join_features.add_missing);
|
||||
added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing);
|
||||
total_added_rows += 1;
|
||||
}
|
||||
++selected_right_row_it;
|
||||
@ -767,8 +767,7 @@ size_t HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinRightColumnsWithAddt
|
||||
if (filter_flags[replicated_row])
|
||||
{
|
||||
any_matched = true;
|
||||
added_columns.appendFromBlock(
|
||||
*selected_right_row_it->block, selected_right_row_it->row_num, join_features.add_missing);
|
||||
added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing);
|
||||
total_added_rows += 1;
|
||||
selected_right_row_it = selected_right_row_it + row_replicate_offset[i] - replicated_row;
|
||||
break;
|
||||
|
@ -104,7 +104,7 @@ void addFoundRowAll(
|
||||
{
|
||||
if (!known_rows.isKnown(std::make_pair(it->block, it->row_num)))
|
||||
{
|
||||
added.appendFromBlock(*it->block, it->row_num, false);
|
||||
added.appendFromBlock(*it, false);
|
||||
++current_offset;
|
||||
if (!new_known_rows_ptr)
|
||||
{
|
||||
@ -124,11 +124,16 @@ void addFoundRowAll(
|
||||
known_rows.add(std::cbegin(*new_known_rows_ptr), std::cend(*new_known_rows_ptr));
|
||||
}
|
||||
}
|
||||
else if constexpr (AddedColumns::isLazy())
|
||||
{
|
||||
added.appendFromBlock(&mapped, false);
|
||||
current_offset += mapped.rows;
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto it = mapped.begin(); it.ok(); ++it)
|
||||
{
|
||||
added.appendFromBlock(*it->block, it->row_num, false);
|
||||
added.appendFromBlock(*it, false);
|
||||
++current_offset;
|
||||
}
|
||||
}
|
||||
|
@ -144,7 +144,7 @@ public:
|
||||
return low;
|
||||
}
|
||||
|
||||
RowRef findAsof(const IColumn & asof_column, size_t row_num) override
|
||||
RowRef * findAsof(const IColumn & asof_column, size_t row_num) override
|
||||
{
|
||||
sort();
|
||||
|
||||
@ -156,10 +156,10 @@ public:
|
||||
if (pos != entries.size())
|
||||
{
|
||||
size_t row_ref_index = entries[pos].row_ref_index;
|
||||
return row_refs[row_ref_index];
|
||||
return &row_refs[row_ref_index];
|
||||
}
|
||||
|
||||
return {nullptr, 0};
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -122,7 +122,7 @@ struct RowRefList : RowRef
|
||||
};
|
||||
|
||||
RowRefList() {} /// NOLINT
|
||||
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {}
|
||||
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_), rows(1) {}
|
||||
|
||||
ForwardIterator begin() const { return ForwardIterator(this); }
|
||||
|
||||
@ -135,8 +135,11 @@ struct RowRefList : RowRef
|
||||
*next = Batch(nullptr);
|
||||
}
|
||||
next = next->insert(std::move(row_ref), pool);
|
||||
++rows;
|
||||
}
|
||||
|
||||
public:
|
||||
SizeT rows = 0;
|
||||
private:
|
||||
Batch * next = nullptr;
|
||||
};
|
||||
@ -158,7 +161,7 @@ struct SortedLookupVectorBase
|
||||
virtual void insert(const IColumn &, const Block *, size_t) = 0;
|
||||
|
||||
// This needs to be synchronized internally
|
||||
virtual RowRef findAsof(const IColumn &, size_t) = 0;
|
||||
virtual RowRef * findAsof(const IColumn &, size_t) = 0;
|
||||
};
|
||||
|
||||
|
||||
|
@ -115,6 +115,7 @@ TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_, Temporary
|
||||
, partial_merge_join_left_table_buffer_bytes(settings.partial_merge_join_left_table_buffer_bytes)
|
||||
, max_files_to_merge(settings.join_on_disk_max_files_to_merge)
|
||||
, temporary_files_codec(settings.temporary_files_codec)
|
||||
, output_by_rowlist_perkey_rows_threshold(settings.join_output_by_rowlist_perkey_rows_threshold)
|
||||
, max_memory_usage(settings.max_memory_usage)
|
||||
, tmp_volume(tmp_volume_)
|
||||
, tmp_data(tmp_data_)
|
||||
|
@ -148,6 +148,7 @@ private:
|
||||
const size_t partial_merge_join_left_table_buffer_bytes = 0;
|
||||
const size_t max_files_to_merge = 0;
|
||||
const String temporary_files_codec = "LZ4";
|
||||
const size_t output_by_rowlist_perkey_rows_threshold = 0;
|
||||
|
||||
/// Value if setting max_memory_usage for query, can be used when max_bytes_in_join is not specified.
|
||||
size_t max_memory_usage = 0;
|
||||
@ -295,6 +296,7 @@ public:
|
||||
return join_use_nulls && isRightOrFull(kind());
|
||||
}
|
||||
|
||||
size_t outputByRowListPerkeyRowsThreshold() const { return output_by_rowlist_perkey_rows_threshold; }
|
||||
size_t defaultMaxBytes() const { return default_max_bytes; }
|
||||
size_t maxJoinedBlockRows() const { return max_joined_block_rows; }
|
||||
size_t maxRowsInRightBlock() const { return partial_merge_join_rows_in_right_blocks; }
|
||||
|
@ -133,6 +133,20 @@ static ColumnWithTypeAndName readColumnWithStringData(const std::shared_ptr<arro
|
||||
std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
|
||||
const size_t chunk_length = chunk.length();
|
||||
|
||||
const size_t null_count = chunk.null_count();
|
||||
if (null_count == 0)
|
||||
{
|
||||
for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i)
|
||||
{
|
||||
const auto * raw_data = buffer->data() + chunk.value_offset(offset_i);
|
||||
column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i));
|
||||
column_chars_t.emplace_back('\0');
|
||||
|
||||
column_offsets.emplace_back(column_chars_t.size());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i)
|
||||
{
|
||||
if (!chunk.IsNull(offset_i) && buffer)
|
||||
@ -145,6 +159,7 @@ static ColumnWithTypeAndName readColumnWithStringData(const std::shared_ptr<arro
|
||||
column_offsets.emplace_back(column_chars_t.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
|
||||
|
@ -1143,24 +1143,42 @@ readColumnWithStringData(const orc::ColumnVectorBatch * orc_column, const orc::T
|
||||
reserver_size += 1;
|
||||
}
|
||||
|
||||
column_chars_t.reserve(reserver_size);
|
||||
column_offsets.reserve(orc_str_column->numElements);
|
||||
column_chars_t.resize_exact(reserver_size);
|
||||
column_offsets.resize_exact(orc_str_column->numElements);
|
||||
|
||||
size_t curr_offset = 0;
|
||||
for (size_t i = 0; i < orc_str_column->numElements; ++i)
|
||||
if (!orc_str_column->hasNulls)
|
||||
{
|
||||
if (!orc_str_column->hasNulls || orc_str_column->notNull[i])
|
||||
for (size_t i = 0; i < orc_str_column->numElements; ++i)
|
||||
{
|
||||
const auto * buf = orc_str_column->data[i];
|
||||
size_t buf_size = orc_str_column->length[i];
|
||||
column_chars_t.insert_assume_reserved(buf, buf + buf_size);
|
||||
memcpy(&column_chars_t[curr_offset], buf, buf_size);
|
||||
curr_offset += buf_size;
|
||||
|
||||
column_chars_t[curr_offset] = 0;
|
||||
++curr_offset;
|
||||
|
||||
column_offsets[i] = curr_offset;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (size_t i = 0; i < orc_str_column->numElements; ++i)
|
||||
{
|
||||
if (orc_str_column->notNull[i])
|
||||
{
|
||||
const auto * buf = orc_str_column->data[i];
|
||||
size_t buf_size = orc_str_column->length[i];
|
||||
memcpy(&column_chars_t[curr_offset], buf, buf_size);
|
||||
curr_offset += buf_size;
|
||||
}
|
||||
|
||||
column_chars_t.push_back(0);
|
||||
column_chars_t[curr_offset] = 0;
|
||||
++curr_offset;
|
||||
|
||||
column_offsets.push_back(curr_offset);
|
||||
column_offsets[i] = curr_offset;
|
||||
}
|
||||
}
|
||||
return {std::move(internal_column), std::move(internal_type), column_name};
|
||||
}
|
||||
|
@ -445,7 +445,7 @@ def test_mysql_distributed(started_cluster):
|
||||
query = "SELECT * FROM ("
|
||||
for i in range(3):
|
||||
query += "SELECT name FROM test_replicas UNION DISTINCT "
|
||||
query += "SELECT name FROM test_replicas)"
|
||||
query += "SELECT name FROM test_replicas) ORDER BY name"
|
||||
|
||||
result = node2.query(query)
|
||||
assert result == "host2\nhost3\nhost4\n"
|
||||
@ -827,6 +827,9 @@ def test_settings(started_cluster):
|
||||
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
|
||||
)
|
||||
|
||||
node1.query("DROP DATABASE IF EXISTS m")
|
||||
node1.query("DROP DATABASE IF EXISTS mm")
|
||||
|
||||
rw_timeout = 40123001
|
||||
connect_timeout = 40123002
|
||||
node1.query(
|
||||
@ -855,6 +858,9 @@ def test_settings(started_cluster):
|
||||
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
|
||||
)
|
||||
|
||||
node1.query("DROP DATABASE m")
|
||||
node1.query("DROP DATABASE mm")
|
||||
|
||||
drop_mysql_table(conn, table_name)
|
||||
conn.close()
|
||||
|
||||
@ -930,6 +936,9 @@ def test_joins(started_cluster):
|
||||
|
||||
conn.commit()
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS test_joins_table_users")
|
||||
node1.query("DROP TABLE IF EXISTS test_joins_table_tickets")
|
||||
|
||||
node1.query(
|
||||
"""
|
||||
CREATE TABLE test_joins_table_users
|
||||
@ -964,6 +973,9 @@ def test_joins(started_cluster):
|
||||
"""
|
||||
) == "281607\tFeedback\t2024-06-25 12:09:41\tuser@example.com\n"
|
||||
|
||||
node1.query("DROP TABLE test_joins_table_users")
|
||||
node1.query("DROP TABLE test_joins_table_tickets")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
with contextmanager(started_cluster)() as cluster:
|
||||
|
15
tests/performance/all_join_opt.xml
Normal file
15
tests/performance/all_join_opt.xml
Normal file
@ -0,0 +1,15 @@
|
||||
<test>
|
||||
<create_query>CREATE TABLE test (a Int64, b String, c LowCardinality(String)) ENGINE = MergeTree() ORDER BY a</create_query>
|
||||
<create_query>CREATE TABLE test1 (a Int64, b String, c LowCardinality(String)) ENGINE = MergeTree() ORDER BY a</create_query>
|
||||
|
||||
<fill_query>INSERT INTO test SELECT number % 10000, number % 10000, number % 10000 FROM numbers(10000000)</fill_query>
|
||||
<fill_query>INSERT INTO test1 SELECT number % 1000 , number % 1000, number % 1000 FROM numbers(100000)</fill_query>
|
||||
|
||||
<query tag='INNER'>SELECT MAX(test1.a) FROM test INNER JOIN test1 on test.b = test1.b</query>
|
||||
<query tag='LEFT'>SELECT MAX(test1.a) FROM test LEFT JOIN test1 on test.b = test1.b</query>
|
||||
<query tag='RIGHT'>SELECT MAX(test1.a) FROM test RIGHT JOIN test1 on test.b = test1.b</query>
|
||||
<query tag='FULL'>SELECT MAX(test1.a) FROM test FULL JOIN test1 on test.b = test1.b</query>
|
||||
|
||||
<drop_query>DROP TABLE IF EXISTS test</drop_query>
|
||||
<drop_query>DROP TABLE IF EXISTS test1</drop_query>
|
||||
</test>
|
Loading…
Reference in New Issue
Block a user