Merge pull request #5 from nikvas0/nikvas0/minmax

Nikvas0/minmax
This commit is contained in:
Nikita Vasilev 2019-01-09 17:34:04 +03:00 committed by GitHub
commit 0536711337
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1162 additions and 996 deletions

View File

@ -637,19 +637,16 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
BlockInputStreamPtr stream = std::move(input);
for (const auto & index : data.indexes) {
stream = std::make_shared<ExpressionBlockInputStream>(stream, index->expr);
stream = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(stream, index->expr));
}
if (data.hasPrimaryKey()) {
stream = std::make_shared<ExpressionBlockInputStream>(
BlockInputStreamPtr(std::move(stream)), data.sorting_key_expr);
stream = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(stream, data.sorting_key_expr));
}
if (!data.indexes.empty() || data.hasPrimaryKey()) {
src_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(stream));
} else {
src_streams.emplace_back(stream);
}
src_streams.emplace_back(stream);
}
Names sort_columns = data.sorting_key_columns;
@ -658,6 +655,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
sort_description.reserve(sort_columns_size);
Block header = src_streams.at(0)->getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
@ -916,16 +914,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
{
/// All columns are modified, proceed to write a new part from scratch.
for (const auto & index : data.indexes) {
in = std::make_shared<ExpressionBlockInputStream>(in, index->expr);
}
for (const auto & index : data.indexes)
in = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(in, index->expr));
if (data.hasPrimaryKey())
in = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(in, data.primary_key_expr));
else if (!data.indexes.empty()) {
in = std::make_shared<MaterializingBlockInputStream>(in);
}
MergeTreeDataPart::MinMaxIndex minmax_idx;

File diff suppressed because it is too large Load Diff

View File

@ -216,12 +216,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
for (auto index : data.indexes)
{
auto index_columns = index->expr->getRequiredColumnsWithTypes();
for (const auto & column : index_columns)
{
if (!block.has(column.name))
block.insert(ColumnWithTypeAndName(column.type, column.name));
}
index->expr->execute(block);
}

View File

