From 11d37efa74fb23bd5759ce04e90b506292303d48 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Mon, 7 Jan 2019 15:51:14 +0300 Subject: [PATCH] reading --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 68 +++++++++++++++++-- .../MergeTree/MergeTreeDataSelectExecutor.h | 7 ++ .../MergeTree/MergeTreeIndexReader.cpp | 26 +++++++ .../Storages/MergeTree/MergeTreeIndexReader.h | 27 ++++++++ .../src/Storages/MergeTree/MergeTreeIndexes.h | 47 ++++++------- dbms/src/Storages/MergeTree/MergeTreeReader.h | 3 +- .../MergeTree/registerStorageMergeTree.cpp | 6 ++ 7 files changed, 151 insertions(+), 33 deletions(-) create mode 100644 dbms/src/Storages/MergeTree/MergeTreeIndexReader.cpp create mode 100644 dbms/src/Storages/MergeTree/MergeTreeIndexReader.h diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index e38b92e1807..c951ccd951a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1,11 +1,15 @@ #include /// For calculations related to sampling coefficients. #include +#include + #include #include #include #include #include +#include +#include #include #include #include @@ -531,10 +535,12 @@ namespace DB /// It can be done in multiple threads (one thread for each part). /// Maybe it should be moved to BlockInputStream, but it can cause some problems. for (auto index : data.indexes) { - auto condition = index->createIndexConditionOnPart(query_info, context); - if (!condition->alwaysUnknownOrTrue()) { - ranges.ranges = condition->filterRanges(ranges.ranges); + auto condition = index->createIndexCondition(query_info, context); + if (condition->alwaysUnknownOrTrue()) { + continue; } + + ranges.ranges = filterMarksUsingIndex(index, condition, part, ranges.ranges, settings); } if (!ranges.ranges.empty()) @@ -930,7 +936,7 @@ namespace DB if (range.end == range.begin + 1) { /// We saw a useful gap between neighboring marks. Either add it to the last range, or start a new range. - if (res.empty() || range.begin - res.back().end > min_marks_for_seek) + if (res.empty() || range.begin - res.back().end > min_marks_for_seek) // is it a bug?? res.push_back(range); else res.back().end = range.end; @@ -952,4 +958,58 @@ namespace DB return res; } + MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex( + MergeTreeIndexPtr index, + IndexConditionPtr condition, + MergeTreeData::DataPartPtr part, + const MarkRanges & ranges, + const Settings & settings) const + { + if (!Poco::File(part->getFullPath() + index->getFileName() + ".idx").exists()) { + return ranges; + } + + const size_t min_marks_for_seek = (settings.merge_tree_min_rows_for_seek + data.index_granularity - 1) / data.index_granularity; + + MergeTreeIndexReader reader( + index, part, + ((part->marks_count + index->granularity - 1) / index->granularity), + ranges); + + MarkRanges res; + + MergeTreeIndexGranulePtr granule = nullptr; + size_t last_index_mark = 0; + for (const auto & range : ranges) + { + MarkRange index_range( + range.begin / index->granularity, range.end / index->granularity); + + if (last_index_mark != index_range.begin || !granule) { + reader.seek(index_range.begin); + } + + for (size_t index_mark = index_range.begin; index_mark < index_range.end; ++index_mark) + { + if (index_mark != index_range.begin || !granule || last_index_mark != index_range.begin) + granule = reader.read(); + + MarkRange data_range( + std::max(range.begin, index_mark * index->granularity), + std::min(range.end, (index_mark + 1) * index->granularity)); + + if (!condition->mayBeTrueOnGranule(*granule)) + continue; + + if (res.empty() || res.back().end - data_range.begin >= min_marks_for_seek) + res.push_back(data_range); + else + res.back().end = data_range.end; + } + + last_index_mark = index_range.end - 1; + } + return res; + } + } \ No newline at end of file diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index 576b88f2e41..8010cc9c889 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -81,6 +81,13 @@ private: const MergeTreeData::DataPart::Index & index, const KeyCondition & key_condition, const Settings & settings) const; + + MarkRanges filterMarksUsingIndex( + MergeTreeIndexPtr index, + IndexConditionPtr condition, + MergeTreeData::DataPartPtr part, + const MarkRanges & ranges, + const Settings & settings) const; }; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeIndexReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeIndexReader.cpp new file mode 100644 index 00000000000..f81f325b065 --- /dev/null +++ b/dbms/src/Storages/MergeTree/MergeTreeIndexReader.cpp @@ -0,0 +1,26 @@ +#include + + +namespace DB { + +MergeTreeIndexReader::MergeTreeIndexReader( + MergeTreeIndexPtr index, MergeTreeData::DataPartPtr part, size_t marks_count, const MarkRanges & all_mark_ranges) + : index(index), stream( + part->getFullPath() + index->getFileName(), ".idx", marks_count, + all_mark_ranges, nullptr, false, nullptr, 0, DBMS_DEFAULT_BUFFER_SIZE, + ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE) { + stream.seekToStart(); +} + +void MergeTreeIndexReader::seek(size_t mark) +{ + stream.seekToMark(mark); +} + +MergeTreeIndexGranulePtr MergeTreeIndexReader::read() { + auto granule = index->createIndexGranule(); + granule->deserializeBinary(*stream.data_buffer); + return granule; +} + +} \ No newline at end of file diff --git a/dbms/src/Storages/MergeTree/MergeTreeIndexReader.h b/dbms/src/Storages/MergeTree/MergeTreeIndexReader.h new file mode 100644 index 00000000000..32275f7f3b2 --- /dev/null +++ b/dbms/src/Storages/MergeTree/MergeTreeIndexReader.h @@ -0,0 +1,27 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB { + +class MergeTreeIndexReader { +public: + MergeTreeIndexReader( + MergeTreeIndexPtr index, + MergeTreeData::DataPartPtr part, + size_t marks_count, + const MarkRanges & all_mark_ranges); + + void seek(size_t mark); + + MergeTreeIndexGranulePtr read(); + +private: + MergeTreeIndexPtr index; + MergeTreeReader::Stream stream; +}; + +} \ No newline at end of file diff --git a/dbms/src/Storages/MergeTree/MergeTreeIndexes.h b/dbms/src/Storages/MergeTree/MergeTreeIndexes.h index 221893d81a3..9475ffe2b5e 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeIndexes.h +++ b/dbms/src/Storages/MergeTree/MergeTreeIndexes.h @@ -17,37 +17,12 @@ constexpr auto INDEX_FILE_PREFIX = "skp_idx_"; namespace DB { - class MergeTreeIndex; using MergeTreeIndexPtr = std::shared_ptr; using MutableMergeTreeIndexPtr = std::shared_ptr; -/// Condition on the index. -/// It works only with one indexPart (MergeTreeDataPart). -class IndexCondition { - friend MergeTreeIndex; - -public: - virtual ~IndexCondition() = default; - - /// Checks if this index is useful for query. - virtual bool alwaysUnknownOrTrue() const = 0; - - /// Drops out ranges where query is false - virtual MarkRanges filterRanges(const MarkRanges & ranges) const = 0; - -protected: - IndexCondition() = default; - -public: - MergeTreeIndexPtr index; -}; - -using IndexConditionPtr = std::shared_ptr; - - struct MergeTreeIndexGranule { friend MergeTreeIndex; @@ -62,15 +37,31 @@ struct MergeTreeIndexGranule virtual void update(const Block & block, size_t * pos, size_t limit) = 0; }; + using MergeTreeIndexGranulePtr = std::shared_ptr; using MergeTreeIndexGranules = std::vector; +/// Condition on the index. +class IndexCondition { + friend MergeTreeIndex; + +public: + virtual ~IndexCondition() = default; + + /// Checks if this index is useful for query. + virtual bool alwaysUnknownOrTrue() const = 0; + + virtual bool mayBeTrueOnGranule(const MergeTreeIndexGranule & granule); + +protected: + IndexCondition() = default; -class MergeTreeIndexReader { public: MergeTreeIndexPtr index; }; +using IndexConditionPtr = std::shared_ptr; + /// Structure for storing basic index info like columns, expression, arguments, ... class MergeTreeIndex @@ -84,11 +75,11 @@ public: virtual String indexType() const { return "UNKNOWN"; }; /// gets filename without extension - virtual String getFileName() const = 0; + virtual String getFileName() const { return INDEX_FILE_PREFIX + name; }; virtual MergeTreeIndexGranulePtr createIndexGranule() const = 0; - virtual IndexConditionPtr createIndexConditionOnPart( + virtual IndexConditionPtr createIndexCondition( const SelectQueryInfo & query_info, const Context & context) const = 0; virtual void writeText(WriteBuffer & ostr) const = 0; diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.h b/dbms/src/Storages/MergeTree/MergeTreeReader.h index ac5d46fb664..744f1c0dbe4 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -56,7 +57,6 @@ public: /// If continue_reading is true, continue reading from last state, otherwise seek to from_mark size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res); -private: class Stream { public: @@ -94,6 +94,7 @@ private: std::unique_ptr non_cached_buffer; }; +private: using FileStreams = std::map>; /// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size. diff --git a/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp b/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp index 1030b6c8cac..837f90396bf 100644 --- a/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -181,6 +182,11 @@ static void setGraphitePatternsFromConfig(const Context & context, } +static void registerMergeTreeSkipIndexes() { + +} + + static String getMergeTreeVerboseHelp(bool is_extended_syntax) { using namespace std::string_literals;