Merge pull request #6054 from CurtizJ/order-by-efficient

Optimization of ORDER BY with respect to the ORDER key in MergeTree tables (continuation of #5042).
This commit is contained in:
Nikolai Kochetov 2019-07-27 17:05:50 +03:00 committed by GitHub
commit 3f2d857cb1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1903 additions and 285 deletions

View File

@ -322,6 +322,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, parallel_view_processing, false, "Enables pushing to attached views concurrently instead of sequentially.") \
M(SettingBool, enable_debug_queries, false, "Enables debug queries such as AST.") \
M(SettingBool, enable_unaligned_array_join, false, "Allow ARRAY JOIN with multiple arrays that have different sizes. When this settings is enabled, arrays will be resized to the longest one.") \
M(SettingBool, optimize_pk_order, true, "Enable order by optimization in reading in primary key order.") \
M(SettingBool, low_cardinality_allow_in_native_format, true, "Use LowCardinality type in Native format. Otherwise, convert LowCardinality columns to ordinary for select query, and convert ordinary columns to required LowCardinality for insert query.") \
M(SettingBool, allow_experimental_multiple_joins_emulation, true, "Emulate multiple joins using subselects") \
M(SettingBool, allow_experimental_cross_to_join_conversion, true, "Convert CROSS JOIN to INNER JOIN if possible") \

View File

@ -62,6 +62,7 @@ struct Less
}
};
Block FinishSortingBlockInputStream::readImpl()
{
if (limit && total_rows_processed >= limit)
@ -98,12 +99,11 @@ Block FinishSortingBlockInputStream::readImpl()
if (description_to_sort.empty())
return block;
size_t size = block.rows();
if (size == 0)
if (block.rows() == 0)
continue;
/// We need to sort each block separately before merging.
sortBlock(block, description_to_sort);
// We need to sort each block separately before merging.
sortBlock(block, description_to_sort, limit);
removeConstantsFromBlock(block);
@ -116,6 +116,7 @@ Block FinishSortingBlockInputStream::readImpl()
Less less(last_columns, current_columns);
size_t size = block.rows();
IColumn::Permutation perm(size);
for (size_t i = 0; i < size; ++i)
perm[i] = i;
@ -148,8 +149,11 @@ Block FinishSortingBlockInputStream::readImpl()
blocks.push_back(block);
}
impl = std::make_unique<MergeSortingBlocksBlockInputStream>(blocks, description_to_sort, max_merged_block_size, limit);
res = impl->read();
if (!blocks.empty())
{
impl = std::make_unique<MergeSortingBlocksBlockInputStream>(blocks, description_to_sort, max_merged_block_size, limit);
res = impl->read();
}
}
if (res)

View File

@ -123,13 +123,21 @@ void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current,
if (order >= size || &cursors[order] != current.impl)
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
source_blocks[order] = new detail::SharedBlock(children[order]->read());
if (*source_blocks[order])
while (true)
{
cursors[order].reset(*source_blocks[order]);
queue.push(TSortCursor(&cursors[order]));
source_blocks[order]->all_columns = cursors[order].all_columns;
source_blocks[order]->sort_columns = cursors[order].sort_columns;
source_blocks[order] = new detail::SharedBlock(children[order]->read());
if (!*source_blocks[order])
break;
if (source_blocks[order]->rows())
{
cursors[order].reset(*source_blocks[order]);
queue.push(TSortCursor(&cursors[order]));
source_blocks[order]->all_columns = cursors[order].all_columns;
source_blocks[order]->sort_columns = cursors[order].sort_columns;
break;
}
}
}

View File

@ -0,0 +1,40 @@
#include "ReverseBlockInputStream.h"
namespace DB
{
ReverseBlockInputStream::ReverseBlockInputStream(const BlockInputStreamPtr & input)
{
children.push_back(input);
}
String ReverseBlockInputStream::getName() const
{
return "Reverse";
}
Block ReverseBlockInputStream::getHeader() const
{
return children.at(0)->getHeader();
}
Block ReverseBlockInputStream::readImpl()
{
auto result_block = children.back()->read();
if (!result_block)
{
return Block();
}
IColumn::Permutation permutation;
size_t rows_size = result_block.rows();
for (size_t i = 0; i < rows_size; ++i)
permutation.emplace_back(rows_size - 1 - i);
for (auto & block : result_block)
block.column = block.column->permute(permutation, 0);
return result_block;
}
}

View File

@ -0,0 +1,21 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/// Reverses an order of rows in every block in a data stream.
class ReverseBlockInputStream : public IBlockInputStream
{
public:
ReverseBlockInputStream(const BlockInputStreamPtr & input);
String getName() const override;
Block getHeader() const override;
protected:
Block readImpl() override;
};
}

View File

