This commit is contained in:
Nikita Vasilev 2019-01-07 15:51:14 +03:00
parent 17f6618fa3
commit 11d37efa74
7 changed files with 151 additions and 33 deletions

View File

@ -1,11 +1,15 @@
#include <boost/rational.hpp> /// For calculations related to sampling coefficients. #include <boost/rational.hpp> /// For calculations related to sampling coefficients.
#include <optional> #include <optional>
#include <Poco/File.h>
#include <Common/FieldVisitors.h> #include <Common/FieldVisitors.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h> #include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeSelectBlockInputStream.h> #include <Storages/MergeTree/MergeTreeSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeReadPool.h> #include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.h> #include <Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeIndexes.h>
#include <Storages/MergeTree/MergeTreeIndexReader.h>
#include <Storages/MergeTree/KeyCondition.h> #include <Storages/MergeTree/KeyCondition.h>
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
@ -531,10 +535,12 @@ namespace DB
/// It can be done in multiple threads (one thread for each part). /// It can be done in multiple threads (one thread for each part).
/// Maybe it should be moved to BlockInputStream, but it can cause some problems. /// Maybe it should be moved to BlockInputStream, but it can cause some problems.
for (auto index : data.indexes) { for (auto index : data.indexes) {
auto condition = index->createIndexConditionOnPart(query_info, context); auto condition = index->createIndexCondition(query_info, context);
if (!condition->alwaysUnknownOrTrue()) { if (condition->alwaysUnknownOrTrue()) {
ranges.ranges = condition->filterRanges(ranges.ranges); continue;
} }
ranges.ranges = filterMarksUsingIndex(index, condition, part, ranges.ranges, settings);
} }
if (!ranges.ranges.empty()) if (!ranges.ranges.empty())
@ -930,7 +936,7 @@ namespace DB
if (range.end == range.begin + 1) if (range.end == range.begin + 1)
{ {
/// We saw a useful gap between neighboring marks. Either add it to the last range, or start a new range. /// We saw a useful gap between neighboring marks. Either add it to the last range, or start a new range.
if (res.empty() || range.begin - res.back().end > min_marks_for_seek) if (res.empty() || range.begin - res.back().end > min_marks_for_seek) // is it a bug??
res.push_back(range); res.push_back(range);
else else
res.back().end = range.end; res.back().end = range.end;
@ -952,4 +958,58 @@ namespace DB
return res; return res;
} }
MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
MergeTreeIndexPtr index,
IndexConditionPtr condition,
MergeTreeData::DataPartPtr part,
const MarkRanges & ranges,
const Settings & settings) const
{
if (!Poco::File(part->getFullPath() + index->getFileName() + ".idx").exists()) {
return ranges;
}
const size_t min_marks_for_seek = (settings.merge_tree_min_rows_for_seek + data.index_granularity - 1) / data.index_granularity;
MergeTreeIndexReader reader(
index, part,
((part->marks_count + index->granularity - 1) / index->granularity),
ranges);
MarkRanges res;
MergeTreeIndexGranulePtr granule = nullptr;
size_t last_index_mark = 0;
for (const auto & range : ranges)
{
MarkRange index_range(
range.begin / index->granularity, range.end / index->granularity);
if (last_index_mark != index_range.begin || !granule) {
reader.seek(index_range.begin);
}
for (size_t index_mark = index_range.begin; index_mark < index_range.end; ++index_mark)
{
if (index_mark != index_range.begin || !granule || last_index_mark != index_range.begin)
granule = reader.read();
MarkRange data_range(
std::max(range.begin, index_mark * index->granularity),
std::min(range.end, (index_mark + 1) * index->granularity));
if (!condition->mayBeTrueOnGranule(*granule))
continue;
if (res.empty() || res.back().end - data_range.begin >= min_marks_for_seek)
res.push_back(data_range);
else
res.back().end = data_range.end;
}
last_index_mark = index_range.end - 1;
}
return res;
}
} }

View File

@ -81,6 +81,13 @@ private:
const MergeTreeData::DataPart::Index & index, const MergeTreeData::DataPart::Index & index,
const KeyCondition & key_condition, const KeyCondition & key_condition,
const Settings & settings) const; const Settings & settings) const;
MarkRanges filterMarksUsingIndex(
MergeTreeIndexPtr index,
IndexConditionPtr condition,
MergeTreeData::DataPartPtr part,
const MarkRanges & ranges,
const Settings & settings) const;
}; };
} }

View File

