diff --git a/src/Columns/ColumnSet.h b/src/Columns/ColumnSet.h index 935a72e551a..95d2ab41eea 100644 --- a/src/Columns/ColumnSet.h +++ b/src/Columns/ColumnSet.h @@ -30,6 +30,7 @@ public: MutableColumnPtr cloneDummy(size_t s_) const override { return ColumnSet::create(s_, data); } FutureSetPtr getData() const { return data; } + void setData(FutureSetPtr data_) { data = std::move(data_); } // Used only for debugging, making it DUMPABLE Field operator[](size_t) const override { return {}; } diff --git a/src/Interpreters/HashJoin/HashJoin.cpp b/src/Interpreters/HashJoin/HashJoin.cpp index 1d0c6f75b8e..a36cb8c9d14 100644 --- a/src/Interpreters/HashJoin/HashJoin.cpp +++ b/src/Interpreters/HashJoin/HashJoin.cpp @@ -124,6 +124,22 @@ static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nulla } } +// static void filterBlock(Block & block, const IColumn::Filter & filter) +// { +// for (auto & elem : block) +// { +// if (elem.column->size() != filter.size()) +// throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of column {} doesn't match size of filter {}", +// elem.column->size(), filter.size()); + +// if (elem.column->empty()) +// { +// block.clear(); +// return; +// } +// } +// } + HashJoin::HashJoin( std::shared_ptr table_join_, const Block & right_sample_block_, @@ -561,6 +577,16 @@ bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits) if (shrink_blocks) block_to_save = block_to_save.shrinkToFit(); + ScatteredBlock right_key_columns_for_filter; + if (save_right_key_columns_for_filter) + { + right_key_columns_for_filter = filterColumnsPresentInSampleBlock(source_block, right_table_keys); + if (shrink_blocks) + right_key_columns_for_filter.shrinkToFit(); + + data->right_key_columns_for_filter.resize(table_join->getClauses().size()); + } + size_t max_bytes_in_join = table_join->sizeLimits().max_bytes; size_t max_rows_in_join = table_join->sizeLimits().max_rows; @@ -599,7 +625,7 @@ bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits) } doDebugAsserts(); - data->blocks_allocated_size += block_to_save.allocatedBytes(); + data->blocks_allocated_size += block_to_save.allocatedBytes() + right_key_columns_for_filter.allocatedBytes(); data->blocks.emplace_back(std::move(block_to_save)); const auto * stored_block = &data->blocks.back(); doDebugAsserts(); @@ -628,6 +654,22 @@ bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits) save_nullmap |= (*null_map)[i]; } + if (save_right_key_columns_for_filter) + { + if (null_map) + right_key_columns_for_filter.filter(*null_map); + + right_key_columns_for_filter.filterBySelector(); + + const auto & required_names = right_keys_for_fiter_per_clause[onexpr_idx]; + + Block right_keys_for_clause; + for (const auto & name : required_names) + right_keys_for_clause.insert(right_key_columns_for_filter.getByName(name)); + + data->right_key_columns_for_filter[onexpr_idx].emplace_back(right_keys_for_clause); + } + auto join_mask_col = JoinCommon::getColumnAsMask(source_block.getSourceBlock(), onexprs[onexpr_idx].condColumnNames().second); /// Save blocks that do not hold conditions in ON section ColumnUInt8::MutablePtr not_joined_map = nullptr; @@ -1619,4 +1661,15 @@ void HashJoin::tryRerangeRightTableData() data->sorted = true; } +void HashJoin::saveRightKeyColumnsForFilter(std::vector keys_per_clause) +{ + if (keys_per_clause.size() != table_join->getClauses().size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Invalid number of clauses. Expected {}, got {}", + table_join->getClauses().size(), keys_per_clause.size()); + + save_right_key_columns_for_filter = true; + right_keys_for_fiter_per_clause = std::move(keys_per_clause); +} + } diff --git a/src/Interpreters/HashJoin/HashJoin.h b/src/Interpreters/HashJoin/HashJoin.h index e478bc66b3c..59fa621492c 100644 --- a/src/Interpreters/HashJoin/HashJoin.h +++ b/src/Interpreters/HashJoin/HashJoin.h @@ -355,6 +355,7 @@ public: Block sample_block; /// Block as it would appear in the BlockList ScatteredBlocksList blocks; /// Blocks of "right" table. BlockNullmapList blocks_nullmaps; /// Nullmaps for blocks of "right" table (if needed) + std::vector right_key_columns_for_filter; /// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows. Arena pool; @@ -408,6 +409,8 @@ public: void materializeColumnsFromLeftBlock(Block & block) const; Block materializeColumnsFromRightBlock(Block block) const; + void saveRightKeyColumnsForFilter(std::vector keys_per_clause); + private: friend class NotJoinedHash; @@ -438,6 +441,8 @@ private: mutable std::unique_ptr used_flags; RightTableDataPtr data; bool have_compressed = false; + bool save_right_key_columns_for_filter = false; + std::vector right_keys_for_fiter_per_clause; std::vector key_sizes; diff --git a/src/Interpreters/PreparedSets.cpp b/src/Interpreters/PreparedSets.cpp index c69e2f84d42..beccadc3054 100644 --- a/src/Interpreters/PreparedSets.cpp +++ b/src/Interpreters/PreparedSets.cpp @@ -86,6 +86,23 @@ FutureSetFromTuple::FutureSetFromTuple(Block block, const Settings & settings) set->finishInsert(); } +FutureSetFromTuple::FutureSetFromTuple(Block block) +{ + SizeLimits no_limits; + set = std::make_shared(no_limits, 0, 0); + set->setHeader(block.cloneEmpty().getColumnsWithTypeAndName()); + + Columns columns; + columns.reserve(block.columns()); + for (const auto & column : block) + columns.emplace_back(column.column); + + set_key_columns.filter = ColumnUInt8::create(block.rows()); + + set->insertFromColumns(columns, set_key_columns); + set->finishInsert(); +} + DataTypes FutureSetFromTuple::getTypes() const { return set->getElementsTypes(); } SetPtr FutureSetFromTuple::buildOrderedSetInplace(const ContextPtr & context) diff --git a/src/Interpreters/PreparedSets.h b/src/Interpreters/PreparedSets.h index a6aee974d0e..25342ba25aa 100644 --- a/src/Interpreters/PreparedSets.h +++ b/src/Interpreters/PreparedSets.h @@ -77,6 +77,7 @@ class FutureSetFromTuple final : public FutureSet { public: FutureSetFromTuple(Block block, const Settings & settings); + explicit FutureSetFromTuple(Block block); SetPtr get() const override { return set; } SetPtr buildOrderedSetInplace(const ContextPtr & context) override; diff --git a/src/Processors/QueryPlan/JoinStep.cpp b/src/Processors/QueryPlan/JoinStep.cpp index d81ce2fda17..1523792fc63 100644 --- a/src/Processors/QueryPlan/JoinStep.cpp +++ b/src/Processors/QueryPlan/JoinStep.cpp @@ -5,8 +5,12 @@ #include #include #include +#include +#include #include #include +#include +#include namespace DB { @@ -37,8 +41,120 @@ std::vector> describeJoinActions(const JoinPtr & join) return description; } +ColumnsWithTypeAndName squashBlocks(const Names & keys, const BlocksList & blocks) +{ + ColumnsWithTypeAndName squashed; + std::vector positions; + + // std::cerr << "===== " << blocks.front().dumpStructure() << std::endl; + for (const auto & name : keys) + { + // std::cerr << ".... " << name << std::endl; + positions.push_back(blocks.front().getPositionByName(name)); + } + + bool first = true; + for (const auto & block : blocks) + { + if (first) + { + first = false; + for (size_t pos : positions) + squashed.push_back(blocks.front().getByPosition(pos)); + continue; + } + + for (size_t i = 0; i < positions.size(); ++i) + { + auto & sq_col = squashed[i]; + auto col_mutable = IColumn::mutate(std::move(sq_col.column)); + + const auto & rhs_col = block.getByPosition(positions[i]); + size_t rows = rhs_col.column->size(); + + col_mutable->insertRangeFrom(*rhs_col.column, 0, rows); + sq_col.column = std::move(col_mutable); + } + } + + return squashed; } +} + +void DynamicJoinFilters::filterDynamicPartsByFilledJoin(const IJoin & join) +{ + if (!parts) + return; + + const auto * hash_join = typeid_cast(&join); + if (!hash_join) + return; + + const auto & blocks = hash_join->getJoinedData()->right_key_columns_for_filter; + if (blocks.empty()) + return; + + const auto & settings = context->getSettingsRef(); + + for (size_t i = 0; i < clauses.size(); ++i) + { + const auto & clause = clauses[i]; + auto squashed = squashBlocks(clause.keys, blocks[i]); + + // std::cerr << "Right join data rows " << squashed.front().column->size() << std::endl; + + auto set = std::make_shared(squashed, settings); + clause.set->setData(std::move(set)); + } + + const auto & primary_key = metadata->getPrimaryKey(); + const Names & primary_key_column_names = primary_key.column_names; + + KeyCondition key_condition(&actions, context, primary_key_column_names, primary_key.expression); + // std::cerr << "======== " << key_condition.toString() << std::endl; + + auto log = getLogger("DynamicJoinFilter"); + + auto parts_with_lock = parts->parts_ranges_ptr->get(); + + size_t prev_marks = 0; + size_t post_marks = 0; + + for (auto & part_range : parts_with_lock.parts_ranges) + { + MarkRanges filtered_ranges; + for (auto & range : part_range.ranges) + { + prev_marks += range.getNumberOfMarks(); + + // std::cerr << "Range " << range.begin << ' ' << range.end << " has final mark " << part_range.data_part->index_granularity->hasFinalMark() << std::endl; + auto new_ranges = MergeTreeDataSelectExecutor::markRangesFromPKRange( + part_range.data_part, + range.begin, + range.end, + metadata, + key_condition, + {}, nullptr, settings, log); + + for (auto & new_range : new_ranges) + { + // std::cerr << "New Range " << new_range.begin << ' ' << new_range.end << std::endl; + if (new_range.getNumberOfMarks()) + { + post_marks += new_range.getNumberOfMarks(); + filtered_ranges.push_back(new_range); + } + } + } + + part_range.ranges = std::move(filtered_ranges); + } + + LOG_TRACE(log, "Reading {}/{} marks after filtration.", post_marks, prev_marks); +} + + JoinStep::JoinStep( const Header & left_header_, const Header & right_header_, @@ -56,6 +172,11 @@ JoinStep::JoinStep( updateInputHeaders({left_header_, right_header_}); } +void JoinStep::setDynamicFilter(DynamicJoinFiltersPtr dynamic_filter_) +{ + dynamic_filter = std::move(dynamic_filter_); +} + QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &) { if (pipelines.size() != 2) @@ -69,10 +190,18 @@ QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines return joined_pipeline; } + auto finish_callback = [filter = this->dynamic_filter, algo = this->join]() + { + LOG_TRACE(getLogger("JoinStep"), "Finish callback called"); + if (filter) + filter->filterDynamicPartsByFilledJoin(*algo); + }; + auto pipeline = QueryPipelineBuilder::joinPipelinesRightLeft( std::move(pipelines[0]), std::move(pipelines[1]), join, + std::move(finish_callback), *output_header, max_block_size, min_block_size_bytes, @@ -105,6 +234,13 @@ void JoinStep::describeActions(FormatSettings & settings) const for (const auto & [name, value] : describeJoinActions(join)) settings.out << prefix << name << ": " << value << '\n'; + + if (dynamic_filter) + { + settings.out << prefix << "Dynamic Filter\n"; + auto expression = std::make_shared(dynamic_filter->actions.clone()); + expression->describeActions(settings.out, prefix); + } } void JoinStep::describeActions(JSONBuilder::JSONMap & map) const diff --git a/src/Processors/QueryPlan/JoinStep.h b/src/Processors/QueryPlan/JoinStep.h index bc9b7600510..a0257336862 100644 --- a/src/Processors/QueryPlan/JoinStep.h +++ b/src/Processors/QueryPlan/JoinStep.h @@ -2,6 +2,8 @@ #include #include +#include +#include namespace DB { @@ -9,6 +11,31 @@ namespace DB class IJoin; using JoinPtr = std::shared_ptr; +struct DynamiclyFilteredPartsRanges; +using DynamiclyFilteredPartsRangesPtr = std::shared_ptr; + +class ColumnSet; + +/// This structure is used to filter left table by the right one after HashJoin is filled. +struct DynamicJoinFilters +{ + struct Clause + { + ColumnSet * set; + Names keys; + }; + + std::vector clauses; + DynamiclyFilteredPartsRangesPtr parts; + ActionsDAG actions; + ContextPtr context; + StorageMetadataPtr metadata; + + void filterDynamicPartsByFilledJoin(const IJoin & join); +}; + +using DynamicJoinFiltersPtr = std::shared_ptr; + /// Join two data streams. class JoinStep : public IQueryPlanStep { @@ -35,6 +62,8 @@ public: void setJoin(JoinPtr join_) { join = std::move(join_); } bool allowPushDownToRight() const; + void setDynamicFilter(DynamicJoinFiltersPtr dynamic_filter_); + private: void updateOutputHeader() override; @@ -43,6 +72,8 @@ private: size_t min_block_size_bytes; size_t max_streams; bool keep_left_read_in_order; + + DynamicJoinFiltersPtr dynamic_filter; }; /// Special step for the case when Join is already filled. diff --git a/src/Processors/QueryPlan/Optimizations/Optimizations.h b/src/Processors/QueryPlan/Optimizations/Optimizations.h index 751d5182dc3..db805521361 100644 --- a/src/Processors/QueryPlan/Optimizations/Optimizations.h +++ b/src/Processors/QueryPlan/Optimizations/Optimizations.h @@ -114,6 +114,7 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes & nodes); void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes); void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &); void optimizeDistinctInOrder(QueryPlan::Node & node, QueryPlan::Nodes &); +void optimizeFilterByJoinSet(QueryPlan::Node & node); /// A separate tree traverse to apply sorting properties after *InOrder optimizations. void applyOrder(const QueryPlanOptimizationSettings & optimization_settings, QueryPlan::Node & root); diff --git a/src/Processors/QueryPlan/Optimizations/optimizeFilterByJoinSet.cpp b/src/Processors/QueryPlan/Optimizations/optimizeFilterByJoinSet.cpp new file mode 100644 index 00000000000..e17d0cf73f6 --- /dev/null +++ b/src/Processors/QueryPlan/Optimizations/optimizeFilterByJoinSet.cpp @@ -0,0 +1,268 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +// namespace Setting +// { +// extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas; +// } + + +namespace QueryPlanOptimizations +{ + +ReadFromMergeTree * findReadingStep(QueryPlan::Node & node) +{ + IQueryPlanStep * step = node.step.get(); + if (auto * reading = typeid_cast(step)) + return reading; + + if (node.children.size() != 1) + return nullptr; + + if (typeid_cast(step) || typeid_cast(step) || typeid_cast(step)) + return findReadingStep(*node.children.front()); + + if (auto * distinct = typeid_cast(step); distinct && distinct->isPreliminary()) + return findReadingStep(*node.children.front()); + + return nullptr; +} + + +void appendExpression(std::optional & dag, const ActionsDAG & expression) +{ + if (dag) + dag->mergeInplace(expression.clone()); + else + dag = expression.clone(); +} + +/// This function builds a common DAG which is a merge of DAGs from Filter and Expression steps chain. +void buildSortingDAG(QueryPlan::Node & node, std::optional & dag) +{ + IQueryPlanStep * step = node.step.get(); + if (auto * reading = typeid_cast(step)) + { + if (const auto prewhere_info = reading->getPrewhereInfo()) + { + //std::cerr << "====== Adding prewhere " << std::endl; + appendExpression(dag, prewhere_info->prewhere_actions); + } + return; + } + + if (node.children.size() != 1) + return; + + buildSortingDAG(*node.children.front(), dag); + + if (typeid_cast(step)) + { + } + + if (auto * expression = typeid_cast(step)) + { + const auto & actions = expression->getExpression(); + appendExpression(dag, actions); + } + + if (auto * filter = typeid_cast(step)) + { + appendExpression(dag, filter->getExpression()); + } + + if (auto * array_join = typeid_cast(step)) + { + const auto & array_joined_columns = array_join->getColumns(); + + if (dag) + { + std::unordered_set keys_set(array_joined_columns.begin(), array_joined_columns.end()); + + /// Remove array joined columns from outputs. + /// Types are changed after ARRAY JOIN, and we can't use this columns anyway. + ActionsDAG::NodeRawConstPtrs outputs; + outputs.reserve(dag->getOutputs().size()); + + for (const auto & output : dag->getOutputs()) + { + if (!keys_set.contains(output->result_name)) + outputs.push_back(output); + } + + dag->getOutputs() = std::move(outputs); + } + } +} + +void optimizeFilterByJoinSet(QueryPlan::Node & node) +{ + auto * join_step = typeid_cast(node.step.get()); + if (!join_step) + return; + + // std::cerr << "optimizeFilterByJoinSet\n"; + + const auto & join = join_step->getJoin(); + auto * hash_join = typeid_cast(join.get()); + if (!hash_join) + return; + + // std::cerr << "optimizeFilterByJoinSet got hash join\n"; + + const auto & table_join = join->getTableJoin(); + if (table_join.kind() != JoinKind::Inner && table_join.kind() != JoinKind::Right) + return; + + const auto & clauses = table_join.getClauses(); + if (clauses.empty()) + return; + + // std::cerr << "optimizeFilterByJoinSetone class\n"; + + auto * reading = findReadingStep(*node.children.front()); + if (!reading) + return; + + if (reading->splitsRangesIntoIntersectionAndNonIntersecting() || reading->isQueryWithFinal()) + return; + + // if (reading->getContext()->getSettingsRef()[Setting::allow_experimental_parallel_reading_from_replicas]) + // return; + + // std::cerr << "optimizeFilterByJoinSetone reading\n"; + + const auto & pk = reading->getStorageMetadata()->getPrimaryKey(); + if (pk.column_names.empty()) + return; + + // std::cerr << "optimizeFilterByJoinSetone pk\n"; + + std::optional dag; + buildSortingDAG(*node.children.front(), dag); + + if (!dag) + dag = ActionsDAG(reading->getOutputHeader().getColumnsWithTypeAndName()); + + // std::cerr << "optimizeFilterByJoinSetone sorting dag " << dag->dumpDAG() << std::endl; + + std::unordered_map outputs; + for (const auto & output : dag->getOutputs()) + outputs.emplace(output->result_name, output); + + const Block & right_source_columns = node.children.back()->step->getOutputHeader(); + ActionsDAG::NodeRawConstPtrs predicates; + DynamicJoinFilters join_filters; + std::vector right_keys_per_clause; + + FunctionOverloadResolverPtr func_tuple_builder = std::make_unique(std::make_shared()); + + for (const auto & clause : clauses) + { + // Names left_keys; + Names right_keys; + std::vector left_columns; + std::vector right_columns; + + size_t keys_size = clause.key_names_left.size(); + + for (size_t i = 0; i < keys_size; ++i) + { + const auto & left_name = clause.key_names_left[i]; + const auto & right_name = clause.key_names_right[i]; + + // std::cerr << left_name << ' ' << right_name << std::endl; + + auto it = outputs.find(left_name); + if (it != outputs.end()) + { + // left_keys.push_back(left_name); + right_keys.push_back(right_name); + left_columns.push_back(it->second); + right_columns.push_back(right_source_columns.getByName(right_name)); + } + } + + if (left_columns.empty()) + return; + + // std::cerr << "optimizeFilterByJoinSetone some coluns\n"; + + const ActionsDAG::Node * in_lhs_arg = left_columns.front(); + if (left_columns.size() > 1) + in_lhs_arg = &dag->addFunction(func_tuple_builder, std::move(left_columns), {}); + + auto context = reading->getContext(); + auto test_set = std::make_shared(Block(right_columns), context->getSettingsRef()); + auto column_set = ColumnSet::create(1, std::move(test_set)); + ColumnSet * column_set_ptr = column_set.get(); + ColumnPtr set_col = ColumnConst::create(std::move(column_set), 0); + + const ActionsDAG::Node * in_rhs_arg = &dag->addColumn({set_col, std::make_shared(), {}}); + + auto func_in = FunctionFactory::instance().get("in", context); + const ActionsDAG::Node * predicate = &dag->addFunction(func_in, {in_lhs_arg, in_rhs_arg}, {}); + + join_filters.clauses.emplace_back(column_set_ptr, right_keys); + right_keys_per_clause.emplace_back(std::move(right_keys)); + predicates.emplace_back(predicate); + } + + if (predicates.size() > 1) + { + FunctionOverloadResolverPtr func_builder_and = std::make_unique(std::make_shared()); + predicates = {&dag->addFunction(func_builder_and, std::move(predicates), {})}; + } + + dag->getOutputs() = std::move(predicates); + dag->removeUnusedActions(); + + // std::cerr << "optimizeFilterByJoinSetone dag " << dag->dumpDAG() << std::endl; + + auto metadata_snapshot = reading->getStorageMetadata(); + const auto & primary_key = metadata_snapshot->getPrimaryKey(); + const Names & primary_key_column_names = primary_key.column_names; + auto context = reading->getContext(); + + KeyCondition key_condition(&*dag, context, primary_key_column_names, primary_key.expression); + + // std::cerr << "optimizeFilterByJoinSetone matched cond " << key_condition.toString() << std::endl; + + /// Condition is (join keys) IN (empty set). + if (key_condition.alwaysUnknownOrTrue()) + return; + + // std::cerr << "optimizeFilterByJoinSetone matched cond " << std::endl; + + auto dynamic_parts = reading->useDynamiclyFilteredParts(); + + join_filters.actions = std::move(*dag); + join_filters.parts = dynamic_parts; + join_filters.context = context; + join_filters.metadata = metadata_snapshot; + + join_step->setDynamicFilter(std::make_shared(std::move(join_filters))); + hash_join->saveRightKeyColumnsForFilter(std::move(right_keys_per_clause)); +} + +} +} diff --git a/src/Processors/QueryPlan/Optimizations/optimizeTree.cpp b/src/Processors/QueryPlan/Optimizations/optimizeTree.cpp index 03418c752d4..cd4af0c8613 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeTree.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeTree.cpp @@ -132,6 +132,8 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s if (optimization_settings.distinct_in_order) optimizeDistinctInOrder(*frame.node, nodes); + + optimizeFilterByJoinSet(*frame.node); } /// Traverse all children first. diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 6899dc7f5d6..6e6a230b00c 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -386,6 +386,16 @@ ReadFromMergeTree::ReadFromMergeTree( setStepDescription(data.getStorageID().getFullNameNotQuoted()); enable_vertical_final = query_info.isFinal() && context->getSettingsRef()[Setting::enable_vertical_final] && data.merging_params.mode == MergeTreeData::MergingParams::Replacing; + + double read_split_ranges_into_intersecting_and_non_intersecting_injection_probability + = settings[Setting::merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability]; + if (read_split_ranges_into_intersecting_and_non_intersecting_injection_probability > 0.0) + { + std::bernoulli_distribution fault(read_split_ranges_into_intersecting_and_non_intersecting_injection_probability); + + if (fault(thread_local_rng)) + read_split_ranges_into_intersecting_and_non_intersecting_injection = true; + } } std::unique_ptr ReadFromMergeTree::createLocalParallelReplicasReadingStep( @@ -454,6 +464,9 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_wit block_size, context); + if (dynamically_filtered_parts) + dynamically_filtered_parts->parts_ranges_ptr = pool->getPartsWithRanges(); + Pipes pipes; for (size_t i = 0; i < pool_settings.threads; ++i) @@ -502,7 +515,7 @@ Pipe ReadFromMergeTree::readFromPool( all_parts_are_remote &= is_remote; } - MergeTreeReadPoolPtr pool; + std::shared_ptr pool; bool allow_prefetched_remote = all_parts_are_remote && settings[Setting::allow_prefetched_read_pool_for_remote_filesystem] && MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.remote_fs_method); @@ -547,6 +560,9 @@ Pipe ReadFromMergeTree::readFromPool( context); } + if (dynamically_filtered_parts) + dynamically_filtered_parts->parts_ranges_ptr = pool->getPartsWithRanges(); + LOG_DEBUG(log, "Reading approx. {} rows with {} streams", total_rows, pool_settings.threads); Pipes pipes; @@ -581,7 +597,7 @@ Pipe ReadFromMergeTree::readInOrder( /// For reading in order it makes sense to read only /// one range per task to reduce number of read rows. bool has_limit_below_one_block = read_type != ReadType::Default && read_limit && read_limit < block_size.max_block_size_rows; - MergeTreeReadPoolPtr pool; + std::shared_ptr pool; if (is_parallel_reading_from_replicas) { @@ -645,6 +661,9 @@ Pipe ReadFromMergeTree::readInOrder( context); } + if (dynamically_filtered_parts) + dynamically_filtered_parts->parts_ranges_ptr = pool->getPartsWithRanges(); + /// If parallel replicas enabled, set total rows in progress here only on initiator with local plan /// Otherwise rows will counted multiple times const UInt64 in_order_limit = query_info.input_order_info ? query_info.input_order_info->limit : 0; @@ -876,14 +895,9 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(RangesInDataParts && parts_ auto read_type = is_parallel_reading_from_replicas ? ReadType::ParallelReplicas : ReadType::Default; - double read_split_ranges_into_intersecting_and_non_intersecting_injection_probability - = settings[Setting::merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability]; - std::bernoulli_distribution fault(read_split_ranges_into_intersecting_and_non_intersecting_injection_probability); - if (read_type != ReadType::ParallelReplicas && num_streams > 1 && - read_split_ranges_into_intersecting_and_non_intersecting_injection_probability > 0.0 && - fault(thread_local_rng) && + read_split_ranges_into_intersecting_and_non_intersecting_injection && !isQueryWithFinal() && data.merging_params.is_deleted_column.empty() && !prewhere_info) @@ -2202,6 +2216,14 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons pipeline.setQueryIdHolder(std::move(query_id_holder)); } +DynamiclyFilteredPartsRangesPtr ReadFromMergeTree::useDynamiclyFilteredParts() +{ + if (!dynamically_filtered_parts) + dynamically_filtered_parts = std::make_shared(); + + return dynamically_filtered_parts; +} + static const char * indexTypeToString(ReadFromMergeTree::IndexType type) { switch (type) diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index 2b02fa82761..82a473dc5f0 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -55,6 +55,13 @@ struct UsefulSkipIndexes std::vector merged_indices; }; +struct DynamiclyFilteredPartsRanges +{ + MergeTreeReadPartsRangesPtr parts_ranges_ptr; +}; + +using DynamiclyFilteredPartsRangesPtr = std::shared_ptr; + /// This step is created to read from MergeTree* table. /// For now, it takes a list of parts and creates source from it. class ReadFromMergeTree final : public SourceStepWithFilter @@ -212,6 +219,9 @@ public: void applyFilters(ActionDAGNodes added_filter_nodes) override; + DynamiclyFilteredPartsRangesPtr useDynamiclyFilteredParts(); + bool splitsRangesIntoIntersectionAndNonIntersecting() const { return read_split_ranges_into_intersecting_and_non_intersecting_injection; } + private: MergeTreeReaderSettings reader_settings; @@ -276,6 +286,8 @@ private: mutable AnalysisResultPtr analyzed_result_ptr; VirtualFields shared_virtual_fields; + DynamiclyFilteredPartsRangesPtr dynamically_filtered_parts; + bool is_parallel_reading_from_replicas; std::optional all_ranges_callback; std::optional read_task_callback; @@ -285,6 +297,8 @@ private: ExpressionActionsPtr virtual_row_conversion; std::optional number_of_current_replica; + + bool read_split_ranges_into_intersecting_and_non_intersecting_injection = false; }; } diff --git a/src/Processors/Transforms/JoiningTransform.cpp b/src/Processors/Transforms/JoiningTransform.cpp index 2862575b541..1af01abb988 100644 --- a/src/Processors/Transforms/JoiningTransform.cpp +++ b/src/Processors/Transforms/JoiningTransform.cpp @@ -241,9 +241,10 @@ Blocks JoiningTransform::readExecute(Chunk & chunk) return res; } -FillingRightJoinSideTransform::FillingRightJoinSideTransform(Block input_header, JoinPtr join_) +FillingRightJoinSideTransform::FillingRightJoinSideTransform(Block input_header, JoinPtr join_, std::function on_finish_callback_) : IProcessor({input_header}, {Block()}) , join(std::move(join_)) + , on_finish_callback(std::move(on_finish_callback_)) {} InputPort * FillingRightJoinSideTransform::addTotalsPort() @@ -312,12 +313,26 @@ IProcessor::Status FillingRightJoinSideTransform::prepare() return Status::Ready; } + if (!on_finish) + { + on_finish = true; + return Status::Ready; + } + output.finish(); return Status::Finished; } void FillingRightJoinSideTransform::work() { + if (on_finish) + { + join->tryRerangeRightTableData(); + if (on_finish_callback) + on_finish_callback(); + return; + } + auto & input = inputs.front(); auto block = input.getHeader().cloneWithColumns(chunk.detachColumns()); @@ -326,9 +341,6 @@ void FillingRightJoinSideTransform::work() else stop_reading = !join->addBlockToJoin(block); - if (input.isFinished()) - join->tryRerangeRightTableData(); - set_totals = for_totals; } diff --git a/src/Processors/Transforms/JoiningTransform.h b/src/Processors/Transforms/JoiningTransform.h index d0042983eb5..74a7d26eb6d 100644 --- a/src/Processors/Transforms/JoiningTransform.h +++ b/src/Processors/Transforms/JoiningTransform.h @@ -102,7 +102,7 @@ private: class FillingRightJoinSideTransform : public IProcessor { public: - FillingRightJoinSideTransform(Block input_header, JoinPtr join_); + FillingRightJoinSideTransform(Block input_header, JoinPtr join_, std::function on_finish_callback_); String getName() const override { return "FillingRightJoinSide"; } InputPort * addTotalsPort(); @@ -116,6 +116,9 @@ private: bool stop_reading = false; bool for_totals = false; bool set_totals = false; + bool on_finish = false; + + std::function on_finish_callback; }; diff --git a/src/QueryPipeline/QueryPipelineBuilder.cpp b/src/QueryPipeline/QueryPipelineBuilder.cpp index 6e644d8f6d5..be47afa2687 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.cpp +++ b/src/QueryPipeline/QueryPipelineBuilder.cpp @@ -384,6 +384,7 @@ std::unique_ptr QueryPipelineBuilder::joinPipelinesRightLe std::unique_ptr left, std::unique_ptr right, JoinPtr join, + std::function finish_callback, const Block & output_header, size_t max_block_size, size_t min_block_size_bytes, @@ -446,7 +447,7 @@ std::unique_ptr QueryPipelineBuilder::joinPipelinesRightLe auto squashing = std::make_shared(right->getHeader(), 0, min_block_size_bytes); connect(*outport, squashing->getInputs().front()); processors.emplace_back(squashing); - auto adding_joined = std::make_shared(right->getHeader(), join); + auto adding_joined = std::make_shared(right->getHeader(), join, finish_callback); connect(squashing->getOutputPort(), adding_joined->getInputs().front()); processors.emplace_back(std::move(adding_joined)); } @@ -459,7 +460,7 @@ std::unique_ptr QueryPipelineBuilder::joinPipelinesRightLe { right->resize(1); - auto adding_joined = std::make_shared(right->getHeader(), join); + auto adding_joined = std::make_shared(right->getHeader(), join, finish_callback); InputPort * totals_port = nullptr; if (right->hasTotals()) totals_port = adding_joined->addTotalsPort(); diff --git a/src/QueryPipeline/QueryPipelineBuilder.h b/src/QueryPipeline/QueryPipelineBuilder.h index 312655b7b6d..d03560644ee 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.h +++ b/src/QueryPipeline/QueryPipelineBuilder.h @@ -124,6 +124,7 @@ public: std::unique_ptr left, std::unique_ptr right, JoinPtr join, + std::function finish_callback, const Block & output_header, size_t max_block_size, size_t min_block_size_bytes, diff --git a/src/Storages/MergeTree/KeyCondition.cpp b/src/Storages/MergeTree/KeyCondition.cpp index 1bf568eebba..e017be00e29 100644 --- a/src/Storages/MergeTree/KeyCondition.cpp +++ b/src/Storages/MergeTree/KeyCondition.cpp @@ -1285,6 +1285,8 @@ bool KeyCondition::tryPrepareSetIndex( Columns transformed_set_columns = set_columns; + // std::cerr << "+++++ set col size " << set_columns.front()->size() << std::endl; + for (size_t indexes_mapping_index = 0; indexes_mapping_index < indexes_mapping_size; ++indexes_mapping_index) { const auto & key_column_type = data_types[indexes_mapping_index]; @@ -1309,6 +1311,8 @@ bool KeyCondition::tryPrepareSetIndex( is_constant_transformed = true; } + // std::cerr << set_element_type->getName() << " -> " << key_column_type->getName() << std::endl; + if (canBeSafelyCasted(set_element_type, key_column_type)) { transformed_set_columns[set_element_index] = castColumn({set_column, set_element_type, {}}, key_column_type); @@ -1383,6 +1387,8 @@ bool KeyCondition::tryPrepareSetIndex( set_columns = std::move(transformed_set_columns); + // std::cerr << "+++2++ set col size " << set_columns.front()->size() << std::endl; + out.set_index = std::make_shared(set_columns, std::move(indexes_mapping)); /// When not all key columns are used or when there are multiple elements in diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 52ea6db787d..0715ede248d 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -127,7 +127,7 @@ size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead( MarkRanges exact_ranges; for (const auto & part : parts) { - MarkRanges part_ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, {}, &exact_ranges, settings, log); + MarkRanges part_ranges = markRangesFromPKRange(part, 0, part->index_granularity->getMarksCount(), metadata_snapshot, key_condition, {}, &exact_ranges, settings, log); for (const auto & range : part_ranges) rows_count += part->index_granularity->getRowsCountInRange(range); } @@ -695,6 +695,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd CurrentMetrics::Increment metric(CurrentMetrics::FilteringMarksWithPrimaryKey); ranges.ranges = markRangesFromPKRange( part, + 0, part->index_granularity->getMarksCount(), metadata_snapshot, key_condition, part_offset_condition, @@ -1035,6 +1036,8 @@ size_t MergeTreeDataSelectExecutor::minMarksForConcurrentRead( /// If @exact_ranges is not null, fill it with ranges containing marks of fully matched records. MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange( const MergeTreeData::DataPartPtr & part, + size_t start_mark, + size_t end_mark, const StorageMetadataPtr & metadata_snapshot, const KeyCondition & key_condition, const std::optional & part_offset_condition, @@ -1045,10 +1048,16 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange( MarkRanges res; size_t marks_count = part->index_granularity->getMarksCount(); + bool has_final_mark = part->index_granularity->hasFinalMark(); + if (marks_count == 0) return res; - bool has_final_mark = part->index_granularity->hasFinalMark(); + if (has_final_mark && end_mark == marks_count) + --end_mark; + + if (start_mark >= end_mark) + return res; bool key_condition_useful = !key_condition.alwaysUnknownOrTrue(); bool part_offset_condition_useful = part_offset_condition && !part_offset_condition->alwaysUnknownOrTrue(); @@ -1056,11 +1065,7 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange( /// If index is not used. if (!key_condition_useful && !part_offset_condition_useful) { - if (has_final_mark) - res.push_back(MarkRange(0, marks_count - 1)); - else - res.push_back(MarkRange(0, marks_count)); - + res.push_back(MarkRange(start_mark, end_mark)); return res; } @@ -1207,7 +1212,7 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange( /// At each step, take the left segment and check if it fits. /// If fits, split it into smaller ones and put them on the stack. If not, discard it. /// If the segment is already of one mark length, add it to response and discard it. - std::vector ranges_stack = { {0, marks_count - (has_final_mark ? 1 : 0)} }; + std::vector ranges_stack = { {start_mark, end_mark} }; size_t steps = 0; @@ -1271,8 +1276,8 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange( MarkRange result_range; - size_t last_mark = marks_count - (has_final_mark ? 1 : 0); - size_t searched_left = 0; + size_t last_mark = end_mark; + size_t searched_left = start_mark; size_t searched_right = last_mark; bool check_left = false; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index d16d9243c14..46b5d4ea75a 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -66,6 +66,8 @@ public: static MarkRanges markRangesFromPKRange( const MergeTreeData::DataPartPtr & part, + size_t start_mark, + size_t end_mark, const StorageMetadataPtr & metadata_snapshot, const KeyCondition & key_condition, const std::optional & part_offset_condition, diff --git a/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp b/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp index 4e5389f2869..33a978f9d2e 100644 --- a/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp +++ b/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp @@ -117,16 +117,14 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool( params_, context_) , prefetch_threadpool(getContext()->getPrefetchThreadpool()) - , log(getLogger( - "MergeTreePrefetchedReadPool(" - + (parts_ranges.empty() ? "" : parts_ranges.front().data_part->storage.getStorageID().getNameForLogs()) + ")")) + , log(getLogger("MergeTreePrefetchedReadPool(" + (storage_snapshot_->storage.getStorageID().getNameForLogs()) + ")")) { - /// Tasks creation might also create a lost of readers - check they do not - /// do any time consuming operations in ctor. - ProfileEventTimeIncrement watch(ProfileEvents::MergeTreePrefetchedReadPoolInit); + // /// Tasks creation might also create a lost of readers - check they do not + // /// do any time consuming operations in ctor. + // ProfileEventTimeIncrement watch(ProfileEvents::MergeTreePrefetchedReadPoolInit); - fillPerPartStatistics(); - fillPerThreadTasks(pool_settings.threads, pool_settings.sum_marks); + // fillPerPartStatistics(); + // fillPerThreadTasks(pool_settings.threads, pool_settings.sum_marks); } std::function MergeTreePrefetchedReadPool::createPrefetchedTask(IMergeTreeReader * reader, Priority priority) @@ -183,6 +181,22 @@ void MergeTreePrefetchedReadPool::startPrefetches() MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::getTask(size_t task_idx, MergeTreeReadTask * previous_task) { + auto init = [this]() + { + auto parts_ranges_and_lock = parts_ranges_ptr->get(); + const auto & parts_ranges = parts_ranges_and_lock.parts_ranges; + + fillPerPartInfos(parts_ranges); + + /// Tasks creation might also create a lost of readers - check they do not + /// do any time consuming operations in ctor. + ProfileEventTimeIncrement watch(ProfileEvents::MergeTreePrefetchedReadPoolInit); + + fillPerPartStatistics(parts_ranges); + fillPerThreadTasks(parts_ranges, pool_settings.threads, pool_settings.sum_marks); + }; + std::call_once(init_flag, init); + std::lock_guard lock(mutex); if (per_thread_tasks.empty()) @@ -313,7 +327,7 @@ MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::createTask(ThreadTask & task, return MergeTreeReadPoolBase::createTask(task.read_info, task.ranges, previous_task); } -void MergeTreePrefetchedReadPool::fillPerPartStatistics() +void MergeTreePrefetchedReadPool::fillPerPartStatistics(const RangesInDataParts & parts_ranges) { per_part_statistics.clear(); per_part_statistics.reserve(parts_ranges.size()); @@ -363,7 +377,7 @@ ALWAYS_INLINE inline String getPartNameForLogging(const DataPartPtr & part) } -void MergeTreePrefetchedReadPool::fillPerThreadTasks(size_t threads, size_t sum_marks) +void MergeTreePrefetchedReadPool::fillPerThreadTasks(const RangesInDataParts & parts_ranges, size_t threads, size_t sum_marks) { if (per_part_infos.empty()) return; diff --git a/src/Storages/MergeTree/MergeTreePrefetchedReadPool.h b/src/Storages/MergeTree/MergeTreePrefetchedReadPool.h index b94d4ea113a..cfe03d21b2f 100644 --- a/src/Storages/MergeTree/MergeTreePrefetchedReadPool.h +++ b/src/Storages/MergeTree/MergeTreePrefetchedReadPool.h @@ -14,7 +14,7 @@ using MergeTreeReaderPtr = std::unique_ptr; /// A class which is responsible for creating read tasks /// which are later taken by readers via getTask method. /// Does prefetching for the read tasks it creates. -class MergeTreePrefetchedReadPool : public MergeTreeReadPoolBase +class MergeTreePrefetchedReadPool final : public MergeTreeReadPoolBase { public: MergeTreePrefetchedReadPool( @@ -106,8 +106,8 @@ private: using TasksPerThread = std::map; using PartStatistics = std::vector; - void fillPerPartStatistics(); - void fillPerThreadTasks(size_t threads, size_t sum_marks); + void fillPerPartStatistics(const RangesInDataParts & parts_ranges); + void fillPerThreadTasks(const RangesInDataParts & parts_ranges, size_t threads, size_t sum_marks); void startPrefetches(); void createPrefetchedReadersForTask(ThreadTask & task); diff --git a/src/Storages/MergeTree/MergeTreeReadPool.cpp b/src/Storages/MergeTree/MergeTreeReadPool.cpp index d266ad55824..7de807f5647 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPool.cpp @@ -62,11 +62,23 @@ MergeTreeReadPool::MergeTreeReadPool( , backoff_settings{context_->getSettingsRef()} , backoff_state{pool_settings.threads} { - fillPerThreadInfo(pool_settings.threads, pool_settings.sum_marks); + //fillPerThreadInfo(pool_settings.threads, pool_settings.sum_marks); } MergeTreeReadTaskPtr MergeTreeReadPool::getTask(size_t task_idx, MergeTreeReadTask * previous_task) { + auto init = [this]() + { + auto parts_ranges_and_lock = parts_ranges_ptr->get(); + const auto & parts_ranges = parts_ranges_and_lock.parts_ranges; + + fillPerPartInfos(parts_ranges); + fillPerThreadInfo(parts_ranges, pool_settings.threads, pool_settings.sum_marks); + + LOG_TRACE(getLogger("MergeTreeReadPool"), "Init callback done"); + }; + std::call_once(init_flag, init); + const std::lock_guard lock{mutex}; /// If number of threads was lowered due to backoff, then will assign work only for maximum 'backoff_state.current_threads' threads. @@ -194,7 +206,7 @@ void MergeTreeReadPool::profileFeedback(ReadBufferFromFileBase::ProfileInfo info LOG_DEBUG(log, "Will lower number of threads to {}", backoff_state.current_threads); } -void MergeTreeReadPool::fillPerThreadInfo(size_t threads, size_t sum_marks) +void MergeTreeReadPool::fillPerThreadInfo(const RangesInDataParts & parts_ranges, size_t threads, size_t sum_marks) { if (threads > 1000000ull) throw Exception(ErrorCodes::CANNOT_SCHEDULE_TASK, "Too many threads ({}) requested", threads); @@ -213,7 +225,7 @@ void MergeTreeReadPool::fillPerThreadInfo(size_t threads, size_t sum_marks) using PartsInfo = std::vector; std::queue parts_queue; - auto per_part_sum_marks = getPerPartSumMarks(); + auto per_part_sum_marks = getPerPartSumMarks(parts_ranges); { /// Group parts by disk name. @@ -223,6 +235,9 @@ void MergeTreeReadPool::fillPerThreadInfo(size_t threads, size_t sum_marks) for (size_t i = 0; i < parts_ranges.size(); ++i) { + if (parts_ranges[i].ranges.empty()) + continue; + PartInfo part_info{parts_ranges[i], per_part_sum_marks[i], i}; if (parts_ranges[i].data_part->isStoredOnDisk()) parts_per_disk[parts_ranges[i].data_part->getDataPartStorage().getDiskName()].push_back(std::move(part_info)); diff --git a/src/Storages/MergeTree/MergeTreeReadPool.h b/src/Storages/MergeTree/MergeTreeReadPool.h index a0425f0951c..4badd807e64 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.h +++ b/src/Storages/MergeTree/MergeTreeReadPool.h @@ -19,7 +19,7 @@ namespace DB * it's workload, it either is signaled that no more work is available (`do_not_steal_tasks == false`) or * continues taking small batches from other threads' workloads (`do_not_steal_tasks == true`). */ -class MergeTreeReadPool : public MergeTreeReadPoolBase +class MergeTreeReadPool final : public MergeTreeReadPoolBase { public: struct BackoffSettings; @@ -72,7 +72,7 @@ public: }; private: - void fillPerThreadInfo(size_t threads, size_t sum_marks); + void fillPerThreadInfo(const RangesInDataParts & parts_ranges, size_t threads, size_t sum_marks); mutable std::mutex mutex; diff --git a/src/Storages/MergeTree/MergeTreeReadPoolBase.cpp b/src/Storages/MergeTree/MergeTreeReadPoolBase.cpp index 15a87f463b4..749b12f2c49 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolBase.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolBase.cpp @@ -31,7 +31,7 @@ MergeTreeReadPoolBase::MergeTreeReadPoolBase( const MergeTreeReadTask::BlockSizeParams & block_size_params_, const ContextPtr & context_) : WithContext(context_) - , parts_ranges(std::move(parts_)) + , parts_ranges_ptr(std::make_shared(std::move(parts_))) , mutations_snapshot(std::move(mutations_snapshot_)) , shared_virtual_fields(std::move(shared_virtual_fields_)) , storage_snapshot(storage_snapshot_) @@ -44,9 +44,12 @@ MergeTreeReadPoolBase::MergeTreeReadPoolBase( , owned_mark_cache(context_->getGlobalContext()->getMarkCache()) , owned_uncompressed_cache(pool_settings_.use_uncompressed_cache ? context_->getGlobalContext()->getUncompressedCache() : nullptr) , header(storage_snapshot->getSampleBlockForColumns(column_names)) + , merge_tree_determine_task_size_by_prewhere_columns(context_->getSettingsRef()[Setting::merge_tree_determine_task_size_by_prewhere_columns]) + , merge_tree_min_bytes_per_task_for_remote_reading(context_->getSettingsRef()[Setting::merge_tree_min_bytes_per_task_for_remote_reading]) + , merge_tree_min_read_task_size(context_->getSettingsRef()[Setting::merge_tree_min_read_task_size]) , profile_callback([this](ReadBufferFromFileBase::ProfileInfo info_) { profileFeedback(info_); }) { - fillPerPartInfos(context_->getSettingsRef()); + //fillPerPartInfos(context_->getSettingsRef()); } static size_t getSizeOfColumns(const IMergeTreeDataPart & part, const Names & columns_to_read) @@ -75,10 +78,12 @@ calculateMinMarksPerTask( const Names & columns_to_read, const std::vector & prewhere_steps_columns, const MergeTreeReadPoolBase::PoolSettings & pool_settings, - const Settings & settings) + bool merge_tree_determine_task_size_by_prewhere_columns, + UInt64 merge_tree_min_read_task_size, + UInt64 merge_tree_min_bytes_per_task_for_remote_reading) { size_t min_marks_per_task - = std::max(settings[Setting::merge_tree_min_read_task_size], pool_settings.min_marks_for_concurrent_read); + = std::max(merge_tree_min_read_task_size, pool_settings.min_marks_for_concurrent_read); size_t avg_mark_bytes = 0; /// It is important to obtain marks count from the part itself instead of calling `part.getMarksCount()`, /// because `part` will report number of marks selected from this part by the query. @@ -90,13 +95,13 @@ calculateMinMarksPerTask( /// We assume that most of the time prewhere does it's job good meaning that lion's share of the rows is filtered out. /// Which means in turn that for most of the rows we will read only the columns from prewhere clause. /// So it makes sense to use only them for the estimation. - const auto & columns = settings[Setting::merge_tree_determine_task_size_by_prewhere_columns] && !prewhere_steps_columns.empty() + const auto & columns = merge_tree_determine_task_size_by_prewhere_columns && !prewhere_steps_columns.empty() ? getHeaviestSetOfColumnsAmongPrewhereSteps(*part.data_part, prewhere_steps_columns) : columns_to_read; const size_t part_compressed_bytes = getSizeOfColumns(*part.data_part, columns); avg_mark_bytes = std::max(part_compressed_bytes / part_marks_count, 1); - const auto min_bytes_per_task = settings[Setting::merge_tree_min_bytes_per_task_for_remote_reading]; + const auto min_bytes_per_task = merge_tree_min_bytes_per_task_for_remote_reading; /// We're taking min here because number of tasks shouldn't be too low - it will make task stealing impossible. /// We also create at least two tasks per thread to have something to steal from a slow thread. const auto heuristic_min_marks @@ -121,7 +126,7 @@ calculateMinMarksPerTask( return {min_marks_per_task, avg_mark_bytes}; } -void MergeTreeReadPoolBase::fillPerPartInfos(const Settings & settings) +void MergeTreeReadPoolBase::fillPerPartInfos(const RangesInDataParts & parts_ranges) { per_part_infos.reserve(parts_ranges.size()); is_part_on_remote_disk.reserve(parts_ranges.size()); @@ -186,12 +191,12 @@ void MergeTreeReadPoolBase::fillPerPartInfos(const Settings & settings) is_part_on_remote_disk.push_back(part_with_ranges.data_part->isStoredOnRemoteDisk()); std::tie(read_task_info.min_marks_per_task, read_task_info.approx_size_of_mark) - = calculateMinMarksPerTask(part_with_ranges, column_names, read_task_info.task_columns.pre_columns, pool_settings, settings); + = calculateMinMarksPerTask(part_with_ranges, column_names, read_task_info.task_columns.pre_columns, pool_settings, merge_tree_determine_task_size_by_prewhere_columns, merge_tree_min_read_task_size, merge_tree_min_bytes_per_task_for_remote_reading); per_part_infos.push_back(std::make_shared(std::move(read_task_info))); } } -std::vector MergeTreeReadPoolBase::getPerPartSumMarks() const +std::vector MergeTreeReadPoolBase::getPerPartSumMarks(const RangesInDataParts & parts_ranges) { std::vector per_part_sum_marks; per_part_sum_marks.reserve(parts_ranges.size()); diff --git a/src/Storages/MergeTree/MergeTreeReadPoolBase.h b/src/Storages/MergeTree/MergeTreeReadPoolBase.h index 19b26156433..cab268cdf25 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolBase.h +++ b/src/Storages/MergeTree/MergeTreeReadPoolBase.h @@ -6,6 +6,29 @@ namespace DB { +class MergeTreeReadPartsRanges +{ +public: + explicit MergeTreeReadPartsRanges(RangesInDataParts parts_ranges_) : parts_ranges(std::move(parts_ranges_)) {} + + struct LockedPartRannges + { + std::lock_guard lock; + RangesInDataParts & parts_ranges; + }; + + LockedPartRannges get() TSA_NO_THREAD_SAFETY_ANALYSIS + { + return {std::lock_guard(mutex), parts_ranges}; + } + +private: + RangesInDataParts parts_ranges; + std::mutex mutex; +}; + +using MergeTreeReadPartsRangesPtr = std::shared_ptr; + class MergeTreeReadPoolBase : public IMergeTreeReadPool, protected WithContext { public: @@ -38,9 +61,11 @@ public: Block getHeader() const override { return header; } + MergeTreeReadPartsRangesPtr getPartsWithRanges() { return parts_ranges_ptr; } + protected: /// Initialized in constructor - const RangesInDataParts parts_ranges; + const MergeTreeReadPartsRangesPtr parts_ranges_ptr; const MutationsSnapshotPtr mutations_snapshot; const VirtualFields shared_virtual_fields; const StorageSnapshotPtr storage_snapshot; @@ -53,9 +78,12 @@ protected: const MarkCachePtr owned_mark_cache; const UncompressedCachePtr owned_uncompressed_cache; const Block header; + const bool merge_tree_determine_task_size_by_prewhere_columns; + const UInt64 merge_tree_min_bytes_per_task_for_remote_reading; + const UInt64 merge_tree_min_read_task_size; - void fillPerPartInfos(const Settings & settings); - std::vector getPerPartSumMarks() const; + void fillPerPartInfos(const RangesInDataParts & parts_ranges); + static std::vector getPerPartSumMarks(const RangesInDataParts & parts_ranges); MergeTreeReadTaskPtr createTask(MergeTreeReadTaskInfoPtr read_info, MergeTreeReadTask::Readers task_readers, MarkRanges ranges) const; @@ -66,6 +94,7 @@ protected: MergeTreeReadTask::Extras getExtras() const; + std::once_flag init_flag; std::vector per_part_infos; std::vector is_part_on_remote_disk; diff --git a/src/Storages/MergeTree/MergeTreeReadPoolInOrder.cpp b/src/Storages/MergeTree/MergeTreeReadPoolInOrder.cpp index c4244ecd982..284f4efe930 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolInOrder.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolInOrder.cpp @@ -37,13 +37,25 @@ MergeTreeReadPoolInOrder::MergeTreeReadPoolInOrder( , has_limit_below_one_block(has_limit_below_one_block_) , read_type(read_type_) { - per_part_mark_ranges.reserve(parts_ranges.size()); - for (const auto & part_with_ranges : parts_ranges) - per_part_mark_ranges.push_back(part_with_ranges.ranges); } MergeTreeReadTaskPtr MergeTreeReadPoolInOrder::getTask(size_t task_idx, MergeTreeReadTask * previous_task) { + auto init = [this]() + { + auto parts_ranges_and_lock = parts_ranges_ptr->get(); + + LOG_TRACE(getLogger("MergeTreeReadPoolInOrder"), "Init callback called\n{}", StackTrace().toString()); + const auto & parts_ranges = parts_ranges_and_lock.parts_ranges; + + fillPerPartInfos(parts_ranges); + + per_part_mark_ranges.reserve(parts_ranges.size()); + for (const auto & part_with_ranges : parts_ranges) + per_part_mark_ranges.push_back(part_with_ranges.ranges); + }; + std::call_once(init_flag, init); + if (task_idx >= per_part_infos.size()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Requested task with idx {}, but there are only {} parts", diff --git a/src/Storages/MergeTree/MergeTreeReadPoolInOrder.h b/src/Storages/MergeTree/MergeTreeReadPoolInOrder.h index 41f3ab1061c..b1671cc3a64 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolInOrder.h +++ b/src/Storages/MergeTree/MergeTreeReadPoolInOrder.h @@ -4,7 +4,7 @@ namespace DB { -class MergeTreeReadPoolInOrder : public MergeTreeReadPoolBase +class MergeTreeReadPoolInOrder final : public MergeTreeReadPoolBase { public: MergeTreeReadPoolInOrder( diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp index 8f06fc312c2..ffd82b78310 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp @@ -128,20 +128,31 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas( context_) , extension(std::move(extension_)) , coordination_mode(CoordinationMode::Default) - , min_marks_per_task(getMinMarksPerTask(pool_settings.min_marks_for_concurrent_read, per_part_infos)) - , mark_segment_size(chooseSegmentSize( - log, - context_->getSettingsRef()[Setting::parallel_replicas_mark_segment_size], - min_marks_per_task, - pool_settings.threads, - pool_settings.sum_marks, - extension.getTotalNodesCount())) + , parallel_replicas_mark_segment_size(context_->getSettingsRef()[Setting::parallel_replicas_mark_segment_size]) { - extension.sendInitialRequest(coordination_mode, parts_ranges, mark_segment_size); } MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_idx*/, MergeTreeReadTask * previous_task) { + auto init = [this]() + { + auto parts_ranges_and_lock = parts_ranges_ptr->get(); + const auto & parts_ranges = parts_ranges_and_lock.parts_ranges; + + fillPerPartInfos(parts_ranges); + min_marks_per_task = getMinMarksPerTask(pool_settings.min_marks_for_concurrent_read, per_part_infos); + mark_segment_size = chooseSegmentSize( + log, + parallel_replicas_mark_segment_size, + min_marks_per_task, + pool_settings.threads, + pool_settings.sum_marks, + extension.getTotalNodesCount()); + + extension.sendInitialRequest(coordination_mode, parts_ranges, mark_segment_size); + }; + std::call_once(init_flag, init); + std::lock_guard lock(mutex); if (no_more_tasks_available) diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h index 63816340eb1..d9cc18485a1 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h @@ -5,7 +5,7 @@ namespace DB { -class MergeTreeReadPoolParallelReplicas : public MergeTreeReadPoolBase +class MergeTreeReadPoolParallelReplicas final : public MergeTreeReadPoolBase { public: MergeTreeReadPoolParallelReplicas( @@ -35,6 +35,7 @@ private: LoggerPtr log = getLogger("MergeTreeReadPoolParallelReplicas"); const ParallelReadingExtension extension; const CoordinationMode coordination_mode; + const size_t parallel_replicas_mark_segment_size; size_t min_marks_per_task{0}; size_t mark_segment_size{0}; RangesInDataPartsDescription buffered_ranges; diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp index f13da426c45..e5b5435a3a4 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp @@ -44,24 +44,34 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd , mode(mode_) , min_marks_per_task(pool_settings.min_marks_for_concurrent_read) { - for (const auto & info : per_part_infos) - min_marks_per_task = std::max(min_marks_per_task, info->min_marks_per_task); - - if (min_marks_per_task == 0) - throw Exception( - ErrorCodes::BAD_ARGUMENTS, "Chosen number of marks to read is zero (likely because of weird interference of settings)"); - - for (const auto & part : parts_ranges) - request.push_back({part.data_part->info, MarkRanges{}}); - - for (const auto & part : parts_ranges) - buffered_tasks.push_back({part.data_part->info, MarkRanges{}}); - - extension.sendInitialRequest(mode, parts_ranges, /*mark_segment_size_=*/0); } MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t task_idx, MergeTreeReadTask * previous_task) { + auto init = [this]() + { + auto parts_ranges_and_lock = parts_ranges_ptr->get(); + const auto & parts_ranges = parts_ranges_and_lock.parts_ranges; + + fillPerPartInfos(parts_ranges); + + for (const auto & info : per_part_infos) + min_marks_per_task = std::max(min_marks_per_task, info->min_marks_per_task); + + if (min_marks_per_task == 0) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "Chosen number of marks to read is zero (likely because of weird interference of settings)"); + + for (const auto & part : parts_ranges) + request.push_back({part.data_part->info, MarkRanges{}}); + + for (const auto & part : parts_ranges) + buffered_tasks.push_back({part.data_part->info, MarkRanges{}}); + + extension.sendInitialRequest(mode, parts_ranges, /*mark_segment_size_=*/0); + }; + std::call_once(init_flag, init); + std::lock_guard lock(mutex); if (task_idx >= per_part_infos.size()) diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h index a05dc54b529..6d6e08243e7 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h @@ -5,7 +5,7 @@ namespace DB { -class MergeTreeReadPoolParallelReplicasInOrder : public MergeTreeReadPoolBase +class MergeTreeReadPoolParallelReplicasInOrder final : public MergeTreeReadPoolBase { public: MergeTreeReadPoolParallelReplicasInOrder( diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index 81497032b08..00bb160216f 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -401,6 +401,7 @@ public: if (!key_condition.alwaysFalse()) mark_ranges = MergeTreeDataSelectExecutor::markRangesFromPKRange( data_part, + 0, data_part->index_granularity->getMarksCount(), metadata_snapshot, key_condition, /*part_offset_condition=*/{}, diff --git a/src/Storages/MergeTree/RPNBuilder.cpp b/src/Storages/MergeTree/RPNBuilder.cpp index 8a74d189de7..eec24257871 100644 --- a/src/Storages/MergeTree/RPNBuilder.cpp +++ b/src/Storages/MergeTree/RPNBuilder.cpp @@ -297,7 +297,10 @@ FutureSetPtr tryGetSetFromDAGNode(const ActionsDAG::Node * dag_node) column = &column_const->getDataColumn(); if (const auto * column_set = typeid_cast(column)) + { + // std::cerr << ".... tryGetSetFromDAGNode " << reinterpret_cast(column_set) << std::endl; return column_set->getData(); + } return {}; } diff --git a/tests/queries/0_stateless/02242_system_filesystem_cache_log_table.sh b/tests/queries/0_stateless/02242_system_filesystem_cache_log_table.sh index fe016f5a27f..64b1922b640 100755 --- a/tests/queries/0_stateless/02242_system_filesystem_cache_log_table.sh +++ b/tests/queries/0_stateless/02242_system_filesystem_cache_log_table.sh @@ -25,5 +25,4 @@ for STORAGE_POLICY in 's3_cache' 'local_cache' 'azure_cache'; do $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=0 --enable_filesystem_cache_log=1 --query "SELECT 2243, '$STORAGE_POLICY', * FROM test_2242 FORMAT Null" $CLICKHOUSE_CLIENT --query "SYSTEM FLUSH LOGS" $CLICKHOUSE_CLIENT --query "SELECT file_segment_range, read_type FROM system.filesystem_cache_log WHERE query_id = (SELECT query_id from system.query_log where query LIKE '%SELECT 2243%$STORAGE_POLICY%' AND current_database = currentDatabase() AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1) ORDER BY file_segment_range, read_type" - done diff --git a/tests/queries/0_stateless/02835_join_step_explain.reference b/tests/queries/0_stateless/02835_join_step_explain.reference index 06f4a9cfc99..f2157c04591 100644 --- a/tests/queries/0_stateless/02835_join_step_explain.reference +++ b/tests/queries/0_stateless/02835_join_step_explain.reference @@ -21,6 +21,12 @@ Positions: 4 0 2 1 Strictness: ALL Algorithm: HashJoin Clauses: [(__table1.id) = (__table2.id)] + Dynamic Filter + Actions: INPUT : 0 -> id UInt64 : 0 + COLUMN Const(Set) -> Set : 1 + ALIAS id :: 0 -> __table1.id UInt64 : 2 + FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0 + Positions: 0 Expression ((JOIN actions + Change column names to column identifiers)) Header: __table1.id UInt64 __table1.value_1 String @@ -78,6 +84,15 @@ Positions: 6 0 3 1 Algorithm: HashJoin ASOF inequality: LESS Clauses: [(__table1.id, __table1.value_2) = (__table2.id, __table2.value_2)] + Dynamic Filter + Actions: INPUT : 0 -> id UInt64 : 0 + INPUT : 1 -> value_2 UInt64 : 1 + COLUMN Const(Set) -> Set : 2 + ALIAS id :: 0 -> __table1.id UInt64 : 3 + ALIAS value_2 :: 1 -> __table1.value_2 UInt64 : 0 + FUNCTION tuple(__table1.id :: 3, __table1.value_2 :: 0) -> tuple(__table1.id, __table1.value_2) Tuple(UInt64, UInt64) : 1 + FUNCTION in(tuple(__table1.id, __table1.value_2) :: 1, :: 2) -> in(tuple(__table1.id, __table1.value_2), ) UInt8 : 0 + Positions: 0 Expression ((JOIN actions + Change column names to column identifiers)) Header: __table1.id UInt64 __table1.value_1 String diff --git a/tests/queries/0_stateless/03036_join_filter_push_down_equivalent_sets.reference b/tests/queries/0_stateless/03036_join_filter_push_down_equivalent_sets.reference index d0a3e7b02ae..d4b6f394f7c 100644 --- a/tests/queries/0_stateless/03036_join_filter_push_down_equivalent_sets.reference +++ b/tests/queries/0_stateless/03036_join_filter_push_down_equivalent_sets.reference @@ -26,6 +26,12 @@ Positions: 4 2 0 1 Strictness: ALL Algorithm: HashJoin Clauses: [(__table1.id) = (__table2.id)] + Dynamic Filter + Actions: INPUT : 0 -> id UInt64 : 0 + COLUMN Const(Set) -> Set : 1 + ALIAS id :: 0 -> __table1.id UInt64 : 2 + FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0 + Positions: 0 Filter (( + (JOIN actions + Change column names to column identifiers))) Header: __table1.id UInt64 __table1.value String @@ -93,6 +99,12 @@ Positions: 4 2 0 1 Strictness: ALL Algorithm: HashJoin Clauses: [(__table1.id) = (__table2.id)] + Dynamic Filter + Actions: INPUT : 0 -> id UInt64 : 0 + COLUMN Const(Set) -> Set : 1 + ALIAS id :: 0 -> __table1.id UInt64 : 2 + FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0 + Positions: 0 Filter (( + (JOIN actions + Change column names to column identifiers))) Header: __table1.id UInt64 __table1.value String @@ -160,6 +172,12 @@ Positions: 4 2 0 1 Strictness: ALL Algorithm: HashJoin Clauses: [(__table1.id) = (__table2.id)] + Dynamic Filter + Actions: INPUT : 0 -> id UInt64 : 0 + COLUMN Const(Set) -> Set : 1 + ALIAS id :: 0 -> __table1.id UInt64 : 2 + FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0 + Positions: 0 Filter (( + (JOIN actions + Change column names to column identifiers))) Header: __table1.id UInt64 __table1.value String @@ -392,6 +410,12 @@ Positions: 4 2 0 1 Strictness: ALL Algorithm: HashJoin Clauses: [(__table1.id) = (__table2.id)] + Dynamic Filter + Actions: INPUT : 0 -> id UInt64 : 0 + COLUMN Const(Set) -> Set : 1 + ALIAS id :: 0 -> __table1.id UInt64 : 2 + FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0 + Positions: 0 Expression ((JOIN actions + Change column names to column identifiers)) Header: __table1.id UInt64 __table1.value String @@ -453,6 +477,12 @@ Positions: 4 2 0 1 Strictness: ALL Algorithm: HashJoin Clauses: [(__table1.id) = (__table2.id)] + Dynamic Filter + Actions: INPUT : 0 -> id UInt64 : 0 + COLUMN Const(Set) -> Set : 1 + ALIAS id :: 0 -> __table1.id UInt64 : 2 + FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0 + Positions: 0 Filter (( + (JOIN actions + Change column names to column identifiers))) Header: __table1.id UInt64 __table1.value String diff --git a/tests/queries/0_stateless/03130_convert_outer_join_to_inner_join.reference b/tests/queries/0_stateless/03130_convert_outer_join_to_inner_join.reference index d35bdeff98b..b3233481876 100644 --- a/tests/queries/0_stateless/03130_convert_outer_join_to_inner_join.reference +++ b/tests/queries/0_stateless/03130_convert_outer_join_to_inner_join.reference @@ -21,6 +21,12 @@ Positions: 4 0 2 1 Strictness: ALL Algorithm: HashJoin Clauses: [(__table1.id) = (__table2.id)] + Dynamic Filter + Actions: INPUT : 0 -> id UInt64 : 0 + COLUMN Const(Set) -> Set : 1 + ALIAS id :: 0 -> __table1.id UInt64 : 2 + FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0 + Positions: 0 Expression Header: __table1.id UInt64 __table1.value String @@ -91,6 +97,12 @@ Positions: 4 0 2 1 Strictness: ALL Algorithm: HashJoin Clauses: [(__table1.id) = (__table2.id)] + Dynamic Filter + Actions: INPUT : 0 -> id UInt64 : 0 + COLUMN Const(Set) -> Set : 1 + ALIAS id :: 0 -> __table1.id UInt64 : 2 + FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0 + Positions: 0 Expression Header: __table1.id UInt64 __table1.value String @@ -161,6 +173,12 @@ Positions: 4 0 2 1 Strictness: ALL Algorithm: HashJoin Clauses: [(__table1.id) = (__table2.id)] + Dynamic Filter + Actions: INPUT : 0 -> id UInt64 : 0 + COLUMN Const(Set) -> Set : 1 + ALIAS id :: 0 -> __table1.id UInt64 : 2 + FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0 + Positions: 0 Expression Header: __table1.id UInt64 __table1.value String diff --git a/tests/queries/0_stateless/03254_filter_by_join_set.reference b/tests/queries/0_stateless/03254_filter_by_join_set.reference new file mode 100644 index 00000000000..4c282ffffa3 --- /dev/null +++ b/tests/queries/0_stateless/03254_filter_by_join_set.reference @@ -0,0 +1,71 @@ +5000000 5000000 5000000 0_r +5000100 5000100 5000100 1_r +5000200 5000200 5000200 2_r +5000300 5000300 5000300 3_r +5000400 5000400 5000400 4_r +5000500 5000500 5000500 5_r +5000600 5000600 5000600 6_r +5000700 5000700 5000700 7_r +5000800 5000800 5000800 8_r +5000900 5000900 5000900 9_r +5001000 5001000 5001000 10_r +5001100 5001100 5001100 11_r +5001200 5001200 5001200 12_r +5001300 5001300 5001300 13_r +5001400 5001400 5001400 14_r +5001500 5001500 5001500 15_r +5001600 5001600 5001600 16_r +5001700 5001700 5001700 17_r +5001800 5001800 5001800 18_r +5001900 5001900 5001900 19_r +Join (JOIN FillRightFirst) +Algorithm: HashJoin +Dynamic Filter +FUNCTION in(__table1.number :: 2, :: 1) -> in(__table1.number, ) UInt8 : 0 +0 +4000000 4000000 5000000 4000000 0_r +4000010 4000010 5000100 4000010 1_r +4000020 4000020 5000200 4000020 2_r +4000030 4000030 5000300 4000030 3_r +4000040 4000040 5000400 4000040 4_r +4000050 4000050 5000500 4000050 5_r +4000060 4000060 5000600 4000060 6_r +4000070 4000070 5000700 4000070 7_r +4000080 4000080 5000800 4000080 8_r +4000090 4000090 5000900 4000090 9_r +4000100 4000100 5001000 4000100 10_r +4000110 4000110 5001100 4000110 11_r +4000120 4000120 5001200 4000120 12_r +4000130 4000130 5001300 4000130 13_r +4000140 4000140 5001400 4000140 14_r +4000150 4000150 5001500 4000150 15_r +4000160 4000160 5001600 4000160 16_r +4000170 4000170 5001700 4000170 17_r +4000180 4000180 5001800 4000180 18_r +4000190 4000190 5001900 4000190 19_r +5000000 5000000 5000000 4000000 0_r +5000100 5000100 5000100 4000010 1_r +5000200 5000200 5000200 4000020 2_r +5000300 5000300 5000300 4000030 3_r +5000400 5000400 5000400 4000040 4_r +5000500 5000500 5000500 4000050 5_r +5000600 5000600 5000600 4000060 6_r +5000700 5000700 5000700 4000070 7_r +5000800 5000800 5000800 4000080 8_r +5000900 5000900 5000900 4000090 9_r +5001000 5001000 5001000 4000100 10_r +5001100 5001100 5001100 4000110 11_r +5001200 5001200 5001200 4000120 12_r +5001300 5001300 5001300 4000130 13_r +5001400 5001400 5001400 4000140 14_r +5001500 5001500 5001500 4000150 15_r +5001600 5001600 5001600 4000160 16_r +5001700 5001700 5001700 4000170 17_r +5001800 5001800 5001800 4000180 18_r +5001900 5001900 5001900 4000190 19_r +Join (JOIN FillRightFirst) +Algorithm: HashJoin +Dynamic Filter +FUNCTION in(__table1.number : 3, :: 1) -> in(__table1.number, ) UInt8 : 0 +FUNCTION in(__table1.number :: 3, :: 2) -> in(__table1.number, ) UInt8 : 1 +0 diff --git a/tests/queries/0_stateless/03254_filter_by_join_set.sql b/tests/queries/0_stateless/03254_filter_by_join_set.sql new file mode 100644 index 00000000000..eb21ed88105 --- /dev/null +++ b/tests/queries/0_stateless/03254_filter_by_join_set.sql @@ -0,0 +1,27 @@ +-- Tags: long + +create table numbers_10m (number UInt64, s String) engine = MergeTree order by number settings index_granularity=8192, index_granularity_bytes=10000000; +insert into numbers_10m select number, toString(number) from numbers_mt(10e6) settings max_insert_threads=8; + +-- explain actions = 1 select number, s, k, v from numbers_10m inner join (select number * 100 + 5000000 as k, toString(number) || '_r' as v from numbers(20)) as r on number = r.k order by number; + + +set merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability=0; + + + +-- first_query +select number, s, k, v from numbers_10m inner join (select number * 100 + 5000000 as k, toString(number) || '_r' as v from numbers(20)) as r on number = r.k order by number; + +select trimLeft(explain) from (EXPLAIN actions = 1 select number, s, k, v from numbers_10m inner join (select number * 100 + 5000000 as k, toString(number) || '_r' as v from numbers(20)) as r on number = r.k order by number settings enable_analyzer=1) where explain like '%Join%' or explain like '%Dynamic Filter%' or explain like '%FUNCTION in%'; + +system flush logs; + +select if(read_rows < 8192 * 3, 0, read_rows) from system.query_log where event_date >= today() - 1 and current_database = currentDatabase() and query like '-- first_query%' and type = 'QueryFinish'; +--select * from system.query_log where event_date >= today() - 1 and current_database = currentDatabase() and query like '-- first_query%' and type = 'QueryFinish' format Vertical; + +-- second_query +select number, s, k, k2, v from numbers_10m inner join (select number * 100 + 5000000 as k, number * 10 + 4000000 as k2, toString(number) || '_r' as v from numbers(20)) as r on number = r.k or number = r.k2 order by number; +select trimLeft(explain) from (EXPLAIN actions = 1 select number, s, k, k2, v from numbers_10m inner join (select number * 100 + 5000000 as k, number * 10 + 4000000 as k2, toString(number) || '_r' as v from numbers(20)) as r on number = r.k or number = r.k2 order by number) where explain like '%Join%' or explain like '%Dynamic Filter%' or explain like '%FUNCTION in%'; + +select if(read_rows < 8192 * 6, 0, read_rows) from system.query_log where event_date >= today() - 1 and current_database = currentDatabase() and query like '-- first_query%' and type = 'QueryFinish';