Merge pull request #3 from nikvas0/nikvas0/index_read

Nikvas0/index read
This commit is contained in:
Nikita Vasilev 2019-01-07 22:29:06 +03:00 committed by GitHub
commit 3cad26dc71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 277 additions and 86 deletions

View File

@ -355,7 +355,9 @@ void MergeTreeData::setSkipIndexes(const ASTs & indexes_asts, bool only_check)
for (const auto &index_ast : indexes_asts) {
indexes.push_back(
std::move(MergeTreeIndexFactory::instance().get(
std::dynamic_pointer_cast<ASTIndexDeclaration>(index_ast))));
*this,
std::dynamic_pointer_cast<ASTIndexDeclaration>(index_ast),
global_context)));
}
}
}

View File

@ -553,18 +553,23 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
Names all_column_names = data.getColumns().getNamesOfPhysical();
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
LOG_DEBUG(log, "Before extract");
NamesAndTypesList gathering_columns, merging_columns;
Names gathering_column_names, merging_column_names;
extractMergingAndGatheringColumns(
all_columns, data.sorting_key_expr, data.indexes,
data.merging_params, gathering_columns, gathering_column_names, merging_columns, merging_column_names);
LOG_DEBUG(log, "After extract");
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, future_part.name, future_part.part_info);
new_data_part->partition.assign(future_part.getPartition());
new_data_part->relative_path = TMP_PREFIX + future_part.name;
new_data_part->is_temp = true;
LOG_DEBUG(log, "New Part");
size_t sum_input_rows_upper_bound = merge_entry->total_size_marks * data.index_granularity;
MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate);

View File

@ -1,11 +1,15 @@
#include <boost/rational.hpp> /// For calculations related to sampling coefficients.
#include <optional>
#include <Poco/File.h>
#include <Common/FieldVisitors.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeIndexes.h>
#include <Storages/MergeTree/MergeTreeIndexReader.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
@ -531,9 +535,9 @@ namespace DB
/// It can be done in multiple threads (one thread for each part).
/// Maybe it should be moved to BlockInputStream, but it can cause some problems.
for (auto index : data.indexes) {
auto condition = index->createIndexConditionOnPart(query_info, context);
auto condition = index->createIndexCondition(query_info, context);
if (!condition->alwaysUnknownOrTrue()) {
ranges.ranges = condition->filterRanges(ranges.ranges);
ranges.ranges = filterMarksUsingIndex(index, condition, part, ranges.ranges, settings);
}
}
@ -930,7 +934,7 @@ namespace DB
if (range.end == range.begin + 1)
{
/// We saw a useful gap between neighboring marks. Either add it to the last range, or start a new range.
if (res.empty() || range.begin - res.back().end > min_marks_for_seek)
if (res.empty() || range.begin - res.back().end > min_marks_for_seek) // is it a bug??
res.push_back(range);
else
res.back().end = range.end;
@ -952,4 +956,59 @@ namespace DB
return res;
}
MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
MergeTreeIndexPtr index,
IndexConditionPtr condition,
MergeTreeData::DataPartPtr part,
const MarkRanges & ranges,
const Settings & settings) const
{
if (!Poco::File(part->getFullPath() + index->getFileName() + ".idx").exists()) {
return ranges;
}
const size_t min_marks_for_seek = (settings.merge_tree_min_rows_for_seek + data.index_granularity - 1) / data.index_granularity;
MergeTreeIndexReader reader(
index, part,
((part->marks_count + index->granularity - 1) / index->granularity),
ranges);
MarkRanges res;
MergeTreeIndexGranulePtr granule = nullptr;
size_t last_index_mark = 0;
for (const auto & range : ranges)
{
MarkRange index_range(
range.begin / index->granularity,
(range.end + index->granularity - 1) / index->granularity);
if (last_index_mark != index_range.begin || !granule) {
reader.seek(index_range.begin);
}
for (size_t index_mark = index_range.begin; index_mark < index_range.end; ++index_mark)
{
if (index_mark != index_range.begin || !granule || last_index_mark != index_range.begin)
granule = reader.read();
MarkRange data_range(
std::max(range.begin, index_mark * index->granularity),
std::min(range.end, (index_mark + 1) * index->granularity));
if (!condition->mayBeTrueOnGranule(*granule))
continue;
if (res.empty() || res.back().end - data_range.begin >= min_marks_for_seek)
res.push_back(data_range);
else
res.back().end = data_range.end;
}
last_index_mark = index_range.end - 1;
}
return res;
}
}