@ -1,5 +1,6 @@
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/FinishSortingBlockInputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/LimitByBlockInputStream.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
@ -22,6 +23,7 @@
#include <DataStreams/CubeBlockInputStream.h>
#include <DataStreams/ConvertColumnLowCardinalityToFullBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/ReverseBlockInputStream.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
@ -50,6 +52,7 @@
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Functions/IFunction.h>
#include <Core/Field.h>
#include <Core/Types.h>
#include <Columns/Collator.h>
@ -582,6 +585,157 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
}
static SortDescription getSortDescription(const ASTSelectQuery & query)
{
SortDescription order_descr;
order_descr.reserve(query.orderBy()->children.size());
for (const auto & elem : query.orderBy()->children)
{
String name = elem->children.front()->getColumnName();
const auto & order_by_elem = elem->as<ASTOrderByElement &>();
std::shared_ptr<Collator> collator;
if (order_by_elem.collation)
collator = std::make_shared<Collator>(order_by_elem.collation->as<ASTLiteral &>().value.get<String>());
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator);
}
return order_descr;
}
static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context)
{
const auto & [field, type] = evaluateConstantExpression(node, context);
if (!isNativeNumber(type))
throw Exception("Illegal type " + type->getName() + " of LIMIT expression, must be numeric type", ErrorCodes::INVALID_LIMIT_EXPRESSION);
Field converted = convertFieldToType(field, DataTypeUInt64());
if (converted.isNull())
throw Exception("The value " + applyVisitor(FieldVisitorToString(), field) + " of LIMIT expression is not representable as UInt64", ErrorCodes::INVALID_LIMIT_EXPRESSION);
return converted.safeGet<UInt64>();
}
static std::pair<UInt64, UInt64> getLimitLengthAndOffset(const ASTSelectQuery & query, const Context & context)
{
UInt64 length = 0;
UInt64 offset = 0;
if (query.limitLength())
{
length = getLimitUIntValue(query.limitLength(), context);
if (query.limitOffset())
offset = getLimitUIntValue(query.limitOffset(), context);
}
return {length, offset};
}
static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & context)
{
/// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY.
if (!query.distinct && !query.limitBy())
{
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
return limit_length + limit_offset;
}
return 0;
}
static SortingInfoPtr optimizeSortingWithPK(const MergeTreeData & merge_tree, const ASTSelectQuery & query, const Context & context)
{
if (!merge_tree.hasSortingKey())
return {};
auto order_descr = getSortDescription(query);
SortDescription prefix_order_descr;
int read_direction = order_descr.at(0).direction;
const auto & sorting_key_columns = merge_tree.getSortingKeyColumns();
size_t prefix_size = std::min(order_descr.size(), sorting_key_columns.size());
auto order_by_expr = query.orderBy();
auto syntax_result = SyntaxAnalyzer(context).analyze(order_by_expr, merge_tree.getColumns().getAllPhysical());
for (size_t i = 0; i < prefix_size; ++i)
{
/// Read in pk order in case of exact match with order key element
/// or in some simple cases when order key element is wrapped into monotonic function.
int current_direction = order_descr[i].direction;
if (order_descr[i].column_name == sorting_key_columns[i] && current_direction == read_direction)
prefix_order_descr.push_back(order_descr[i]);
else
{
const auto & ast = query.orderBy()->children[i];
ExpressionActionsPtr actions;
try
{
actions = ExpressionAnalyzer(ast->children.at(0), syntax_result, context).getActions(false);
}
catch (const Exception &)
{
/// Can't analyze order expression at this stage.
/// May be some actions required for order will be executed later.
break;
}
const auto & input_columns = actions->getRequiredColumnsWithTypes();
if (input_columns.size() != 1 || input_columns.front().name != sorting_key_columns[i])
break;
bool first = true;
for (const auto & action : actions->getActions())
{
if (action.type != ExpressionAction::APPLY_FUNCTION)
continue;
if (!first)
{
current_direction = 0;
break;
}
else
first = false;
const auto & func = *action.function_base;
if (!func.hasInformationAboutMonotonicity())
{
current_direction = 0;
break;
}
auto monotonicity = func.getMonotonicityForRange(*input_columns.front().type, {}, {});
if (!monotonicity.is_monotonic)
{
current_direction = 0;
break;
}
else if (!monotonicity.is_positive)
current_direction *= -1;
}
if (!current_direction || (i > 0 && current_direction != read_direction))
break;
if (i == 0)
read_direction = current_direction;
prefix_order_descr.push_back(order_descr[i]);
}
}
if (prefix_order_descr.empty())
return {};
return std::make_shared<SortingInfo>(std::move(prefix_order_descr), read_direction);
}
template <typename TPipeline>
void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, bool dry_run)
{
@ -614,14 +768,14 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
auto optimize_prewhere = [&](auto & merge_tree)
{
SelectQueryInfo query_info;
query_info.query = query_ptr;
query_info.syntax_analyzer_result = syntax_analyzer_result;
query_info.sets = query_analyzer->getPreparedSets();
SelectQueryInfo current_info;
current_info.query = query_ptr;
current_info.syntax_analyzer_result = syntax_analyzer_result;
current_info.sets = query_analyzer->getPreparedSets();
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
if (settings.optimize_move_to_prewhere && query.where() && !query.prewhere() && !query.final())
MergeTreeWhereOptimizer{query_info, context, merge_tree, query_analyzer->getRequiredSourceColumns(), log};
MergeTreeWhereOptimizer{current_info, context, merge_tree, query_analyzer->getRequiredSourceColumns(), log};
};
if (const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))
@ -639,6 +793,13 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
source_header = storage->getSampleBlockForColumns(filter_info->actions->getRequiredColumns());
}
SortingInfoPtr sorting_info;
if (settings.optimize_pk_order && storage && query.orderBy() && !query.groupBy() && !query.final())
{
if (const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))
sorting_info = optimizeSortingWithPK(*merge_tree_data, query, context);
}
if (dry_run)
{
if constexpr (pipeline_with_processors)
@ -688,7 +849,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
executeFetchColumns(from_stage, pipeline, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere);
executeFetchColumns(from_stage, pipeline, sorting_info, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere);
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(options.to_stage));
}
@ -806,7 +967,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (!expressions.second_stage && !expressions.need_aggregate && !expressions.has_having)
{
if (expressions.has_order_by)
executeOrder(pipeline);
executeOrder(pipeline, query_info.sorting_info);
if (expressions.has_order_by && query.limitLength())
executeDistinct(pipeline, false, expressions.selected_columns);
@ -897,10 +1058,11 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
* but there is no aggregation, then on the remote servers ORDER BY was made
* - therefore, we merge the sorted streams from remote servers.
*/
if (!expressions.first_stage && !expressions.need_aggregate && !(query.group_by_with_totals && !aggregate_final))
executeMergeSorted(pipeline);
else /// Otherwise, just sort.
executeOrder(pipeline);
executeOrder(pipeline, query_info.sorting_info);
}
/** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,
@ -956,51 +1118,10 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
}
static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context)
{
const auto & [field, type] = evaluateConstantExpression(node, context);
if (!isNativeNumber(type))
throw Exception("Illegal type " + type->getName() + " of LIMIT expression, must be numeric type", ErrorCodes::INVALID_LIMIT_EXPRESSION);
Field converted = convertFieldToType(field, DataTypeUInt64());
if (converted.isNull())
throw Exception("The value " + applyVisitor(FieldVisitorToString(), field) + " of LIMIT expression is not representable as UInt64", ErrorCodes::INVALID_LIMIT_EXPRESSION);
return converted.safeGet<UInt64>();
}
static std::pair<UInt64, UInt64> getLimitLengthAndOffset(const ASTSelectQuery & query, const Context & context)
{
UInt64 length = 0;
UInt64 offset = 0;
if (query.limitLength())
{
length = getLimitUIntValue(query.limitLength(), context);
if (query.limitOffset())
offset = getLimitUIntValue(query.limitOffset(), context);
}
return {length, offset};
}
static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & context)
{
/// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY.
if (!query.distinct && !query.limitBy())
{
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
return limit_length + limit_offset;
}
return 0;
}
template <typename TPipeline>
void InterpreterSelectQuery::executeFetchColumns(
QueryProcessingStage::Enum processing_stage, TPipeline & pipeline,
const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere)
const SortingInfoPtr & sorting_info, const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere)
{
constexpr bool pipeline_with_processors = std::is_same<TPipeline, QueryPipeline>::value;
@ -1256,11 +1377,11 @@ void InterpreterSelectQuery::executeFetchColumns(
if (max_streams > 1 && !is_remote)
max_streams *= settings.max_streams_to_max_threads_ratio;
SelectQueryInfo query_info;
query_info.query = query_ptr;
query_info.syntax_analyzer_result = syntax_analyzer_result;
query_info.sets = query_analyzer->getPreparedSets();
query_info.prewhere_info = prewhere_info;
query_info.sorting_info = sorting_info;
auto streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams);
@ -1778,59 +1899,75 @@ void InterpreterSelectQuery::executeExpression(QueryPipeline & pipeline, const E
});
}
static SortDescription getSortDescription(const ASTSelectQuery & query)
{
SortDescription order_descr;
order_descr.reserve(query.orderBy()->children.size());
for (const auto & elem : query.orderBy()->children)
{
String name = elem->children.front()->getColumnName();
const auto & order_by_elem = elem->as<ASTOrderByElement &>();
std::shared_ptr<Collator> collator;
if (order_by_elem.collation)
collator = std::make_shared<Collator>(order_by_elem.collation->as<ASTLiteral &>().value.get<String>());
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator);
}
return order_descr;
}
void InterpreterSelectQuery::executeOrder(Pipeline & pipeline)
void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, SortingInfoPtr sorting_info)
{
auto & query = getSelectQuery();
SortDescription order_descr = getSortDescription(query);
const Settings & settings = context.getSettingsRef();
UInt64 limit = getLimitForSorting(query, context);
const Settings & settings = context.getSettingsRef();
pipeline.transform([&](auto & stream)
if (sorting_info)
{
auto sorting_stream = std::make_shared<PartialSortingBlockInputStream>(stream, order_descr, limit);
/* Case of sorting with optimization using sorting key.
* We have several threads, each of them reads batch of parts in direct
* or reverse order of sorting key using one input stream per part
* and then merge them into one sorted stream.
* At this stage we merge per-thread streams into one.
*/
/// Limits on sorting
IBlockInputStream::LocalLimits limits;
limits.mode = IBlockInputStream::LIMITS_TOTAL;
limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
sorting_stream->setLimits(limits);
if (sorting_info->prefix_order_descr.size() < order_descr.size())
{
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<FinishSortingBlockInputStream>(
stream, sorting_info->prefix_order_descr,
order_descr, settings.max_block_size, limit);
});
}
stream = sorting_stream;
});
if (pipeline.hasMoreThanOneStream())
{
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<AsynchronousBlockInputStream>(stream);
});
/// If there are several streams, we merge them into one
executeUnion(pipeline);
pipeline.firstStream() = std::make_shared<MergingSortedBlockInputStream>(
pipeline.streams, order_descr,
settings.max_block_size, limit);
pipeline.streams.resize(1);
}
}
else
{
pipeline.transform([&](auto & stream)
{
auto sorting_stream = std::make_shared<PartialSortingBlockInputStream>(stream, order_descr, limit);
/// Merge the sorted blocks.
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
pipeline.firstStream(), order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort,
settings.max_bytes_before_external_sort, context.getTemporaryPath());
/// Limits on sorting
IBlockInputStream::LocalLimits limits;
limits.mode = IBlockInputStream::LIMITS_TOTAL;
limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
sorting_stream->setLimits(limits);
stream = sorting_stream;
});
/// If there are several streams, we merge them into one
executeUnion(pipeline);
/// Merge the sorted blocks.
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
pipeline.firstStream(), order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort,
settings.max_bytes_before_external_sort, context.getTemporaryPath());
}
}
void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline)
void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, SortingInfoPtr /* sorting_info */)
{
/// TODO: Implement optimization using sorting_info
auto & query = getSelectQuery();
SortDescription order_descr = getSortDescription(query);
UInt64 limit = getLimitForSorting(query, context);

View File

@ -186,7 +186,8 @@ private:
template <typename TPipeline>
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, TPipeline & pipeline,
const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere);
const SortingInfoPtr & sorting_info, const PrewhereInfoPtr & prewhere_info,
const Names & columns_to_remove_after_prewhere);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
@ -194,7 +195,7 @@ private:
void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(Pipeline & pipeline);
void executeOrder(Pipeline & pipeline, SortingInfoPtr sorting_info);
void executeMergeSorted(Pipeline & pipeline);
void executePreLimit(Pipeline & pipeline);
void executeUnion(Pipeline & pipeline);
@ -211,7 +212,7 @@ private:
void executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
void executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(QueryPipeline & pipeline);
void executeOrder(QueryPipeline & pipeline, SortingInfoPtr sorting_info);
void executeMergeSorted(QueryPipeline & pipeline);
void executePreLimit(QueryPipeline & pipeline);
void executeLimitBy(QueryPipeline & pipeline);
@ -248,6 +249,7 @@ private:
NamesAndTypesList source_columns;
SyntaxAnalyzerResultPtr syntax_analyzer_result;
std::unique_ptr<ExpressionAnalyzer> query_analyzer;
SelectQueryInfo query_info;
/// How many streams we ask for storage to produce, and in how many threads we will do further processing.
size_t max_streams = 1;

