added indexes in selectexecutor

This commit is contained in:
Nikita Vasilev 2018-12-29 16:02:57 +03:00
parent 5079330bdc
commit f90cdca498
2 changed files with 784 additions and 772 deletions

View File

@ -58,37 +58,37 @@ namespace ProfileEvents
namespace DB
{
namespace ErrorCodes
{
namespace ErrorCodes
{
extern const int INDEX_NOT_USED;
extern const int SAMPLING_NOT_SUPPORTED;
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
extern const int ILLEGAL_COLUMN;
extern const int ARGUMENT_OUT_OF_BOUND;
}
}
MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & data_)
MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & data_)
: data(data_), log(&Logger::get(data.getLogName() + " (SelectExecutor)"))
{
}
{
}
/// Construct a block consisting only of possible values of virtual columns
static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts)
{
static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts)
{
auto column = ColumnString::create();
for (const auto & part : parts)
column->insert(part->name);
return Block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "_part")};
}
}
size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
const MergeTreeData::DataPartsVector & parts, const KeyCondition & key_condition, const Settings & settings) const
{
{
size_t full_marks_count = 0;
/// We will find out how many rows we would have read without sampling.
@ -109,19 +109,19 @@ size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
}
return full_marks_count * data.index_granularity;
}
}
using RelativeSize = boost::rational<ASTSampleRatio::BigNum>;
using RelativeSize = boost::rational<ASTSampleRatio::BigNum>;
std::string toString(const RelativeSize & x)
{
std::string toString(const RelativeSize & x)
{
return ASTSampleRatio::toString(x.numerator()) + "/" + ASTSampleRatio::toString(x.denominator());
}
}
/// Converts sample size to an approximate number of rows (ex. `SAMPLE 1000000`) to relative value (ex. `SAMPLE 0.1`).
static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, size_t approx_total_rows)
{
static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, size_t approx_total_rows)
{
if (approx_total_rows == 0)
return 1;
@ -129,23 +129,23 @@ static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, siz
auto absolute_sample_size = node_sample.ratio.numerator / node_sample.ratio.denominator;
return std::min(RelativeSize(1), RelativeSize(absolute_sample_size) / RelativeSize(approx_total_rows));
}
}
BlockInputStreams MergeTreeDataSelectExecutor::read(
BlockInputStreams MergeTreeDataSelectExecutor::read(
const Names & column_names_to_return,
const SelectQueryInfo & query_info,
const Context & context,
const size_t max_block_size,
const unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
{
{
return readFromParts(
data.getDataPartsVector(), column_names_to_return, query_info, context,
max_block_size, num_streams, max_block_numbers_to_read);
}
}
BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
MergeTreeData::DataPartsVector parts,
const Names & column_names_to_return,
const SelectQueryInfo & query_info,
@ -153,7 +153,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
const size_t max_block_size,
const unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
{
{
size_t part_index = 0;
/// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it.
@ -528,6 +528,16 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
else
ranges.ranges = MarkRanges{MarkRange{0, part->marks_count}};
for (const auto index_part : part->index_parts) {
const auto condition = index_part->createIndexConditionOnPart(
query_info, context, index_part->index->sample.getNames(), index_part->index->expr);
if (condition->alwaysUnknownOrTrue()) {
continue;
} else {
ranges.ranges = condition->filterRanges(ranges.ranges);
}
}
if (!ranges.ranges.empty())
{
parts_with_ranges.push_back(ranges);
@ -601,10 +611,10 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
stream = std::make_shared<ExpressionBlockInputStream>(stream, query_info.prewhere_info->remove_columns_actions);
return res;
}
}
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
@ -613,7 +623,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
const PrewhereInfoPtr & prewhere_info,
const Names & virt_columns,
const Settings & settings) const
{
{
const size_t min_marks_for_concurrent_read =
(settings.merge_tree_min_rows_for_concurrent_read + data.index_granularity - 1) / data.index_granularity;
const size_t max_marks_to_use_cache =
@ -745,9 +755,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
}
return res;
}
}
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
const Names & column_names,
size_t max_block_size,
@ -755,7 +765,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
const PrewhereInfoPtr & prewhere_info,
const Names & virt_columns,
const Settings & settings) const
{
{
const size_t max_marks_to_use_cache =
(settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity;
@ -829,12 +839,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
}
return {merged};
}
}
void MergeTreeDataSelectExecutor::createPositiveSignCondition(
void MergeTreeDataSelectExecutor::createPositiveSignCondition(
ExpressionActionsPtr & out_expression, String & out_column, const Context & context) const
{
{
auto function = std::make_shared<ASTFunction>();
auto arguments = std::make_shared<ASTExpressionList>();
auto sign = std::make_shared<ASTIdentifier>(data.merging_params.sign_column);
@ -851,14 +861,14 @@ void MergeTreeDataSelectExecutor::createPositiveSignCondition(
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(query, data.getColumns().getAllPhysical());
out_expression = ExpressionAnalyzer(query, syntax_result, context).getActions(false);
out_column = function->getColumnName();
}
}
/// Calculates a set of mark ranges, that could possibly contain keys, required by condition.
/// In other words, it removes subranges from whole range, that definitely could not contain required keys.
MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
const MergeTreeData::DataPart::Index & index, const KeyCondition & key_condition, const Settings & settings) const
{
{
MarkRanges res;
size_t marks_count = index.at(0)->size();
@ -941,6 +951,6 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
}
return res;
}
}
}

View File

@ -49,6 +49,7 @@ public:
protected:
IndexCondition() = default;
public:
MergeTreeIndexPartPtr part;
};
@ -80,6 +81,7 @@ protected:
virtual void updateImpl(const Block & block, const Names & column_names) = 0;
virtual void mergeImpl(const MergeTreeIndexPart & other) = 0;
public:
MergeTreeIndexPtr index;
};