mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 01:00:48 +00:00
Fix some tests.
This commit is contained in:
parent
cbdf3752ef
commit
1aeb705b20
@ -88,9 +88,20 @@ size_t minMarksForConcurrentRead(
|
||||
|
||||
}
|
||||
|
||||
struct ReadFromMergeTree::AnalysisResult
|
||||
{
|
||||
RangesInDataParts parts_with_ranges;
|
||||
MergeTreeDataSelectSamplingData sampling;
|
||||
bool sample_factor_column_queried = false;
|
||||
String query_id;
|
||||
IndexStats index_stats;
|
||||
Names column_names_to_read;
|
||||
ReadFromMergeTree::ReadType read_type = ReadFromMergeTree::ReadType::Default;
|
||||
};
|
||||
|
||||
ReadFromMergeTree::ReadFromMergeTree(
|
||||
const SelectQueryInfo & query_info_,
|
||||
const PartitionIdToMaxBlock * max_block_numbers_to_read_,
|
||||
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
|
||||
ContextPtr context_,
|
||||
const MergeTreeData & data_,
|
||||
StorageMetadataPtr metadata_snapshot_,
|
||||
@ -107,7 +118,7 @@ ReadFromMergeTree::ReadFromMergeTree(
|
||||
data_.getPartitionValueType(),
|
||||
virt_column_names_)})
|
||||
, query_info(std::move(query_info_))
|
||||
, max_block_numbers_to_read(max_block_numbers_to_read_)
|
||||
, max_block_numbers_to_read(std::move(max_block_numbers_to_read_))
|
||||
, context(std::move(context_))
|
||||
, data(data_)
|
||||
, metadata_snapshot(std::move(metadata_snapshot_))
|
||||
@ -842,26 +853,26 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
|
||||
return Pipe::unitePipes(std::move(partition_pipes));
|
||||
}
|
||||
|
||||
void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
|
||||
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTreeData::DataPartsVector parts) const
|
||||
{
|
||||
auto parts = std::move(prepared_parts);
|
||||
AnalysisResult result;
|
||||
|
||||
size_t total_parts = parts.size();
|
||||
|
||||
auto part_values = MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(data, parts, query_info.query, context);
|
||||
if (part_values && part_values->empty())
|
||||
{
|
||||
pipeline.init(Pipe(std::make_shared<NullSource>(getOutputStream().header)));
|
||||
return;
|
||||
}
|
||||
return result;
|
||||
|
||||
result.column_names_to_read = real_column_names;
|
||||
|
||||
/// If there are only virtual columns in the query, you must request at least one non-virtual one.
|
||||
if (real_column_names.empty())
|
||||
if (result.column_names_to_read.empty())
|
||||
{
|
||||
NamesAndTypesList available_real_columns = metadata_snapshot->getColumns().getAllPhysical();
|
||||
real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
|
||||
result.column_names_to_read.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
|
||||
}
|
||||
|
||||
metadata_snapshot->check(real_column_names, data.getVirtuals(), data.getStorageID());
|
||||
metadata_snapshot->check(result.column_names_to_read, data.getVirtuals(), data.getStorageID());
|
||||
|
||||
// Build and check if primary key is used when necessary
|
||||
const auto & primary_key = metadata_snapshot->getPrimaryKey();
|
||||
@ -881,28 +892,26 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
|
||||
auto query_context = context->hasQueryContext() ? context->getQueryContext() : context;
|
||||
|
||||
MergeTreeDataSelectExecutor::filterPartsByPartition(
|
||||
metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read, log, index_stats);
|
||||
metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read.get(), log, result.index_stats);
|
||||
|
||||
bool sample_factor_column_queried = false;
|
||||
for (const auto & col : virt_column_names)
|
||||
if (col == "_sample_factor")
|
||||
sample_factor_column_queried = true;
|
||||
result.sample_factor_column_queried = true;
|
||||
|
||||
auto sampling = MergeTreeDataSelectExecutor::getSampling(
|
||||
result.sampling = MergeTreeDataSelectExecutor::getSampling(
|
||||
select, parts, metadata_snapshot, key_condition,
|
||||
data, log, sample_factor_column_queried, metadata_snapshot->getColumns().getAllPhysical(), context);
|
||||
data, log, result.sample_factor_column_queried, metadata_snapshot->getColumns().getAllPhysical(), context);
|
||||
|
||||
if (sampling.read_nothing)
|
||||
{
|
||||
pipeline.init(Pipe(std::make_shared<NullSource>(getOutputStream().header)));
|
||||
return;
|
||||
}
|
||||
if (result.sampling.read_nothing)
|
||||
return result;
|
||||
|
||||
size_t total_marks_pk = 0;
|
||||
for (const auto & part : parts)
|
||||
total_marks_pk += part->index_granularity.getMarksCountWithoutFinal();
|
||||
|
||||
auto parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes(
|
||||
size_t parts_before_pk = parts.size();
|
||||
|
||||
result.parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes(
|
||||
std::move(parts),
|
||||
metadata_snapshot,
|
||||
query_info,
|
||||
@ -911,18 +920,18 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
|
||||
settings.reader_settings,
|
||||
log,
|
||||
settings.num_streams,
|
||||
index_stats,
|
||||
result.index_stats,
|
||||
true);
|
||||
|
||||
size_t sum_marks_pk = total_marks_pk;
|
||||
for (const auto & stat : index_stats)
|
||||
for (const auto & stat : result.index_stats)
|
||||
if (stat.type == IndexType::PrimaryKey)
|
||||
sum_marks_pk = stat.num_granules_after;
|
||||
|
||||
size_t sum_marks = 0;
|
||||
size_t sum_ranges = 0;
|
||||
|
||||
for (const auto & part : parts_with_ranges)
|
||||
for (const auto & part : result.parts_with_ranges)
|
||||
{
|
||||
sum_ranges += part.ranges.size();
|
||||
sum_marks += part.getMarksCount();
|
||||
@ -931,31 +940,53 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
|
||||
LOG_DEBUG(
|
||||
log,
|
||||
"Selected {}/{} parts by partition key, {} parts by primary key, {}/{} marks by primary key, {} marks to read from {} ranges",
|
||||
parts.size(),
|
||||
parts_before_pk,
|
||||
total_parts,
|
||||
parts_with_ranges.size(),
|
||||
result.parts_with_ranges.size(),
|
||||
sum_marks_pk,
|
||||
total_marks_pk,
|
||||
sum_marks,
|
||||
sum_ranges);
|
||||
|
||||
String query_id = MergeTreeDataSelectExecutor::checkLimits(data, parts_with_ranges, context);
|
||||
result.query_id = MergeTreeDataSelectExecutor::checkLimits(data, result.parts_with_ranges, context);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::SelectedParts, parts_with_ranges.size());
|
||||
ProfileEvents::increment(ProfileEvents::SelectedParts, result.parts_with_ranges.size());
|
||||
ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges);
|
||||
ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks);
|
||||
|
||||
const auto & input_order_info = query_info.input_order_info
|
||||
? query_info.input_order_info
|
||||
: (query_info.projection ? query_info.projection->input_order_info : nullptr);
|
||||
|
||||
const auto & q_settings = context->getSettingsRef();
|
||||
if ((q_settings.optimize_read_in_order || q_settings.optimize_aggregation_in_order) && input_order_info)
|
||||
result.read_type = (input_order_info->direction > 0) ? ReadType::InOrder
|
||||
: ReadType::InReverseOrder;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
auto result = selectRangesToRead(prepared_parts);
|
||||
if (result.parts_with_ranges.empty())
|
||||
{
|
||||
pipeline.init(Pipe(std::make_shared<NullSource>(getOutputStream().header)));
|
||||
return;
|
||||
}
|
||||
|
||||
/// Projection, that needed to drop columns, which have appeared by execution
|
||||
/// of some extra expressions, and to allow execute the same expressions later.
|
||||
/// NOTE: It may lead to double computation of expressions.
|
||||
ActionsDAGPtr result_projection;
|
||||
|
||||
Names column_names_to_read = real_column_names;
|
||||
if (!select.final() && sampling.use_sampling)
|
||||
Names column_names_to_read = std::move(result.column_names_to_read);
|
||||
const auto & select = query_info.query->as<ASTSelectQuery &>();
|
||||
if (!select.final() && result.sampling.use_sampling)
|
||||
{
|
||||
/// Add columns needed for `sample_by_ast` to `column_names_to_read`.
|
||||
/// Skip this if final was used, because such columns were already added from PK.
|
||||
std::vector<String> add_columns = sampling.filter_expression->getRequiredColumns().getNames();
|
||||
std::vector<String> add_columns = result.sampling.filter_expression->getRequiredColumns().getNames();
|
||||
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
|
||||
std::sort(column_names_to_read.begin(), column_names_to_read.end());
|
||||
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()),
|
||||
@ -985,7 +1016,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
|
||||
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
|
||||
|
||||
pipe = spreadMarkRangesAmongStreamsFinal(
|
||||
std::move(parts_with_ranges),
|
||||
std::move(result.parts_with_ranges),
|
||||
column_names_to_read,
|
||||
result_projection);
|
||||
}
|
||||
@ -999,7 +1030,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
|
||||
auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActionsDAG(false);
|
||||
|
||||
pipe = spreadMarkRangesAmongStreamsWithOrder(
|
||||
std::move(parts_with_ranges),
|
||||
std::move(result.parts_with_ranges),
|
||||
column_names_to_read,
|
||||
sorting_key_prefix_expr,
|
||||
result_projection,
|
||||
@ -1008,7 +1039,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
|
||||
else
|
||||
{
|
||||
pipe = spreadMarkRangesAmongStreams(
|
||||
std::move(parts_with_ranges),
|
||||
std::move(result.parts_with_ranges),
|
||||
column_names_to_read);
|
||||
}
|
||||
|
||||
@ -1018,15 +1049,15 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
|
||||
return;
|
||||
}
|
||||
|
||||
if (sampling.use_sampling)
|
||||
if (result.sampling.use_sampling)
|
||||
{
|
||||
auto sampling_actions = std::make_shared<ExpressionActions>(sampling.filter_expression);
|
||||
auto sampling_actions = std::make_shared<ExpressionActions>(result.sampling.filter_expression);
|
||||
pipe.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<FilterTransform>(
|
||||
header,
|
||||
sampling_actions,
|
||||
sampling.filter_function->getColumnName(),
|
||||
result.sampling.filter_function->getColumnName(),
|
||||
false);
|
||||
});
|
||||
}
|
||||
@ -1041,12 +1072,12 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
|
||||
}
|
||||
|
||||
/// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values.
|
||||
if (sample_factor_column_queried)
|
||||
if (result.sample_factor_column_queried)
|
||||
{
|
||||
ColumnWithTypeAndName column;
|
||||
column.name = "_sample_factor";
|
||||
column.type = std::make_shared<DataTypeFloat64>();
|
||||
column.column = column.type->createColumnConst(0, Field(sampling.used_sample_factor));
|
||||
column.column = column.type->createColumnConst(0, Field(result.sampling.used_sample_factor));
|
||||
|
||||
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
|
||||
auto adding_column_action = std::make_shared<ExpressionActions>(adding_column_dag);
|
||||
@ -1073,8 +1104,8 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
|
||||
processors.emplace_back(processor);
|
||||
|
||||
// Attach QueryIdHolder if needed
|
||||
if (!query_id.empty())
|
||||
pipe.addQueryIdHolder(std::make_shared<QueryIdHolder>(query_id, data));
|
||||
if (!result.query_id.empty())
|
||||
pipe.addQueryIdHolder(std::make_shared<QueryIdHolder>(result.query_id, data));
|
||||
|
||||
pipeline.init(std::move(pipe));
|
||||
}
|
||||
@ -1098,45 +1129,50 @@ static const char * indexTypeToString(ReadFromMergeTree::IndexType type)
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
// static const char * readTypeToString(ReadFromMergeTree::ReadType type)
|
||||
// {
|
||||
// switch (type)
|
||||
// {
|
||||
// case ReadFromMergeTree::ReadType::Default:
|
||||
// return "Default";
|
||||
// case ReadFromMergeTree::ReadType::InOrder:
|
||||
// return "InOrder";
|
||||
// case ReadFromMergeTree::ReadType::InReverseOrder:
|
||||
// return "InReverseOrder";
|
||||
// }
|
||||
static const char * readTypeToString(ReadFromMergeTree::ReadType type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case ReadFromMergeTree::ReadType::Default:
|
||||
return "Default";
|
||||
case ReadFromMergeTree::ReadType::InOrder:
|
||||
return "InOrder";
|
||||
case ReadFromMergeTree::ReadType::InReverseOrder:
|
||||
return "InReverseOrder";
|
||||
}
|
||||
|
||||
// __builtin_unreachable();
|
||||
// }
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
|
||||
{
|
||||
auto result = selectRangesToRead(prepared_parts);
|
||||
std::string prefix(format_settings.offset, format_settings.indent_char);
|
||||
//format_settings.out << prefix << "ReadType: " << readTypeToString(read_type) << '\n';
|
||||
format_settings.out << prefix << "ReadType: " << readTypeToString(result.read_type) << '\n';
|
||||
|
||||
if (!index_stats.empty())
|
||||
if (!result.index_stats.empty())
|
||||
{
|
||||
format_settings.out << prefix << "Parts: " << index_stats.back().num_parts_after << '\n';
|
||||
format_settings.out << prefix << "Granules: " << index_stats.back().num_granules_after << '\n';
|
||||
format_settings.out << prefix << "Parts: " << result.index_stats.back().num_parts_after << '\n';
|
||||
format_settings.out << prefix << "Granules: " << result.index_stats.back().num_granules_after << '\n';
|
||||
}
|
||||
}
|
||||
|
||||
void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
|
||||
{
|
||||
//map.add("Read Type", readTypeToString(read_type));
|
||||
if (!index_stats.empty())
|
||||
auto result = selectRangesToRead(prepared_parts);
|
||||
map.add("Read Type", readTypeToString(result.read_type));
|
||||
if (!result.index_stats.empty())
|
||||
{
|
||||
map.add("Parts", index_stats.back().num_parts_after);
|
||||
map.add("Granules", index_stats.back().num_granules_after);
|
||||
map.add("Parts", result.index_stats.back().num_parts_after);
|
||||
map.add("Granules", result.index_stats.back().num_granules_after);
|
||||
}
|
||||
}
|
||||
|
||||
void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const
|
||||
{
|
||||
auto result = selectRangesToRead(prepared_parts);
|
||||
auto index_stats = std::move(result.index_stats);
|
||||
|
||||
std::string prefix(format_settings.offset, format_settings.indent_char);
|
||||
if (!index_stats.empty())
|
||||
{
|
||||
@ -1186,6 +1222,9 @@ void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const
|
||||
|
||||
void ReadFromMergeTree::describeIndexes(JSONBuilder::JSONMap & map) const
|
||||
{
|
||||
auto result = selectRangesToRead(prepared_parts);
|
||||
auto index_stats = std::move(result.index_stats);
|
||||
|
||||
if (!index_stats.empty())
|
||||
{
|
||||
/// Do not print anything if no indexes is applied.
|
||||
|
@ -72,7 +72,7 @@ public:
|
||||
|
||||
ReadFromMergeTree(
|
||||
const SelectQueryInfo & query_info_,
|
||||
const PartitionIdToMaxBlock * max_block_numbers_to_read_,
|
||||
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
|
||||
ContextPtr context_,
|
||||
const MergeTreeData & data_,
|
||||
StorageMetadataPtr metadata_snapshot_,
|
||||
@ -97,7 +97,7 @@ public:
|
||||
|
||||
private:
|
||||
SelectQueryInfo query_info;
|
||||
const PartitionIdToMaxBlock * max_block_numbers_to_read;
|
||||
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read;
|
||||
ContextPtr context;
|
||||
const MergeTreeData & data;
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
@ -106,7 +106,6 @@ private:
|
||||
Names real_column_names;
|
||||
MergeTreeData::DataPartsVector prepared_parts;
|
||||
PrewhereInfoPtr prewhere_info;
|
||||
IndexStats index_stats;
|
||||
Names virt_column_names;
|
||||
Settings settings;
|
||||
|
||||
@ -134,6 +133,9 @@ private:
|
||||
RangesInDataParts && parts,
|
||||
const Names & column_names,
|
||||
ActionsDAGPtr & out_projection);
|
||||
|
||||
struct AnalysisResult;
|
||||
AnalysisResult selectRangesToRead(MergeTreeData::DataPartsVector parts) const;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -3818,7 +3818,7 @@ static void selectBestProjection(
|
||||
const Names & required_columns,
|
||||
ProjectionCandidate & candidate,
|
||||
ContextPtr query_context,
|
||||
const PartitionIdToMaxBlock * max_added_blocks,
|
||||
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks,
|
||||
const Settings & settings,
|
||||
const MergeTreeData::DataPartsVector & parts,
|
||||
ProjectionCandidate *& selected_candidate,
|
||||
@ -4097,11 +4097,11 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
||||
// First build a MergeTreeDataSelectCache to check if a projection is indeed better than base
|
||||
// query_info.merge_tree_data_select_cache = std::make_unique<MergeTreeDataSelectCache>();
|
||||
|
||||
std::unique_ptr<PartitionIdToMaxBlock> max_added_blocks;
|
||||
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks;
|
||||
if (settings.select_sequential_consistency)
|
||||
{
|
||||
if (const StorageReplicatedMergeTree * replicated = dynamic_cast<const StorageReplicatedMergeTree *>(this))
|
||||
max_added_blocks = std::make_unique<PartitionIdToMaxBlock>(replicated->getMaxAddedBlocks());
|
||||
max_added_blocks = std::make_shared<PartitionIdToMaxBlock>(replicated->getMaxAddedBlocks());
|
||||
}
|
||||
|
||||
auto parts = getDataPartsVector();
|
||||
@ -4122,7 +4122,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
||||
analysis_result.required_columns,
|
||||
candidate,
|
||||
query_context,
|
||||
max_added_blocks.get(),
|
||||
max_added_blocks,
|
||||
settings,
|
||||
parts,
|
||||
selected_candidate,
|
||||
@ -4143,7 +4143,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
||||
query_info, // TODO syntax_analysis_result set in index
|
||||
query_context,
|
||||
settings.max_threads,
|
||||
max_added_blocks.get());
|
||||
max_added_blocks);
|
||||
|
||||
// Add 1 to base sum_marks so that we prefer projections even when they have equal number of marks to read.
|
||||
// NOTE: It is not clear if we need it. E.g. projections do not support skip index for now.
|
||||
@ -4160,7 +4160,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
||||
analysis_result.required_columns,
|
||||
candidate,
|
||||
query_context,
|
||||
max_added_blocks.get(),
|
||||
max_added_blocks,
|
||||
settings,
|
||||
parts,
|
||||
selected_candidate,
|
||||
|
@ -139,7 +139,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
|
||||
const UInt64 max_block_size,
|
||||
const unsigned num_streams,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
|
||||
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read) const
|
||||
{
|
||||
const auto & settings = context->getSettingsRef();
|
||||
auto parts = data.getDataPartsVector();
|
||||
@ -670,8 +670,8 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const MergeTreeData & data,
|
||||
const SelectQueryInfo & query_info,
|
||||
ContextPtr & context,
|
||||
ContextPtr & query_context,
|
||||
const ContextPtr & context,
|
||||
const ContextPtr & query_context,
|
||||
MergeTreeData::DataPartsVector & parts,
|
||||
const std::optional<std::unordered_set<String>> & part_values,
|
||||
const PartitionIdToMaxBlock * max_block_numbers_to_read,
|
||||
@ -766,7 +766,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
|
||||
MergeTreeData::DataPartsVector && parts,
|
||||
StorageMetadataPtr metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
ContextPtr & context,
|
||||
const ContextPtr & context,
|
||||
KeyCondition & key_condition,
|
||||
const MergeTreeReaderSettings & reader_settings,
|
||||
Poco::Logger * log,
|
||||
@ -993,7 +993,10 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
|
||||
return parts_with_ranges;
|
||||
}
|
||||
|
||||
String MergeTreeDataSelectExecutor::checkLimits(const MergeTreeData & data, const RangesInDataParts & parts_with_ranges, ContextPtr & context)
|
||||
String MergeTreeDataSelectExecutor::checkLimits(
|
||||
const MergeTreeData & data,
|
||||
const RangesInDataParts & parts_with_ranges,
|
||||
const ContextPtr & context)
|
||||
{
|
||||
const auto & settings = context->getSettingsRef();
|
||||
// Check limitations. query_id is used as the quota RAII's resource key.
|
||||
@ -1092,7 +1095,7 @@ size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead(
|
||||
const SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
unsigned num_streams,
|
||||
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
|
||||
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read) const
|
||||
{
|
||||
size_t total_parts = parts.size();
|
||||
if (total_parts == 0)
|
||||
@ -1137,7 +1140,7 @@ size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead(
|
||||
ReadFromMergeTree::IndexStats index_stats;
|
||||
|
||||
filterPartsByPartition(
|
||||
metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read, log, index_stats);
|
||||
metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read.get(), log, index_stats);
|
||||
|
||||
auto sampling = MergeTreeDataSelectExecutor::getSampling(
|
||||
select, parts, metadata_snapshot, key_condition,
|
||||
@ -1173,7 +1176,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
|
||||
ContextPtr context,
|
||||
const UInt64 max_block_size,
|
||||
const unsigned num_streams,
|
||||
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
|
||||
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read) const
|
||||
{
|
||||
size_t total_parts = parts.size();
|
||||
if (total_parts == 0)
|
||||
@ -1207,6 +1210,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
|
||||
.preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes,
|
||||
//.min_marks_for_concurrent_read = settings.min_marks_for_concurrent_read,
|
||||
.use_uncompressed_cache = settings.use_uncompressed_cache,
|
||||
.force_primary_key = settings.force_primary_key,
|
||||
.reader_settings = reader_settings,
|
||||
.backoff_settings = MergeTreeReadPool::BackoffSettings(settings),
|
||||
};
|
||||
|
@ -53,7 +53,7 @@ public:
|
||||
UInt64 max_block_size,
|
||||
unsigned num_streams,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
|
||||
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr) const;
|
||||
|
||||
size_t estimateNumMarksToRead(
|
||||
MergeTreeData::DataPartsVector parts,
|
||||
@ -63,7 +63,7 @@ public:
|
||||
const SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
unsigned num_streams,
|
||||
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
|
||||
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr) const;
|
||||
|
||||
QueryPlanPtr readFromParts(
|
||||
MergeTreeData::DataPartsVector parts,
|
||||
@ -74,7 +74,7 @@ public:
|
||||
ContextPtr context,
|
||||
UInt64 max_block_size,
|
||||
unsigned num_streams,
|
||||
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
|
||||
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr) const;
|
||||
|
||||
private:
|
||||
const MergeTreeData & data;
|
||||
@ -196,8 +196,8 @@ public:
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const MergeTreeData & data,
|
||||
const SelectQueryInfo & query_info,
|
||||
ContextPtr & context,
|
||||
ContextPtr & query_context,
|
||||
const ContextPtr & context,
|
||||
const ContextPtr & query_context,
|
||||
MergeTreeData::DataPartsVector & parts,
|
||||
const std::optional<std::unordered_set<String>> & part_values,
|
||||
const PartitionIdToMaxBlock * max_block_numbers_to_read,
|
||||
@ -208,7 +208,7 @@ public:
|
||||
MergeTreeData::DataPartsVector && parts,
|
||||
StorageMetadataPtr metadata_snapshot,
|
||||
const SelectQueryInfo & query_info,
|
||||
ContextPtr & context,
|
||||
const ContextPtr & context,
|
||||
KeyCondition & key_condition,
|
||||
const MergeTreeReaderSettings & reader_settings,
|
||||
Poco::Logger * log,
|
||||
@ -227,7 +227,7 @@ public:
|
||||
NamesAndTypesList available_real_columns,
|
||||
ContextPtr context);
|
||||
|
||||
static String checkLimits(const MergeTreeData & data, const RangesInDataParts & parts_with_ranges, ContextPtr & context);
|
||||
static String checkLimits(const MergeTreeData & data, const RangesInDataParts & parts_with_ranges, const ContextPtr & context);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -4377,9 +4377,9 @@ void StorageReplicatedMergeTree::read(
|
||||
*/
|
||||
if (local_context->getSettingsRef().select_sequential_consistency)
|
||||
{
|
||||
auto max_added_blocks = getMaxAddedBlocks();
|
||||
auto max_added_blocks = std::make_shared<ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock>(getMaxAddedBlocks());
|
||||
if (auto plan = reader.read(
|
||||
column_names, metadata_snapshot, query_info, local_context, max_block_size, num_streams, processed_stage, &max_added_blocks))
|
||||
column_names, metadata_snapshot, query_info, local_context, max_block_size, num_streams, processed_stage, std::move(max_added_blocks)))
|
||||
query_plan = std::move(*plan);
|
||||
return;
|
||||
}
|
||||
|
@ -8,7 +8,7 @@ INSERT INTO xp SELECT '2020-01-01', number, '' FROM numbers(100000);
|
||||
|
||||
CREATE TABLE xp_d AS xp ENGINE = Distributed(test_shard_localhost, currentDatabase(), xp);
|
||||
|
||||
SELECT count(7 = (SELECT number FROM numbers(0) ORDER BY number ASC NULLS FIRST LIMIT 7)) FROM xp_d PREWHERE toYYYYMM(A) GLOBAL IN (SELECT NULL = (SELECT number FROM numbers(1) ORDER BY number DESC NULLS LAST LIMIT 1), toYYYYMM(min(A)) FROM xp_d) WHERE B > NULL; -- { serverError 20 }
|
||||
SELECT count(7 = (SELECT number FROM numbers(0) ORDER BY number ASC NULLS FIRST LIMIT 7)) FROM xp_d PREWHERE toYYYYMM(A) GLOBAL IN (SELECT NULL = (SELECT number FROM numbers(1) ORDER BY number DESC NULLS LAST LIMIT 1), toYYYYMM(min(A)) FROM xp_d) WHERE B > NULL; -- { serverError 8 }
|
||||
|
||||
SELECT count() FROM xp_d WHERE A GLOBAL IN (SELECT NULL); -- { serverError 53 }
|
||||
|
||||
|
@ -1,13 +1,10 @@
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(SettingQuotaAndLimits)
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(MergingFinal)
|
||||
ReplacingSorted 2 → 1
|
||||
(Expression)
|
||||
ExpressionTransform × 2
|
||||
(ReadFromMergeTree)
|
||||
ExpressionTransform
|
||||
ReplacingSorted 2 → 1
|
||||
ExpressionTransform × 2
|
||||
MergeTree × 2 0 → 1
|
||||
0 0
|
||||
1 1
|
||||
@ -19,13 +16,10 @@ ExpressionTransform
|
||||
(Expression)
|
||||
ExpressionTransform × 2
|
||||
(SettingQuotaAndLimits)
|
||||
(Expression)
|
||||
(ReadFromMergeTree)
|
||||
ExpressionTransform × 2
|
||||
(MergingFinal)
|
||||
ReplacingSorted × 2 2 → 1
|
||||
Copy × 2 1 → 2
|
||||
AddingSelector × 2
|
||||
(Expression)
|
||||
ExpressionTransform × 2
|
||||
(ReadFromMergeTree)
|
||||
MergeTree × 2 0 → 1
|
||||
|
Loading…
Reference in New Issue
Block a user