View File

@ -348,6 +348,10 @@ public:
/// Returns additional columns that need to be read for FINAL to work.
virtual Names getColumnsRequiredForFinal() const { return {}; }
/// Returns names of primary key + secondary sorting columns
virtual Names getSortingKeyColumns() const { return {}; }
private:
/// You always need to take the next three locks in this order.

View File

@ -51,23 +51,59 @@ Block MergeTreeBaseSelectBlockInputStream::readImpl()
while (!res && !isCancelled())
{
if (!task && !getNewTask())
if ((!task || task->isFinished()) && !getNewTask())
break;
res = readFromPart();
if (res)
injectVirtualColumns(res);
if (task->isFinished())
task.reset();
}
return res;
}
Block MergeTreeBaseSelectBlockInputStream::readFromPart()
void MergeTreeBaseSelectBlockInputStream::initializeRangeReaders(MergeTreeReadTask & current_task)
{
if (prewhere_info)
{
if (reader->getColumns().empty())
{
current_task.range_reader = MergeTreeRangeReader(
pre_reader.get(), nullptr,
prewhere_info->alias_actions, prewhere_info->prewhere_actions,
&prewhere_info->prewhere_column_name, &current_task.ordered_names,
current_task.should_reorder, current_task.remove_prewhere_column, true);
}
else
{
MergeTreeRangeReader * pre_reader_ptr = nullptr;
if (pre_reader != nullptr)
{
current_task.pre_range_reader = MergeTreeRangeReader(
pre_reader.get(), nullptr,
prewhere_info->alias_actions, prewhere_info->prewhere_actions,
&prewhere_info->prewhere_column_name, &current_task.ordered_names,
current_task.should_reorder, current_task.remove_prewhere_column, false);
pre_reader_ptr = &current_task.pre_range_reader;
}
current_task.range_reader = MergeTreeRangeReader(
reader.get(), pre_reader_ptr, nullptr, nullptr,
nullptr, &current_task.ordered_names, true, false, true);
}
}
else
{
current_task.range_reader = MergeTreeRangeReader(
reader.get(), nullptr, nullptr, nullptr,
nullptr, &current_task.ordered_names, current_task.should_reorder, false, true);
}
}
Block MergeTreeBaseSelectBlockInputStream::readFromPartImpl()
{
if (task->size_predictor)
task->size_predictor->startBlock();
@ -113,44 +149,6 @@ Block MergeTreeBaseSelectBlockInputStream::readFromPart()
return index_granularity.countMarksForRows(current_reader.currentMark(), rows_to_read, current_reader.numReadRowsInCurrentGranule());
};
if (!task->range_reader.isInitialized())
{
if (prewhere_info)
{
if (reader->getColumns().empty())
{
task->range_reader = MergeTreeRangeReader(
pre_reader.get(), nullptr,
prewhere_info->alias_actions, prewhere_info->prewhere_actions,
&prewhere_info->prewhere_column_name, &task->ordered_names,
task->should_reorder, task->remove_prewhere_column, true);
}
else
{
MergeTreeRangeReader * pre_reader_ptr = nullptr;
if (pre_reader != nullptr)
{
task->pre_range_reader = MergeTreeRangeReader(
pre_reader.get(), nullptr,
prewhere_info->alias_actions, prewhere_info->prewhere_actions,
&prewhere_info->prewhere_column_name, &task->ordered_names,
task->should_reorder, task->remove_prewhere_column, false);
pre_reader_ptr = &task->pre_range_reader;
}
task->range_reader = MergeTreeRangeReader(
reader.get(), pre_reader_ptr, nullptr, nullptr,
nullptr, &task->ordered_names, true, false, true);
}
}
else
{
task->range_reader = MergeTreeRangeReader(
reader.get(), nullptr, nullptr, nullptr,
nullptr, &task->ordered_names, task->should_reorder, false, true);
}
}
UInt64 recommended_rows = estimateNumRows(*task, task->range_reader);
UInt64 rows_to_read = std::max(UInt64(1), std::min(current_max_block_size_rows, recommended_rows));
@ -185,6 +183,15 @@ Block MergeTreeBaseSelectBlockInputStream::readFromPart()
}
Block MergeTreeBaseSelectBlockInputStream::readFromPart()
{
if (!task->range_reader.isInitialized())
initializeRangeReaders(*task);
return readFromPartImpl();
}
void MergeTreeBaseSelectBlockInputStream::injectVirtualColumns(Block & block) const
{
/// add virtual columns

View File

@ -42,10 +42,16 @@ protected:
/// We will call progressImpl manually.
void progress(const Progress &) override {}
Block readFromPart();
virtual Block readFromPart();
Block readFromPartImpl();
void injectVirtualColumns(Block & block) const;
void initializeRangeReaders(MergeTreeReadTask & task);
size_t estimateNumRows(MergeTreeReadTask & current_task, MergeTreeRangeReader & current_reader);
protected:
const MergeTreeData & storage;

View File

@ -193,4 +193,64 @@ void MergeTreeBlockSizePredictor::update(const Block & block, double decay)
}
}
MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns, const PrewhereInfoPtr & prewhere_info, bool check_columns)
{
Names column_names = required_columns;
Names pre_column_names;
/// inject columns required for defaults evaluation
bool should_reorder = !injectRequiredColumns(storage, data_part, column_names).empty();
if (prewhere_info)
{
if (prewhere_info->alias_actions)
pre_column_names = prewhere_info->alias_actions->getRequiredColumns();
else
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
if (pre_column_names.empty())
pre_column_names.push_back(column_names[0]);
const auto injected_pre_columns = injectRequiredColumns(storage, data_part, pre_column_names);
if (!injected_pre_columns.empty())
should_reorder = true;
const NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
Names post_column_names;
for (const auto & name : column_names)
if (!pre_name_set.count(name))
post_column_names.push_back(name);
column_names = post_column_names;
}
MergeTreeReadTaskColumns result;
if (check_columns)
{
/// Under owned_data_part->columns_lock we check that all requested columns are of the same type as in the table.
/// This may be not true in case of ALTER MODIFY.
if (!pre_column_names.empty())
storage.check(data_part->columns, pre_column_names);
if (!column_names.empty())
storage.check(data_part->columns, column_names);
const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
result.pre_columns = physical_columns.addTypes(pre_column_names);
result.columns = physical_columns.addTypes(column_names);
}
else
{
result.pre_columns = data_part->columns.addTypes(pre_column_names);
result.columns = data_part->columns.addTypes(column_names);
}
result.should_reorder = should_reorder;
return result;
}
}