View File

@ -81,6 +81,13 @@ private:
const MergeTreeData::DataPart::Index & index,
const KeyCondition & key_condition,
const Settings & settings) const;
MarkRanges filterMarksUsingIndex(
MergeTreeIndexPtr index,
IndexConditionPtr condition,
MergeTreeData::DataPartPtr part,
const MarkRanges & ranges,
const Settings & settings) const;
};
}

View File

@ -214,6 +214,17 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
NamesAndTypesList columns = data.getColumns().getAllPhysical().filter(block.getNames());
MergedBlockOutputStream out(data, new_data_part->getFullPath(), columns, compression_codec);
for (auto index : data.indexes)
{
auto index_columns = index->expr->getRequiredColumnsWithTypes();
for (const auto & column : index_columns)
{
if (!block.has(column.name))
block.insert(ColumnWithTypeAndName(column.type, column.name));
}
index->expr->execute(block);
}
out.writePrefix();
out.writeWithPermutation(block, perm_ptr);
out.writeSuffixAndFinalizePart(new_data_part);

View File

@ -0,0 +1,26 @@
#include <Storages/MergeTree/MergeTreeIndexReader.h>
namespace DB {
MergeTreeIndexReader::MergeTreeIndexReader(
MergeTreeIndexPtr index, MergeTreeData::DataPartPtr part, size_t marks_count, const MarkRanges & all_mark_ranges)
: index(index), stream(
part->getFullPath() + index->getFileName(), ".idx", marks_count,
all_mark_ranges, nullptr, false, nullptr, 0, DBMS_DEFAULT_BUFFER_SIZE,
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE) {
stream.seekToStart();
}
void MergeTreeIndexReader::seek(size_t mark)
{
stream.seekToMark(mark);
}
MergeTreeIndexGranulePtr MergeTreeIndexReader::read() {
auto granule = index->createIndexGranule();
granule->deserializeBinary(*stream.data_buffer);
return granule;
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeIndexes.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB {
class MergeTreeIndexReader {
public:
MergeTreeIndexReader(
MergeTreeIndexPtr index,
MergeTreeData::DataPartPtr part,
size_t marks_count,
const MarkRanges & all_mark_ranges);
void seek(size_t mark);
MergeTreeIndexGranulePtr read();
private:
MergeTreeIndexPtr index;
MergeTreeReader::Stream stream;
};
}

View File

@ -18,36 +18,6 @@ namespace ErrorCodes
}
void MergeTreeIndexes::writeText(DB::WriteBuffer &ostr) const
{
writeString("indexes format version: 1\n", ostr);
DB::writeText(size(), ostr);
writeString(" indexes:\n", ostr);
for (auto index : *this) {
index->writeText(ostr);
writeChar('\n', ostr);
}
}
void MergeTreeIndexes::readText(DB::ReadBuffer &istr)
{
const MergeTreeIndexFactory & factory = MergeTreeIndexFactory::instance();
assertString("indexes format version: 1\n", istr);
size_t count;
DB::readText(count, istr);
assertString(" indexes:\n", istr);
reserve(count);
for (size_t i = 0; i < count; ++i) {
String index_descr;
readString(index_descr, istr);
emplace_back(factory.get(index_descr));
assertChar('\n', istr);
}
}
void MergeTreeIndexFactory::registerIndex(const std::string &name, Creator creator)
{
if (!indexes.emplace(name, std::move(creator)).second)
@ -55,7 +25,10 @@ void MergeTreeIndexFactory::registerIndex(const std::string &name, Creator creat
ErrorCodes::LOGICAL_ERROR);
}
std::unique_ptr<MergeTreeIndex> MergeTreeIndexFactory::get(std::shared_ptr<ASTIndexDeclaration> node) const
std::unique_ptr<MergeTreeIndex> MergeTreeIndexFactory::get(
const MergeTreeData & data,
std::shared_ptr<ASTIndexDeclaration> node,
const Context & context) const
{
if (!node->type)
throw Exception(
@ -74,14 +47,7 @@ std::unique_ptr<MergeTreeIndex> MergeTreeIndexFactory::get(std::shared_ptr<ASTIn
}
}),
ErrorCodes::INCORRECT_QUERY);
return it->second(node);
}
std::unique_ptr<MergeTreeIndex> MergeTreeIndexFactory::get(const String & description) const
{
ParserIndexDeclaration parser;
ASTPtr ast = parseQuery(parser, description.data(), description.data() + description.size(), "index factory", 0);
return get(std::dynamic_pointer_cast<ASTIndexDeclaration>(ast));
return it->second(data, node, context);
}
}