@ -6,6 +6,8 @@
#include <numeric>
#include <boost/algorithm/string.hpp>
namespace DB
{
@ -17,7 +19,6 @@ namespace ErrorCodes
extern const int UNKNOWN_EXCEPTION;
}
void MergeTreeIndexFactory::registerIndex(const std::string &name, Creator creator)
{
if (!indexes.emplace(name, std::move(creator)).second)
@ -32,8 +33,12 @@ std::unique_ptr<MergeTreeIndex> MergeTreeIndexFactory::get(
{
if (!node->type)
throw Exception(
"for INDEX TYPE is required",
ErrorCodes::INCORRECT_QUERY);
"for index TYPE is required", ErrorCodes::INCORRECT_QUERY);
if (node->type->parameters && !node->type->parameters->children.empty())
throw Exception(
"Index type can not have parameters", ErrorCodes::INCORRECT_QUERY);
boost::algorithm::to_lower(node->type->name);
auto it = indexes.find(node->type->name);
if (it == indexes.end())
throw Exception(

View File

@ -31,6 +31,7 @@ struct MergeTreeIndexGranule
virtual void serializeBinary(WriteBuffer & ostr) const = 0;
virtual void deserializeBinary(ReadBuffer & istr) = 0;
virtual String toString() const = 0;
virtual bool empty() const = 0;
virtual void update(const Block & block, size_t * pos, size_t limit) = 0;
@ -43,15 +44,11 @@ using MergeTreeIndexGranules = std::vector<MergeTreeIndexGranulePtr>;
/// Condition on the index.
class IndexCondition {
public:
IndexCondition() = default;
virtual ~IndexCondition() = default;
/// Checks if this index is useful for query.
virtual bool alwaysUnknownOrTrue() const = 0;
virtual bool mayBeTrueOnGranule(const MergeTreeIndexGranule & granule) const = 0;
MergeTreeIndexPtr index;
virtual bool mayBeTrueOnGranule(MergeTreeIndexGranulePtr granule) const = 0;
};
using IndexConditionPtr = std::shared_ptr<IndexCondition>;
@ -61,12 +58,10 @@ using IndexConditionPtr = std::shared_ptr<IndexCondition>;
class MergeTreeIndex
{
public:
MergeTreeIndex(String name, ExpressionActionsPtr expr, size_t granularity, Block key)
: name(name), expr(expr), granularity(granularity), sample(key) {}
MergeTreeIndex(String name, ExpressionActionsPtr expr, size_t granularity)
: name(name), expr(expr), granularity(granularity) {}
virtual ~MergeTreeIndex() {};
virtual String indexType() const { return "UNKNOWN"; };
virtual ~MergeTreeIndex() = default;
/// gets filename without extension
String getFileName() const { return INDEX_FILE_PREFIX + name; };
@ -81,7 +76,6 @@ public:
size_t granularity;
Names columns;
DataTypes data_types;
Block sample;
};

View File

@ -0,0 +1,186 @@
#include <Storages/MergeTree/MergeTreeMinMaxIndex.h>
#include <Poco/Logger.h>
namespace DB
{
MergeTreeMinMaxGranule::MergeTreeMinMaxGranule(const MergeTreeMinMaxIndex & index)
: MergeTreeIndexGranule(), index(index), parallelogram()
{
}
void MergeTreeMinMaxGranule::serializeBinary(WriteBuffer & ostr) const
{
if (empty())
throw Exception(
"Attempt to write empty minmax index `" + index.name + "`", ErrorCodes::LOGICAL_ERROR);
Poco::Logger * log = &Poco::Logger::get("minmax_idx");
LOG_DEBUG(log, "serializeBinary Granule");
for (size_t i = 0; i < index.columns.size(); ++i)
{
const DataTypePtr & type = index.data_types[i];
LOG_DEBUG(log, "parallel " << i << " :: "
<< applyVisitor(FieldVisitorToString(), parallelogram[i].left) << " "
<< applyVisitor(FieldVisitorToString(), parallelogram[i].right));
type->serializeBinary(parallelogram[i].left, ostr);
type->serializeBinary(parallelogram[i].right, ostr);
}
}
void MergeTreeMinMaxGranule::deserializeBinary(ReadBuffer & istr)
{
Poco::Logger * log = &Poco::Logger::get("minmax_idx");
LOG_DEBUG(log, "deserializeBinary Granule");
parallelogram.clear();
for (size_t i = 0; i < index.columns.size(); ++i)
{
const DataTypePtr & type = index.data_types[i];
Field min_val;
type->deserializeBinary(min_val, istr);
Field max_val;
type->deserializeBinary(max_val, istr);
LOG_DEBUG(log, "parallel " << i << " :: "
<< applyVisitor(FieldVisitorToString(), min_val) << " "
<< applyVisitor(FieldVisitorToString(), max_val));
parallelogram.emplace_back(min_val, true, max_val, true);
}
}
String MergeTreeMinMaxGranule::toString() const
{
String res = "minmax granule: ";
for (size_t i = 0; i < parallelogram.size(); ++i)
{
res += "["
+ applyVisitor(FieldVisitorToString(), parallelogram[i].left) + ", "
+ applyVisitor(FieldVisitorToString(), parallelogram[i].right) + "]";
}
return res;
}
void MergeTreeMinMaxGranule::update(const Block & block, size_t * pos, size_t limit)
{
Poco::Logger * log = &Poco::Logger::get("minmax_idx");
LOG_DEBUG(log, "update Granule " << parallelogram.size()
<< " pos: "<< *pos << " limit: " << limit << " rows: " << block.rows());
size_t rows_read = 0;
for (size_t i = 0; i < index.columns.size(); ++i)
{
LOG_DEBUG(log, "granule column: " << index.columns[i]);
auto column = block.getByName(index.columns[i]).column;
size_t cur;
/// TODO: more effective (index + getExtremes??)
for (cur = 0; cur < limit && cur + *pos < column->size(); ++cur)
{
Field field;
column->get(cur + *pos, field);
LOG_DEBUG(log, "upd:: " << applyVisitor(FieldVisitorToString(), field));
if (parallelogram.size() <= i)
{
LOG_DEBUG(log, "emplaced");
parallelogram.emplace_back(field, true, field, true);
}
else
{
parallelogram[i].left = std::min(parallelogram[i].left, field);
parallelogram[i].right = std::max(parallelogram[i].right, field);
}
}
LOG_DEBUG(log, "res:: ["
<< applyVisitor(FieldVisitorToString(), parallelogram[i].left) << ", "
<< applyVisitor(FieldVisitorToString(), parallelogram[i].right) << "]");
rows_read = cur;
}
LOG_DEBUG(log, "updated rows_read: " << rows_read);
*pos += rows_read;
};
MinMaxCondition::MinMaxCondition(
const SelectQueryInfo &query,
const Context &context,
const MergeTreeMinMaxIndex &index)
: IndexCondition(), index(index), condition(query, context, index.columns, index.expr) {};
bool MinMaxCondition::alwaysUnknownOrTrue() const
{
return condition.alwaysUnknownOrTrue();
}
bool MinMaxCondition::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const
{
std::shared_ptr<MergeTreeMinMaxGranule> granule
= std::dynamic_pointer_cast<MergeTreeMinMaxGranule>(idx_granule);
if (!granule) {
throw Exception(
"Minmax index condition got wrong granule", ErrorCodes::LOGICAL_ERROR);
}
return condition.mayBeTrueInParallelogram(granule->parallelogram, index.data_types);
}
MergeTreeIndexGranulePtr MergeTreeMinMaxIndex::createIndexGranule() const
{
return std::make_shared<MergeTreeMinMaxGranule>(*this);
}
IndexConditionPtr MergeTreeMinMaxIndex::createIndexCondition(
const SelectQueryInfo & query, const Context & context) const
{
return std::make_shared<MinMaxCondition>(query, context, *this);
};
std::unique_ptr<MergeTreeIndex> MergeTreeMinMaxIndexCreator(
const MergeTreeData & data,
std::shared_ptr<ASTIndexDeclaration> node,
const Context & context)
{
if (node->name.empty())
throw Exception("Index must have unique name", ErrorCodes::INCORRECT_QUERY);
if (node->type->arguments)
throw Exception("Minmax index have not any arguments", ErrorCodes::INCORRECT_QUERY);
ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(node->expr->clone());
auto syntax = SyntaxAnalyzer(context, {}).analyze(
expr_list, data.getColumns().getAllPhysical());
auto minmax_expr = ExpressionAnalyzer(expr_list, syntax, context).getActions(false);
auto minmax = std::make_unique<MergeTreeMinMaxIndex>(
node->name, std::move(minmax_expr), node->granularity.get<size_t>());
auto sample = ExpressionAnalyzer(expr_list, syntax, context)
.getActions(true)->getSampleBlock();
Poco::Logger * log = &Poco::Logger::get("minmax_idx");
LOG_DEBUG(log, "new minmax index");
for (size_t i = 0; i < expr_list->children.size(); ++i)
{
const auto & column = sample.getByPosition(i);
minmax->columns.emplace_back(column.name);
minmax->data_types.emplace_back(column.type);
LOG_DEBUG(log, ">" << column.name << " " << column.type->getName());
}
return minmax;
}
}

View File

@ -0,0 +1,81 @@
#pragma once
#include <Storages/MergeTree/MergeTreeIndexes.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <memory>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCORRECT_QUERY;
}
class MergeTreeMinMaxIndex;
struct MergeTreeMinMaxGranule : public MergeTreeIndexGranule
{
explicit MergeTreeMinMaxGranule(const MergeTreeMinMaxIndex & index);
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr) override;
String toString() const override;
bool empty() const override { return parallelogram.empty(); }
void update(const Block & block, size_t * pos, size_t limit) override;
~MergeTreeMinMaxGranule() override = default;
const MergeTreeMinMaxIndex & index;
std::vector<Range> parallelogram;
};
class MinMaxCondition : public IndexCondition
{
public:
MinMaxCondition(
const SelectQueryInfo & query,
const Context & context,
const MergeTreeMinMaxIndex & index);
bool alwaysUnknownOrTrue() const override;
bool mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const override;
~MinMaxCondition() override = default;
private:
const MergeTreeMinMaxIndex & index;
KeyCondition condition;
};
class MergeTreeMinMaxIndex : public MergeTreeIndex
{
public:
MergeTreeMinMaxIndex(String name, ExpressionActionsPtr expr, size_t granularity)
: MergeTreeIndex(name, expr, granularity) {}
~MergeTreeMinMaxIndex() override = default;
MergeTreeIndexGranulePtr createIndexGranule() const override;
IndexConditionPtr createIndexCondition(
const SelectQueryInfo & query, const Context & context) const override;
};
std::unique_ptr<MergeTreeIndex> MergeTreeMinMaxIndexCreator(
const MergeTreeData & data, std::shared_ptr<ASTIndexDeclaration> node, const Context & context);
}