View File

@ -64,6 +64,18 @@ struct MergeTreeReadTask
virtual ~MergeTreeReadTask();
};
struct MergeTreeReadTaskColumns
{
/// column names to read during WHERE
NamesAndTypesList columns;
/// column names to read during PREWHERE
NamesAndTypesList pre_columns;
/// resulting block may require reordering in accordance with `ordered_names`
bool should_reorder;
};
MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns, const PrewhereInfoPtr & prewhere_info, bool check_columns);
struct MergeTreeBlockSizePredictor
{

View File

@ -328,6 +328,7 @@ public:
Names getColumnsRequiredForPrimaryKey() const override { return primary_key_expr->getRequiredColumns(); }
Names getColumnsRequiredForSampling() const override { return columns_required_for_sampling; }
Names getColumnsRequiredForFinal() const override { return sorting_key_expr->getRequiredColumns(); }
Names getSortingKeyColumns() const override { return sorting_key_columns; }
bool supportsPrewhere() const override { return true; }
bool supportsSampling() const override { return sample_by_ast != nullptr; }

View File

@ -142,7 +142,10 @@ MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & na
{
}
MergeTreeDataPart::MergeTreeDataPart(const MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_)
MergeTreeDataPart::MergeTreeDataPart(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_)
: storage(storage_)
, name(name_)
, info(info_)

View File

@ -6,6 +6,7 @@
#include <Common/FieldVisitors.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeReverseSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
@ -40,9 +41,11 @@ namespace std
#include <DataStreams/CollapsingFinalBlockInputStream.h>
#include <DataStreams/AddingConstColumnBlockInputStream.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/SummingSortedBlockInputStream.h>
#include <DataStreams/ReplacingSortedBlockInputStream.h>
#include <DataStreams/ReverseBlockInputStream.h>
#include <DataStreams/AggregatingSortedBlockInputStream.h>
#include <DataStreams/VersionedCollapsingSortedBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
@ -547,8 +550,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
parts_with_ranges.push_back(ranges);
sum_ranges += ranges.ranges.size();
for (const auto & range : ranges.ranges)
sum_marks += range.end - range.begin;
sum_marks += ranges.getMarksCount();
}
}
@ -583,7 +585,19 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
column_names_to_read,
max_block_size,
settings.use_uncompressed_cache,
query_info.prewhere_info,
query_info,
virt_column_names,
settings);
}
else if (settings.optimize_pk_order && query_info.sorting_info)
{
res = spreadMarkRangesAmongStreamsPKOrder(
std::move(parts_with_ranges),
num_streams,
column_names_to_read,
max_block_size,
settings.use_uncompressed_cache,
query_info,
virt_column_names,
settings);
}
@ -595,7 +609,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
column_names_to_read,
max_block_size,
settings.use_uncompressed_cache,
query_info.prewhere_info,
query_info,
virt_column_names,
settings);
}
@ -641,11 +655,10 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
const Names & column_names,
UInt64 max_block_size,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings) const
{
/// Count marks for each part.
std::vector<size_t> sum_marks_in_parts(parts.size());
size_t sum_marks = 0;
@ -658,10 +671,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
/// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`.
std::reverse(parts[i].ranges.begin(), parts[i].ranges.end());
for (const auto & range : parts[i].ranges)
sum_marks_in_parts[i] += range.end - range.begin;
sum_marks_in_parts[i] = parts[i].getMarksCount();
sum_marks += sum_marks_in_parts[i];
if (parts[i].data_part->index_granularity_info.is_adaptive)
adaptive_parts++;
}
@ -694,7 +706,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts.size());
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
num_streams, sum_marks, min_marks_for_concurrent_read, parts, data, prewhere_info, true,
num_streams, sum_marks, min_marks_for_concurrent_read, parts, data, query_info.prewhere_info, true,
column_names, MergeTreeReadPool::BackoffSettings(settings), settings.preferred_block_size_bytes, false);
/// Let's estimate total number of rows for progress bar.
@ -705,7 +717,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
res.emplace_back(std::make_shared<MergeTreeThreadSelectBlockInputStream>(
i, pool, min_marks_for_concurrent_read, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, data, use_uncompressed_cache,
prewhere_info, settings, virt_columns));
query_info.prewhere_info, settings, virt_columns));
if (i == 0)
{
@ -781,7 +793,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeSelectBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
use_uncompressed_cache, prewhere_info, true, settings.min_bytes_to_use_direct_io,
use_uncompressed_cache, query_info.prewhere_info, true, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, true, virt_columns, part.part_index_in_query);
res.push_back(source_stream);
@ -795,16 +807,202 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
return res;
}
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsPKOrder(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings) const
{
size_t sum_marks = 0;
SortingInfoPtr sorting_info = query_info.sorting_info;
size_t adaptive_parts = 0;
std::vector<size_t> sum_marks_in_parts(parts.size());
/// In case of reverse order let's split ranges to avoid reading much data.
auto split_ranges = [max_block_size](const auto & ranges, size_t rows_granularity, size_t num_marks_in_part)
{
/// Constants is just a guess.
const size_t min_rows_in_range = max_block_size;
const size_t max_num_ranges = 64;
size_t min_marks_in_range = std::max(
(min_rows_in_range + rows_granularity - 1) / rows_granularity,
(num_marks_in_part + max_num_ranges - 1) / max_num_ranges);
MarkRanges new_ranges;
for (auto range : ranges)
{
while (range.begin + min_marks_in_range < range.end)
{
new_ranges.emplace_back(range.begin, range.begin + min_marks_in_range);
range.begin += min_marks_in_range;
}
new_ranges.emplace_back(range.begin, range.end);
}
return new_ranges;
};
for (size_t i = 0; i < parts.size(); ++i)
{
sum_marks_in_parts[i] = parts[i].getMarksCount();
sum_marks += sum_marks_in_parts[i];
if (sorting_info->direction == -1)
parts[i].ranges = split_ranges(parts[i].ranges, data.settings.index_granularity, sum_marks_in_parts[i]);
/// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`.
std::reverse(parts[i].ranges.begin(), parts[i].ranges.end());
if (parts[i].data_part->index_granularity_info.is_adaptive)
adaptive_parts++;
}
size_t index_granularity_bytes = 0;
if (adaptive_parts > parts.size() / 2)
index_granularity_bytes = data.settings.index_granularity_bytes;
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
settings.merge_tree_max_rows_to_use_cache,
settings.merge_tree_max_bytes_to_use_cache,
data.settings.index_granularity,
index_granularity_bytes);
const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks(
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
data.settings.index_granularity,
index_granularity_bytes);
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
BlockInputStreams streams;
if (sum_marks == 0)
return streams;
const size_t min_marks_per_stream = (sum_marks - 1) / num_streams + 1;
for (size_t i = 0; i < num_streams && !parts.empty(); ++i)
{
size_t need_marks = min_marks_per_stream;
BlockInputStreams streams_per_thread;
/// Loop over parts.
/// We will iteratively take part or some subrange of a part from the back
/// and assign a stream to read from it.
while (need_marks > 0 && !parts.empty())
{
RangesInDataPart part = parts.back();
parts.pop_back();
size_t & marks_in_part = sum_marks_in_parts.back();
/// We will not take too few rows from a part.
if (marks_in_part >= min_marks_for_concurrent_read &&
need_marks < min_marks_for_concurrent_read)
need_marks = min_marks_for_concurrent_read;
/// Do not leave too few rows in the part.
if (marks_in_part > need_marks &&
marks_in_part - need_marks < min_marks_for_concurrent_read)
need_marks = marks_in_part;
MarkRanges ranges_to_get_from_part;
/// We take the whole part if it is small enough.
if (marks_in_part <= need_marks)
{
/// Restore the order of segments.
std::reverse(part.ranges.begin(), part.ranges.end());
ranges_to_get_from_part = part.ranges;
need_marks -= marks_in_part;
sum_marks_in_parts.pop_back();
}
else
{
/// Loop through ranges in part. Take enough ranges to cover "need_marks".
while (need_marks > 0)
{
if (part.ranges.empty())
throw Exception("Unexpected end of ranges while spreading marks among streams", ErrorCodes::LOGICAL_ERROR);
MarkRange & range = part.ranges.back();
const size_t marks_in_range = range.end - range.begin;
const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);
ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range);
range.begin += marks_to_get_from_range;
marks_in_part -= marks_to_get_from_range;
need_marks -= marks_to_get_from_range;
if (range.begin == range.end)
part.ranges.pop_back();
}
parts.emplace_back(part);
}
BlockInputStreamPtr source_stream;
if (sorting_info->direction == 1)
{
source_stream = std::make_shared<MergeTreeSelectBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
use_uncompressed_cache, query_info.prewhere_info, true, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, true, virt_columns, part.part_index_in_query);
}
else
{
source_stream = std::make_shared<MergeTreeReverseSelectBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
use_uncompressed_cache, query_info.prewhere_info, true, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, true, virt_columns, part.part_index_in_query);
source_stream = std::make_shared<ReverseBlockInputStream>(source_stream);
}
streams_per_thread.push_back(source_stream);
}
if (streams_per_thread.size() > 1)
{
SortDescription sort_description;
for (size_t j = 0; j < query_info.sorting_info->prefix_order_descr.size(); ++j)
sort_description.emplace_back(data.sorting_key_columns[j],
sorting_info->direction, 1);
for (auto & stream : streams_per_thread)
stream = std::make_shared<ExpressionBlockInputStream>(stream, data.sorting_key_expr);
streams.push_back(std::make_shared<MergingSortedBlockInputStream>(
streams_per_thread, sort_description, max_block_size));
}
else
streams.push_back(streams_per_thread.at(0));
}
return streams;
}
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
const Names & column_names,
UInt64 max_block_size,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings) const
{
size_t sum_marks = 0;
size_t adaptive_parts = 0;
for (size_t i = 0; i < parts.size(); ++i)
@ -840,13 +1038,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeSelectBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
prewhere_info, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true,
query_info.prewhere_info, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true,
virt_columns, part.part_index_in_query);
to_merge.emplace_back(std::make_shared<ExpressionBlockInputStream>(source_stream, data.sorting_key_expr));
}
Names sort_columns = data.sorting_key_columns;
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();

