This commit is contained in:
Nikolai Kochetov 2024-11-20 15:11:56 -08:00 committed by GitHub
commit ff5515cce5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
39 changed files with 928 additions and 86 deletions

View File

@ -30,6 +30,7 @@ public:
MutableColumnPtr cloneDummy(size_t s_) const override { return ColumnSet::create(s_, data); }
FutureSetPtr getData() const { return data; }
void setData(FutureSetPtr data_) { data = std::move(data_); }
// Used only for debugging, making it DUMPABLE
Field operator[](size_t) const override { return {}; }

View File

@ -124,6 +124,22 @@ static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nulla
}
}
// static void filterBlock(Block & block, const IColumn::Filter & filter)
// {
// for (auto & elem : block)
// {
// if (elem.column->size() != filter.size())
// throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of column {} doesn't match size of filter {}",
// elem.column->size(), filter.size());
// if (elem.column->empty())
// {
// block.clear();
// return;
// }
// }
// }
HashJoin::HashJoin(
std::shared_ptr<TableJoin> table_join_,
const Block & right_sample_block_,
@ -561,6 +577,16 @@ bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits)
if (shrink_blocks)
block_to_save = block_to_save.shrinkToFit();
ScatteredBlock right_key_columns_for_filter;
if (save_right_key_columns_for_filter)
{
right_key_columns_for_filter = filterColumnsPresentInSampleBlock(source_block, right_table_keys);
if (shrink_blocks)
right_key_columns_for_filter.shrinkToFit();
data->right_key_columns_for_filter.resize(table_join->getClauses().size());
}
size_t max_bytes_in_join = table_join->sizeLimits().max_bytes;
size_t max_rows_in_join = table_join->sizeLimits().max_rows;
@ -599,7 +625,7 @@ bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits)
}
doDebugAsserts();
data->blocks_allocated_size += block_to_save.allocatedBytes();
data->blocks_allocated_size += block_to_save.allocatedBytes() + right_key_columns_for_filter.allocatedBytes();
data->blocks.emplace_back(std::move(block_to_save));
const auto * stored_block = &data->blocks.back();
doDebugAsserts();
@ -628,6 +654,22 @@ bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits)
save_nullmap |= (*null_map)[i];
}
if (save_right_key_columns_for_filter)
{
if (null_map)
right_key_columns_for_filter.filter(*null_map);
right_key_columns_for_filter.filterBySelector();
const auto & required_names = right_keys_for_fiter_per_clause[onexpr_idx];
Block right_keys_for_clause;
for (const auto & name : required_names)
right_keys_for_clause.insert(right_key_columns_for_filter.getByName(name));
data->right_key_columns_for_filter[onexpr_idx].emplace_back(right_keys_for_clause);
}
auto join_mask_col = JoinCommon::getColumnAsMask(source_block.getSourceBlock(), onexprs[onexpr_idx].condColumnNames().second);
/// Save blocks that do not hold conditions in ON section
ColumnUInt8::MutablePtr not_joined_map = nullptr;
@ -1619,4 +1661,15 @@ void HashJoin::tryRerangeRightTableData()
data->sorted = true;
}
void HashJoin::saveRightKeyColumnsForFilter(std::vector<Names> keys_per_clause)
{
if (keys_per_clause.size() != table_join->getClauses().size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid number of clauses. Expected {}, got {}",
table_join->getClauses().size(), keys_per_clause.size());
save_right_key_columns_for_filter = true;
right_keys_for_fiter_per_clause = std::move(keys_per_clause);
}
}

View File

@ -355,6 +355,7 @@ public:
Block sample_block; /// Block as it would appear in the BlockList
ScatteredBlocksList blocks; /// Blocks of "right" table.
BlockNullmapList blocks_nullmaps; /// Nullmaps for blocks of "right" table (if needed)
std::vector<BlocksList> right_key_columns_for_filter;
/// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows.
Arena pool;
@ -408,6 +409,8 @@ public:
void materializeColumnsFromLeftBlock(Block & block) const;
Block materializeColumnsFromRightBlock(Block block) const;
void saveRightKeyColumnsForFilter(std::vector<Names> keys_per_clause);
private:
friend class NotJoinedHash;
@ -438,6 +441,8 @@ private:
mutable std::unique_ptr<JoinStuff::JoinUsedFlags> used_flags;
RightTableDataPtr data;
bool have_compressed = false;
bool save_right_key_columns_for_filter = false;
std::vector<Names> right_keys_for_fiter_per_clause;
std::vector<Sizes> key_sizes;

View File

@ -86,6 +86,23 @@ FutureSetFromTuple::FutureSetFromTuple(Block block, const Settings & settings)
set->finishInsert();
}
FutureSetFromTuple::FutureSetFromTuple(Block block)
{
SizeLimits no_limits;
set = std::make_shared<Set>(no_limits, 0, 0);
set->setHeader(block.cloneEmpty().getColumnsWithTypeAndName());
Columns columns;
columns.reserve(block.columns());
for (const auto & column : block)
columns.emplace_back(column.column);
set_key_columns.filter = ColumnUInt8::create(block.rows());
set->insertFromColumns(columns, set_key_columns);
set->finishInsert();
}
DataTypes FutureSetFromTuple::getTypes() const { return set->getElementsTypes(); }
SetPtr FutureSetFromTuple::buildOrderedSetInplace(const ContextPtr & context)

View File

@ -77,6 +77,7 @@ class FutureSetFromTuple final : public FutureSet
{
public:
FutureSetFromTuple(Block block, const Settings & settings);
explicit FutureSetFromTuple(Block block);
SetPtr get() const override { return set; }
SetPtr buildOrderedSetInplace(const ContextPtr & context) override;

View File

@ -5,8 +5,12 @@
#include <Processors/Transforms/JoiningTransform.h>
#include <Processors/Transforms/SquashingTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <IO/Operators.h>
#include <Common/JSONBuilder.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnSet.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
namespace DB
{
@ -37,8 +41,120 @@ std::vector<std::pair<String, String>> describeJoinActions(const JoinPtr & join)
return description;
}
ColumnsWithTypeAndName squashBlocks(const Names & keys, const BlocksList & blocks)
{
ColumnsWithTypeAndName squashed;
std::vector<size_t> positions;
// std::cerr << "===== " << blocks.front().dumpStructure() << std::endl;
for (const auto & name : keys)
{
// std::cerr << ".... " << name << std::endl;
positions.push_back(blocks.front().getPositionByName(name));
}
bool first = true;
for (const auto & block : blocks)
{
if (first)
{
first = false;
for (size_t pos : positions)
squashed.push_back(blocks.front().getByPosition(pos));
continue;
}
for (size_t i = 0; i < positions.size(); ++i)
{
auto & sq_col = squashed[i];
auto col_mutable = IColumn::mutate(std::move(sq_col.column));
const auto & rhs_col = block.getByPosition(positions[i]);
size_t rows = rhs_col.column->size();
col_mutable->insertRangeFrom(*rhs_col.column, 0, rows);
sq_col.column = std::move(col_mutable);
}
}
return squashed;
}
}
void DynamicJoinFilters::filterDynamicPartsByFilledJoin(const IJoin & join)
{
if (!parts)
return;
const auto * hash_join = typeid_cast<const HashJoin *>(&join);
if (!hash_join)
return;
const auto & blocks = hash_join->getJoinedData()->right_key_columns_for_filter;
if (blocks.empty())
return;
const auto & settings = context->getSettingsRef();
for (size_t i = 0; i < clauses.size(); ++i)
{
const auto & clause = clauses[i];
auto squashed = squashBlocks(clause.keys, blocks[i]);
// std::cerr << "Right join data rows " << squashed.front().column->size() << std::endl;
auto set = std::make_shared<FutureSetFromTuple>(squashed, settings);
clause.set->setData(std::move(set));
}
const auto & primary_key = metadata->getPrimaryKey();
const Names & primary_key_column_names = primary_key.column_names;
KeyCondition key_condition(&actions, context, primary_key_column_names, primary_key.expression);
// std::cerr << "======== " << key_condition.toString() << std::endl;
auto log = getLogger("DynamicJoinFilter");
auto parts_with_lock = parts->parts_ranges_ptr->get();
size_t prev_marks = 0;
size_t post_marks = 0;
for (auto & part_range : parts_with_lock.parts_ranges)
{
MarkRanges filtered_ranges;
for (auto & range : part_range.ranges)
{
prev_marks += range.getNumberOfMarks();
// std::cerr << "Range " << range.begin << ' ' << range.end << " has final mark " << part_range.data_part->index_granularity->hasFinalMark() << std::endl;
auto new_ranges = MergeTreeDataSelectExecutor::markRangesFromPKRange(
part_range.data_part,
range.begin,
range.end,
metadata,
key_condition,
{}, nullptr, settings, log);
for (auto & new_range : new_ranges)
{
// std::cerr << "New Range " << new_range.begin << ' ' << new_range.end << std::endl;
if (new_range.getNumberOfMarks())
{
post_marks += new_range.getNumberOfMarks();
filtered_ranges.push_back(new_range);
}
}
}
part_range.ranges = std::move(filtered_ranges);
}
LOG_TRACE(log, "Reading {}/{} marks after filtration.", post_marks, prev_marks);
}
JoinStep::JoinStep(
const Header & left_header_,
const Header & right_header_,
@ -56,6 +172,11 @@ JoinStep::JoinStep(
updateInputHeaders({left_header_, right_header_});
}
void JoinStep::setDynamicFilter(DynamicJoinFiltersPtr dynamic_filter_)
{
dynamic_filter = std::move(dynamic_filter_);
}
QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &)
{
if (pipelines.size() != 2)
@ -69,10 +190,18 @@ QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines
return joined_pipeline;
}
auto finish_callback = [filter = this->dynamic_filter, algo = this->join]()
{
LOG_TRACE(getLogger("JoinStep"), "Finish callback called");
if (filter)
filter->filterDynamicPartsByFilledJoin(*algo);
};
auto pipeline = QueryPipelineBuilder::joinPipelinesRightLeft(
std::move(pipelines[0]),
std::move(pipelines[1]),
join,
std::move(finish_callback),
*output_header,
max_block_size,
min_block_size_bytes,
@ -105,6 +234,13 @@ void JoinStep::describeActions(FormatSettings & settings) const
for (const auto & [name, value] : describeJoinActions(join))
settings.out << prefix << name << ": " << value << '\n';
if (dynamic_filter)
{
settings.out << prefix << "Dynamic Filter\n";
auto expression = std::make_shared<ExpressionActions>(dynamic_filter->actions.clone());
expression->describeActions(settings.out, prefix);
}
}
void JoinStep::describeActions(JSONBuilder::JSONMap & map) const