@ -0,0 +1,26 @@
#include <Storages/MergeTree/MergeTreeIndexReader.h>
namespace DB {
MergeTreeIndexReader::MergeTreeIndexReader(
MergeTreeIndexPtr index, MergeTreeData::DataPartPtr part, size_t marks_count, const MarkRanges & all_mark_ranges)
: index(index), stream(
part->getFullPath() + index->getFileName(), ".idx", marks_count,
all_mark_ranges, nullptr, false, nullptr, 0, DBMS_DEFAULT_BUFFER_SIZE,
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE) {
stream.seekToStart();
}
void MergeTreeIndexReader::seek(size_t mark)
{
stream.seekToMark(mark);
}
MergeTreeIndexGranulePtr MergeTreeIndexReader::read() {
auto granule = index->createIndexGranule();
granule->deserializeBinary(*stream.data_buffer);
return granule;
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeIndexes.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB {
class MergeTreeIndexReader {
public:
MergeTreeIndexReader(
MergeTreeIndexPtr index,
MergeTreeData::DataPartPtr part,
size_t marks_count,
const MarkRanges & all_mark_ranges);
void seek(size_t mark);
MergeTreeIndexGranulePtr read();
private:
MergeTreeIndexPtr index;
MergeTreeReader::Stream stream;
};
}

View File

@ -17,37 +17,12 @@ constexpr auto INDEX_FILE_PREFIX = "skp_idx_";
namespace DB namespace DB
{ {
class MergeTreeIndex; class MergeTreeIndex;
using MergeTreeIndexPtr = std::shared_ptr<const MergeTreeIndex>; using MergeTreeIndexPtr = std::shared_ptr<const MergeTreeIndex>;
using MutableMergeTreeIndexPtr = std::shared_ptr<MergeTreeIndex>; using MutableMergeTreeIndexPtr = std::shared_ptr<MergeTreeIndex>;
/// Condition on the index.
/// It works only with one indexPart (MergeTreeDataPart).
class IndexCondition {
friend MergeTreeIndex;
public:
virtual ~IndexCondition() = default;
/// Checks if this index is useful for query.
virtual bool alwaysUnknownOrTrue() const = 0;
/// Drops out ranges where query is false
virtual MarkRanges filterRanges(const MarkRanges & ranges) const = 0;
protected:
IndexCondition() = default;
public:
MergeTreeIndexPtr index;
};
using IndexConditionPtr = std::shared_ptr<IndexCondition>;
struct MergeTreeIndexGranule struct MergeTreeIndexGranule
{ {
friend MergeTreeIndex; friend MergeTreeIndex;
@ -62,15 +37,31 @@ struct MergeTreeIndexGranule
virtual void update(const Block & block, size_t * pos, size_t limit) = 0; virtual void update(const Block & block, size_t * pos, size_t limit) = 0;
}; };
using MergeTreeIndexGranulePtr = std::shared_ptr<MergeTreeIndexGranule>; using MergeTreeIndexGranulePtr = std::shared_ptr<MergeTreeIndexGranule>;
using MergeTreeIndexGranules = std::vector<MergeTreeIndexGranulePtr>; using MergeTreeIndexGranules = std::vector<MergeTreeIndexGranulePtr>;
/// Condition on the index.
class IndexCondition {
friend MergeTreeIndex;
public:
virtual ~IndexCondition() = default;
/// Checks if this index is useful for query.
virtual bool alwaysUnknownOrTrue() const = 0;
virtual bool mayBeTrueOnGranule(const MergeTreeIndexGranule & granule);
protected:
IndexCondition() = default;
class MergeTreeIndexReader {
public: public:
MergeTreeIndexPtr index; MergeTreeIndexPtr index;
}; };
using IndexConditionPtr = std::shared_ptr<IndexCondition>;
/// Structure for storing basic index info like columns, expression, arguments, ... /// Structure for storing basic index info like columns, expression, arguments, ...
class MergeTreeIndex class MergeTreeIndex
@ -84,11 +75,11 @@ public:
virtual String indexType() const { return "UNKNOWN"; }; virtual String indexType() const { return "UNKNOWN"; };
/// gets filename without extension /// gets filename without extension
virtual String getFileName() const = 0; virtual String getFileName() const { return INDEX_FILE_PREFIX + name; };
virtual MergeTreeIndexGranulePtr createIndexGranule() const = 0; virtual MergeTreeIndexGranulePtr createIndexGranule() const = 0;
virtual IndexConditionPtr createIndexConditionOnPart( virtual IndexConditionPtr createIndexCondition(
const SelectQueryInfo & query_info, const Context & context) const = 0; const SelectQueryInfo & query_info, const Context & context) const = 0;
virtual void writeText(WriteBuffer & ostr) const = 0; virtual void writeText(WriteBuffer & ostr) const = 0;

View File

@ -4,6 +4,7 @@
#include <Storages/MergeTree/MarkRange.h> #include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeRangeReader.h> #include <Storages/MergeTree/MergeTreeRangeReader.h>
#include <Compression/CachedCompressedReadBuffer.h>
#include <Compression/CompressedReadBufferFromFile.h> #include <Compression/CompressedReadBufferFromFile.h>
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <port/clock.h> #include <port/clock.h>
@ -56,7 +57,6 @@ public:
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark /// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res); size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res);
private:
class Stream class Stream
{ {
public: public:
@ -94,6 +94,7 @@ private:
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer; std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
}; };
private:
using FileStreams = std::map<std::string, std::unique_ptr<Stream>>; using FileStreams = std::map<std::string, std::unique_ptr<Stream>>;
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size. /// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.

View File

@ -1,5 +1,6 @@
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Storages/StorageMergeTree.h> #include <Storages/StorageMergeTree.h>
#include <Storages/StorageMergeTreeIndexes.h>
#include <Storages/StorageReplicatedMergeTree.h> #include <Storages/StorageReplicatedMergeTree.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
@ -181,6 +182,11 @@ static void setGraphitePatternsFromConfig(const Context & context,
} }
static void registerMergeTreeSkipIndexes() {
}
static String getMergeTreeVerboseHelp(bool is_extended_syntax) static String getMergeTreeVerboseHelp(bool is_extended_syntax)
{ {
using namespace std::string_literals; using namespace std::string_literals;