View File

@ -52,7 +52,17 @@ private:
const Names & column_names,
UInt64 max_block_size,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings) const;
BlockInputStreams spreadMarkRangesAmongStreamsPKOrder(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings) const;
@ -61,7 +71,7 @@ private:
const Names & column_names,
UInt64 max_block_size,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings) const;

View File

@ -427,6 +427,7 @@ size_t MergeTreeRangeReader::numReadRowsInCurrentGranule() const
{
return prev_reader ? prev_reader->numReadRowsInCurrentGranule() : stream.numReadRowsInCurrentGranule();
}
size_t MergeTreeRangeReader::numPendingRowsInCurrentGranule() const
{
if (prev_reader)

View File

@ -219,65 +219,16 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
per_part_columns_lock.emplace_back(part.data_part->columns_lock);
/// inject column names required for DEFAULT evaluation in current part
auto required_column_names = column_names;
auto [required_columns, required_pre_columns, should_reorder] =
getReadTaskColumns(data, part.data_part, column_names, prewhere_info, check_columns);
const auto injected_columns = injectRequiredColumns(data, part.data_part, required_column_names);
auto should_reoder = !injected_columns.empty();
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & required_column_names = required_columns.getNames();
per_part_column_name_set.emplace_back(required_column_names.begin(), required_column_names.end());
Names required_pre_column_names;
if (prewhere_info)
{
/// collect columns required for PREWHERE evaluation
if (prewhere_info->alias_actions)
required_pre_column_names = prewhere_info->alias_actions->getRequiredColumns();
else
required_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
/// there must be at least one column required for PREWHERE
if (required_pre_column_names.empty())
required_pre_column_names.push_back(required_column_names[0]);
/// PREWHERE columns may require some additional columns for DEFAULT evaluation
const auto injected_pre_columns = injectRequiredColumns(data, part.data_part, required_pre_column_names);
if (!injected_pre_columns.empty())
should_reoder = true;
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const NameSet pre_name_set(required_pre_column_names.begin(), required_pre_column_names.end());
Names post_column_names;
for (const auto & name : required_column_names)
if (!pre_name_set.count(name))
post_column_names.push_back(name);
required_column_names = post_column_names;
}
per_part_column_name_set.emplace_back(std::begin(required_column_names), std::end(required_column_names));
if (check_columns)
{
/** Under part->columns_lock check that all requested columns in part are of same type that in table.
* This could be violated during ALTER MODIFY.
*/
if (!required_pre_column_names.empty())
data.check(part.data_part->columns, required_pre_column_names);
if (!required_column_names.empty())
data.check(part.data_part->columns, required_column_names);
const NamesAndTypesList & physical_columns = data.getColumns().getAllPhysical();
per_part_pre_columns.push_back(physical_columns.addTypes(required_pre_column_names));
per_part_columns.push_back(physical_columns.addTypes(required_column_names));
}
else
{
per_part_pre_columns.push_back(part.data_part->columns.addTypes(required_pre_column_names));
per_part_columns.push_back(part.data_part->columns.addTypes(required_column_names));
}
per_part_should_reorder.push_back(should_reoder);
per_part_pre_columns.push_back(std::move(required_pre_columns));
per_part_columns.push_back(std::move(required_columns));
per_part_should_reorder.push_back(should_reorder);
parts_with_idx.push_back({ part.data_part, part.part_index_in_query });

View File

@ -0,0 +1,191 @@
#include <Storages/MergeTree/MergeTreeReverseSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Core/Defines.h>
namespace DB
{
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
MergeTreeReverseSelectBlockInputStream::MergeTreeReverseSelectBlockInputStream(
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & owned_data_part_,
UInt64 max_block_size_rows_,
size_t preferred_block_size_bytes_,
size_t preferred_max_column_in_block_size_bytes_,
Names required_columns_,
const MarkRanges & mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
bool check_columns,
size_t min_bytes_to_use_direct_io_,
size_t max_read_buffer_size_,
bool save_marks_in_cache_,
const Names & virt_column_names_,
size_t part_index_in_query_,
bool quiet)
:
MergeTreeBaseSelectBlockInputStream{storage_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names_},
required_columns{required_columns_},
data_part{owned_data_part_},
part_columns_lock(data_part->columns_lock),
all_mark_ranges(mark_ranges_),
part_index_in_query(part_index_in_query_),
path(data_part->getFullPath())
{
/// Let's estimate total number of rows for progress bar.
for (const auto & range : all_mark_ranges)
total_marks_count += range.end - range.begin;
size_t total_rows = data_part->index_granularity.getTotalRows();
if (!quiet)
LOG_TRACE(log, "Reading " << all_mark_ranges.size() << " ranges in reverse order from part " << data_part->name
<< ", approx. " << total_rows
<< (all_mark_ranges.size() > 1
? ", up to " + toString(data_part->index_granularity.getRowsCountInRanges(all_mark_ranges))
: "")
<< " rows starting from " << data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));
addTotalRowsApprox(total_rows);
header = storage.getSampleBlockForColumns(required_columns);
/// Types may be different during ALTER (when this stream is used to perform an ALTER).
/// NOTE: We may use similar code to implement non blocking ALTERs.
for (const auto & name_type : data_part->columns)
{
if (header.has(name_type.name))
{
auto & elem = header.getByName(name_type.name);
if (!elem.type->equals(*name_type.type))
{
elem.type = name_type.type;
elem.column = elem.type->createColumn();
}
}
}
executePrewhereActions(header, prewhere_info);
injectVirtualColumns(header);
ordered_names = getHeader().getNames();
task_columns = getReadTaskColumns(storage, data_part, required_columns, prewhere_info, check_columns);
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & column_names = task_columns.columns.getNames();
column_name_set = NameSet{column_names.begin(), column_names.end()};
if (use_uncompressed_cache)
owned_uncompressed_cache = storage.global_context.getUncompressedCache();
owned_mark_cache = storage.global_context.getMarkCache();
reader = std::make_unique<MergeTreeReader>(
path, data_part, task_columns.columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), save_marks_in_cache, storage,
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
if (prewhere_info)
pre_reader = std::make_unique<MergeTreeReader>(
path, data_part, task_columns.pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), save_marks_in_cache, storage,
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
}
Block MergeTreeReverseSelectBlockInputStream::getHeader() const
{
return header;
}
bool MergeTreeReverseSelectBlockInputStream::getNewTask()
try
{
if ((blocks.empty() && all_mark_ranges.empty()) || total_marks_count == 0)
{
finish();
return false;
}
/// We have some blocks to return in buffer.
/// Return true to continue reading, but actually don't create a task.
if (all_mark_ranges.empty())
return true;
/// Read ranges from right to left.
MarkRanges mark_ranges_for_task = { all_mark_ranges.back() };
all_mark_ranges.pop_back();
auto size_predictor = (preferred_block_size_bytes == 0)
? nullptr
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, data_part->storage.getSampleBlock());
task = std::make_unique<MergeTreeReadTask>(
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set,
task_columns.columns, task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
task_columns.should_reorder, std::move(size_predictor));
return true;
}
catch (...)
{
/// Suspicion of the broken part. A part is added to the queue for verification.
if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part->name);
throw;
}
Block MergeTreeReverseSelectBlockInputStream::readFromPart()
{
Block res;
if (!blocks.empty())
{
res = std::move(blocks.back());
blocks.pop_back();
return res;
}
if (!task->range_reader.isInitialized())
initializeRangeReaders(*task);
while (!task->isFinished())
{
Block block = readFromPartImpl();
blocks.push_back(std::move(block));
}
if (blocks.empty())
return {};
res = std::move(blocks.back());
blocks.pop_back();
return res;
}
void MergeTreeReverseSelectBlockInputStream::finish()
{
/** Close the files (before destroying the object).
* When many sources are created, but simultaneously reading only a few of them,
* buffers don't waste memory.
*/
reader.reset();
pre_reader.reset();
part_columns_lock.unlock();
data_part.reset();
}
MergeTreeReverseSelectBlockInputStream::~MergeTreeReverseSelectBlockInputStream() = default;
}