View File

@ -17,60 +17,45 @@ constexpr auto INDEX_FILE_PREFIX = "skp_idx_";
namespace DB
{
class MergeTreeData;
class MergeTreeIndex;
using MergeTreeIndexPtr = std::shared_ptr<const MergeTreeIndex>;
using MutableMergeTreeIndexPtr = std::shared_ptr<MergeTreeIndex>;
/// Condition on the index.
/// It works only with one indexPart (MergeTreeDataPart).
class IndexCondition {
friend MergeTreeIndex;
public:
virtual ~IndexCondition() = default;
/// Checks if this index is useful for query.
virtual bool alwaysUnknownOrTrue() const = 0;
/// Drops out ranges where query is false
virtual MarkRanges filterRanges(const MarkRanges & ranges) const = 0;
protected:
IndexCondition() = default;
public:
MergeTreeIndexPtr index;
};
using IndexConditionPtr = std::shared_ptr<IndexCondition>;
struct MergeTreeIndexGranule
{
friend MergeTreeIndex;
virtual ~MergeTreeIndexGranule();
virtual ~MergeTreeIndexGranule() = default;
virtual void serializeBinary(WriteBuffer & ostr) const = 0;
virtual void deserializeBinary(ReadBuffer & istr) const = 0;
virtual void deserializeBinary(ReadBuffer & istr) = 0;
virtual bool empty() const = 0;
virtual void update(const Block & block, size_t * pos, size_t limit) = 0;
};
using MergeTreeIndexGranulePtr = std::shared_ptr<MergeTreeIndexGranule>;
using MergeTreeIndexGranules = std::vector<MergeTreeIndexGranulePtr>;
class MergeTreeIndexReader {
/// Condition on the index.
class IndexCondition {
public:
IndexCondition() = default;
virtual ~IndexCondition() = default;
/// Checks if this index is useful for query.
virtual bool alwaysUnknownOrTrue() const = 0;
virtual bool mayBeTrueOnGranule(const MergeTreeIndexGranule & granule) const = 0;
MergeTreeIndexPtr index;
};
using IndexConditionPtr = std::shared_ptr<IndexCondition>;
/// Structure for storing basic index info like columns, expression, arguments, ...
class MergeTreeIndex
@ -84,11 +69,11 @@ public:
virtual String indexType() const { return "UNKNOWN"; };
/// gets filename without extension
virtual String getFileName() const = 0;
String getFileName() const { return INDEX_FILE_PREFIX + name; };
virtual MergeTreeIndexGranulePtr createIndexGranule() const = 0;
virtual IndexConditionPtr createIndexConditionOnPart(
virtual IndexConditionPtr createIndexCondition(
const SelectQueryInfo & query_info, const Context & context) const = 0;
virtual void writeText(WriteBuffer & ostr) const = 0;
@ -102,12 +87,7 @@ public:
};
class MergeTreeIndexes : public std::vector<MutableMergeTreeIndexPtr>
{
public:
void writeText(WriteBuffer & ostr) const;
void readText(ReadBuffer & istr);
};
using MergeTreeIndexes = std::vector<MutableMergeTreeIndexPtr>;
class MergeTreeIndexFactory : public ext::singleton<MergeTreeIndexFactory>
@ -115,10 +95,16 @@ class MergeTreeIndexFactory : public ext::singleton<MergeTreeIndexFactory>
friend class ext::singleton<MergeTreeIndexFactory>;
public:
using Creator = std::function<std::unique_ptr<MergeTreeIndex>(std::shared_ptr<ASTIndexDeclaration> node)>;
using Creator = std::function<
std::unique_ptr<MergeTreeIndex>(
const MergeTreeData & data,
std::shared_ptr<ASTIndexDeclaration> node,
const Context & context)>;
std::unique_ptr<MergeTreeIndex> get(std::shared_ptr<ASTIndexDeclaration> node) const;
std::unique_ptr<MergeTreeIndex> get(const String & description) const;
std::unique_ptr<MergeTreeIndex> get(
const MergeTreeData & data,
std::shared_ptr<ASTIndexDeclaration> node,
const Context & context) const;
void registerIndex(const std::string & name, Creator creator);

View File

@ -4,6 +4,7 @@
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeRangeReader.h>
#include <Compression/CachedCompressedReadBuffer.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Core/NamesAndTypes.h>
#include <port/clock.h>
@ -56,7 +57,6 @@ public:
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res);
private:
class Stream
{
public:
@ -94,6 +94,7 @@ private:
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
};
private:
using FileStreams = std::map<std::string, std::unique_ptr<Stream>>;
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.

View File

@ -0,0 +1,2 @@
#include <Storages/MergeTree/MergeTreeTestIndex.h>

View File

@ -0,0 +1,89 @@
#pragma once
#include <Storages/MergeTree/MergeTreeIndexes.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <iostream>
#include <random>
namespace DB {
class MergeTreeTestIndex;
struct MergeTreeTestGranule : public MergeTreeIndexGranule {
~MergeTreeTestGranule() override {};
void serializeBinary(WriteBuffer &ostr) const override {
//std::cerr << "TESTINDEX: written " << emp << "\n";
writeIntBinary(emp, ostr);
}
void deserializeBinary(ReadBuffer &istr) override {
readIntBinary(emp, istr);
//std::cerr << "TESTINDEX: read " << emp << "\n";
}
bool empty() const override {
return static_cast<bool>(emp);
}
void update(const Block &block, size_t *pos, size_t limit) override {
*pos += std::min(limit, block.rows() - *pos);
emp = rand();
};
Int32 emp = true;
};
class IndexTestCondition : public IndexCondition{
public:
IndexTestCondition() = default;
~IndexTestCondition() override {};
/// Checks if this index is useful for query.
bool alwaysUnknownOrTrue() const override { return false; };
bool mayBeTrueOnGranule(const MergeTreeIndexGranule &) const override {
return true;
}
};
class MergeTreeTestIndex : public MergeTreeIndex
{
public:
MergeTreeTestIndex(String name, ExpressionActionsPtr expr, size_t granularity, Block key)
: MergeTreeIndex(name, expr, granularity, key) {}
~MergeTreeTestIndex() override {}
String indexType() const override { return "TEST"; }
/// gets filename without extension
MergeTreeIndexGranulePtr createIndexGranule() const override {
return std::make_shared<MergeTreeTestGranule>();
}
IndexConditionPtr createIndexCondition(
const SelectQueryInfo & , const Context & ) const override {
return std::make_shared<IndexTestCondition>();
};
void writeText(WriteBuffer & ostr) const override {
DB::writeText(10, ostr);
};
};
std::unique_ptr<MergeTreeIndex> MTItestCreator(
const MergeTreeData & data, std::shared_ptr<ASTIndexDeclaration> node, const Context & ) {
return std::make_unique<MergeTreeTestIndex>(
node->name, data.primary_key_expr, node->granularity.get<size_t>(), Block{});
}
}

View File

@ -1,6 +1,8 @@
#include <Storages/StorageFactory.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeIndexes.h>
#include <Storages/MergeTree/MergeTreeTestIndex.h>
#include <Common/typeid_cast.h>
#include <Common/OptimizedRegularExpression.h>
@ -633,6 +635,12 @@ static StoragePtr create(const StorageFactory::Arguments & args)
}
static void registerMergeTreeSkipIndexes() {
auto & factory = MergeTreeIndexFactory::instance();
factory.registerIndex("test", MTItestCreator);
}
void registerStorageMergeTree(StorageFactory & factory)
{
factory.registerStorage("MergeTree", create);
@ -650,6 +658,8 @@ void registerStorageMergeTree(StorageFactory & factory)
factory.registerStorage("ReplicatedSummingMergeTree", create);
factory.registerStorage("ReplicatedGraphiteMergeTree", create);
factory.registerStorage("ReplicatedVersionedCollapsingMergeTree", create);
registerMergeTreeSkipIndexes();
}
}