View File

@ -2,6 +2,8 @@
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/Joins.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
namespace DB
{
@ -9,6 +11,31 @@ namespace DB
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
struct DynamiclyFilteredPartsRanges;
using DynamiclyFilteredPartsRangesPtr = std::shared_ptr<DynamiclyFilteredPartsRanges>;
class ColumnSet;
/// This structure is used to filter left table by the right one after HashJoin is filled.
struct DynamicJoinFilters
{
struct Clause
{
ColumnSet * set;
Names keys;
};
std::vector<Clause> clauses;
DynamiclyFilteredPartsRangesPtr parts;
ActionsDAG actions;
ContextPtr context;
StorageMetadataPtr metadata;
void filterDynamicPartsByFilledJoin(const IJoin & join);
};
using DynamicJoinFiltersPtr = std::shared_ptr<DynamicJoinFilters>;
/// Join two data streams.
class JoinStep : public IQueryPlanStep
{
@ -35,6 +62,8 @@ public:
void setJoin(JoinPtr join_) { join = std::move(join_); }
bool allowPushDownToRight() const;
void setDynamicFilter(DynamicJoinFiltersPtr dynamic_filter_);
private:
void updateOutputHeader() override;
@ -43,6 +72,8 @@ private:
size_t min_block_size_bytes;
size_t max_streams;
bool keep_left_read_in_order;
DynamicJoinFiltersPtr dynamic_filter;
};
/// Special step for the case when Join is already filled.

View File

@ -114,6 +114,7 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes & nodes);
void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &);
void optimizeDistinctInOrder(QueryPlan::Node & node, QueryPlan::Nodes &);
void optimizeFilterByJoinSet(QueryPlan::Node & node);
/// A separate tree traverse to apply sorting properties after *InOrder optimizations.
void applyOrder(const QueryPlanOptimizationSettings & optimization_settings, QueryPlan::Node & root);

View File

@ -0,0 +1,268 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/Optimizations/actionsDAGUtils.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Core/Settings.h>
#include <Columns/ColumnSet.h>
#include <Columns/ColumnConst.h>
#include <DataTypes/DataTypeSet.h>
#include <Functions/FunctionFactory.h>
#include <Functions/tuple.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Functions/FunctionsLogical.h>
namespace DB
{
// namespace Setting
// {
// extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas;
// }
namespace QueryPlanOptimizations
{
ReadFromMergeTree * findReadingStep(QueryPlan::Node & node)
{
IQueryPlanStep * step = node.step.get();
if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
return reading;
if (node.children.size() != 1)
return nullptr;
if (typeid_cast<ExpressionStep *>(step) || typeid_cast<FilterStep *>(step) || typeid_cast<ArrayJoinStep *>(step))
return findReadingStep(*node.children.front());
if (auto * distinct = typeid_cast<DistinctStep *>(step); distinct && distinct->isPreliminary())
return findReadingStep(*node.children.front());
return nullptr;
}
void appendExpression(std::optional<ActionsDAG> & dag, const ActionsDAG & expression)
{
if (dag)
dag->mergeInplace(expression.clone());
else
dag = expression.clone();
}
/// This function builds a common DAG which is a merge of DAGs from Filter and Expression steps chain.
void buildSortingDAG(QueryPlan::Node & node, std::optional<ActionsDAG> & dag)
{
IQueryPlanStep * step = node.step.get();
if (auto * reading = typeid_cast<ReadFromMergeTree *>(step))
{
if (const auto prewhere_info = reading->getPrewhereInfo())
{
//std::cerr << "====== Adding prewhere " << std::endl;
appendExpression(dag, prewhere_info->prewhere_actions);
}
return;
}
if (node.children.size() != 1)
return;
buildSortingDAG(*node.children.front(), dag);
if (typeid_cast<DistinctStep *>(step))
{
}
if (auto * expression = typeid_cast<ExpressionStep *>(step))
{
const auto & actions = expression->getExpression();
appendExpression(dag, actions);
}
if (auto * filter = typeid_cast<FilterStep *>(step))
{
appendExpression(dag, filter->getExpression());
}
if (auto * array_join = typeid_cast<ArrayJoinStep *>(step))
{
const auto & array_joined_columns = array_join->getColumns();
if (dag)
{
std::unordered_set<std::string_view> keys_set(array_joined_columns.begin(), array_joined_columns.end());
/// Remove array joined columns from outputs.
/// Types are changed after ARRAY JOIN, and we can't use this columns anyway.
ActionsDAG::NodeRawConstPtrs outputs;
outputs.reserve(dag->getOutputs().size());
for (const auto & output : dag->getOutputs())
{
if (!keys_set.contains(output->result_name))
outputs.push_back(output);
}
dag->getOutputs() = std::move(outputs);
}
}
}
void optimizeFilterByJoinSet(QueryPlan::Node & node)
{
auto * join_step = typeid_cast<JoinStep *>(node.step.get());
if (!join_step)
return;
// std::cerr << "optimizeFilterByJoinSet\n";
const auto & join = join_step->getJoin();
auto * hash_join = typeid_cast<HashJoin *>(join.get());
if (!hash_join)
return;
// std::cerr << "optimizeFilterByJoinSet got hash join\n";
const auto & table_join = join->getTableJoin();
if (table_join.kind() != JoinKind::Inner && table_join.kind() != JoinKind::Right)
return;
const auto & clauses = table_join.getClauses();
if (clauses.empty())
return;
// std::cerr << "optimizeFilterByJoinSetone class\n";
auto * reading = findReadingStep(*node.children.front());
if (!reading)
return;
if (reading->splitsRangesIntoIntersectionAndNonIntersecting() || reading->isQueryWithFinal())
return;
// if (reading->getContext()->getSettingsRef()[Setting::allow_experimental_parallel_reading_from_replicas])
// return;
// std::cerr << "optimizeFilterByJoinSetone reading\n";
const auto & pk = reading->getStorageMetadata()->getPrimaryKey();
if (pk.column_names.empty())
return;
// std::cerr << "optimizeFilterByJoinSetone pk\n";
std::optional<ActionsDAG> dag;
buildSortingDAG(*node.children.front(), dag);
if (!dag)
dag = ActionsDAG(reading->getOutputHeader().getColumnsWithTypeAndName());
// std::cerr << "optimizeFilterByJoinSetone sorting dag " << dag->dumpDAG() << std::endl;
std::unordered_map<std::string_view, const ActionsDAG::Node *> outputs;
for (const auto & output : dag->getOutputs())
outputs.emplace(output->result_name, output);
const Block & right_source_columns = node.children.back()->step->getOutputHeader();
ActionsDAG::NodeRawConstPtrs predicates;
DynamicJoinFilters join_filters;
std::vector<Names> right_keys_per_clause;
FunctionOverloadResolverPtr func_tuple_builder = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionTuple>());
for (const auto & clause : clauses)
{
// Names left_keys;
Names right_keys;
std::vector<const ActionsDAG::Node *> left_columns;
std::vector<ColumnWithTypeAndName> right_columns;
size_t keys_size = clause.key_names_left.size();
for (size_t i = 0; i < keys_size; ++i)
{
const auto & left_name = clause.key_names_left[i];
const auto & right_name = clause.key_names_right[i];
// std::cerr << left_name << ' ' << right_name << std::endl;
auto it = outputs.find(left_name);
if (it != outputs.end())
{
// left_keys.push_back(left_name);
right_keys.push_back(right_name);
left_columns.push_back(it->second);
right_columns.push_back(right_source_columns.getByName(right_name));
}
}
if (left_columns.empty())
return;
// std::cerr << "optimizeFilterByJoinSetone some coluns\n";
const ActionsDAG::Node * in_lhs_arg = left_columns.front();
if (left_columns.size() > 1)
in_lhs_arg = &dag->addFunction(func_tuple_builder, std::move(left_columns), {});
auto context = reading->getContext();
auto test_set = std::make_shared<FutureSetFromTuple>(Block(right_columns), context->getSettingsRef());
auto column_set = ColumnSet::create(1, std::move(test_set));
ColumnSet * column_set_ptr = column_set.get();
ColumnPtr set_col = ColumnConst::create(std::move(column_set), 0);
const ActionsDAG::Node * in_rhs_arg = &dag->addColumn({set_col, std::make_shared<DataTypeSet>(), {}});
auto func_in = FunctionFactory::instance().get("in", context);
const ActionsDAG::Node * predicate = &dag->addFunction(func_in, {in_lhs_arg, in_rhs_arg}, {});
join_filters.clauses.emplace_back(column_set_ptr, right_keys);
right_keys_per_clause.emplace_back(std::move(right_keys));
predicates.emplace_back(predicate);
}
if (predicates.size() > 1)
{
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionOr>());
predicates = {&dag->addFunction(func_builder_and, std::move(predicates), {})};
}
dag->getOutputs() = std::move(predicates);
dag->removeUnusedActions();
// std::cerr << "optimizeFilterByJoinSetone dag " << dag->dumpDAG() << std::endl;
auto metadata_snapshot = reading->getStorageMetadata();
const auto & primary_key = metadata_snapshot->getPrimaryKey();
const Names & primary_key_column_names = primary_key.column_names;
auto context = reading->getContext();
KeyCondition key_condition(&*dag, context, primary_key_column_names, primary_key.expression);
// std::cerr << "optimizeFilterByJoinSetone matched cond " << key_condition.toString() << std::endl;
/// Condition is (join keys) IN (empty set).
if (key_condition.alwaysUnknownOrTrue())
return;
// std::cerr << "optimizeFilterByJoinSetone matched cond " << std::endl;
auto dynamic_parts = reading->useDynamiclyFilteredParts();
join_filters.actions = std::move(*dag);
join_filters.parts = dynamic_parts;
join_filters.context = context;
join_filters.metadata = metadata_snapshot;
join_step->setDynamicFilter(std::make_shared<DynamicJoinFilters>(std::move(join_filters)));
hash_join->saveRightKeyColumnsForFilter(std::move(right_keys_per_clause));
}
}
}