View File

@ -0,0 +1,81 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
{
/// Used to read data from single part with select query
/// Cares about PREWHERE, virtual columns, indexes etc.
/// To read data from multiple parts, Storage (MergeTree) creates multiple such objects.
class MergeTreeReverseSelectBlockInputStream : public MergeTreeBaseSelectBlockInputStream
{
public:
MergeTreeReverseSelectBlockInputStream(
const MergeTreeData & storage,
const MergeTreeData::DataPartPtr & owned_data_part,
UInt64 max_block_size_rows,
size_t preferred_block_size_bytes,
size_t preferred_max_column_in_block_size_bytes,
Names column_names,
const MarkRanges & mark_ranges,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
bool check_columns,
size_t min_bytes_to_use_direct_io,
size_t max_read_buffer_size,
bool save_marks_in_cache,
const Names & virt_column_names = {},
size_t part_index_in_query = 0,
bool quiet = false);
~MergeTreeReverseSelectBlockInputStream() override;
String getName() const override { return "MergeTreeReverse"; }
Block getHeader() const override;
/// Closes readers and unlock part locks
void finish();
protected:
bool getNewTask() override;
Block readFromPart() override;
private:
Block header;
/// Used by Task
Names required_columns;
/// Names from header. Used in order to order columns in read blocks.
Names ordered_names;
NameSet column_name_set;
MergeTreeReadTaskColumns task_columns;
/// Data part will not be removed if the pointer owns it
MergeTreeData::DataPartPtr data_part;
/// Forbids to change columns list of the part during reading
std::shared_lock<std::shared_mutex> part_columns_lock;
/// Mark ranges we should read (in ascending order)
MarkRanges all_mark_ranges;
/// Total number of marks we should read
size_t total_marks_count = 0;
/// Value of _part_index virtual column (used only in SelectExecutor)
size_t part_index_in_query = 0;
String path;
Blocks blocks;
Logger * log = &Logger::get("MergeTreeReverseSelectBlockInputStream");
};
}

View File