View File

@ -1,2 +0,0 @@
#include <Storages/MergeTree/MergeTreeTestIndex.h>

View File

@ -1,95 +0,0 @@
#pragma once
#include <Storages/MergeTree/MergeTreeIndexes.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <iostream>
#include <random>
namespace DB {
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
}
class MergeTreeTestIndex;
struct MergeTreeTestGranule : public MergeTreeIndexGranule {
~MergeTreeTestGranule() override {};
void serializeBinary(WriteBuffer &ostr) const override {
//std::cerr << "TESTINDEX: written " << emp << "\n";
writeIntBinary(emp, ostr);
}
void deserializeBinary(ReadBuffer &istr) override {
readIntBinary(emp, istr);
if (emp != 10) {
throw Exception("kek bad read", ErrorCodes::FILE_DOESNT_EXIST);
}
//std::cerr << "TESTINDEX: read " << emp << "\n";
}
bool empty() const override {
return emp == 0;
}
void update(const Block &block, size_t *pos, size_t limit) override {
*pos += std::min(limit, block.rows() - *pos);
emp = 10;
};
Int32 emp = 0;
};
class IndexTestCondition : public IndexCondition{
public:
IndexTestCondition() = default;
~IndexTestCondition() override {};
/// Checks if this index is useful for query.
bool alwaysUnknownOrTrue() const override { return false; };
bool mayBeTrueOnGranule(const MergeTreeIndexGranule &) const override {
return true;
}
};
class MergeTreeTestIndex : public MergeTreeIndex
{
public:
MergeTreeTestIndex(String name, ExpressionActionsPtr expr, size_t granularity, Block key)
: MergeTreeIndex(name, expr, granularity, key) {}
~MergeTreeTestIndex() override {}
String indexType() const override { return "TEST"; }
/// gets filename without extension
MergeTreeIndexGranulePtr createIndexGranule() const override {
return std::make_shared<MergeTreeTestGranule>();
}
IndexConditionPtr createIndexCondition(
const SelectQueryInfo & , const Context & ) const override {
return std::make_shared<IndexTestCondition>();
};
};
std::unique_ptr<MergeTreeIndex> MTItestCreator(
const MergeTreeData & data, std::shared_ptr<ASTIndexDeclaration> node, const Context & ) {
return std::make_unique<MergeTreeTestIndex>(
node->name, data.primary_key_expr, node->granularity.get<size_t>(), Block{});
}
}

View File

@ -2,7 +2,7 @@
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeIndexes.h>
#include <Storages/MergeTree/MergeTreeTestIndex.h>
#include <Storages/MergeTree/MergeTreeMinMaxIndex.h>
#include <Common/typeid_cast.h>
#include <Common/OptimizedRegularExpression.h>
@ -637,7 +637,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
static void registerMergeTreeSkipIndexes() {
auto & factory = MergeTreeIndexFactory::instance();
factory.registerIndex("test", MTItestCreator);
factory.registerIndex("minmax", MergeTreeMinMaxIndexCreator);
}