View File

@ -132,6 +132,8 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
if (optimization_settings.distinct_in_order)
optimizeDistinctInOrder(*frame.node, nodes);
optimizeFilterByJoinSet(*frame.node);
}
/// Traverse all children first.

View File

@ -386,6 +386,16 @@ ReadFromMergeTree::ReadFromMergeTree(
setStepDescription(data.getStorageID().getFullNameNotQuoted());
enable_vertical_final = query_info.isFinal() && context->getSettingsRef()[Setting::enable_vertical_final]
&& data.merging_params.mode == MergeTreeData::MergingParams::Replacing;
double read_split_ranges_into_intersecting_and_non_intersecting_injection_probability
= settings[Setting::merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability];
if (read_split_ranges_into_intersecting_and_non_intersecting_injection_probability > 0.0)
{
std::bernoulli_distribution fault(read_split_ranges_into_intersecting_and_non_intersecting_injection_probability);
if (fault(thread_local_rng))
read_split_ranges_into_intersecting_and_non_intersecting_injection = true;
}
}
std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplicasReadingStep(
@ -454,6 +464,9 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_wit
block_size,
context);
if (dynamically_filtered_parts)
dynamically_filtered_parts->parts_ranges_ptr = pool->getPartsWithRanges();
Pipes pipes;
for (size_t i = 0; i < pool_settings.threads; ++i)
@ -502,7 +515,7 @@ Pipe ReadFromMergeTree::readFromPool(
all_parts_are_remote &= is_remote;
}
MergeTreeReadPoolPtr pool;
std::shared_ptr<MergeTreeReadPoolBase> pool;
bool allow_prefetched_remote = all_parts_are_remote && settings[Setting::allow_prefetched_read_pool_for_remote_filesystem]
&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.remote_fs_method);
@ -547,6 +560,9 @@ Pipe ReadFromMergeTree::readFromPool(
context);
}
if (dynamically_filtered_parts)
dynamically_filtered_parts->parts_ranges_ptr = pool->getPartsWithRanges();
LOG_DEBUG(log, "Reading approx. {} rows with {} streams", total_rows, pool_settings.threads);
Pipes pipes;
@ -581,7 +597,7 @@ Pipe ReadFromMergeTree::readInOrder(
/// For reading in order it makes sense to read only
/// one range per task to reduce number of read rows.
bool has_limit_below_one_block = read_type != ReadType::Default && read_limit && read_limit < block_size.max_block_size_rows;
MergeTreeReadPoolPtr pool;
std::shared_ptr<MergeTreeReadPoolBase> pool;
if (is_parallel_reading_from_replicas)
{
@ -645,6 +661,9 @@ Pipe ReadFromMergeTree::readInOrder(
context);
}
if (dynamically_filtered_parts)
dynamically_filtered_parts->parts_ranges_ptr = pool->getPartsWithRanges();
/// If parallel replicas enabled, set total rows in progress here only on initiator with local plan
/// Otherwise rows will counted multiple times
const UInt64 in_order_limit = query_info.input_order_info ? query_info.input_order_info->limit : 0;
@ -876,14 +895,9 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(RangesInDataParts && parts_
auto read_type = is_parallel_reading_from_replicas ? ReadType::ParallelReplicas : ReadType::Default;
double read_split_ranges_into_intersecting_and_non_intersecting_injection_probability
= settings[Setting::merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability];
std::bernoulli_distribution fault(read_split_ranges_into_intersecting_and_non_intersecting_injection_probability);
if (read_type != ReadType::ParallelReplicas &&
num_streams > 1 &&
read_split_ranges_into_intersecting_and_non_intersecting_injection_probability > 0.0 &&
fault(thread_local_rng) &&
read_split_ranges_into_intersecting_and_non_intersecting_injection &&
!isQueryWithFinal() &&
data.merging_params.is_deleted_column.empty() &&
!prewhere_info)
@ -2202,6 +2216,14 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
pipeline.setQueryIdHolder(std::move(query_id_holder));
}
DynamiclyFilteredPartsRangesPtr ReadFromMergeTree::useDynamiclyFilteredParts()
{
if (!dynamically_filtered_parts)
dynamically_filtered_parts = std::make_shared<DynamiclyFilteredPartsRanges>();
return dynamically_filtered_parts;
}
static const char * indexTypeToString(ReadFromMergeTree::IndexType type)
{
switch (type)

View File

@ -55,6 +55,13 @@ struct UsefulSkipIndexes
std::vector<MergedDataSkippingIndexAndCondition> merged_indices;
};
struct DynamiclyFilteredPartsRanges
{
MergeTreeReadPartsRangesPtr parts_ranges_ptr;
};
using DynamiclyFilteredPartsRangesPtr = std::shared_ptr<DynamiclyFilteredPartsRanges>;
/// This step is created to read from MergeTree* table.
/// For now, it takes a list of parts and creates source from it.
class ReadFromMergeTree final : public SourceStepWithFilter
@ -212,6 +219,9 @@ public:
void applyFilters(ActionDAGNodes added_filter_nodes) override;
DynamiclyFilteredPartsRangesPtr useDynamiclyFilteredParts();
bool splitsRangesIntoIntersectionAndNonIntersecting() const { return read_split_ranges_into_intersecting_and_non_intersecting_injection; }
private:
MergeTreeReaderSettings reader_settings;
@ -276,6 +286,8 @@ private:
mutable AnalysisResultPtr analyzed_result_ptr;
VirtualFields shared_virtual_fields;
DynamiclyFilteredPartsRangesPtr dynamically_filtered_parts;
bool is_parallel_reading_from_replicas;
std::optional<MergeTreeAllRangesCallback> all_ranges_callback;
std::optional<MergeTreeReadTaskCallback> read_task_callback;
@ -285,6 +297,8 @@ private:
ExpressionActionsPtr virtual_row_conversion;
std::optional<size_t> number_of_current_replica;
bool read_split_ranges_into_intersecting_and_non_intersecting_injection = false;
};
}