@ -19,7 +19,7 @@ MergeTreeSelectBlockInputStream::MergeTreeSelectBlockInputStream(
UInt64 max_block_size_rows_,
size_t preferred_block_size_bytes_,
size_t preferred_max_column_in_block_size_bytes_,
Names column_names,
Names required_columns_,
const MarkRanges & mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
@ -34,7 +34,7 @@ MergeTreeSelectBlockInputStream::MergeTreeSelectBlockInputStream(
MergeTreeBaseSelectBlockInputStream{storage_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names_},
required_columns{column_names},
required_columns{required_columns_},
data_part{owned_data_part_},
part_columns_lock(data_part->columns_lock),
all_mark_ranges(mark_ranges_),
@ -99,57 +99,7 @@ try
}
is_first_task = false;
Names pre_column_names;
Names column_names = required_columns;
/// inject columns required for defaults evaluation
bool should_reorder = !injectRequiredColumns(storage, data_part, column_names).empty();
if (prewhere_info)
{
if (prewhere_info->alias_actions)
pre_column_names = prewhere_info->alias_actions->getRequiredColumns();
else
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
if (pre_column_names.empty())
pre_column_names.push_back(column_names[0]);
const auto injected_pre_columns = injectRequiredColumns(storage, data_part, pre_column_names);
if (!injected_pre_columns.empty())
should_reorder = true;
const NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
Names post_column_names;
for (const auto & name : column_names)
if (!pre_name_set.count(name))
post_column_names.push_back(name);
column_names = post_column_names;
}
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
column_name_set = NameSet{column_names.begin(), column_names.end()};
if (check_columns)
{
/// Under owned_data_part->columns_lock we check that all requested columns are of the same type as in the table.
/// This may be not true in case of ALTER MODIFY.
if (!pre_column_names.empty())
storage.check(data_part->columns, pre_column_names);
if (!column_names.empty())
storage.check(data_part->columns, column_names);
const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
pre_columns = physical_columns.addTypes(pre_column_names);
columns = physical_columns.addTypes(column_names);
}
else
{
pre_columns = data_part->columns.addTypes(pre_column_names);
columns = data_part->columns.addTypes(column_names);
}
task_columns = getReadTaskColumns(storage, data_part, required_columns, prewhere_info, check_columns);
/** @note you could simply swap `reverse` in if and else branches of MergeTreeDataSelectExecutor,
* and remove this reverse. */
@ -160,9 +110,14 @@ try
? nullptr
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, data_part->storage.getSampleBlock());
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & column_names = task_columns.columns.getNames();
column_name_set = NameSet{column_names.begin(), column_names.end()};
task = std::make_unique<MergeTreeReadTask>(
data_part, remaining_mark_ranges, part_index_in_query, ordered_names, column_name_set, columns, pre_columns,
prewhere_info && prewhere_info->remove_prewhere_column, should_reorder, std::move(size_predictor));
data_part, remaining_mark_ranges, part_index_in_query, ordered_names, column_name_set, task_columns.columns,
task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
task_columns.should_reorder, std::move(size_predictor));
if (!reader)
{
@ -172,13 +127,13 @@ try
owned_mark_cache = storage.global_context.getMarkCache();
reader = std::make_unique<MergeTreeReader>(
path, data_part, columns, owned_uncompressed_cache.get(),
path, data_part, task_columns.columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), save_marks_in_cache, storage,
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
if (prewhere_info)
pre_reader = std::make_unique<MergeTreeReader>(
path, data_part, pre_columns, owned_uncompressed_cache.get(),
path, data_part, task_columns.pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), save_marks_in_cache, storage,
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
}

View File

@ -55,8 +55,8 @@ private:
/// Names from header. Used in order to order columns in read blocks.
Names ordered_names;
NameSet column_name_set;
NamesAndTypesList columns;
NamesAndTypesList pre_columns;
MergeTreeReadTaskColumns task_columns;
/// Data part will not be removed if the pointer owns it
MergeTreeData::DataPartPtr data_part;

View File

@ -22,6 +22,15 @@ struct RangesInDataPart
{
}
size_t getMarksCount() const
{
size_t total = 0;
for (const auto & range : ranges)
total += range.end - range.begin;
return total;
}
size_t getRowsCount() const
{
return data_part->index_granularity.getRowsCountInRanges(ranges);

View File

@ -1,6 +1,7 @@
#pragma once
#include <Interpreters/PreparedSets.h>
#include <Core/SortDescription.h>
#include <memory>
namespace DB
@ -33,8 +34,18 @@ struct FilterInfo
bool do_remove_column = false;
};
struct SortingInfo
{
SortDescription prefix_order_descr;
int direction;
SortingInfo(const SortDescription & prefix_order_descr_, int direction_)
: prefix_order_descr(prefix_order_descr_), direction(direction_) {}
};
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
using FilterInfoPtr = std::shared_ptr<FilterInfo>;
using SortingInfoPtr = std::shared_ptr<SortingInfo>;
struct SyntaxAnalyzerResult;
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
@ -51,6 +62,8 @@ struct SelectQueryInfo
PrewhereInfoPtr prewhere_info;
SortingInfoPtr sorting_info;
/// Prepared sets are used for indices by storage engine.
/// Example: x IN (1, 2, 3)
PreparedSets sets;

View File

@ -0,0 +1,34 @@
<test>
<name>order_by_pk_order</name>
<type>loop</type>
<stop_conditions>
<all_of>
<iterations>5</iterations>
<min_time_not_changing_for_ms>10000</min_time_not_changing_for_ms>
</all_of>
<any_of>
<iterations>500</iterations>
<total_time_ms>60000</total_time_ms>
</any_of>
</stop_conditions>
<metrics>
<max_rows_per_second />
<max_bytes_per_second />
<avg_rows_per_second />
<avg_bytes_per_second />
<min_time />
</metrics>
<main_metric>
<min_time />
</main_metric>
<preconditions>
<table_exists>test.hits</table_exists>
</preconditions>
<query>SELECT CounterID FROM test.hits ORDER BY CounterID, EventDate DESC LIMIT 50</query>
</test>

View File

@ -0,0 +1,200 @@
1
2
3
4
5
6
1
1
2
3
4
1
1
1
1
1
1
2
2
2
2
2
1 1
1 2
1 3
1 4
1 5
1 6
2 1
2 1
2 2
2 3
2 4
2 1
2 1
2 2
2 3
2 4
1 1
1 2
1 3
1 4
1 5
1 6
1 6
1 5
1 4
1 3
1 2
1 1
2 4
2 3
2 2
2 1
2 1
2 4
2 3
2 2
2 1
2 1
1 6
1 5
1 4
1 3
1 2
1 1
2
2
2
2
2
1
1
1
1
1
1
1 1 101
1 2 102
1 3 103
1 4 104
1 5 104
1 6 105
2 1 106
2 1 107
2 2 107
2 3 108
2 4 109
2 1 106
2 1 107
2 2 107
2 3 108
2 4 109
1 1 101
1 2 102
1 3 103
1 4 104
1 5 104
1 6 105
1 6 105
1 5 104
1 4 104
1 3 103
1 2 102
1 1 101
2 4 109
2 3 108
2 2 107
2 1 106
2 1 107
1 1 101
1 2 102
1 3 103
1 4 104
1 5 104
1 6 105
2 1 107
2 1 106
2 2 107
2 3 108
2 4 109
2 4 109
2 3 108
2 2 107
2 1 106
2 1 107
1 6 105
1 5 104
1 4 104
1 3 103
1 2 102
1 1 101
2 1 107
2 1 106
2 2 107
2 3 108
2 4 109
1 1 101
1 2 102
1 3 103
1 4 104
1 5 104
1 6 105
1 6 105
1 5 104
1 4 104
1 3 103
1 2 102
1 1 101
2 4 109
2 3 108
2 2 107
2 1 107
2 1 106
2 4 109
2 3 108
2 2 107
2 1 107
2 1 106
1 6 105
1 5 104
1 4 104
1 3 103
1 2 102
1 1 101
2019-05-05 00:00:00
2019-05-05 00:00:00
2019-05-05 00:00:00
2019-05-05 00:00:00
2019-05-05 00:00:00
2019-05-05 00:00:00 -1249512288
2019-05-05 00:00:00 -916059969
2019-05-05 00:00:00 -859523951
2019-05-05 00:00:00 -45363190
2019-05-05 00:00:00 345522721
2019-05-14 00:00:00 99
2019-05-14 00:00:00 89
2019-05-14 00:00:00 79
2019-05-14 00:00:00 69
2019-05-14 00:00:00 59
2019-05-14 00:00:00 99
2019-05-14 00:00:00 89
2019-05-14 00:00:00 79
2019-05-14 00:00:00 69
2019-05-14 00:00:00 59
2019-05-14 00:00:00 99
2019-05-14 00:00:00 89
2019-05-14 00:00:00 79
2019-05-14 00:00:00 69
2019-05-14 00:00:00 59
2019-05-05 00:00:00
2019-05-05 00:00:00
2019-05-05 00:00:00
2019-05-05 00:00:00
2019-05-05 00:00:00
1 5
1 5
1 5
1 3
1 3

