ClickHouse/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp

1628 lines
61 KiB
C++
Raw Normal View History

2017-03-12 19:18:07 +00:00
#include <boost/rational.hpp> /// For calculations related to sampling coefficients.
#include <ext/scope_guard.h>
#include <optional>
2019-01-07 12:51:14 +00:00
#include <Poco/File.h>
#include <Common/FieldVisitors.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
2019-10-01 16:50:08 +00:00
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
2019-10-01 16:50:08 +00:00
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
2019-01-17 12:11:36 +00:00
#include <Storages/MergeTree/MergeTreeIndices.h>
2019-01-07 12:51:14 +00:00
#include <Storages/MergeTree/MergeTreeIndexReader.h>
#include <Storages/MergeTree/KeyCondition.h>
2019-12-10 23:18:24 +00:00
#include <Storages/ReadInOrderOptimizer.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSampleRatio.h>
2019-08-14 14:06:16 +00:00
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/Context.h>
2017-12-01 20:38:50 +00:00
/// Allow to use __uint128_t as a template parameter for boost::rational.
// https://stackoverflow.com/questions/41198673/uint128-t-not-working-with-clang-and-libstdc
2017-12-25 17:16:29 +00:00
#if !defined(__GLIBCXX_BITSIZE_INT_N_0) && defined(__SIZEOF_INT128__)
2017-12-01 20:38:50 +00:00
namespace std
{
template <>
struct numeric_limits<__uint128_t>
{
static constexpr bool is_specialized = true;
static constexpr bool is_signed = false;
static constexpr bool is_integer = true;
static constexpr int radix = 2;
2017-12-09 17:16:24 +00:00
static constexpr int digits = 128;
2020-08-07 01:27:29 +00:00
static constexpr int digits10 = 38;
2017-12-01 20:38:50 +00:00
static constexpr __uint128_t min () { return 0; } // used in boost 1.65.1+
2018-07-16 13:44:24 +00:00
static constexpr __uint128_t max () { return __uint128_t(0) - 1; } // used in boost 1.68.0+
2017-12-01 20:38:50 +00:00
};
}
#endif
2017-12-01 20:38:50 +00:00
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeEnum.h>
2020-04-22 13:52:07 +00:00
#include <DataTypes/DataTypesNumber.h>
#include <Processors/ConcatProcessor.h>
2020-04-14 10:04:49 +00:00
#include <Processors/Merges/AggregatingSortedTransform.h>
2020-04-22 13:52:07 +00:00
#include <Processors/Merges/CollapsingSortedTransform.h>
#include <Processors/Merges/MergingSortedTransform.h>
2020-04-14 10:04:49 +00:00
#include <Processors/Merges/ReplacingSortedTransform.h>
2020-04-22 13:52:07 +00:00
#include <Processors/Merges/SummingSortedTransform.h>
2020-04-14 10:04:49 +00:00
#include <Processors/Merges/VersionedCollapsingTransform.h>
2019-10-01 16:50:08 +00:00
#include <Processors/Sources/SourceFromInputStream.h>
2020-04-22 13:52:07 +00:00
#include <Processors/Transforms/AddingConstColumnTransform.h>
#include <Processors/Transforms/AddingSelectorTransform.h>
#include <Processors/Transforms/CopyTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Storages/VirtualColumnUtils.h>
namespace ProfileEvents
{
extern const Event SelectedParts;
extern const Event SelectedRanges;
extern const Event SelectedMarks;
}
2014-03-13 12:48:07 +00:00
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int LOGICAL_ERROR;
extern const int INDEX_NOT_USED;
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
extern const int ILLEGAL_COLUMN;
extern const int ARGUMENT_OUT_OF_BOUND;
}
2018-10-17 03:13:00 +00:00
MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & data_)
2020-05-30 21:57:37 +00:00
: data(data_), log(&Poco::Logger::get(data.getLogName() + " (SelectExecutor)"))
2014-03-13 12:48:07 +00:00
{
}
/// Construct a block consisting only of possible values of virtual columns
2016-03-05 03:17:11 +00:00
static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts)
2014-07-28 10:36:11 +00:00
{
auto column = ColumnString::create();
2014-07-28 10:36:11 +00:00
for (const auto & part : parts)
column->insert(part->name);
2014-07-28 10:36:11 +00:00
return Block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "_part")};
2014-07-28 10:36:11 +00:00
}
size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
2020-06-17 12:39:20 +00:00
const MergeTreeData::DataPartsVector & parts,
const StorageMetadataPtr & metadata_snapshot,
const KeyCondition & key_condition,
const Settings & settings) const
{
2019-03-25 13:55:24 +00:00
size_t rows_count = 0;
/// We will find out how many rows we would have read without sampling.
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Preliminary index scan with condition: {}", key_condition.toString());
2020-03-09 01:59:08 +00:00
for (const auto & part : parts)
{
2020-07-20 15:09:00 +00:00
MarkRanges ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings, log);
/** In order to get a lower bound on the number of rows that match the condition on PK,
* consider only guaranteed full marks.
* That is, do not take into account the first and last marks, which may be incomplete.
*/
2020-03-09 01:59:08 +00:00
for (const auto & range : ranges)
if (range.end - range.begin > 2)
rows_count += part->index_granularity.getRowsCountInRange({range.begin + 1, range.end - 1});
2019-03-25 13:55:24 +00:00
}
2019-03-25 13:55:24 +00:00
return rows_count;
}
using RelativeSize = boost::rational<ASTSampleRatio::BigNum>;
2019-12-15 06:34:43 +00:00
static std::string toString(const RelativeSize & x)
{
return ASTSampleRatio::toString(x.numerator()) + "/" + ASTSampleRatio::toString(x.denominator());
}
2017-03-12 19:18:07 +00:00
/// Converts sample size to an approximate number of rows (ex. `SAMPLE 1000000`) to relative value (ex. `SAMPLE 0.1`).
static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, size_t approx_total_rows)
{
if (approx_total_rows == 0)
return 1;
const auto & node_sample = node->as<ASTSampleRatio &>();
auto absolute_sample_size = node_sample.ratio.numerator / node_sample.ratio.denominator;
return std::min(RelativeSize(1), RelativeSize(absolute_sample_size) / RelativeSize(approx_total_rows));
}
Pipes MergeTreeDataSelectExecutor::read(
const Names & column_names_to_return,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
2019-02-10 16:55:12 +00:00
const UInt64 max_block_size,
2017-06-02 15:54:39 +00:00
const unsigned num_streams,
2018-10-10 16:20:15 +00:00
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
2014-03-13 12:48:07 +00:00
{
return readFromParts(
data.getDataPartsVector(), column_names_to_return, metadata_snapshot,
query_info, context, max_block_size, num_streams,
max_block_numbers_to_read);
}
Pipes MergeTreeDataSelectExecutor::readFromParts(
MergeTreeData::DataPartsVector parts,
const Names & column_names_to_return,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
2019-02-10 16:55:12 +00:00
const UInt64 max_block_size,
const unsigned num_streams,
2018-10-10 16:20:15 +00:00
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
{
/// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it.
2017-08-29 14:08:09 +00:00
/// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query.
Names virt_column_names;
Names real_column_names;
bool part_column_queried = false;
bool sample_factor_column_queried = false;
Float64 used_sample_factor = 1;
for (const String & name : column_names_to_return)
{
if (name == "_part")
{
part_column_queried = true;
virt_column_names.push_back(name);
}
else if (name == "_part_index")
{
virt_column_names.push_back(name);
}
else if (name == "_partition_id")
{
virt_column_names.push_back(name);
}
else if (name == "_sample_factor")
{
sample_factor_column_queried = true;
virt_column_names.push_back(name);
}
else
{
real_column_names.push_back(name);
}
}
NamesAndTypesList available_real_columns = metadata_snapshot->getColumns().getAllPhysical();
/// If there are only virtual columns in the query, you must request at least one non-virtual one.
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
/// If `_part` virtual column is requested, we try to use it as an index.
Block virtual_columns_block = getBlockWithPartColumn(parts);
if (part_column_queried)
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context);
std::multiset<String> part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
2020-06-19 17:17:13 +00:00
metadata_snapshot->check(real_column_names, data.getVirtuals(), data.getStorageID());
const Settings & settings = context.getSettingsRef();
2020-06-17 12:39:20 +00:00
const auto & primary_key = metadata_snapshot->getPrimaryKey();
2020-05-21 19:46:03 +00:00
Names primary_key_columns = primary_key.column_names;
2020-05-21 19:46:03 +00:00
KeyCondition key_condition(query_info, context, primary_key_columns, primary_key.expression);
if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue())
{
std::stringstream exception_message;
exception_message << "Primary key (";
for (size_t i = 0, size = primary_key_columns.size(); i < size; ++i)
exception_message << (i == 0 ? "" : ", ") << primary_key_columns[i];
exception_message << ") is not used and setting 'force_primary_key' is set.";
throw Exception(exception_message.str(), ErrorCodes::INDEX_NOT_USED);
}
std::optional<KeyCondition> minmax_idx_condition;
if (data.minmax_idx_expr)
{
minmax_idx_condition.emplace(query_info, context, data.minmax_idx_columns, data.minmax_idx_expr);
if (settings.force_index_by_date && minmax_idx_condition->alwaysUnknownOrTrue())
{
String msg = "MinMax index by columns (";
bool first = true;
for (const String & col : data.minmax_idx_columns)
{
if (first)
first = false;
else
msg += ", ";
msg += col;
}
msg += ") is not used and setting 'force_index_by_date' is set";
throw Exception(msg, ErrorCodes::INDEX_NOT_USED);
}
}
/// Select the parts in which there can be data that satisfy `minmax_idx_condition` and that match the condition on `_part`,
/// as well as `max_block_number_to_read`.
{
auto prev_parts = parts;
parts.clear();
for (const auto & part : prev_parts)
{
if (part_values.find(part->name) == part_values.end())
continue;
2018-05-23 19:34:37 +00:00
if (part->isEmpty())
continue;
2020-03-10 14:56:55 +00:00
if (minmax_idx_condition && !minmax_idx_condition->checkInHyperrectangle(
part->minmax_idx.hyperrectangle, data.minmax_idx_column_types).can_be_true)
continue;
2018-10-10 16:20:15 +00:00
if (max_block_numbers_to_read)
2018-09-24 09:53:28 +00:00
{
2018-10-10 16:20:15 +00:00
auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id);
if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second)
2018-09-24 09:53:28 +00:00
continue;
}
parts.push_back(part);
}
}
/// Sampling.
Names column_names_to_read = real_column_names;
std::shared_ptr<ASTFunction> filter_function;
ExpressionActionsPtr filter_expression;
RelativeSize relative_sample_size = 0;
RelativeSize relative_sample_offset = 0;
const auto & select = query_info.query->as<ASTSelectQuery &>();
auto select_sample_size = select.sampleSize();
auto select_sample_offset = select.sampleOffset();
if (select_sample_size)
{
relative_sample_size.assign(
select_sample_size->as<ASTSampleRatio &>().ratio.numerator,
select_sample_size->as<ASTSampleRatio &>().ratio.denominator);
if (relative_sample_size < 0)
throw Exception("Negative sample size", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
relative_sample_offset = 0;
if (select_sample_offset)
relative_sample_offset.assign(
select_sample_offset->as<ASTSampleRatio &>().ratio.numerator,
select_sample_offset->as<ASTSampleRatio &>().ratio.denominator);
if (relative_sample_offset < 0)
throw Exception("Negative sample offset", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
/// Convert absolute value of the sampling (in form `SAMPLE 1000000` - how many rows to read) into the relative `SAMPLE 0.1` (how much data to read).
size_t approx_total_rows = 0;
if (relative_sample_size > 1 || relative_sample_offset > 1)
2020-06-17 12:39:20 +00:00
approx_total_rows = getApproximateTotalRowsToRead(parts, metadata_snapshot, key_condition, settings);
if (relative_sample_size > 1)
{
relative_sample_size = convertAbsoluteSampleSizeToRelative(select_sample_size, approx_total_rows);
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Selected relative sample size: {}", toString(relative_sample_size));
}
/// SAMPLE 1 is the same as the absence of SAMPLE.
if (relative_sample_size == RelativeSize(1))
relative_sample_size = 0;
if (relative_sample_offset > 0 && RelativeSize(0) == relative_sample_size)
throw Exception("Sampling offset is incorrect because no sampling", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (relative_sample_offset > 1)
{
relative_sample_offset = convertAbsoluteSampleSizeToRelative(select_sample_offset, approx_total_rows);
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Selected relative sample offset: {}", toString(relative_sample_offset));
}
}
/** Which range of sampling key values do I need to read?
* First, in the whole range ("universe") we select the interval
* of relative `relative_sample_size` size, offset from the beginning by `relative_sample_offset`.
*
* Example: SAMPLE 0.4 OFFSET 0.3
*
* [------********------]
* ^ - offset
* <------> - size
*
* If the interval passes through the end of the universe, then cut its right side.
*
* Example: SAMPLE 0.4 OFFSET 0.8
*
* [----------------****]
* ^ - offset
* <------> - size
*
* Next, if the `parallel_replicas_count`, `parallel_replica_offset` settings are set,
* then it is necessary to break the received interval into pieces of the number `parallel_replicas_count`,
* and select a piece with the number `parallel_replica_offset` (from zero).
*
* Example: SAMPLE 0.4 OFFSET 0.3, parallel_replicas_count = 2, parallel_replica_offset = 1
*
* [----------****------]
* ^ - offset
* <------> - size
* <--><--> - pieces for different `parallel_replica_offset`, select the second one.
*
* It is very important that the intervals for different `parallel_replica_offset` cover the entire range without gaps and overlaps.
* It is also important that the entire universe can be covered using SAMPLE 0.1 OFFSET 0, ... OFFSET 0.9 and similar decimals.
*/
bool use_sampling = relative_sample_size > 0 || (settings.parallel_replicas_count > 1 && data.supportsSampling());
bool no_data = false; /// There is nothing left after sampling.
if (use_sampling)
{
if (sample_factor_column_queried && relative_sample_size != RelativeSize(0))
used_sample_factor = 1.0 / boost::rational_cast<Float64>(relative_sample_size);
RelativeSize size_of_universum = 0;
const auto & sampling_key = metadata_snapshot->getSamplingKey();
2020-05-20 15:16:39 +00:00
DataTypePtr sampling_column_type = sampling_key.data_types[0];
2019-11-25 13:35:28 +00:00
if (typeid_cast<const DataTypeUInt64 *>(sampling_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt64>::max()) + RelativeSize(1);
2019-11-25 13:35:28 +00:00
else if (typeid_cast<const DataTypeUInt32 *>(sampling_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1);
2019-11-25 13:35:28 +00:00
else if (typeid_cast<const DataTypeUInt16 *>(sampling_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt16>::max()) + RelativeSize(1);
2019-11-25 13:35:28 +00:00
else if (typeid_cast<const DataTypeUInt8 *>(sampling_column_type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt8>::max()) + RelativeSize(1);
else
2019-11-25 13:35:28 +00:00
throw Exception("Invalid sampling column type in storage parameters: " + sampling_column_type->getName() + ". Must be unsigned integer type.",
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
if (settings.parallel_replicas_count > 1)
{
if (relative_sample_size == RelativeSize(0))
relative_sample_size = 1;
2019-08-13 11:24:18 +00:00
relative_sample_size /= settings.parallel_replicas_count.value;
relative_sample_offset += relative_sample_size * RelativeSize(settings.parallel_replica_offset.value);
}
if (relative_sample_offset >= RelativeSize(1))
no_data = true;
/// Calculate the half-interval of `[lower, upper)` column values.
bool has_lower_limit = false;
bool has_upper_limit = false;
RelativeSize lower_limit_rational = relative_sample_offset * size_of_universum;
RelativeSize upper_limit_rational = (relative_sample_offset + relative_sample_size) * size_of_universum;
UInt64 lower = boost::rational_cast<ASTSampleRatio::BigNum>(lower_limit_rational);
UInt64 upper = boost::rational_cast<ASTSampleRatio::BigNum>(upper_limit_rational);
if (lower > 0)
has_lower_limit = true;
if (upper_limit_rational < size_of_universum)
has_upper_limit = true;
/*std::cerr << std::fixed << std::setprecision(100)
<< "relative_sample_size: " << relative_sample_size << "\n"
<< "relative_sample_offset: " << relative_sample_offset << "\n"
<< "lower_limit_float: " << lower_limit_rational << "\n"
<< "upper_limit_float: " << upper_limit_rational << "\n"
<< "lower: " << lower << "\n"
<< "upper: " << upper << "\n";*/
if ((has_upper_limit && upper == 0)
|| (has_lower_limit && has_upper_limit && lower == upper))
no_data = true;
if (no_data || (!has_lower_limit && !has_upper_limit))
{
use_sampling = false;
}
else
{
/// Let's add the conditions to cut off something else when the index is scanned again and when the request is processed.
std::shared_ptr<ASTFunction> lower_function;
std::shared_ptr<ASTFunction> upper_function;
/// If sample and final are used together no need to calculate sampling expression twice.
/// The first time it was calculated for final, because sample key is a part of the PK.
/// So, assume that we already have calculated column.
ASTPtr sampling_key_ast = metadata_snapshot->getSamplingKeyAST();
2020-05-20 15:16:39 +00:00
if (select.final())
{
2020-05-21 19:46:03 +00:00
sampling_key_ast = std::make_shared<ASTIdentifier>(sampling_key.column_names[0]);
/// We do spoil available_real_columns here, but it is not used later.
2020-05-21 19:46:03 +00:00
available_real_columns.emplace_back(sampling_key.column_names[0], std::move(sampling_column_type));
}
if (has_lower_limit)
{
2020-05-21 19:46:03 +00:00
if (!key_condition.addCondition(sampling_key.column_names[0], Range::createLeftBounded(lower, true)))
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(sampling_key_ast);
2018-02-26 03:37:08 +00:00
args->children.push_back(std::make_shared<ASTLiteral>(lower));
lower_function = std::make_shared<ASTFunction>();
lower_function->name = "greaterOrEquals";
lower_function->arguments = args;
lower_function->children.push_back(lower_function->arguments);
filter_function = lower_function;
}
if (has_upper_limit)
{
2020-05-21 19:46:03 +00:00
if (!key_condition.addCondition(sampling_key.column_names[0], Range::createRightBounded(upper, false)))
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(sampling_key_ast);
2018-02-26 03:37:08 +00:00
args->children.push_back(std::make_shared<ASTLiteral>(upper));
upper_function = std::make_shared<ASTFunction>();
upper_function->name = "less";
upper_function->arguments = args;
upper_function->children.push_back(upper_function->arguments);
filter_function = upper_function;
}
if (has_lower_limit && has_upper_limit)
{
ASTPtr args = std::make_shared<ASTExpressionList>();
args->children.push_back(lower_function);
args->children.push_back(upper_function);
filter_function = std::make_shared<ASTFunction>();
filter_function->name = "and";
filter_function->arguments = args;
filter_function->children.push_back(filter_function->arguments);
}
ASTPtr query = filter_function;
auto syntax_result = TreeRewriter(context).analyze(query, available_real_columns);
filter_expression = ExpressionAnalyzer(filter_function, syntax_result, context).getActions(false);
if (!select.final())
{
/// Add columns needed for `sample_by_ast` to `column_names_to_read`.
/// Skip this if final was used, because such columns were already added from PK.
std::vector<String> add_columns = filter_expression->getRequiredColumns();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
std::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()),
column_names_to_read.end());
}
}
}
if (no_data)
{
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Sampling yields no data.");
return {};
}
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Key condition: {}", key_condition.toString());
if (minmax_idx_condition)
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "MinMax index condition: {}", minmax_idx_condition->toString());
MergeTreeReaderSettings reader_settings =
{
.min_bytes_to_use_direct_io = settings.min_bytes_to_use_direct_io,
.min_bytes_to_use_mmap_io = settings.min_bytes_to_use_mmap_io,
.max_read_buffer_size = settings.max_read_buffer_size,
.save_marks_in_cache = true
};
/// PREWHERE
String prewhere_column;
if (select.prewhere())
prewhere_column = select.prewhere()->getColumnName();
2019-06-19 15:30:48 +00:00
std::vector<std::pair<MergeTreeIndexPtr, MergeTreeIndexConditionPtr>> useful_indices;
2020-05-28 12:37:05 +00:00
for (const auto & index : metadata_snapshot->getSecondaryIndices())
2019-02-05 15:22:47 +00:00
{
2020-05-28 12:37:05 +00:00
auto index_helper = MergeTreeIndexFactory::instance().get(index);
auto condition = index_helper->createIndexCondition(query_info, context);
2019-02-05 15:22:47 +00:00
if (!condition->alwaysUnknownOrTrue())
2020-05-28 12:37:05 +00:00
useful_indices.emplace_back(index_helper, condition);
2019-02-05 15:22:47 +00:00
}
2020-07-20 15:09:00 +00:00
RangesInDataParts parts_with_ranges(parts.size());
size_t sum_marks = 0;
2020-07-28 16:58:19 +00:00
std::atomic<size_t> sum_marks_pk = 0;
size_t sum_ranges = 0;
2020-07-20 15:09:00 +00:00
/// Let's find what range to read from each part.
{
auto process_part = [&](size_t part_index)
2019-11-05 17:42:35 +00:00
{
auto & part = parts[part_index];
2020-07-20 15:09:00 +00:00
RangesInDataPart ranges(part, part_index);
2020-07-20 15:09:00 +00:00
if (metadata_snapshot->hasPrimaryKey())
ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings, log);
else
2019-11-05 17:42:35 +00:00
{
size_t total_marks_count = part->getMarksCount();
if (total_marks_count)
{
if (part->index_granularity.hasFinalMark())
--total_marks_count;
ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}};
}
2019-11-05 17:42:35 +00:00
}
2020-07-28 16:58:19 +00:00
sum_marks_pk.fetch_add(ranges.getMarksCount(), std::memory_order_relaxed);
for (const auto & index_and_condition : useful_indices)
ranges.ranges = filterMarksUsingIndex(
2020-07-20 15:09:00 +00:00
index_and_condition.first, index_and_condition.second, part, ranges.ranges, settings, reader_settings, log);
if (!ranges.ranges.empty())
parts_with_ranges[part_index] = std::move(ranges);
};
size_t num_threads = std::min(size_t(num_streams), parts.size());
if (num_threads <= 1)
{
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
process_part(part_index);
2020-07-20 15:09:00 +00:00
}
else
{
/// Parallel loading of data parts.
ThreadPool pool(num_threads);
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()] {
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachTo(thread_group);
process_part(part_index);
});
pool.wait();
}
2020-07-20 15:09:00 +00:00
/// Skip empty ranges.
size_t next_part = 0;
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
{
auto & part = parts_with_ranges[part_index];
if (!part.data_part)
continue;
2020-07-20 15:09:00 +00:00
sum_ranges += part.ranges.size();
sum_marks += part.getMarksCount();
if (next_part != part_index)
std::swap(parts_with_ranges[next_part], part);
++next_part;
}
2020-07-21 14:08:18 +00:00
parts_with_ranges.resize(next_part);
}
2020-07-28 16:58:19 +00:00
LOG_DEBUG(log, "Selected {} parts by date, {} parts by key, {} marks by primary key, {} marks to read from {} ranges", parts.size(), parts_with_ranges.size(), sum_marks_pk.load(std::memory_order_relaxed), sum_marks, sum_ranges);
if (parts_with_ranges.empty())
return {};
ProfileEvents::increment(ProfileEvents::SelectedParts, parts_with_ranges.size());
ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges);
ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks);
Pipes res;
2020-05-12 18:22:58 +00:00
/// Projection, that needed to drop columns, which have appeared by execution
/// of some extra expressions, and to allow execute the same expressions later.
/// NOTE: It may lead to double computation of expressions.
ExpressionActionsPtr result_projection;
if (select.final())
{
/// Add columns needed to calculate the sorting expression and the sign.
2020-06-17 11:05:11 +00:00
std::vector<String> add_columns = metadata_snapshot->getColumnsRequiredForSortingKey();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
if (!data.merging_params.sign_column.empty())
column_names_to_read.push_back(data.merging_params.sign_column);
if (!data.merging_params.version_column.empty())
column_names_to_read.push_back(data.merging_params.version_column);
std::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
2017-06-02 15:54:39 +00:00
res = spreadMarkRangesAmongStreamsFinal(
2017-11-24 23:03:58 +00:00
std::move(parts_with_ranges),
2020-04-22 13:52:07 +00:00
num_streams,
column_names_to_read,
metadata_snapshot,
max_block_size,
settings.use_uncompressed_cache,
query_info,
virt_column_names,
2019-10-10 16:30:30 +00:00
settings,
2020-05-12 18:22:58 +00:00
reader_settings,
result_projection);
}
2020-05-13 13:49:10 +00:00
else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order) && query_info.input_order_info)
2019-05-18 12:21:40 +00:00
{
2020-05-13 13:49:10 +00:00
size_t prefix_size = query_info.input_order_info->order_key_prefix_descr.size();
2020-06-17 11:05:11 +00:00
auto order_key_prefix_ast = metadata_snapshot->getSortingKey().expression_list_ast->clone();
2019-07-28 00:41:26 +00:00
order_key_prefix_ast->children.resize(prefix_size);
auto syntax_result = TreeRewriter(context).analyze(order_key_prefix_ast, metadata_snapshot->getColumns().getAllPhysical());
2019-07-28 00:41:26 +00:00
auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActions(false);
res = spreadMarkRangesAmongStreamsWithOrder(
2019-05-18 12:21:40 +00:00
std::move(parts_with_ranges),
num_streams,
2019-05-18 12:21:40 +00:00
column_names_to_read,
metadata_snapshot,
2019-05-18 12:21:40 +00:00
max_block_size,
settings.use_uncompressed_cache,
query_info,
2019-07-28 00:41:26 +00:00
sorting_key_prefix_expr,
virt_column_names,
2019-10-10 16:30:30 +00:00
settings,
2020-05-12 18:22:58 +00:00
reader_settings,
result_projection);
}
else
{
2017-06-02 15:54:39 +00:00
res = spreadMarkRangesAmongStreams(
2017-11-24 23:03:58 +00:00
std::move(parts_with_ranges),
2017-06-02 15:54:39 +00:00
num_streams,
column_names_to_read,
metadata_snapshot,
max_block_size,
settings.use_uncompressed_cache,
query_info,
virt_column_names,
2019-10-10 16:30:30 +00:00
settings,
reader_settings);
}
if (use_sampling)
2019-10-01 16:50:08 +00:00
{
for (auto & pipe : res)
pipe.addSimpleTransform(std::make_shared<FilterTransform>(
pipe.getHeader(), filter_expression, filter_function->getColumnName(), false));
2019-10-01 16:50:08 +00:00
}
2020-05-12 18:22:58 +00:00
if (result_projection)
{
for (auto & pipe : res)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(
pipe.getHeader(), result_projection));
}
/// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values.
if (sample_factor_column_queried)
2019-10-01 16:50:08 +00:00
{
for (auto & pipe : res)
pipe.addSimpleTransform(std::make_shared<AddingConstColumnTransform<Float64>>(
pipe.getHeader(), std::make_shared<DataTypeFloat64>(), used_sample_factor, "_sample_factor"));
2019-10-01 16:50:08 +00:00
}
2018-10-04 08:58:19 +00:00
if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions)
2019-10-01 16:50:08 +00:00
{
for (auto & pipe : res)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(
pipe.getHeader(), query_info.prewhere_info->remove_columns_actions));
2019-10-01 16:50:08 +00:00
}
return res;
2014-03-13 12:48:07 +00:00
}
2019-04-01 12:10:32 +00:00
namespace
{
2019-04-01 11:09:30 +00:00
size_t roundRowsOrBytesToMarks(
size_t rows_setting,
size_t bytes_setting,
2019-06-19 10:07:56 +00:00
size_t rows_granularity,
size_t bytes_granularity)
2019-04-01 11:09:30 +00:00
{
/// Marks are placed whenever threshold on rows or bytes is met.
/// So we have to return the number of marks on whatever estimate is higher - by rows or by bytes.
size_t res = (rows_setting + rows_granularity - 1) / rows_granularity;
2019-06-19 10:07:56 +00:00
if (bytes_granularity == 0)
return res;
2019-04-01 11:09:30 +00:00
else
return std::max(res, (bytes_setting + bytes_granularity - 1) / bytes_granularity);
2019-04-01 11:09:30 +00:00
}
}
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
2017-11-24 23:03:58 +00:00
RangesInDataParts && parts,
2017-06-02 15:54:39 +00:00
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
2019-02-10 16:55:12 +00:00
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
const Names & virt_columns,
2019-10-10 16:30:30 +00:00
const Settings & settings,
const MergeTreeReaderSettings & reader_settings) const
2014-03-13 12:48:07 +00:00
{
/// Count marks for each part.
std::vector<size_t> sum_marks_in_parts(parts.size());
size_t sum_marks = 0;
2019-03-25 13:55:24 +00:00
size_t total_rows = 0;
2019-06-19 10:07:56 +00:00
2019-08-26 14:24:29 +00:00
const auto data_settings = data.getSettings();
2019-06-19 10:07:56 +00:00
size_t adaptive_parts = 0;
for (size_t i = 0; i < parts.size(); ++i)
{
2019-03-25 13:55:24 +00:00
total_rows += parts[i].getRowsCount();
sum_marks_in_parts[i] = parts[i].getMarksCount();
sum_marks += sum_marks_in_parts[i];
2019-06-19 10:07:56 +00:00
if (parts[i].data_part->index_granularity_info.is_adaptive)
++adaptive_parts;
}
2019-06-19 10:07:56 +00:00
size_t index_granularity_bytes = 0;
if (adaptive_parts > parts.size() / 2)
2019-08-13 10:29:31 +00:00
index_granularity_bytes = data_settings->index_granularity_bytes;
2019-06-19 10:07:56 +00:00
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
settings.merge_tree_max_rows_to_use_cache,
settings.merge_tree_max_bytes_to_use_cache,
2019-08-13 10:29:31 +00:00
data_settings->index_granularity,
2019-06-19 10:07:56 +00:00
index_granularity_bytes);
const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks(
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
2019-08-13 10:29:31 +00:00
data_settings->index_granularity,
2019-06-19 10:07:56 +00:00
index_granularity_bytes);
2019-04-01 11:09:30 +00:00
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
Pipes res;
if (0 == sum_marks)
return res;
if (num_streams > 1)
{
/// Parallel query execution.
2017-06-02 15:54:39 +00:00
/// Reduce the number of num_streams if the data is small.
if (sum_marks < num_streams * min_marks_for_concurrent_read && parts.size() < num_streams)
num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts.size());
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
num_streams,
sum_marks,
min_marks_for_concurrent_read,
parts,
data,
metadata_snapshot,
query_info.prewhere_info,
true,
column_names,
MergeTreeReadPool::BackoffSettings(settings),
settings.preferred_block_size_bytes,
false);
/// Let's estimate total number of rows for progress bar.
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Reading approx. {} rows with {} streams", total_rows, num_streams);
ColumnConst unification (#1011) * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * Fixed error in ColumnArray::replicateGeneric [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150]. * ColumnConst: unification (incomplete) [#CLICKHOUSE-3150].
2017-07-21 06:35:58 +00:00
for (size_t i = 0; i < num_streams; ++i)
{
2019-10-04 15:40:05 +00:00
auto source = std::make_shared<MergeTreeThreadSelectBlockInputProcessor>(
i, pool, min_marks_for_concurrent_read, max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
data, metadata_snapshot, use_uncompressed_cache,
2019-12-19 13:10:57 +00:00
query_info.prewhere_info, reader_settings, virt_columns);
if (i == 0)
{
/// Set the approximate number of rows for the first source only
2019-10-04 15:40:05 +00:00
source->addTotalRowsApprox(total_rows);
}
2019-10-04 15:40:05 +00:00
res.emplace_back(std::move(source));
}
}
else
{
/// Sequential query execution.
2020-03-09 01:59:08 +00:00
for (const auto & part : parts)
{
auto source = std::make_shared<MergeTreeSelectProcessor>(
data, metadata_snapshot, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
query_info.prewhere_info, true, reader_settings, virt_columns, part.part_index_in_query);
res.emplace_back(std::move(source));
}
/// Use ConcatProcessor to concat sources together.
/// It is needed to read in parts order (and so in PK order) if single thread is used.
if (res.size() > 1)
{
auto concat = std::make_shared<ConcatProcessor>(res.front().getHeader(), res.size());
Pipe pipe(std::move(res), std::move(concat));
2020-03-25 12:17:11 +00:00
res = Pipes();
2020-03-25 12:15:51 +00:00
res.emplace_back(std::move(pipe));
}
}
return res;
2014-03-13 12:48:07 +00:00
}
static ExpressionActionsPtr createProjection(const Pipe & pipe, const MergeTreeData & data)
{
const auto & header = pipe.getHeader();
auto projection = std::make_shared<ExpressionActions>(header.getNamesAndTypesList(), data.global_context);
projection->add(ExpressionAction::project(header.getNames()));
return projection;
}
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
2019-05-18 12:21:40 +00:00
RangesInDataParts && parts,
size_t num_streams,
2019-05-18 12:21:40 +00:00
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
2019-05-18 12:21:40 +00:00
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
2019-07-28 00:41:26 +00:00
const ExpressionActionsPtr & sorting_key_prefix_expr,
2019-05-18 12:21:40 +00:00
const Names & virt_columns,
2019-10-10 16:30:30 +00:00
const Settings & settings,
2020-05-12 18:22:58 +00:00
const MergeTreeReaderSettings & reader_settings,
ExpressionActionsPtr & out_projection) const
2019-05-18 12:21:40 +00:00
{
size_t sum_marks = 0;
2020-05-13 13:49:10 +00:00
const InputOrderInfoPtr & input_order_info = query_info.input_order_info;
2020-05-05 14:35:23 +00:00
size_t adaptive_parts = 0;
std::vector<size_t> sum_marks_in_parts(parts.size());
2019-08-26 14:24:29 +00:00
const auto data_settings = data.getSettings();
for (size_t i = 0; i < parts.size(); ++i)
{
sum_marks_in_parts[i] = parts[i].getMarksCount();
sum_marks += sum_marks_in_parts[i];
if (parts[i].data_part->index_granularity_info.is_adaptive)
++adaptive_parts;
}
2019-07-18 18:34:15 +00:00
size_t index_granularity_bytes = 0;
if (adaptive_parts > parts.size() / 2)
2019-08-13 10:29:31 +00:00
index_granularity_bytes = data_settings->index_granularity_bytes;
2019-07-18 18:34:15 +00:00
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
settings.merge_tree_max_rows_to_use_cache,
settings.merge_tree_max_bytes_to_use_cache,
2019-08-13 10:29:31 +00:00
data_settings->index_granularity,
2019-07-18 18:34:15 +00:00
index_granularity_bytes);
const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks(
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
2019-08-13 10:29:31 +00:00
data_settings->index_granularity,
2019-07-18 18:34:15 +00:00
index_granularity_bytes);
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
Pipes res;
if (sum_marks == 0)
return res;
/// Let's split ranges to avoid reading much data.
2019-08-19 08:25:07 +00:00
auto split_ranges = [rows_granularity = data_settings->index_granularity, max_block_size](const auto & ranges, int direction)
{
MarkRanges new_ranges;
const size_t max_marks_in_range = (max_block_size + rows_granularity - 1) / rows_granularity;
size_t marks_in_range = 1;
if (direction == 1)
{
/// Split first few ranges to avoid reading much data.
2020-08-08 00:47:03 +00:00
bool split = false;
for (auto range : ranges)
{
2020-08-08 00:47:03 +00:00
while (!split && range.begin + marks_in_range < range.end)
{
new_ranges.emplace_back(range.begin, range.begin + marks_in_range);
range.begin += marks_in_range;
marks_in_range *= 2;
if (marks_in_range > max_marks_in_range)
2020-08-08 00:47:03 +00:00
split = true;
}
new_ranges.emplace_back(range.begin, range.end);
}
}
else
{
/// Split all ranges to avoid reading much data, because we have to
/// store whole range in memory to reverse it.
for (auto it = ranges.rbegin(); it != ranges.rend(); ++it)
{
auto range = *it;
while (range.begin + marks_in_range < range.end)
{
new_ranges.emplace_front(range.end - marks_in_range, range.end);
range.end -= marks_in_range;
marks_in_range = std::min(marks_in_range * 2, max_marks_in_range);
}
new_ranges.emplace_front(range.begin, range.end);
}
}
return new_ranges;
};
const size_t min_marks_per_stream = (sum_marks - 1) / num_streams + 1;
bool need_preliminary_merge = (parts.size() > settings.read_in_order_two_level_merge_threshold);
for (size_t i = 0; i < num_streams && !parts.empty(); ++i)
2019-05-18 12:21:40 +00:00
{
size_t need_marks = min_marks_per_stream;
2019-05-25 11:09:23 +00:00
Pipes pipes;
2019-05-18 12:21:40 +00:00
/// Loop over parts.
/// We will iteratively take part or some subrange of a part from the back
/// and assign a stream to read from it.
while (need_marks > 0 && !parts.empty())
{
RangesInDataPart part = parts.back();
parts.pop_back();
size_t & marks_in_part = sum_marks_in_parts.back();
/// We will not take too few rows from a part.
if (marks_in_part >= min_marks_for_concurrent_read &&
need_marks < min_marks_for_concurrent_read)
need_marks = min_marks_for_concurrent_read;
/// Do not leave too few rows in the part.
if (marks_in_part > need_marks &&
marks_in_part - need_marks < min_marks_for_concurrent_read)
need_marks = marks_in_part;
2019-05-18 12:21:40 +00:00
MarkRanges ranges_to_get_from_part;
/// We take the whole part if it is small enough.
if (marks_in_part <= need_marks)
{
ranges_to_get_from_part = part.ranges;
need_marks -= marks_in_part;
sum_marks_in_parts.pop_back();
}
else
{
/// Loop through ranges in part. Take enough ranges to cover "need_marks".
while (need_marks > 0)
{
if (part.ranges.empty())
throw Exception("Unexpected end of ranges while spreading marks among streams", ErrorCodes::LOGICAL_ERROR);
MarkRange & range = part.ranges.front();
const size_t marks_in_range = range.end - range.begin;
const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);
ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range);
range.begin += marks_to_get_from_range;
marks_in_part -= marks_to_get_from_range;
need_marks -= marks_to_get_from_range;
if (range.begin == range.end)
part.ranges.pop_front();
}
parts.emplace_back(part);
}
2020-05-13 13:49:10 +00:00
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction);
2020-05-13 13:49:10 +00:00
if (input_order_info->direction == 1)
{
pipes.emplace_back(std::make_shared<MergeTreeSelectProcessor>(
data,
metadata_snapshot,
part.data_part,
max_block_size,
settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes,
column_names,
ranges_to_get_from_part,
use_uncompressed_cache,
query_info.prewhere_info,
true,
reader_settings,
virt_columns,
part.part_index_in_query));
}
else
{
pipes.emplace_back(std::make_shared<MergeTreeReverseSelectProcessor>(
data,
metadata_snapshot,
part.data_part,
max_block_size,
settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes,
column_names,
ranges_to_get_from_part,
use_uncompressed_cache,
query_info.prewhere_info,
true,
reader_settings,
virt_columns,
part.part_index_in_query));
pipes.back().addSimpleTransform(std::make_shared<ReverseTransform>(pipes.back().getHeader()));
}
}
if (pipes.size() > 1 && need_preliminary_merge)
{
SortDescription sort_description;
2020-05-26 22:17:32 +00:00
for (size_t j = 0; j < input_order_info->order_key_prefix_descr.size(); ++j)
2020-06-17 11:05:11 +00:00
sort_description.emplace_back(metadata_snapshot->getSortingKey().column_names[j],
2020-05-26 22:17:32 +00:00
input_order_info->direction, 1);
2020-05-13 15:53:47 +00:00
/// Drop temporary columns, added by 'sorting_key_prefix_expr'
2020-05-12 18:22:58 +00:00
out_projection = createProjection(pipes.back(), data);
for (auto & pipe : pipes)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), sorting_key_prefix_expr));
auto merging_sorted = std::make_shared<MergingSortedTransform>(
pipes.back().getHeader(), pipes.size(), sort_description, max_block_size);
2020-05-12 18:22:58 +00:00
res.emplace_back(std::move(pipes), std::move(merging_sorted));
}
else
{
for (auto && pipe : pipes)
res.emplace_back(std::move(pipe));
}
2019-05-18 12:21:40 +00:00
}
return res;
2019-05-18 12:21:40 +00:00
}
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
2017-11-24 23:03:58 +00:00
RangesInDataParts && parts,
2020-04-22 13:52:07 +00:00
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
2019-02-10 16:55:12 +00:00
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
const Names & virt_columns,
2019-10-10 16:30:30 +00:00
const Settings & settings,
2020-05-12 18:22:58 +00:00
const MergeTreeReaderSettings & reader_settings,
ExpressionActionsPtr & out_projection) const
2014-03-13 12:48:07 +00:00
{
2019-08-26 14:24:29 +00:00
const auto data_settings = data.getSettings();
2019-04-01 11:09:30 +00:00
size_t sum_marks = 0;
2019-06-19 10:07:56 +00:00
size_t adaptive_parts = 0;
2020-03-09 01:59:08 +00:00
for (const auto & part : parts)
2019-06-19 10:07:56 +00:00
{
2020-03-09 01:59:08 +00:00
for (const auto & range : part.ranges)
sum_marks += range.end - range.begin;
2020-03-09 01:59:08 +00:00
if (part.data_part->index_granularity_info.is_adaptive)
++adaptive_parts;
2019-06-19 10:07:56 +00:00
}
size_t index_granularity_bytes = 0;
if (adaptive_parts >= parts.size() / 2)
2019-08-13 10:29:31 +00:00
index_granularity_bytes = data_settings->index_granularity_bytes;
2019-06-19 10:07:56 +00:00
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
settings.merge_tree_max_rows_to_use_cache,
settings.merge_tree_max_bytes_to_use_cache,
2019-08-13 10:29:31 +00:00
data_settings->index_granularity,
2019-06-19 10:07:56 +00:00
index_granularity_bytes);
2019-04-01 11:09:30 +00:00
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
2019-10-01 16:50:08 +00:00
Pipes pipes;
2020-03-09 01:59:08 +00:00
for (const auto & part : parts)
{
2019-10-01 16:50:08 +00:00
auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
data, metadata_snapshot, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
2019-10-10 16:30:30 +00:00
query_info.prewhere_info, true, reader_settings,
2017-04-05 20:34:19 +00:00
virt_columns, part.part_index_in_query);
Pipe pipe(std::move(source_processor));
2020-05-13 15:53:47 +00:00
/// Drop temporary columns, added by 'sorting_key_expr'
2020-05-12 18:22:58 +00:00
if (!out_projection)
out_projection = createProjection(pipe, data);
2020-06-17 11:05:11 +00:00
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), metadata_snapshot->getSortingKey().expression));
2019-10-01 16:50:08 +00:00
pipes.emplace_back(std::move(pipe));
}
2020-06-17 11:05:11 +00:00
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
2018-06-30 21:35:01 +00:00
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
2020-07-06 14:33:31 +00:00
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
Block header = pipes.at(0).getHeader();
2018-06-30 21:35:01 +00:00
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
2020-04-22 13:52:07 +00:00
auto get_merging_processor = [&]() -> MergingTransformPtr
2019-10-01 16:50:08 +00:00
{
2020-04-22 13:52:07 +00:00
switch (data.merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
{
return std::make_shared<MergingSortedTransform>(header, pipes.size(),
sort_description, max_block_size);
}
2019-10-01 16:50:08 +00:00
2020-04-22 13:52:07 +00:00
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedTransform>(header, pipes.size(),
sort_description, data.merging_params.sign_column, true, max_block_size);
2019-10-01 16:50:08 +00:00
2020-04-22 13:52:07 +00:00
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedTransform>(header, pipes.size(),
2020-07-06 14:33:31 +00:00
sort_description, data.merging_params.columns_to_sum, partition_key_columns, max_block_size);
2020-04-22 13:52:07 +00:00
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedTransform>(header, pipes.size(),
sort_description, max_block_size);
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedTransform>(header, pipes.size(),
sort_description, data.merging_params.version_column, max_block_size);
2019-10-01 16:50:08 +00:00
2020-04-22 13:52:07 +00:00
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingTransform>(header, pipes.size(),
sort_description, data.merging_params.sign_column, max_block_size);
case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
}
__builtin_unreachable();
2019-10-01 16:50:08 +00:00
};
2020-04-22 13:52:07 +00:00
if (num_streams > settings.max_final_threads)
num_streams = settings.max_final_threads;
2020-05-28 10:57:04 +00:00
if (num_streams <= 1 || sort_description.empty())
2020-04-22 13:52:07 +00:00
{
Pipe pipe(std::move(pipes), get_merging_processor());
pipes = Pipes();
pipes.emplace_back(std::move(pipe));
return pipes;
}
ColumnNumbers key_columns;
key_columns.reserve(sort_description.size());
for (auto & desc : sort_description)
{
if (!desc.column_name.empty())
key_columns.push_back(header.getPositionByName(desc.column_name));
else
key_columns.emplace_back(desc.column_number);
}
Processors selectors;
Processors copiers;
selectors.reserve(pipes.size());
for (auto & pipe : pipes)
{
auto selector = std::make_shared<AddingSelectorTransform>(pipe.getHeader(), num_streams, key_columns);
auto copier = std::make_shared<CopyTransform>(pipe.getHeader(), num_streams);
connect(pipe.getPort(), selector->getInputPort());
connect(selector->getOutputPort(), copier->getInputPort());
selectors.emplace_back(std::move(selector));
copiers.emplace_back(std::move(copier));
}
Processors merges;
std::vector<InputPorts::iterator> input_ports;
merges.reserve(num_streams);
input_ports.reserve(num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
2020-04-22 13:52:07 +00:00
auto merge = get_merging_processor();
merge->setSelectorPosition(i);
input_ports.emplace_back(merge->getInputs().begin());
merges.emplace_back(std::move(merge));
}
/// Connect outputs of i-th splitter with i-th input port of every merge.
for (auto & resize : copiers)
{
size_t input_num = 0;
for (auto & output : resize->getOutputs())
{
2020-04-22 13:52:07 +00:00
connect(output, *input_ports[input_num]);
++input_ports[input_num];
++input_num;
}
}
2020-04-22 13:52:07 +00:00
Processors processors;
for (auto & pipe : pipes)
2020-04-14 10:04:49 +00:00
{
2020-04-22 13:52:07 +00:00
auto pipe_processors = std::move(pipe).detachProcessors();
processors.insert(processors.end(), pipe_processors.begin(), pipe_processors.end());
2020-04-14 10:04:49 +00:00
}
2020-04-22 13:52:07 +00:00
pipes.clear();
pipes.reserve(num_streams);
for (auto & merge : merges)
pipes.emplace_back(&merge->getOutputs().front());
pipes.front().addProcessors(processors);
pipes.front().addProcessors(selectors);
pipes.front().addProcessors(copiers);
pipes.front().addProcessors(merges);
2019-10-01 16:50:08 +00:00
return pipes;
2014-03-13 12:48:07 +00:00
}
2017-01-20 02:22:18 +00:00
/// Calculates a set of mark ranges, that could possibly contain keys, required by condition.
2017-03-12 19:18:07 +00:00
/// In other words, it removes subranges from whole range, that definitely could not contain required keys.
MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
2020-06-17 12:39:20 +00:00
const MergeTreeData::DataPartPtr & part,
const StorageMetadataPtr & metadata_snapshot,
const KeyCondition & key_condition,
2020-07-20 15:09:00 +00:00
const Settings & settings,
Poco::Logger * log)
2014-03-13 12:48:07 +00:00
{
MarkRanges res;
2019-03-25 13:55:24 +00:00
size_t marks_count = part->index_granularity.getMarksCount();
const auto & index = part->index;
2018-05-23 19:34:37 +00:00
if (marks_count == 0)
return res;
bool has_final_mark = part->index_granularity.hasFinalMark();
/// If index is not used.
if (key_condition.alwaysUnknownOrTrue())
{
LOG_TRACE(log, "Not using primary index on part {}", part->name);
Optimize PK lookup for queries that match exact PK range Existing code that looks up marks that match the query has a pathological case, when most of the part does in fact match the query. The code works by recursively splitting a part into ranges and then discarding the ranges that definitely do not match the query, based on primary key. The problem is that it requires visiting every mark that matches the query, making the complexity of this sort of look up O(n). For queries that match exact range on the primary key, we can find both left and right parts of the range with O(log 2) complexity. This change implements exactly that. To engage this optimization, the query must: * Have a prefix list of the primary key. * Have only range or single set element constraints for columns. * Have only AND as a boolean operator. Consider a table with `(service, timestamp)` as the primary key. The following conditions will be optimized: * `service = 'foo'` * `service = 'foo' and timestamp >= now() - 3600` * `service in ('foo')` * `service in ('foo') and timestamp >= now() - 3600 and timestamp <= now` The following will fall back to previous lookup algorithm: * `timestamp >= now() - 3600` * `service in ('foo', 'bar') and timestamp >= now() - 3600` * `service = 'foo'` Note that the optimization won't engage when PK has a range expression followed by a point expression, since in that case the range is not continuous. Trace query logging provides the following messages types of messages, each representing a different kind of PK usage for a part: ``` Used optimized inclusion search over index for part 20200711_5710108_5710108_0 with 9 steps Used generic exclusion search over index for part 20200711_5710118_5710228_5 with 1495 steps Not using index on part 20200710_5710473_5710473_0 ``` Number of steps translates to computational complexity. Here's a comparison for before and after for a query over 24h of data: ``` Read 4562944 rows, 148.05 MiB in 45.19249672 sec., 100966 rows/sec., 3.28 MiB/sec. Read 4183040 rows, 135.78 MiB in 0.196279627 sec., 21311636 rows/sec., 691.75 MiB/sec. ``` This is especially useful for queries that read data in order and terminate early to return "last X things" matching a query. See #11564 for more thoughts on this.
2020-07-07 18:10:44 +00:00
if (has_final_mark)
res.push_back(MarkRange(0, marks_count - 1));
else
res.push_back(MarkRange(0, marks_count));
Optimize PK lookup for queries that match exact PK range Existing code that looks up marks that match the query has a pathological case, when most of the part does in fact match the query. The code works by recursively splitting a part into ranges and then discarding the ranges that definitely do not match the query, based on primary key. The problem is that it requires visiting every mark that matches the query, making the complexity of this sort of look up O(n). For queries that match exact range on the primary key, we can find both left and right parts of the range with O(log 2) complexity. This change implements exactly that. To engage this optimization, the query must: * Have a prefix list of the primary key. * Have only range or single set element constraints for columns. * Have only AND as a boolean operator. Consider a table with `(service, timestamp)` as the primary key. The following conditions will be optimized: * `service = 'foo'` * `service = 'foo' and timestamp >= now() - 3600` * `service in ('foo')` * `service in ('foo') and timestamp >= now() - 3600 and timestamp <= now` The following will fall back to previous lookup algorithm: * `timestamp >= now() - 3600` * `service in ('foo', 'bar') and timestamp >= now() - 3600` * `service = 'foo'` Note that the optimization won't engage when PK has a range expression followed by a point expression, since in that case the range is not continuous. Trace query logging provides the following messages types of messages, each representing a different kind of PK usage for a part: ``` Used optimized inclusion search over index for part 20200711_5710108_5710108_0 with 9 steps Used generic exclusion search over index for part 20200711_5710118_5710228_5 with 1495 steps Not using index on part 20200710_5710473_5710473_0 ``` Number of steps translates to computational complexity. Here's a comparison for before and after for a query over 24h of data: ``` Read 4562944 rows, 148.05 MiB in 45.19249672 sec., 100966 rows/sec., 3.28 MiB/sec. Read 4183040 rows, 135.78 MiB in 0.196279627 sec., 21311636 rows/sec., 691.75 MiB/sec. ``` This is especially useful for queries that read data in order and terminate early to return "last X things" matching a query. See #11564 for more thoughts on this.
2020-07-07 18:10:44 +00:00
return res;
}
size_t used_key_size = key_condition.getMaxKeyColumn() + 1;
std::function<void(size_t, size_t, FieldRef &)> create_field_ref;
/// If there are no monotonic functions, there is no need to save block reference.
/// Passing explicit field to FieldRef allows to optimize ranges and shows better performance.
const auto & primary_key = metadata_snapshot->getPrimaryKey();
if (key_condition.hasMonotonicFunctionsChain())
{
auto index_block = std::make_shared<Block>();
for (size_t i = 0; i < used_key_size; ++i)
index_block->insert({index[i], primary_key.data_types[i], primary_key.column_names[i]});
create_field_ref = [index_block](size_t row, size_t column, FieldRef & field)
{
field = {index_block.get(), row, column};
};
}
else
{
Optimize PK lookup for queries that match exact PK range Existing code that looks up marks that match the query has a pathological case, when most of the part does in fact match the query. The code works by recursively splitting a part into ranges and then discarding the ranges that definitely do not match the query, based on primary key. The problem is that it requires visiting every mark that matches the query, making the complexity of this sort of look up O(n). For queries that match exact range on the primary key, we can find both left and right parts of the range with O(log 2) complexity. This change implements exactly that. To engage this optimization, the query must: * Have a prefix list of the primary key. * Have only range or single set element constraints for columns. * Have only AND as a boolean operator. Consider a table with `(service, timestamp)` as the primary key. The following conditions will be optimized: * `service = 'foo'` * `service = 'foo' and timestamp >= now() - 3600` * `service in ('foo')` * `service in ('foo') and timestamp >= now() - 3600 and timestamp <= now` The following will fall back to previous lookup algorithm: * `timestamp >= now() - 3600` * `service in ('foo', 'bar') and timestamp >= now() - 3600` * `service = 'foo'` Note that the optimization won't engage when PK has a range expression followed by a point expression, since in that case the range is not continuous. Trace query logging provides the following messages types of messages, each representing a different kind of PK usage for a part: ``` Used optimized inclusion search over index for part 20200711_5710108_5710108_0 with 9 steps Used generic exclusion search over index for part 20200711_5710118_5710228_5 with 1495 steps Not using index on part 20200710_5710473_5710473_0 ``` Number of steps translates to computational complexity. Here's a comparison for before and after for a query over 24h of data: ``` Read 4562944 rows, 148.05 MiB in 45.19249672 sec., 100966 rows/sec., 3.28 MiB/sec. Read 4183040 rows, 135.78 MiB in 0.196279627 sec., 21311636 rows/sec., 691.75 MiB/sec. ``` This is especially useful for queries that read data in order and terminate early to return "last X things" matching a query. See #11564 for more thoughts on this.
2020-07-07 18:10:44 +00:00
create_field_ref = [&index](size_t row, size_t column, FieldRef & field)
{
index[column]->get(row, field);
};
}
2018-05-23 19:34:37 +00:00
Optimize PK lookup for queries that match exact PK range Existing code that looks up marks that match the query has a pathological case, when most of the part does in fact match the query. The code works by recursively splitting a part into ranges and then discarding the ranges that definitely do not match the query, based on primary key. The problem is that it requires visiting every mark that matches the query, making the complexity of this sort of look up O(n). For queries that match exact range on the primary key, we can find both left and right parts of the range with O(log 2) complexity. This change implements exactly that. To engage this optimization, the query must: * Have a prefix list of the primary key. * Have only range or single set element constraints for columns. * Have only AND as a boolean operator. Consider a table with `(service, timestamp)` as the primary key. The following conditions will be optimized: * `service = 'foo'` * `service = 'foo' and timestamp >= now() - 3600` * `service in ('foo')` * `service in ('foo') and timestamp >= now() - 3600 and timestamp <= now` The following will fall back to previous lookup algorithm: * `timestamp >= now() - 3600` * `service in ('foo', 'bar') and timestamp >= now() - 3600` * `service = 'foo'` Note that the optimization won't engage when PK has a range expression followed by a point expression, since in that case the range is not continuous. Trace query logging provides the following messages types of messages, each representing a different kind of PK usage for a part: ``` Used optimized inclusion search over index for part 20200711_5710108_5710108_0 with 9 steps Used generic exclusion search over index for part 20200711_5710118_5710228_5 with 1495 steps Not using index on part 20200710_5710473_5710473_0 ``` Number of steps translates to computational complexity. Here's a comparison for before and after for a query over 24h of data: ``` Read 4562944 rows, 148.05 MiB in 45.19249672 sec., 100966 rows/sec., 3.28 MiB/sec. Read 4183040 rows, 135.78 MiB in 0.196279627 sec., 21311636 rows/sec., 691.75 MiB/sec. ``` This is especially useful for queries that read data in order and terminate early to return "last X things" matching a query. See #11564 for more thoughts on this.
2020-07-07 18:10:44 +00:00
/// NOTE Creating temporary Field objects to pass to KeyCondition.
std::vector<FieldRef> index_left(used_key_size);
std::vector<FieldRef> index_right(used_key_size);
Optimize PK lookup for queries that match exact PK range Existing code that looks up marks that match the query has a pathological case, when most of the part does in fact match the query. The code works by recursively splitting a part into ranges and then discarding the ranges that definitely do not match the query, based on primary key. The problem is that it requires visiting every mark that matches the query, making the complexity of this sort of look up O(n). For queries that match exact range on the primary key, we can find both left and right parts of the range with O(log 2) complexity. This change implements exactly that. To engage this optimization, the query must: * Have a prefix list of the primary key. * Have only range or single set element constraints for columns. * Have only AND as a boolean operator. Consider a table with `(service, timestamp)` as the primary key. The following conditions will be optimized: * `service = 'foo'` * `service = 'foo' and timestamp >= now() - 3600` * `service in ('foo')` * `service in ('foo') and timestamp >= now() - 3600 and timestamp <= now` The following will fall back to previous lookup algorithm: * `timestamp >= now() - 3600` * `service in ('foo', 'bar') and timestamp >= now() - 3600` * `service = 'foo'` Note that the optimization won't engage when PK has a range expression followed by a point expression, since in that case the range is not continuous. Trace query logging provides the following messages types of messages, each representing a different kind of PK usage for a part: ``` Used optimized inclusion search over index for part 20200711_5710108_5710108_0 with 9 steps Used generic exclusion search over index for part 20200711_5710118_5710228_5 with 1495 steps Not using index on part 20200710_5710473_5710473_0 ``` Number of steps translates to computational complexity. Here's a comparison for before and after for a query over 24h of data: ``` Read 4562944 rows, 148.05 MiB in 45.19249672 sec., 100966 rows/sec., 3.28 MiB/sec. Read 4183040 rows, 135.78 MiB in 0.196279627 sec., 21311636 rows/sec., 691.75 MiB/sec. ``` This is especially useful for queries that read data in order and terminate early to return "last X things" matching a query. See #11564 for more thoughts on this.
2020-07-07 18:10:44 +00:00
auto may_be_true_in_range = [&](MarkRange & range)
{
if (range.end == marks_count && !has_final_mark)
{
for (size_t i = 0; i < used_key_size; ++i)
Optimize PK lookup for queries that match exact PK range Existing code that looks up marks that match the query has a pathological case, when most of the part does in fact match the query. The code works by recursively splitting a part into ranges and then discarding the ranges that definitely do not match the query, based on primary key. The problem is that it requires visiting every mark that matches the query, making the complexity of this sort of look up O(n). For queries that match exact range on the primary key, we can find both left and right parts of the range with O(log 2) complexity. This change implements exactly that. To engage this optimization, the query must: * Have a prefix list of the primary key. * Have only range or single set element constraints for columns. * Have only AND as a boolean operator. Consider a table with `(service, timestamp)` as the primary key. The following conditions will be optimized: * `service = 'foo'` * `service = 'foo' and timestamp >= now() - 3600` * `service in ('foo')` * `service in ('foo') and timestamp >= now() - 3600 and timestamp <= now` The following will fall back to previous lookup algorithm: * `timestamp >= now() - 3600` * `service in ('foo', 'bar') and timestamp >= now() - 3600` * `service = 'foo'` Note that the optimization won't engage when PK has a range expression followed by a point expression, since in that case the range is not continuous. Trace query logging provides the following messages types of messages, each representing a different kind of PK usage for a part: ``` Used optimized inclusion search over index for part 20200711_5710108_5710108_0 with 9 steps Used generic exclusion search over index for part 20200711_5710118_5710228_5 with 1495 steps Not using index on part 20200710_5710473_5710473_0 ``` Number of steps translates to computational complexity. Here's a comparison for before and after for a query over 24h of data: ``` Read 4562944 rows, 148.05 MiB in 45.19249672 sec., 100966 rows/sec., 3.28 MiB/sec. Read 4183040 rows, 135.78 MiB in 0.196279627 sec., 21311636 rows/sec., 691.75 MiB/sec. ``` This is especially useful for queries that read data in order and terminate early to return "last X things" matching a query. See #11564 for more thoughts on this.
2020-07-07 18:10:44 +00:00
create_field_ref(range.begin, i, index_left[i]);
Optimize PK lookup for queries that match exact PK range Existing code that looks up marks that match the query has a pathological case, when most of the part does in fact match the query. The code works by recursively splitting a part into ranges and then discarding the ranges that definitely do not match the query, based on primary key. The problem is that it requires visiting every mark that matches the query, making the complexity of this sort of look up O(n). For queries that match exact range on the primary key, we can find both left and right parts of the range with O(log 2) complexity. This change implements exactly that. To engage this optimization, the query must: * Have a prefix list of the primary key. * Have only range or single set element constraints for columns. * Have only AND as a boolean operator. Consider a table with `(service, timestamp)` as the primary key. The following conditions will be optimized: * `service = 'foo'` * `service = 'foo' and timestamp >= now() - 3600` * `service in ('foo')` * `service in ('foo') and timestamp >= now() - 3600 and timestamp <= now` The following will fall back to previous lookup algorithm: * `timestamp >= now() - 3600` * `service in ('foo', 'bar') and timestamp >= now() - 3600` * `service = 'foo'` Note that the optimization won't engage when PK has a range expression followed by a point expression, since in that case the range is not continuous. Trace query logging provides the following messages types of messages, each representing a different kind of PK usage for a part: ``` Used optimized inclusion search over index for part 20200711_5710108_5710108_0 with 9 steps Used generic exclusion search over index for part 20200711_5710118_5710228_5 with 1495 steps Not using index on part 20200710_5710473_5710473_0 ``` Number of steps translates to computational complexity. Here's a comparison for before and after for a query over 24h of data: ``` Read 4562944 rows, 148.05 MiB in 45.19249672 sec., 100966 rows/sec., 3.28 MiB/sec. Read 4183040 rows, 135.78 MiB in 0.196279627 sec., 21311636 rows/sec., 691.75 MiB/sec. ``` This is especially useful for queries that read data in order and terminate early to return "last X things" matching a query. See #11564 for more thoughts on this.
2020-07-07 18:10:44 +00:00
return key_condition.mayBeTrueAfter(
used_key_size, index_left.data(), primary_key.data_types);
}
Optimize PK lookup for queries that match exact PK range Existing code that looks up marks that match the query has a pathological case, when most of the part does in fact match the query. The code works by recursively splitting a part into ranges and then discarding the ranges that definitely do not match the query, based on primary key. The problem is that it requires visiting every mark that matches the query, making the complexity of this sort of look up O(n). For queries that match exact range on the primary key, we can find both left and right parts of the range with O(log 2) complexity. This change implements exactly that. To engage this optimization, the query must: * Have a prefix list of the primary key. * Have only range or single set element constraints for columns. * Have only AND as a boolean operator. Consider a table with `(service, timestamp)` as the primary key. The following conditions will be optimized: * `service = 'foo'` * `service = 'foo' and timestamp >= now() - 3600` * `service in ('foo')` * `service in ('foo') and timestamp >= now() - 3600 and timestamp <= now` The following will fall back to previous lookup algorithm: * `timestamp >= now() - 3600` * `service in ('foo', 'bar') and timestamp >= now() - 3600` * `service = 'foo'` Note that the optimization won't engage when PK has a range expression followed by a point expression, since in that case the range is not continuous. Trace query logging provides the following messages types of messages, each representing a different kind of PK usage for a part: ``` Used optimized inclusion search over index for part 20200711_5710108_5710108_0 with 9 steps Used generic exclusion search over index for part 20200711_5710118_5710228_5 with 1495 steps Not using index on part 20200710_5710473_5710473_0 ``` Number of steps translates to computational complexity. Here's a comparison for before and after for a query over 24h of data: ``` Read 4562944 rows, 148.05 MiB in 45.19249672 sec., 100966 rows/sec., 3.28 MiB/sec. Read 4183040 rows, 135.78 MiB in 0.196279627 sec., 21311636 rows/sec., 691.75 MiB/sec. ``` This is especially useful for queries that read data in order and terminate early to return "last X things" matching a query. See #11564 for more thoughts on this.
2020-07-07 18:10:44 +00:00
if (has_final_mark && range.end == marks_count)
range.end -= 1; /// Remove final empty mark. It's useful only for primary key condition.
for (size_t i = 0; i < used_key_size; ++i)
{
Optimize PK lookup for queries that match exact PK range Existing code that looks up marks that match the query has a pathological case, when most of the part does in fact match the query. The code works by recursively splitting a part into ranges and then discarding the ranges that definitely do not match the query, based on primary key. The problem is that it requires visiting every mark that matches the query, making the complexity of this sort of look up O(n). For queries that match exact range on the primary key, we can find both left and right parts of the range with O(log 2) complexity. This change implements exactly that. To engage this optimization, the query must: * Have a prefix list of the primary key. * Have only range or single set element constraints for columns. * Have only AND as a boolean operator. Consider a table with `(service, timestamp)` as the primary key. The following conditions will be optimized: * `service = 'foo'` * `service = 'foo' and timestamp >= now() - 3600` * `service in ('foo')` * `service in ('foo') and timestamp >= now() - 3600 and timestamp <= now` The following will fall back to previous lookup algorithm: * `timestamp >= now() - 3600` * `service in ('foo', 'bar') and timestamp >= now() - 3600` * `service = 'foo'` Note that the optimization won't engage when PK has a range expression followed by a point expression, since in that case the range is not continuous. Trace query logging provides the following messages types of messages, each representing a different kind of PK usage for a part: ``` Used optimized inclusion search over index for part 20200711_5710108_5710108_0 with 9 steps Used generic exclusion search over index for part 20200711_5710118_5710228_5 with 1495 steps Not using index on part 20200710_5710473_5710473_0 ``` Number of steps translates to computational complexity. Here's a comparison for before and after for a query over 24h of data: ``` Read 4562944 rows, 148.05 MiB in 45.19249672 sec., 100966 rows/sec., 3.28 MiB/sec. Read 4183040 rows, 135.78 MiB in 0.196279627 sec., 21311636 rows/sec., 691.75 MiB/sec. ``` This is especially useful for queries that read data in order and terminate early to return "last X things" matching a query. See #11564 for more thoughts on this.
2020-07-07 18:10:44 +00:00
create_field_ref(range.begin, i, index_left[i]);
create_field_ref(range.end, i, index_right[i]);
}
Optimize PK lookup for queries that match exact PK range Existing code that looks up marks that match the query has a pathological case, when most of the part does in fact match the query. The code works by recursively splitting a part into ranges and then discarding the ranges that definitely do not match the query, based on primary key. The problem is that it requires visiting every mark that matches the query, making the complexity of this sort of look up O(n). For queries that match exact range on the primary key, we can find both left and right parts of the range with O(log 2) complexity. This change implements exactly that. To engage this optimization, the query must: * Have a prefix list of the primary key. * Have only range or single set element constraints for columns. * Have only AND as a boolean operator. Consider a table with `(service, timestamp)` as the primary key. The following conditions will be optimized: * `service = 'foo'` * `service = 'foo' and timestamp >= now() - 3600` * `service in ('foo')` * `service in ('foo') and timestamp >= now() - 3600 and timestamp <= now` The following will fall back to previous lookup algorithm: * `timestamp >= now() - 3600` * `service in ('foo', 'bar') and timestamp >= now() - 3600` * `service = 'foo'` Note that the optimization won't engage when PK has a range expression followed by a point expression, since in that case the range is not continuous. Trace query logging provides the following messages types of messages, each representing a different kind of PK usage for a part: ``` Used optimized inclusion search over index for part 20200711_5710108_5710108_0 with 9 steps Used generic exclusion search over index for part 20200711_5710118_5710228_5 with 1495 steps Not using index on part 20200710_5710473_5710473_0 ``` Number of steps translates to computational complexity. Here's a comparison for before and after for a query over 24h of data: ``` Read 4562944 rows, 148.05 MiB in 45.19249672 sec., 100966 rows/sec., 3.28 MiB/sec. Read 4183040 rows, 135.78 MiB in 0.196279627 sec., 21311636 rows/sec., 691.75 MiB/sec. ``` This is especially useful for queries that read data in order and terminate early to return "last X things" matching a query. See #11564 for more thoughts on this.
2020-07-07 18:10:44 +00:00
return key_condition.mayBeTrueInRange(
used_key_size, index_left.data(), index_right.data(), primary_key.data_types);
};
if (!key_condition.matchesExactContinuousRange())
{
// Do exclusion search, where we drop ranges that do not match
size_t min_marks_for_seek = roundRowsOrBytesToMarks(
settings.merge_tree_min_rows_for_seek,
settings.merge_tree_min_bytes_for_seek,
part->index_granularity_info.fixed_index_granularity,
part->index_granularity_info.index_granularity_bytes);
/** There will always be disjoint suspicious segments on the stack, the leftmost one at the top (back).
* At each step, take the left segment and check if it fits.
* If fits, split it into smaller ones and put them on the stack. If not, discard it.
* If the segment is already of one mark length, add it to response and discard it.
*/
std::vector<MarkRange> ranges_stack = { {0, marks_count} };
size_t steps = 0;
while (!ranges_stack.empty())
{
MarkRange range = ranges_stack.back();
ranges_stack.pop_back();
Optimize PK lookup for queries that match exact PK range Existing code that looks up marks that match the query has a pathological case, when most of the part does in fact match the query. The code works by recursively splitting a part into ranges and then discarding the ranges that definitely do not match the query, based on primary key. The problem is that it requires visiting every mark that matches the query, making the complexity of this sort of look up O(n). For queries that match exact range on the primary key, we can find both left and right parts of the range with O(log 2) complexity. This change implements exactly that. To engage this optimization, the query must: * Have a prefix list of the primary key. * Have only range or single set element constraints for columns. * Have only AND as a boolean operator. Consider a table with `(service, timestamp)` as the primary key. The following conditions will be optimized: * `service = 'foo'` * `service = 'foo' and timestamp >= now() - 3600` * `service in ('foo')` * `service in ('foo') and timestamp >= now() - 3600 and timestamp <= now` The following will fall back to previous lookup algorithm: * `timestamp >= now() - 3600` * `service in ('foo', 'bar') and timestamp >= now() - 3600` * `service = 'foo'` Note that the optimization won't engage when PK has a range expression followed by a point expression, since in that case the range is not continuous. Trace query logging provides the following messages types of messages, each representing a different kind of PK usage for a part: ``` Used optimized inclusion search over index for part 20200711_5710108_5710108_0 with 9 steps Used generic exclusion search over index for part 20200711_5710118_5710228_5 with 1495 steps Not using index on part 20200710_5710473_5710473_0 ``` Number of steps translates to computational complexity. Here's a comparison for before and after for a query over 24h of data: ``` Read 4562944 rows, 148.05 MiB in 45.19249672 sec., 100966 rows/sec., 3.28 MiB/sec. Read 4183040 rows, 135.78 MiB in 0.196279627 sec., 21311636 rows/sec., 691.75 MiB/sec. ``` This is especially useful for queries that read data in order and terminate early to return "last X things" matching a query. See #11564 for more thoughts on this.
2020-07-07 18:10:44 +00:00
steps++;
Optimize PK lookup for queries that match exact PK range Existing code that looks up marks that match the query has a pathological case, when most of the part does in fact match the query. The code works by recursively splitting a part into ranges and then discarding the ranges that definitely do not match the query, based on primary key. The problem is that it requires visiting every mark that matches the query, making the complexity of this sort of look up O(n). For queries that match exact range on the primary key, we can find both left and right parts of the range with O(log 2) complexity. This change implements exactly that. To engage this optimization, the query must: * Have a prefix list of the primary key. * Have only range or single set element constraints for columns. * Have only AND as a boolean operator. Consider a table with `(service, timestamp)` as the primary key. The following conditions will be optimized: * `service = 'foo'` * `service = 'foo' and timestamp >= now() - 3600` * `service in ('foo')` * `service in ('foo') and timestamp >= now() - 3600 and timestamp <= now` The following will fall back to previous lookup algorithm: * `timestamp >= now() - 3600` * `service in ('foo', 'bar') and timestamp >= now() - 3600` * `service = 'foo'` Note that the optimization won't engage when PK has a range expression followed by a point expression, since in that case the range is not continuous. Trace query logging provides the following messages types of messages, each representing a different kind of PK usage for a part: ``` Used optimized inclusion search over index for part 20200711_5710108_5710108_0 with 9 steps Used generic exclusion search over index for part 20200711_5710118_5710228_5 with 1495 steps Not using index on part 20200710_5710473_5710473_0 ``` Number of steps translates to computational complexity. Here's a comparison for before and after for a query over 24h of data: ``` Read 4562944 rows, 148.05 MiB in 45.19249672 sec., 100966 rows/sec., 3.28 MiB/sec. Read 4183040 rows, 135.78 MiB in 0.196279627 sec., 21311636 rows/sec., 691.75 MiB/sec. ``` This is especially useful for queries that read data in order and terminate early to return "last X things" matching a query. See #11564 for more thoughts on this.
2020-07-07 18:10:44 +00:00
if (!may_be_true_in_range(range))
continue;
if (range.end == range.begin + 1)
{
/// We saw a useful gap between neighboring marks. Either add it to the last range, or start a new range.
if (res.empty() || range.begin - res.back().end > min_marks_for_seek)
res.push_back(range);
else
res.back().end = range.end;
}
else
{
/// Break the segment and put the result on the stack from right to left.
size_t step = (range.end - range.begin - 1) / settings.merge_tree_coarse_index_granularity + 1;
size_t end;
for (end = range.end; end > range.begin + step; end -= step)
ranges_stack.emplace_back(end - step, end);
ranges_stack.emplace_back(range.begin, end);
}
}
Optimize PK lookup for queries that match exact PK range Existing code that looks up marks that match the query has a pathological case, when most of the part does in fact match the query. The code works by recursively splitting a part into ranges and then discarding the ranges that definitely do not match the query, based on primary key. The problem is that it requires visiting every mark that matches the query, making the complexity of this sort of look up O(n). For queries that match exact range on the primary key, we can find both left and right parts of the range with O(log 2) complexity. This change implements exactly that. To engage this optimization, the query must: * Have a prefix list of the primary key. * Have only range or single set element constraints for columns. * Have only AND as a boolean operator. Consider a table with `(service, timestamp)` as the primary key. The following conditions will be optimized: * `service = 'foo'` * `service = 'foo' and timestamp >= now() - 3600` * `service in ('foo')` * `service in ('foo') and timestamp >= now() - 3600 and timestamp <= now` The following will fall back to previous lookup algorithm: * `timestamp >= now() - 3600` * `service in ('foo', 'bar') and timestamp >= now() - 3600` * `service = 'foo'` Note that the optimization won't engage when PK has a range expression followed by a point expression, since in that case the range is not continuous. Trace query logging provides the following messages types of messages, each representing a different kind of PK usage for a part: ``` Used optimized inclusion search over index for part 20200711_5710108_5710108_0 with 9 steps Used generic exclusion search over index for part 20200711_5710118_5710228_5 with 1495 steps Not using index on part 20200710_5710473_5710473_0 ``` Number of steps translates to computational complexity. Here's a comparison for before and after for a query over 24h of data: ``` Read 4562944 rows, 148.05 MiB in 45.19249672 sec., 100966 rows/sec., 3.28 MiB/sec. Read 4183040 rows, 135.78 MiB in 0.196279627 sec., 21311636 rows/sec., 691.75 MiB/sec. ``` This is especially useful for queries that read data in order and terminate early to return "last X things" matching a query. See #11564 for more thoughts on this.
2020-07-07 18:10:44 +00:00
LOG_TRACE(log, "Used generic exclusion search over index for part {} with {} steps", part->name, steps);
}
else
{
// Do inclusion search, where we only look for one range
size_t steps = 0;
auto find_leaf = [&](bool left) -> std::optional<size_t>
{
std::vector<MarkRange> stack = {};
MarkRange range = {0, marks_count};
steps++;
if (may_be_true_in_range(range))
stack.emplace_back(range.begin, range.end);
while (!stack.empty())
{
range = stack.back();
stack.pop_back();
if (range.end == range.begin + 1)
{
if (left)
return range.begin;
else
return range.end;
}
else
{
std::vector<MarkRange> check_order = {};
MarkRange left_range = {range.begin, (range.begin + range.end) / 2};
MarkRange right_range = {(range.begin + range.end) / 2, range.end};
if (left)
{
check_order.emplace_back(left_range.begin, left_range.end);
check_order.emplace_back(right_range.begin, right_range.end);
}
else
{
check_order.emplace_back(right_range.begin, right_range.end);
check_order.emplace_back(left_range.begin, left_range.end);
}
steps++;
if (may_be_true_in_range(check_order[0]))
{
stack.emplace_back(check_order[0].begin, check_order[0].end);
continue;
}
stack.emplace_back(check_order[1].begin, check_order[1].end);
}
}
return std::nullopt;
};
auto left_leaf = find_leaf(true);
if (left_leaf)
res.emplace_back(left_leaf.value(), find_leaf(false).value());
LOG_TRACE(log, "Used optimized inclusion search over index for part {} with {} steps", part->name, steps);
}
return res;
2014-03-13 12:48:07 +00:00
}
2019-01-08 19:41:36 +00:00
MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
2020-05-28 13:45:08 +00:00
MergeTreeIndexPtr index_helper,
2019-06-19 15:30:48 +00:00
MergeTreeIndexConditionPtr condition,
2019-01-09 17:05:52 +00:00
MergeTreeData::DataPartPtr part,
const MarkRanges & ranges,
const Settings & settings,
2020-07-20 15:09:00 +00:00
const MergeTreeReaderSettings & reader_settings,
Poco::Logger * log)
2019-01-08 19:41:36 +00:00
{
2020-05-28 13:45:08 +00:00
if (!part->volume->getDisk()->exists(part->getFullRelativePath() + index_helper->getFileName() + ".idx"))
2019-01-30 13:34:28 +00:00
{
2020-05-28 13:45:08 +00:00
LOG_DEBUG(log, "File for index {} does not exist. Skipping it.", backQuote(index_helper->index.name));
2019-01-08 19:41:36 +00:00
return ranges;
2019-01-30 13:34:28 +00:00
}
2019-01-07 12:51:14 +00:00
2020-05-28 13:45:08 +00:00
auto index_granularity = index_helper->index.granularity;
2020-05-28 12:37:05 +00:00
2019-04-01 11:09:30 +00:00
const size_t min_marks_for_seek = roundRowsOrBytesToMarks(
settings.merge_tree_min_rows_for_seek,
settings.merge_tree_min_bytes_for_seek,
part->index_granularity_info.fixed_index_granularity,
part->index_granularity_info.index_granularity_bytes);
2019-01-07 12:51:14 +00:00
2019-01-30 13:34:28 +00:00
size_t granules_dropped = 0;
size_t total_granules = 0;
2019-01-30 13:34:28 +00:00
size_t marks_count = part->getMarksCount();
size_t final_mark = part->index_granularity.hasFinalMark();
2020-05-28 12:37:05 +00:00
size_t index_marks_count = (marks_count - final_mark + index_granularity - 1) / index_granularity;
2019-01-08 19:41:36 +00:00
MergeTreeIndexReader reader(
index_helper, part,
index_marks_count,
ranges,
reader_settings);
2019-01-07 12:51:14 +00:00
2019-01-08 19:41:36 +00:00
MarkRanges res;
2019-01-07 12:51:14 +00:00
2019-01-22 15:58:11 +00:00
/// Some granules can cover two or more ranges,
/// this variable is stored to avoid reading the same granule twice.
2019-01-08 19:41:36 +00:00
MergeTreeIndexGranulePtr granule = nullptr;
size_t last_index_mark = 0;
for (const auto & range : ranges)
{
MarkRange index_range(
2020-05-28 12:37:05 +00:00
range.begin / index_granularity,
(range.end + index_granularity - 1) / index_granularity);
2019-01-07 12:51:14 +00:00
2019-01-22 15:58:11 +00:00
if (last_index_mark != index_range.begin || !granule)
2019-01-08 19:41:36 +00:00
reader.seek(index_range.begin);
2019-01-07 12:51:14 +00:00
total_granules += index_range.end - index_range.begin;
2019-01-08 19:41:36 +00:00
for (size_t index_mark = index_range.begin; index_mark < index_range.end; ++index_mark)
{
if (index_mark != index_range.begin || !granule || last_index_mark != index_range.begin)
granule = reader.read();
2019-01-07 12:51:14 +00:00
2019-01-08 19:41:36 +00:00
MarkRange data_range(
2020-05-28 12:37:05 +00:00
std::max(range.begin, index_mark * index_granularity),
std::min(range.end, (index_mark + 1) * index_granularity));
2019-01-07 12:51:14 +00:00
2019-01-08 19:41:36 +00:00
if (!condition->mayBeTrueOnGranule(granule))
2019-01-09 14:15:23 +00:00
{
2019-01-30 13:34:28 +00:00
++granules_dropped;
2019-01-08 19:41:36 +00:00
continue;
2019-01-09 14:15:23 +00:00
}
2019-01-07 12:51:14 +00:00
2019-02-14 16:59:26 +00:00
if (res.empty() || res.back().end - data_range.begin > min_marks_for_seek)
2019-01-08 19:41:36 +00:00
res.push_back(data_range);
else
res.back().end = data_range.end;
2019-01-07 12:51:14 +00:00
}
2019-01-08 19:41:36 +00:00
last_index_mark = index_range.end - 1;
2019-01-07 12:51:14 +00:00
}
2019-01-30 13:34:28 +00:00
LOG_DEBUG(log, "Index {} has dropped {} / {} granules.", backQuote(index_helper->index.name), granules_dropped, total_granules);
2019-01-30 13:34:28 +00:00
2019-01-08 19:41:36 +00:00
return res;
}
2019-04-01 11:09:30 +00:00
2019-01-07 12:51:14 +00:00
2014-03-13 12:48:07 +00:00
}