View File

@ -241,9 +241,10 @@ Blocks JoiningTransform::readExecute(Chunk & chunk)
return res;
}
FillingRightJoinSideTransform::FillingRightJoinSideTransform(Block input_header, JoinPtr join_)
FillingRightJoinSideTransform::FillingRightJoinSideTransform(Block input_header, JoinPtr join_, std::function<void()> on_finish_callback_)
: IProcessor({input_header}, {Block()})
, join(std::move(join_))
, on_finish_callback(std::move(on_finish_callback_))
{}
InputPort * FillingRightJoinSideTransform::addTotalsPort()
@ -312,12 +313,26 @@ IProcessor::Status FillingRightJoinSideTransform::prepare()
return Status::Ready;
}
if (!on_finish)
{
on_finish = true;
return Status::Ready;
}
output.finish();
return Status::Finished;
}
void FillingRightJoinSideTransform::work()
{
if (on_finish)
{
join->tryRerangeRightTableData();
if (on_finish_callback)
on_finish_callback();
return;
}
auto & input = inputs.front();
auto block = input.getHeader().cloneWithColumns(chunk.detachColumns());
@ -326,9 +341,6 @@ void FillingRightJoinSideTransform::work()
else
stop_reading = !join->addBlockToJoin(block);
if (input.isFinished())
join->tryRerangeRightTableData();
set_totals = for_totals;
}

View File

@ -102,7 +102,7 @@ private:
class FillingRightJoinSideTransform : public IProcessor
{
public:
FillingRightJoinSideTransform(Block input_header, JoinPtr join_);
FillingRightJoinSideTransform(Block input_header, JoinPtr join_, std::function<void()> on_finish_callback_);
String getName() const override { return "FillingRightJoinSide"; }
InputPort * addTotalsPort();
@ -116,6 +116,9 @@ private:
bool stop_reading = false;
bool for_totals = false;
bool set_totals = false;
bool on_finish = false;
std::function<void()> on_finish_callback;
};

View File