View File

@ -0,0 +1,56 @@
CREATE DATABASE IF NOT EXISTS test;
DROP TABLE IF EXISTS test.pk_order;
SET optimize_pk_order = 1;
CREATE TABLE test.pk_order(a UInt64, b UInt64, c UInt64, d UInt64) ENGINE=MergeTree() ORDER BY (a, b);
INSERT INTO test.pk_order(a, b, c, d) VALUES (1, 1, 101, 1), (1, 2, 102, 1), (1, 3, 103, 1), (1, 4, 104, 1);
INSERT INTO test.pk_order(a, b, c, d) VALUES (1, 5, 104, 1), (1, 6, 105, 1), (2, 1, 106, 2), (2, 1, 107, 2);
INSERT INTO test.pk_order(a, b, c, d) VALUES (2, 2, 107, 2), (2, 3, 108, 2), (2, 4, 109, 2);
SELECT b FROM test.pk_order ORDER BY a, b;
SELECT a FROM test.pk_order ORDER BY a, b;
SELECT a, b FROM test.pk_order ORDER BY a, b;
SELECT a, b FROM test.pk_order ORDER BY a DESC, b;
SELECT a, b FROM test.pk_order ORDER BY a, b DESC;
SELECT a, b FROM test.pk_order ORDER BY a DESC, b DESC;
SELECT a FROM test.pk_order ORDER BY a DESC;
SELECT a, b, c FROM test.pk_order ORDER BY a, b, c;
SELECT a, b, c FROM test.pk_order ORDER BY a DESC, b, c;
SELECT a, b, c FROM test.pk_order ORDER BY a, b DESC, c;
SELECT a, b, c FROM test.pk_order ORDER BY a, b, c DESC;
SELECT a, b, c FROM test.pk_order ORDER BY a DESC, b DESC, c;
SELECT a, b, c FROM test.pk_order ORDER BY a DESC, b, c DESC;
SELECT a, b, c FROM test.pk_order ORDER BY a, b DESC, c DESC;
SELECT a, b, c FROM test.pk_order ORDER BY a DESC, b DESC, c DESC;
DROP TABLE IF EXISTS test.pk_order;
CREATE TABLE pk_order (d DateTime, a Int32, b Int32) ENGINE = MergeTree ORDER BY (d, a)
PARTITION BY toDate(d) SETTINGS index_granularity=1;
INSERT INTO pk_order
SELECT toDateTime('2019-05-05 00:00:00') + INTERVAL number % 10 DAY, number, intHash32(number) from numbers(100);
set max_block_size = 1;
-- Currently checking number of read rows while reading in pk order not working precise. TODO: fix it.
-- SET max_rows_to_read = 10;
SELECT d FROM pk_order ORDER BY d LIMIT 5;
SELECT d, b FROM pk_order ORDER BY d, b LIMIT 5;
SELECT d, a FROM pk_order ORDER BY d DESC, a DESC LIMIT 5;
SELECT d, a FROM pk_order ORDER BY d DESC, -a LIMIT 5;
SELECT d, a FROM pk_order ORDER BY d DESC, a DESC LIMIT 5;
SELECT toStartOfHour(d) as d1 FROM pk_order ORDER BY d1 LIMIT 5;
DROP TABLE pk_order;
CREATE TABLE pk_order (a Int, b Int) ENGINE = MergeTree ORDER BY (a / b);
INSERT INTO pk_order SELECT number % 10 + 1, number % 6 + 1 from numbers(100);
SELECT * FROM pk_order ORDER BY (a / b), a LIMIT 5;

View File

@ -0,0 +1,600 @@
33554106
33554106
33553911
33553911
33553911
33553911
33553911
33553828
33553772
33553718
33553718
33553673
33553673
33553673
33553673
33553362
33553353
33553353
33553353
33553353
33553353
33553353
33553004
33553004
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-17
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-18
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
57
33554106
33554106
33553911
33553911
33553911
33553911
33553911
33553828
33553772
33553718
33553718
33553673
33553673
33553673
33553673
33553362
33553353
33553353
33553353
33553353
33553353
33553353
33553004
33553004
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
33552322
2014-03-21
2014-03-21
2014-03-23
2014-03-23
2014-03-20
2014-03-20
2014-03-20
2014-03-17
2014-03-19
2014-03-23
2014-03-23
2014-03-20
2014-03-20
2014-03-18
2014-03-18
2014-03-17
2014-03-21
2014-03-21
2014-03-21
2014-03-21
2014-03-21
2014-03-21
2014-03-17
2014-03-17
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-23
2014-03-22
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-17
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-18
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
57 2014-03-23
33554106 2014-03-21
33554106 2014-03-21
33553911 2014-03-20
33553911 2014-03-20
33553911 2014-03-20
33553911 2014-03-23
33553911 2014-03-23
33553828 2014-03-17
33553772 2014-03-19
33553718 2014-03-23
33553718 2014-03-23
33553673 2014-03-18
33553673 2014-03-18
33553673 2014-03-20
33553673 2014-03-20
33553362 2014-03-17
33553353 2014-03-21
33553353 2014-03-21
33553353 2014-03-21
33553353 2014-03-21
33553353 2014-03-21
33553353 2014-03-21
33553004 2014-03-17
33553004 2014-03-17
33552322 2014-03-17
33552322 2014-03-17
33552322 2014-03-17
33552322 2014-03-17
33552322 2014-03-17
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33552322 2014-03-18
33554106 2014-03-21
33554106 2014-03-21
33553911 2014-03-23
33553911 2014-03-23
33553911 2014-03-20
33553911 2014-03-20
33553911 2014-03-20
33553828 2014-03-17
33553772 2014-03-19
33553718 2014-03-23
33553718 2014-03-23
33553673 2014-03-20
33553673 2014-03-20
33553673 2014-03-18
33553673 2014-03-18
33553362 2014-03-17
33553353 2014-03-21
33553353 2014-03-21
33553353 2014-03-21
33553353 2014-03-21
33553353 2014-03-21
33553353 2014-03-21
33553004 2014-03-17
33553004 2014-03-17
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-23
33552322 2014-03-22

View File

@ -0,0 +1,14 @@
SET optimize_pk_order = 1;
SELECT CounterID FROM test.hits ORDER BY CounterID DESC LIMIT 50;
SELECT CounterID FROM test.hits ORDER BY CounterID LIMIT 50;
SELECT CounterID FROM test.hits ORDER BY CounterID, EventDate LIMIT 50;
SELECT EventDate FROM test.hits ORDER BY CounterID, EventDate LIMIT 50;
SELECT EventDate FROM test.hits ORDER BY CounterID, EventDate DESC LIMIT 50;
SELECT CounterID FROM test.hits ORDER BY CounterID, EventDate DESC LIMIT 50;
SELECT CounterID FROM test.hits ORDER BY CounterID DESC, EventDate DESC LIMIT 50;
SELECT EventDate FROM test.hits ORDER BY CounterID DESC, EventDate DESC LIMIT 50;
SELECT CounterID, EventDate FROM test.hits ORDER BY CounterID, EventDate LIMIT 50;
SELECT CounterID, EventDate FROM test.hits ORDER BY CounterID, EventDate DESC LIMIT 50;
SELECT CounterID, EventDate FROM test.hits ORDER BY CounterID DESC, EventDate LIMIT 50;
SELECT CounterID, EventDate FROM test.hits ORDER BY CounterID DESC, EventDate DESC LIMIT 50;