This commit is contained in:
Nikita Vasilev 2019-03-09 17:03:41 +03:00
parent 8e65c9a094
commit b9d258dc99
2 changed files with 117 additions and 27 deletions

View File

@ -22,11 +22,15 @@ const Field UNKNOWN_FIELD(3u);
MergeTreeSetIndexGranule::MergeTreeSetIndexGranule(const MergeTreeSetSkippingIndex & index) MergeTreeSetIndexGranule::MergeTreeSetIndexGranule(const MergeTreeSetSkippingIndex & index)
: IMergeTreeIndexGranule(), index(index), block(index.header.cloneEmpty()) {} : IMergeTreeIndexGranule()
, index(index)
, block(index.header.cloneEmpty()) {}
MergeTreeSetIndexGranule::MergeTreeSetIndexGranule( MergeTreeSetIndexGranule::MergeTreeSetIndexGranule(
const MergeTreeSetSkippingIndex & index, const Columns & columns) const MergeTreeSetSkippingIndex & index, MutableColumns && mutable_columns)
: IMergeTreeIndexGranule(), index(index), block(index.header.cloneWithColumns(columns)) {} : IMergeTreeIndexGranule()
, index(index)
, block(index.header.cloneWithColumns(std::move(mutable_columns))) {}
void MergeTreeSetIndexGranule::serializeBinary(WriteBuffer & ostr) const void MergeTreeSetIndexGranule::serializeBinary(WriteBuffer & ostr) const
{ {
@ -91,19 +95,30 @@ void MergeTreeSetIndexGranule::deserializeBinary(ReadBuffer & istr)
MergeTreeSetIndexAggregator::MergeTreeSetIndexAggregator(const MergeTreeSetSkippingIndex & index) MergeTreeSetIndexAggregator::MergeTreeSetIndexAggregator(const MergeTreeSetSkippingIndex & index)
: index(index), set(new Set(SizeLimits{}, true)) : index(index), columns(index.header.cloneEmptyColumns())
{ {
set->setHeader(index.header); ColumnRawPtrs column_ptrs;
column_ptrs.reserve(index.columns.size());
Columns materialized_columns;
for (const auto & column : index.header.getColumns())
{
materialized_columns.emplace_back(column->convertToFullColumnIfConst()->convertToFullColumnIfLowCardinality());
column_ptrs.emplace_back(materialized_columns.back().get());
}
data.init(ClearableSetVariants::chooseMethod(column_ptrs, key_sizes));
columns = index.header.cloneEmptyColumns();
} }
void MergeTreeSetIndexAggregator::update(const Block & new_block, size_t * pos, size_t limit) void MergeTreeSetIndexAggregator::update(const Block & block, size_t * pos, size_t limit)
{ {
if (*pos >= new_block.rows()) if (*pos >= block.rows())
throw Exception( throw Exception(
"The provided position is not less than the number of block rows. Position: " "The provided position is not less than the number of block rows. Position: "
+ toString(*pos) + ", Block rows: " + toString(new_block.rows()) + ".", ErrorCodes::LOGICAL_ERROR); + toString(*pos) + ", Block rows: " + toString(block.rows()) + ".", ErrorCodes::LOGICAL_ERROR);
size_t rows_read = std::min(limit, new_block.rows() - *pos); size_t rows_read = std::min(limit, block.rows() - *pos);
if (size() > index.max_rows) if (size() > index.max_rows)
{ {
@ -111,30 +126,90 @@ void MergeTreeSetIndexAggregator::update(const Block & new_block, size_t * pos,
return; return;
} }
Block key_block; ColumnRawPtrs index_column_ptrs;
for (size_t i = 0; i < index.columns.size(); ++i) index_column_ptrs.reserve(index.columns.size());
Columns materialized_columns;
for (const auto & column_name : index.columns)
{ {
const auto & name = index.columns[i]; materialized_columns.emplace_back(
const auto & type = index.data_types[i]; block.getByName(column_name).column->convertToFullColumnIfConst()->convertToFullColumnIfLowCardinality());
key_block.insert( index_column_ptrs.emplace_back(materialized_columns.back().get());
ColumnWithTypeAndName(
new_block.getByName(name).column->cut(*pos, rows_read),
type,
name));
} }
set->insertFromBlock(key_block); IColumn::Filter filter(block.rows(), 0);
bool has_new_data = false;
switch (data.type)
{
case ClearableSetVariants::Type::EMPTY:
break;
#define M(NAME) \
case ClearableSetVariants::Type::NAME: \
has_new_data = buildFilter(*data.NAME, index_column_ptrs, filter, *pos, rows_read, data); \
break;
APPLY_FOR_SET_VARIANTS(M)
#undef M
}
if (!has_new_data)
{
*pos += rows_read;
return;
}
for (size_t i = 0; i < columns.size(); ++i)
{
auto filtered_column = block.getByName(index.columns[i]).column->filter(filter, block.rows());
columns[i]->insertRangeFrom(*filtered_column, 0, filtered_column->size());
}
*pos += rows_read; *pos += rows_read;
} }
template <typename Method>
bool MergeTreeSetIndexAggregator::buildFilter(
Method & method,
const ColumnRawPtrs & columns,
IColumn::Filter & filter,
size_t pos,
size_t limit,
ClearableSetVariants & variants) const
{
/// Like DistinctSortedBlockInputStream.
typename Method::State state(columns, key_sizes, nullptr);
bool has_new_data = false;
for (size_t i = 0; i < limit; ++i)
{
auto emplace_result = state.emplaceKey(method.data, pos + i, variants.string_pool);
if (emplace_result.isInserted())
has_new_data = true;
/// Emit the record if there is no such key in the current set yet.
/// Skip it otherwise.
filter[pos + i] = emplace_result.isInserted();
}
return has_new_data;
}
MergeTreeIndexGranulePtr MergeTreeSetIndexAggregator::getGranuleAndReset() MergeTreeIndexGranulePtr MergeTreeSetIndexAggregator::getGranuleAndReset()
{ {
auto granule = std::make_shared<MergeTreeSetIndexGranule>(index, set->getSetElements()); auto granule = std::make_shared<MergeTreeSetIndexGranule>(index, std::move(columns));
auto new_set = std::make_unique<Set>(SizeLimits{}, true); switch (data.type)
new_set->setHeader(index.header); {
set.swap(new_set); case ClearableSetVariants::Type::EMPTY:
break;
#define M(NAME) \
case ClearableSetVariants::Type::NAME: \
data.NAME->data.clear(); \
break;
APPLY_FOR_SET_VARIANTS(M)
#undef M
}
columns = index.header.cloneEmptyColumns();
return granule; return granule;
} }

View File

@ -3,7 +3,7 @@
#include <Storages/MergeTree/MergeTreeIndices.h> #include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/Set.h> #include <Interpreters/SetVariants.h>
#include <memory> #include <memory>
#include <set> #include <set>
@ -17,7 +17,7 @@ class MergeTreeSetSkippingIndex;
struct MergeTreeSetIndexGranule : public IMergeTreeIndexGranule struct MergeTreeSetIndexGranule : public IMergeTreeIndexGranule
{ {
explicit MergeTreeSetIndexGranule(const MergeTreeSetSkippingIndex & index); explicit MergeTreeSetIndexGranule(const MergeTreeSetSkippingIndex & index);
MergeTreeSetIndexGranule(const MergeTreeSetSkippingIndex & index, const Columns & columns); MergeTreeSetIndexGranule(const MergeTreeSetSkippingIndex & index, MutableColumns && columns);
void serializeBinary(WriteBuffer & ostr) const override; void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr) override; void deserializeBinary(ReadBuffer & istr) override;
@ -37,13 +37,28 @@ struct MergeTreeSetIndexAggregator : IMergeTreeIndexAggregator
explicit MergeTreeSetIndexAggregator(const MergeTreeSetSkippingIndex & index); explicit MergeTreeSetIndexAggregator(const MergeTreeSetSkippingIndex & index);
~MergeTreeSetIndexAggregator() override = default; ~MergeTreeSetIndexAggregator() override = default;
size_t size() const { return set->getTotalRowCount(); } size_t size() const { return data.getTotalRowCount(); }
bool empty() const override { return !size(); } bool empty() const override { return !size(); }
MergeTreeIndexGranulePtr getGranuleAndReset() override; MergeTreeIndexGranulePtr getGranuleAndReset() override;
void update(const Block & block, size_t * pos, size_t limit) override; void update(const Block & block, size_t * pos, size_t limit) override;
private:
/// return true if has new data
template <typename Method>
bool buildFilter(
Method & method,
const ColumnRawPtrs & key_columns,
IColumn::Filter & filter,
size_t pos,
size_t limit,
ClearableSetVariants & variants) const;
const MergeTreeSetSkippingIndex & index; const MergeTreeSetSkippingIndex & index;
std::unique_ptr<Set> set; ClearableSetVariants data;
Sizes key_sizes;
MutableColumns columns;
}; };