@ -384,6 +384,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
std::unique_ptr<QueryPipelineBuilder> left,
std::unique_ptr<QueryPipelineBuilder> right,
JoinPtr join,
std::function<void()> finish_callback,
const Block & output_header,
size_t max_block_size,
size_t min_block_size_bytes,
@ -446,7 +447,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
auto squashing = std::make_shared<SimpleSquashingChunksTransform>(right->getHeader(), 0, min_block_size_bytes);
connect(*outport, squashing->getInputs().front());
processors.emplace_back(squashing);
auto adding_joined = std::make_shared<FillingRightJoinSideTransform>(right->getHeader(), join);
auto adding_joined = std::make_shared<FillingRightJoinSideTransform>(right->getHeader(), join, finish_callback);
connect(squashing->getOutputPort(), adding_joined->getInputs().front());
processors.emplace_back(std::move(adding_joined));
}
@ -459,7 +460,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
{
right->resize(1);
auto adding_joined = std::make_shared<FillingRightJoinSideTransform>(right->getHeader(), join);
auto adding_joined = std::make_shared<FillingRightJoinSideTransform>(right->getHeader(), join, finish_callback);
InputPort * totals_port = nullptr;
if (right->hasTotals())
totals_port = adding_joined->addTotalsPort();

View File

@ -124,6 +124,7 @@ public:
std::unique_ptr<QueryPipelineBuilder> left,
std::unique_ptr<QueryPipelineBuilder> right,
JoinPtr join,
std::function<void()> finish_callback,
const Block & output_header,
size_t max_block_size,
size_t min_block_size_bytes,

View File

@ -1285,6 +1285,8 @@ bool KeyCondition::tryPrepareSetIndex(
Columns transformed_set_columns = set_columns;
// std::cerr << "+++++ set col size " << set_columns.front()->size() << std::endl;
for (size_t indexes_mapping_index = 0; indexes_mapping_index < indexes_mapping_size; ++indexes_mapping_index)
{
const auto & key_column_type = data_types[indexes_mapping_index];
@ -1309,6 +1311,8 @@ bool KeyCondition::tryPrepareSetIndex(
is_constant_transformed = true;
}
// std::cerr << set_element_type->getName() << " -> " << key_column_type->getName() << std::endl;
if (canBeSafelyCasted(set_element_type, key_column_type))
{
transformed_set_columns[set_element_index] = castColumn({set_column, set_element_type, {}}, key_column_type);
@ -1383,6 +1387,8 @@ bool KeyCondition::tryPrepareSetIndex(
set_columns = std::move(transformed_set_columns);
// std::cerr << "+++2++ set col size " << set_columns.front()->size() << std::endl;
out.set_index = std::make_shared<MergeTreeSetIndex>(set_columns, std::move(indexes_mapping));
/// When not all key columns are used or when there are multiple elements in

View File

@ -127,7 +127,7 @@ size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
MarkRanges exact_ranges;
for (const auto & part : parts)
{
MarkRanges part_ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, {}, &exact_ranges, settings, log);
MarkRanges part_ranges = markRangesFromPKRange(part, 0, part->index_granularity->getMarksCount(), metadata_snapshot, key_condition, {}, &exact_ranges, settings, log);
for (const auto & range : part_ranges)
rows_count += part->index_granularity->getRowsCountInRange(range);
}
@ -695,6 +695,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
CurrentMetrics::Increment metric(CurrentMetrics::FilteringMarksWithPrimaryKey);
ranges.ranges = markRangesFromPKRange(
part,
0, part->index_granularity->getMarksCount(),
metadata_snapshot,
key_condition,
part_offset_condition,
@ -1035,6 +1036,8 @@ size_t MergeTreeDataSelectExecutor::minMarksForConcurrentRead(
/// If @exact_ranges is not null, fill it with ranges containing marks of fully matched records.
MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
const MergeTreeData::DataPartPtr & part,
size_t start_mark,
size_t end_mark,
const StorageMetadataPtr & metadata_snapshot,
const KeyCondition & key_condition,
const std::optional<KeyCondition> & part_offset_condition,
@ -1045,10 +1048,16 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
MarkRanges res;
size_t marks_count = part->index_granularity->getMarksCount();
bool has_final_mark = part->index_granularity->hasFinalMark();
if (marks_count == 0)
return res;
bool has_final_mark = part->index_granularity->hasFinalMark();
if (has_final_mark && end_mark == marks_count)
--end_mark;
if (start_mark >= end_mark)
return res;
bool key_condition_useful = !key_condition.alwaysUnknownOrTrue();
bool part_offset_condition_useful = part_offset_condition && !part_offset_condition->alwaysUnknownOrTrue();
@ -1056,11 +1065,7 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
/// If index is not used.
if (!key_condition_useful && !part_offset_condition_useful)
{
if (has_final_mark)
res.push_back(MarkRange(0, marks_count - 1));
else
res.push_back(MarkRange(0, marks_count));
res.push_back(MarkRange(start_mark, end_mark));
return res;
}
@ -1207,7 +1212,7 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
/// At each step, take the left segment and check if it fits.
/// If fits, split it into smaller ones and put them on the stack. If not, discard it.
/// If the segment is already of one mark length, add it to response and discard it.
std::vector<MarkRange> ranges_stack = { {0, marks_count - (has_final_mark ? 1 : 0)} };
std::vector<MarkRange> ranges_stack = { {start_mark, end_mark} };
size_t steps = 0;
@ -1271,8 +1276,8 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
MarkRange result_range;
size_t last_mark = marks_count - (has_final_mark ? 1 : 0);
size_t searched_left = 0;
size_t last_mark = end_mark;
size_t searched_left = start_mark;
size_t searched_right = last_mark;
bool check_left = false;

View File

@ -66,6 +66,8 @@ public:
static MarkRanges markRangesFromPKRange(
const MergeTreeData::DataPartPtr & part,
size_t start_mark,
size_t end_mark,
const StorageMetadataPtr & metadata_snapshot,
const KeyCondition & key_condition,
const std::optional<KeyCondition> & part_offset_condition,

View File

@ -117,16 +117,14 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
params_,
context_)
, prefetch_threadpool(getContext()->getPrefetchThreadpool())
, log(getLogger(
"MergeTreePrefetchedReadPool("
+ (parts_ranges.empty() ? "" : parts_ranges.front().data_part->storage.getStorageID().getNameForLogs()) + ")"))
, log(getLogger("MergeTreePrefetchedReadPool(" + (storage_snapshot_->storage.getStorageID().getNameForLogs()) + ")"))
{
/// Tasks creation might also create a lost of readers - check they do not
/// do any time consuming operations in ctor.
ProfileEventTimeIncrement<Milliseconds> watch(ProfileEvents::MergeTreePrefetchedReadPoolInit);
// /// Tasks creation might also create a lost of readers - check they do not
// /// do any time consuming operations in ctor.
// ProfileEventTimeIncrement<Milliseconds> watch(ProfileEvents::MergeTreePrefetchedReadPoolInit);
fillPerPartStatistics();
fillPerThreadTasks(pool_settings.threads, pool_settings.sum_marks);
// fillPerPartStatistics();
// fillPerThreadTasks(pool_settings.threads, pool_settings.sum_marks);
}
std::function<void()> MergeTreePrefetchedReadPool::createPrefetchedTask(IMergeTreeReader * reader, Priority priority)
@ -183,6 +181,22 @@ void MergeTreePrefetchedReadPool::startPrefetches()
MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::getTask(size_t task_idx, MergeTreeReadTask * previous_task)
{
auto init = [this]()
{
auto parts_ranges_and_lock = parts_ranges_ptr->get();
const auto & parts_ranges = parts_ranges_and_lock.parts_ranges;
fillPerPartInfos(parts_ranges);
/// Tasks creation might also create a lost of readers - check they do not
/// do any time consuming operations in ctor.
ProfileEventTimeIncrement<Milliseconds> watch(ProfileEvents::MergeTreePrefetchedReadPoolInit);
fillPerPartStatistics(parts_ranges);
fillPerThreadTasks(parts_ranges, pool_settings.threads, pool_settings.sum_marks);
};
std::call_once(init_flag, init);
std::lock_guard lock(mutex);
if (per_thread_tasks.empty())
@ -313,7 +327,7 @@ MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::createTask(ThreadTask & task,
return MergeTreeReadPoolBase::createTask(task.read_info, task.ranges, previous_task);
}
void MergeTreePrefetchedReadPool::fillPerPartStatistics()
void MergeTreePrefetchedReadPool::fillPerPartStatistics(const RangesInDataParts & parts_ranges)
{
per_part_statistics.clear();
per_part_statistics.reserve(parts_ranges.size());
@ -363,7 +377,7 @@ ALWAYS_INLINE inline String getPartNameForLogging(const DataPartPtr & part)
}
void MergeTreePrefetchedReadPool::fillPerThreadTasks(size_t threads, size_t sum_marks)
void MergeTreePrefetchedReadPool::fillPerThreadTasks(const RangesInDataParts & parts_ranges, size_t threads, size_t sum_marks)
{
if (per_part_infos.empty())
return;

View File

@ -14,7 +14,7 @@ using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
/// A class which is responsible for creating read tasks
/// which are later taken by readers via getTask method.
/// Does prefetching for the read tasks it creates.
class MergeTreePrefetchedReadPool : public MergeTreeReadPoolBase
class MergeTreePrefetchedReadPool final : public MergeTreeReadPoolBase
{
public:
MergeTreePrefetchedReadPool(
@ -106,8 +106,8 @@ private:
using TasksPerThread = std::map<size_t, ThreadTasks>;
using PartStatistics = std::vector<PartStatistic>;
void fillPerPartStatistics();
void fillPerThreadTasks(size_t threads, size_t sum_marks);
void fillPerPartStatistics(const RangesInDataParts & parts_ranges);
void fillPerThreadTasks(const RangesInDataParts & parts_ranges, size_t threads, size_t sum_marks);
void startPrefetches();
void createPrefetchedReadersForTask(ThreadTask & task);

View File

@ -62,11 +62,23 @@ MergeTreeReadPool::MergeTreeReadPool(
, backoff_settings{context_->getSettingsRef()}
, backoff_state{pool_settings.threads}
{
fillPerThreadInfo(pool_settings.threads, pool_settings.sum_marks);
//fillPerThreadInfo(pool_settings.threads, pool_settings.sum_marks);
}
MergeTreeReadTaskPtr MergeTreeReadPool::getTask(size_t task_idx, MergeTreeReadTask * previous_task)
{
auto init = [this]()
{
auto parts_ranges_and_lock = parts_ranges_ptr->get();
const auto & parts_ranges = parts_ranges_and_lock.parts_ranges;
fillPerPartInfos(parts_ranges);
fillPerThreadInfo(parts_ranges, pool_settings.threads, pool_settings.sum_marks);
LOG_TRACE(getLogger("MergeTreeReadPool"), "Init callback done");
};
std::call_once(init_flag, init);
const std::lock_guard lock{mutex};
/// If number of threads was lowered due to backoff, then will assign work only for maximum 'backoff_state.current_threads' threads.
@ -194,7 +206,7 @@ void MergeTreeReadPool::profileFeedback(ReadBufferFromFileBase::ProfileInfo info
LOG_DEBUG(log, "Will lower number of threads to {}", backoff_state.current_threads);
}
void MergeTreeReadPool::fillPerThreadInfo(size_t threads, size_t sum_marks)
void MergeTreeReadPool::fillPerThreadInfo(const RangesInDataParts & parts_ranges, size_t threads, size_t sum_marks)
{
if (threads > 1000000ull)
throw Exception(ErrorCodes::CANNOT_SCHEDULE_TASK, "Too many threads ({}) requested", threads);
@ -213,7 +225,7 @@ void MergeTreeReadPool::fillPerThreadInfo(size_t threads, size_t sum_marks)
using PartsInfo = std::vector<PartInfo>;
std::queue<PartsInfo> parts_queue;
auto per_part_sum_marks = getPerPartSumMarks();
auto per_part_sum_marks = getPerPartSumMarks(parts_ranges);
{
/// Group parts by disk name.
@ -223,6 +235,9 @@ void MergeTreeReadPool::fillPerThreadInfo(size_t threads, size_t sum_marks)
for (size_t i = 0; i < parts_ranges.size(); ++i)
{
if (parts_ranges[i].ranges.empty())
continue;
PartInfo part_info{parts_ranges[i], per_part_sum_marks[i], i};
if (parts_ranges[i].data_part->isStoredOnDisk())
parts_per_disk[parts_ranges[i].data_part->getDataPartStorage().getDiskName()].push_back(std::move(part_info));

View File

@ -19,7 +19,7 @@ namespace DB
* it's workload, it either is signaled that no more work is available (`do_not_steal_tasks == false`) or
* continues taking small batches from other threads' workloads (`do_not_steal_tasks == true`).
*/
class MergeTreeReadPool : public MergeTreeReadPoolBase
class MergeTreeReadPool final : public MergeTreeReadPoolBase
{
public:
struct BackoffSettings;
@ -72,7 +72,7 @@ public:
};
private:
void fillPerThreadInfo(size_t threads, size_t sum_marks);
void fillPerThreadInfo(const RangesInDataParts & parts_ranges, size_t threads, size_t sum_marks);
mutable std::mutex mutex;

View File

@ -31,7 +31,7 @@ MergeTreeReadPoolBase::MergeTreeReadPoolBase(
const MergeTreeReadTask::BlockSizeParams & block_size_params_,
const ContextPtr & context_)
: WithContext(context_)
, parts_ranges(std::move(parts_))
, parts_ranges_ptr(std::make_shared<MergeTreeReadPartsRanges>(std::move(parts_)))
, mutations_snapshot(std::move(mutations_snapshot_))
, shared_virtual_fields(std::move(shared_virtual_fields_))
, storage_snapshot(storage_snapshot_)
@ -44,9 +44,12 @@ MergeTreeReadPoolBase::MergeTreeReadPoolBase(
, owned_mark_cache(context_->getGlobalContext()->getMarkCache())
, owned_uncompressed_cache(pool_settings_.use_uncompressed_cache ? context_->getGlobalContext()->getUncompressedCache() : nullptr)
, header(storage_snapshot->getSampleBlockForColumns(column_names))
, merge_tree_determine_task_size_by_prewhere_columns(context_->getSettingsRef()[Setting::merge_tree_determine_task_size_by_prewhere_columns])
, merge_tree_min_bytes_per_task_for_remote_reading(context_->getSettingsRef()[Setting::merge_tree_min_bytes_per_task_for_remote_reading])
, merge_tree_min_read_task_size(context_->getSettingsRef()[Setting::merge_tree_min_read_task_size])
, profile_callback([this](ReadBufferFromFileBase::ProfileInfo info_) { profileFeedback(info_); })
{
fillPerPartInfos(context_->getSettingsRef());
//fillPerPartInfos(context_->getSettingsRef());
}
static size_t getSizeOfColumns(const IMergeTreeDataPart & part, const Names & columns_to_read)
@ -75,10 +78,12 @@ calculateMinMarksPerTask(
const Names & columns_to_read,
const std::vector<NamesAndTypesList> & prewhere_steps_columns,
const MergeTreeReadPoolBase::PoolSettings & pool_settings,
const Settings & settings)
bool merge_tree_determine_task_size_by_prewhere_columns,
UInt64 merge_tree_min_read_task_size,
UInt64 merge_tree_min_bytes_per_task_for_remote_reading)
{
size_t min_marks_per_task
= std::max<size_t>(settings[Setting::merge_tree_min_read_task_size], pool_settings.min_marks_for_concurrent_read);
= std::max<size_t>(merge_tree_min_read_task_size, pool_settings.min_marks_for_concurrent_read);
size_t avg_mark_bytes = 0;
/// It is important to obtain marks count from the part itself instead of calling `part.getMarksCount()`,
/// because `part` will report number of marks selected from this part by the query.
@ -90,13 +95,13 @@ calculateMinMarksPerTask(
/// We assume that most of the time prewhere does it's job good meaning that lion's share of the rows is filtered out.
/// Which means in turn that for most of the rows we will read only the columns from prewhere clause.
/// So it makes sense to use only them for the estimation.
const auto & columns = settings[Setting::merge_tree_determine_task_size_by_prewhere_columns] && !prewhere_steps_columns.empty()
const auto & columns = merge_tree_determine_task_size_by_prewhere_columns && !prewhere_steps_columns.empty()
? getHeaviestSetOfColumnsAmongPrewhereSteps(*part.data_part, prewhere_steps_columns)
: columns_to_read;
const size_t part_compressed_bytes = getSizeOfColumns(*part.data_part, columns);
avg_mark_bytes = std::max<size_t>(part_compressed_bytes / part_marks_count, 1);
const auto min_bytes_per_task = settings[Setting::merge_tree_min_bytes_per_task_for_remote_reading];
const auto min_bytes_per_task = merge_tree_min_bytes_per_task_for_remote_reading;
/// We're taking min here because number of tasks shouldn't be too low - it will make task stealing impossible.
/// We also create at least two tasks per thread to have something to steal from a slow thread.
const auto heuristic_min_marks
@ -121,7 +126,7 @@ calculateMinMarksPerTask(
return {min_marks_per_task, avg_mark_bytes};
}
void MergeTreeReadPoolBase::fillPerPartInfos(const Settings & settings)
void MergeTreeReadPoolBase::fillPerPartInfos(const RangesInDataParts & parts_ranges)
{
per_part_infos.reserve(parts_ranges.size());
is_part_on_remote_disk.reserve(parts_ranges.size());
@ -186,12 +191,12 @@ void MergeTreeReadPoolBase::fillPerPartInfos(const Settings & settings)
is_part_on_remote_disk.push_back(part_with_ranges.data_part->isStoredOnRemoteDisk());
std::tie(read_task_info.min_marks_per_task, read_task_info.approx_size_of_mark)
= calculateMinMarksPerTask(part_with_ranges, column_names, read_task_info.task_columns.pre_columns, pool_settings, settings);
= calculateMinMarksPerTask(part_with_ranges, column_names, read_task_info.task_columns.pre_columns, pool_settings, merge_tree_determine_task_size_by_prewhere_columns, merge_tree_min_read_task_size, merge_tree_min_bytes_per_task_for_remote_reading);
per_part_infos.push_back(std::make_shared<MergeTreeReadTaskInfo>(std::move(read_task_info)));
}
}
std::vector<size_t> MergeTreeReadPoolBase::getPerPartSumMarks() const
std::vector<size_t> MergeTreeReadPoolBase::getPerPartSumMarks(const RangesInDataParts & parts_ranges)
{
std::vector<size_t> per_part_sum_marks;
per_part_sum_marks.reserve(parts_ranges.size());

View File

@ -6,6 +6,29 @@
namespace DB
{
class MergeTreeReadPartsRanges
{
public:
explicit MergeTreeReadPartsRanges(RangesInDataParts parts_ranges_) : parts_ranges(std::move(parts_ranges_)) {}
struct LockedPartRannges
{
std::lock_guard<std::mutex> lock;
RangesInDataParts & parts_ranges;
};
LockedPartRannges get() TSA_NO_THREAD_SAFETY_ANALYSIS
{
return {std::lock_guard(mutex), parts_ranges};
}
private:
RangesInDataParts parts_ranges;
std::mutex mutex;
};
using MergeTreeReadPartsRangesPtr = std::shared_ptr<MergeTreeReadPartsRanges>;
class MergeTreeReadPoolBase : public IMergeTreeReadPool, protected WithContext
{
public:
@ -38,9 +61,11 @@ public:
Block getHeader() const override { return header; }
MergeTreeReadPartsRangesPtr getPartsWithRanges() { return parts_ranges_ptr; }
protected:
/// Initialized in constructor
const RangesInDataParts parts_ranges;
const MergeTreeReadPartsRangesPtr parts_ranges_ptr;
const MutationsSnapshotPtr mutations_snapshot;
const VirtualFields shared_virtual_fields;
const StorageSnapshotPtr storage_snapshot;
@ -53,9 +78,12 @@ protected:
const MarkCachePtr owned_mark_cache;
const UncompressedCachePtr owned_uncompressed_cache;
const Block header;
const bool merge_tree_determine_task_size_by_prewhere_columns;
const UInt64 merge_tree_min_bytes_per_task_for_remote_reading;
const UInt64 merge_tree_min_read_task_size;
void fillPerPartInfos(const Settings & settings);
std::vector<size_t> getPerPartSumMarks() const;
void fillPerPartInfos(const RangesInDataParts & parts_ranges);
static std::vector<size_t> getPerPartSumMarks(const RangesInDataParts & parts_ranges);
MergeTreeReadTaskPtr createTask(MergeTreeReadTaskInfoPtr read_info, MergeTreeReadTask::Readers task_readers, MarkRanges ranges) const;
@ -66,6 +94,7 @@ protected:
MergeTreeReadTask::Extras getExtras() const;
std::once_flag init_flag;
std::vector<MergeTreeReadTaskInfoPtr> per_part_infos;
std::vector<bool> is_part_on_remote_disk;

View File

@ -37,13 +37,25 @@ MergeTreeReadPoolInOrder::MergeTreeReadPoolInOrder(
, has_limit_below_one_block(has_limit_below_one_block_)
, read_type(read_type_)
{
per_part_mark_ranges.reserve(parts_ranges.size());
for (const auto & part_with_ranges : parts_ranges)
per_part_mark_ranges.push_back(part_with_ranges.ranges);
}
MergeTreeReadTaskPtr MergeTreeReadPoolInOrder::getTask(size_t task_idx, MergeTreeReadTask * previous_task)
{
auto init = [this]()
{
auto parts_ranges_and_lock = parts_ranges_ptr->get();
LOG_TRACE(getLogger("MergeTreeReadPoolInOrder"), "Init callback called\n{}", StackTrace().toString());
const auto & parts_ranges = parts_ranges_and_lock.parts_ranges;
fillPerPartInfos(parts_ranges);
per_part_mark_ranges.reserve(parts_ranges.size());
for (const auto & part_with_ranges : parts_ranges)
per_part_mark_ranges.push_back(part_with_ranges.ranges);
};
std::call_once(init_flag, init);
if (task_idx >= per_part_infos.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Requested task with idx {}, but there are only {} parts",

View File

@ -4,7 +4,7 @@
namespace DB
{
class MergeTreeReadPoolInOrder : public MergeTreeReadPoolBase
class MergeTreeReadPoolInOrder final : public MergeTreeReadPoolBase
{
public:
MergeTreeReadPoolInOrder(

View File

@ -128,20 +128,31 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas(
context_)
, extension(std::move(extension_))
, coordination_mode(CoordinationMode::Default)
, min_marks_per_task(getMinMarksPerTask(pool_settings.min_marks_for_concurrent_read, per_part_infos))
, mark_segment_size(chooseSegmentSize(
log,
context_->getSettingsRef()[Setting::parallel_replicas_mark_segment_size],
min_marks_per_task,
pool_settings.threads,
pool_settings.sum_marks,
extension.getTotalNodesCount()))
, parallel_replicas_mark_segment_size(context_->getSettingsRef()[Setting::parallel_replicas_mark_segment_size])
{
extension.sendInitialRequest(coordination_mode, parts_ranges, mark_segment_size);
}
MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_idx*/, MergeTreeReadTask * previous_task)
{
auto init = [this]()
{
auto parts_ranges_and_lock = parts_ranges_ptr->get();
const auto & parts_ranges = parts_ranges_and_lock.parts_ranges;
fillPerPartInfos(parts_ranges);
min_marks_per_task = getMinMarksPerTask(pool_settings.min_marks_for_concurrent_read, per_part_infos);
mark_segment_size = chooseSegmentSize(
log,
parallel_replicas_mark_segment_size,
min_marks_per_task,
pool_settings.threads,
pool_settings.sum_marks,
extension.getTotalNodesCount());
extension.sendInitialRequest(coordination_mode, parts_ranges, mark_segment_size);
};
std::call_once(init_flag, init);
std::lock_guard lock(mutex);
if (no_more_tasks_available)

View File

@ -5,7 +5,7 @@
namespace DB
{
class MergeTreeReadPoolParallelReplicas : public MergeTreeReadPoolBase
class MergeTreeReadPoolParallelReplicas final : public MergeTreeReadPoolBase
{
public:
MergeTreeReadPoolParallelReplicas(
@ -35,6 +35,7 @@ private:
LoggerPtr log = getLogger("MergeTreeReadPoolParallelReplicas");
const ParallelReadingExtension extension;
const CoordinationMode coordination_mode;
const size_t parallel_replicas_mark_segment_size;
size_t min_marks_per_task{0};
size_t mark_segment_size{0};
RangesInDataPartsDescription buffered_ranges;

View File

@ -44,24 +44,34 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd
, mode(mode_)
, min_marks_per_task(pool_settings.min_marks_for_concurrent_read)
{
for (const auto & info : per_part_infos)
min_marks_per_task = std::max(min_marks_per_task, info->min_marks_per_task);
if (min_marks_per_task == 0)
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Chosen number of marks to read is zero (likely because of weird interference of settings)");
for (const auto & part : parts_ranges)
request.push_back({part.data_part->info, MarkRanges{}});
for (const auto & part : parts_ranges)
buffered_tasks.push_back({part.data_part->info, MarkRanges{}});
extension.sendInitialRequest(mode, parts_ranges, /*mark_segment_size_=*/0);
}
MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t task_idx, MergeTreeReadTask * previous_task)
{
auto init = [this]()
{
auto parts_ranges_and_lock = parts_ranges_ptr->get();
const auto & parts_ranges = parts_ranges_and_lock.parts_ranges;
fillPerPartInfos(parts_ranges);
for (const auto & info : per_part_infos)
min_marks_per_task = std::max(min_marks_per_task, info->min_marks_per_task);
if (min_marks_per_task == 0)
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Chosen number of marks to read is zero (likely because of weird interference of settings)");
for (const auto & part : parts_ranges)
request.push_back({part.data_part->info, MarkRanges{}});
for (const auto & part : parts_ranges)
buffered_tasks.push_back({part.data_part->info, MarkRanges{}});
extension.sendInitialRequest(mode, parts_ranges, /*mark_segment_size_=*/0);
};
std::call_once(init_flag, init);
std::lock_guard lock(mutex);
if (task_idx >= per_part_infos.size())

View File

@ -5,7 +5,7 @@
namespace DB
{
class MergeTreeReadPoolParallelReplicasInOrder : public MergeTreeReadPoolBase
class MergeTreeReadPoolParallelReplicasInOrder final : public MergeTreeReadPoolBase
{
public:
MergeTreeReadPoolParallelReplicasInOrder(

View File

@ -401,6 +401,7 @@ public:
if (!key_condition.alwaysFalse())
mark_ranges = MergeTreeDataSelectExecutor::markRangesFromPKRange(
data_part,
0, data_part->index_granularity->getMarksCount(),
metadata_snapshot,
key_condition,
/*part_offset_condition=*/{},

View File

@ -297,7 +297,10 @@ FutureSetPtr tryGetSetFromDAGNode(const ActionsDAG::Node * dag_node)
column = &column_const->getDataColumn();
if (const auto * column_set = typeid_cast<const ColumnSet *>(column))
{
// std::cerr << ".... tryGetSetFromDAGNode " << reinterpret_cast<const void *>(column_set) << std::endl;
return column_set->getData();
}
return {};
}

View File

@ -25,5 +25,4 @@ for STORAGE_POLICY in 's3_cache' 'local_cache' 'azure_cache'; do
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=0 --enable_filesystem_cache_log=1 --query "SELECT 2243, '$STORAGE_POLICY', * FROM test_2242 FORMAT Null"
$CLICKHOUSE_CLIENT --query "SYSTEM FLUSH LOGS"
$CLICKHOUSE_CLIENT --query "SELECT file_segment_range, read_type FROM system.filesystem_cache_log WHERE query_id = (SELECT query_id from system.query_log where query LIKE '%SELECT 2243%$STORAGE_POLICY%' AND current_database = currentDatabase() AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1) ORDER BY file_segment_range, read_type"
done

View File

@ -21,6 +21,12 @@ Positions: 4 0 2 1
Strictness: ALL
Algorithm: HashJoin
Clauses: [(__table1.id) = (__table2.id)]
Dynamic Filter
Actions: INPUT : 0 -> id UInt64 : 0
COLUMN Const(Set) -> Set : 1
ALIAS id :: 0 -> __table1.id UInt64 : 2
FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0
Positions: 0
Expression ((JOIN actions + Change column names to column identifiers))
Header: __table1.id UInt64
__table1.value_1 String
@ -78,6 +84,15 @@ Positions: 6 0 3 1
Algorithm: HashJoin
ASOF inequality: LESS
Clauses: [(__table1.id, __table1.value_2) = (__table2.id, __table2.value_2)]
Dynamic Filter
Actions: INPUT : 0 -> id UInt64 : 0
INPUT : 1 -> value_2 UInt64 : 1
COLUMN Const(Set) -> Set : 2
ALIAS id :: 0 -> __table1.id UInt64 : 3
ALIAS value_2 :: 1 -> __table1.value_2 UInt64 : 0
FUNCTION tuple(__table1.id :: 3, __table1.value_2 :: 0) -> tuple(__table1.id, __table1.value_2) Tuple(UInt64, UInt64) : 1
FUNCTION in(tuple(__table1.id, __table1.value_2) :: 1, :: 2) -> in(tuple(__table1.id, __table1.value_2), ) UInt8 : 0
Positions: 0
Expression ((JOIN actions + Change column names to column identifiers))
Header: __table1.id UInt64
__table1.value_1 String

View File

@ -26,6 +26,12 @@ Positions: 4 2 0 1
Strictness: ALL
Algorithm: HashJoin
Clauses: [(__table1.id) = (__table2.id)]
Dynamic Filter
Actions: INPUT : 0 -> id UInt64 : 0
COLUMN Const(Set) -> Set : 1
ALIAS id :: 0 -> __table1.id UInt64 : 2
FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0
Positions: 0
Filter (( + (JOIN actions + Change column names to column identifiers)))
Header: __table1.id UInt64
__table1.value String
@ -93,6 +99,12 @@ Positions: 4 2 0 1
Strictness: ALL
Algorithm: HashJoin
Clauses: [(__table1.id) = (__table2.id)]
Dynamic Filter
Actions: INPUT : 0 -> id UInt64 : 0
COLUMN Const(Set) -> Set : 1
ALIAS id :: 0 -> __table1.id UInt64 : 2
FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0
Positions: 0
Filter (( + (JOIN actions + Change column names to column identifiers)))
Header: __table1.id UInt64
__table1.value String
@ -160,6 +172,12 @@ Positions: 4 2 0 1
Strictness: ALL
Algorithm: HashJoin
Clauses: [(__table1.id) = (__table2.id)]
Dynamic Filter
Actions: INPUT : 0 -> id UInt64 : 0
COLUMN Const(Set) -> Set : 1
ALIAS id :: 0 -> __table1.id UInt64 : 2
FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0
Positions: 0
Filter (( + (JOIN actions + Change column names to column identifiers)))
Header: __table1.id UInt64
__table1.value String
@ -392,6 +410,12 @@ Positions: 4 2 0 1
Strictness: ALL
Algorithm: HashJoin
Clauses: [(__table1.id) = (__table2.id)]
Dynamic Filter
Actions: INPUT : 0 -> id UInt64 : 0
COLUMN Const(Set) -> Set : 1
ALIAS id :: 0 -> __table1.id UInt64 : 2
FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0
Positions: 0
Expression ((JOIN actions + Change column names to column identifiers))
Header: __table1.id UInt64
__table1.value String
@ -453,6 +477,12 @@ Positions: 4 2 0 1
Strictness: ALL
Algorithm: HashJoin
Clauses: [(__table1.id) = (__table2.id)]
Dynamic Filter
Actions: INPUT : 0 -> id UInt64 : 0
COLUMN Const(Set) -> Set : 1
ALIAS id :: 0 -> __table1.id UInt64 : 2
FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0
Positions: 0
Filter (( + (JOIN actions + Change column names to column identifiers)))
Header: __table1.id UInt64
__table1.value String

View File

@ -21,6 +21,12 @@ Positions: 4 0 2 1
Strictness: ALL
Algorithm: HashJoin
Clauses: [(__table1.id) = (__table2.id)]
Dynamic Filter
Actions: INPUT : 0 -> id UInt64 : 0
COLUMN Const(Set) -> Set : 1
ALIAS id :: 0 -> __table1.id UInt64 : 2
FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0
Positions: 0
Expression
Header: __table1.id UInt64
__table1.value String
@ -91,6 +97,12 @@ Positions: 4 0 2 1
Strictness: ALL
Algorithm: HashJoin
Clauses: [(__table1.id) = (__table2.id)]
Dynamic Filter
Actions: INPUT : 0 -> id UInt64 : 0
COLUMN Const(Set) -> Set : 1
ALIAS id :: 0 -> __table1.id UInt64 : 2
FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0
Positions: 0
Expression
Header: __table1.id UInt64
__table1.value String
@ -161,6 +173,12 @@ Positions: 4 0 2 1
Strictness: ALL
Algorithm: HashJoin
Clauses: [(__table1.id) = (__table2.id)]
Dynamic Filter
Actions: INPUT : 0 -> id UInt64 : 0
COLUMN Const(Set) -> Set : 1
ALIAS id :: 0 -> __table1.id UInt64 : 2
FUNCTION in(__table1.id :: 2, :: 1) -> in(__table1.id, ) UInt8 : 0
Positions: 0
Expression
Header: __table1.id UInt64
__table1.value String

View File

@ -0,0 +1,71 @@
5000000 5000000 5000000 0_r
5000100 5000100 5000100 1_r
5000200 5000200 5000200 2_r
5000300 5000300 5000300 3_r
5000400 5000400 5000400 4_r
5000500 5000500 5000500 5_r
5000600 5000600 5000600 6_r
5000700 5000700 5000700 7_r
5000800 5000800 5000800 8_r
5000900 5000900 5000900 9_r
5001000 5001000 5001000 10_r
5001100 5001100 5001100 11_r
5001200 5001200 5001200 12_r
5001300 5001300 5001300 13_r
5001400 5001400 5001400 14_r
5001500 5001500 5001500 15_r
5001600 5001600 5001600 16_r
5001700 5001700 5001700 17_r
5001800 5001800 5001800 18_r
5001900 5001900 5001900 19_r
Join (JOIN FillRightFirst)
Algorithm: HashJoin
Dynamic Filter
FUNCTION in(__table1.number :: 2, :: 1) -> in(__table1.number, ) UInt8 : 0
0
4000000 4000000 5000000 4000000 0_r
4000010 4000010 5000100 4000010 1_r
4000020 4000020 5000200 4000020 2_r
4000030 4000030 5000300 4000030 3_r
4000040 4000040 5000400 4000040 4_r
4000050 4000050 5000500 4000050 5_r
4000060 4000060 5000600 4000060 6_r
4000070 4000070 5000700 4000070 7_r
4000080 4000080 5000800 4000080 8_r
4000090 4000090 5000900 4000090 9_r
4000100 4000100 5001000 4000100 10_r
4000110 4000110 5001100 4000110 11_r
4000120 4000120 5001200 4000120 12_r
4000130 4000130 5001300 4000130 13_r
4000140 4000140 5001400 4000140 14_r
4000150 4000150 5001500 4000150 15_r
4000160 4000160 5001600 4000160 16_r
4000170 4000170 5001700 4000170 17_r
4000180 4000180 5001800 4000180 18_r
4000190 4000190 5001900 4000190 19_r
5000000 5000000 5000000 4000000 0_r
5000100 5000100 5000100 4000010 1_r
5000200 5000200 5000200 4000020 2_r
5000300 5000300 5000300 4000030 3_r
5000400 5000400 5000400 4000040 4_r
5000500 5000500 5000500 4000050 5_r
5000600 5000600 5000600 4000060 6_r
5000700 5000700 5000700 4000070 7_r
5000800 5000800 5000800 4000080 8_r
5000900 5000900 5000900 4000090 9_r
5001000 5001000 5001000 4000100 10_r
5001100 5001100 5001100 4000110 11_r
5001200 5001200 5001200 4000120 12_r
5001300 5001300 5001300 4000130 13_r
5001400 5001400 5001400 4000140 14_r
5001500 5001500 5001500 4000150 15_r
5001600 5001600 5001600 4000160 16_r
5001700 5001700 5001700 4000170 17_r
5001800 5001800 5001800 4000180 18_r
5001900 5001900 5001900 4000190 19_r
Join (JOIN FillRightFirst)
Algorithm: HashJoin
Dynamic Filter
FUNCTION in(__table1.number : 3, :: 1) -> in(__table1.number, ) UInt8 : 0
FUNCTION in(__table1.number :: 3, :: 2) -> in(__table1.number, ) UInt8 : 1
0

View File

@ -0,0 +1,27 @@
-- Tags: long
create table numbers_10m (number UInt64, s String) engine = MergeTree order by number settings index_granularity=8192, index_granularity_bytes=10000000;
insert into numbers_10m select number, toString(number) from numbers_mt(10e6) settings max_insert_threads=8;
-- explain actions = 1 select number, s, k, v from numbers_10m inner join (select number * 100 + 5000000 as k, toString(number) || '_r' as v from numbers(20)) as r on number = r.k order by number;
set merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability=0;
-- first_query
select number, s, k, v from numbers_10m inner join (select number * 100 + 5000000 as k, toString(number) || '_r' as v from numbers(20)) as r on number = r.k order by number;
select trimLeft(explain) from (EXPLAIN actions = 1 select number, s, k, v from numbers_10m inner join (select number * 100 + 5000000 as k, toString(number) || '_r' as v from numbers(20)) as r on number = r.k order by number settings enable_analyzer=1) where explain like '%Join%' or explain like '%Dynamic Filter%' or explain like '%FUNCTION in%';
system flush logs;
select if(read_rows < 8192 * 3, 0, read_rows) from system.query_log where event_date >= today() - 1 and current_database = currentDatabase() and query like '-- first_query%' and type = 'QueryFinish';
--select * from system.query_log where event_date >= today() - 1 and current_database = currentDatabase() and query like '-- first_query%' and type = 'QueryFinish' format Vertical;
-- second_query
select number, s, k, k2, v from numbers_10m inner join (select number * 100 + 5000000 as k, number * 10 + 4000000 as k2, toString(number) || '_r' as v from numbers(20)) as r on number = r.k or number = r.k2 order by number;
select trimLeft(explain) from (EXPLAIN actions = 1 select number, s, k, k2, v from numbers_10m inner join (select number * 100 + 5000000 as k, number * 10 + 4000000 as k2, toString(number) || '_r' as v from numbers(20)) as r on number = r.k or number = r.k2 order by number) where explain like '%Join%' or explain like '%Dynamic Filter%' or explain like '%FUNCTION in%';
select if(read_rows < 8192 * 6, 0, read_rows) from system.query_log where event_date >= today() - 1 and current_database = currentDatabase() and query like '-- first_query%' and type = 'QueryFinish';