Merge pull request #23673 from amosbird/partitionvalue

Add _partition_value virtual column
This commit is contained in:
Kruglov Pavel 2021-05-11 11:23:00 +03:00 committed by GitHub
commit 49e7ed6e60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 171 additions and 55 deletions

View File

@ -755,6 +755,7 @@ void TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select
partition_source_columns.push_back("_part");
partition_source_columns.push_back("_partition_id");
partition_source_columns.push_back("_part_uuid");
partition_source_columns.push_back("_partition_value");
optimize_trivial_count = true;
for (const auto & required_column : required)
{

View File

@ -26,6 +26,7 @@ ReadFromMergeTree::ReadFromMergeTree(
: ISourceStep(DataStream{.header = MergeTreeBaseSelectProcessor::transformHeader(
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
prewhere_info_,
storage_.getPartitionValueType(),
virt_column_names_)})
, storage(storage_)
, metadata_snapshot(std::move(metadata_snapshot_))

View File

@ -30,7 +30,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
const MergeTreeReaderSettings & reader_settings_,
bool use_uncompressed_cache_,
const Names & virt_column_names_)
: SourceWithProgress(transformHeader(std::move(header), prewhere_info_, virt_column_names_))
: SourceWithProgress(transformHeader(std::move(header), prewhere_info_, storage_.getPartitionValueType(), virt_column_names_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, prewhere_info(prewhere_info_)
@ -40,6 +40,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
, reader_settings(reader_settings_)
, use_uncompressed_cache(use_uncompressed_cache_)
, virt_column_names(virt_column_names_)
, partition_value_type(storage.getPartitionValueType())
{
header_without_virtual_columns = getPort().getHeader();
@ -60,7 +61,7 @@ Chunk MergeTreeBaseSelectProcessor::generate()
if (res.hasRows())
{
injectVirtualColumns(res, task.get(), virt_column_names);
injectVirtualColumns(res, task.get(), partition_value_type, virt_column_names);
return res;
}
}
@ -207,11 +208,18 @@ namespace
virtual void insertStringColumn(const ColumnPtr & column, const String & name) = 0;
virtual void insertUInt64Column(const ColumnPtr & column, const String & name) = 0;
virtual void insertUUIDColumn(const ColumnPtr & column, const String & name) = 0;
virtual void
insertPartitionValueColumn(size_t rows, const Row & partition_value, const DataTypePtr & partition_value_type, const String & name)
= 0;
};
}
static void injectVirtualColumnsImpl(size_t rows, VirtualColumnsInserter & inserter,
MergeTreeReadTask * task, const Names & virtual_columns)
static void injectVirtualColumnsImpl(
size_t rows,
VirtualColumnsInserter & inserter,
MergeTreeReadTask * task,
const DataTypePtr & partition_value_type,
const Names & virtual_columns)
{
/// add virtual columns
/// Except _sample_factor, which is added from the outside.
@ -263,6 +271,13 @@ static void injectVirtualColumnsImpl(size_t rows, VirtualColumnsInserter & inser
inserter.insertStringColumn(column, virtual_column_name);
}
else if (virtual_column_name == "_partition_value")
{
if (rows)
inserter.insertPartitionValueColumn(rows, task->data_part->partition.value, partition_value_type, virtual_column_name);
else
inserter.insertPartitionValueColumn(rows, {}, partition_value_type, virtual_column_name);
}
}
}
}
@ -288,6 +303,19 @@ namespace
block.insert({column, std::make_shared<DataTypeUUID>(), name});
}
void insertPartitionValueColumn(
size_t rows, const Row & partition_value, const DataTypePtr & partition_value_type, const String & name) final
{
ColumnPtr column;
if (rows)
column = partition_value_type->createColumnConst(rows, Tuple(partition_value.begin(), partition_value.end()))
->convertToFullColumnIfConst();
else
column = partition_value_type->createColumn();
block.insert({column, partition_value_type, name});
}
Block & block;
};
@ -309,23 +337,38 @@ namespace
{
columns.push_back(column);
}
void
insertPartitionValueColumn(size_t rows, const Row & partition_value, const DataTypePtr & partition_value_type, const String &) final
{
ColumnPtr column;
if (rows)
column = partition_value_type->createColumnConst(rows, Tuple(partition_value.begin(), partition_value.end()))
->convertToFullColumnIfConst();
else
column = partition_value_type->createColumn();
columns.push_back(column);
}
Columns & columns;
};
}
void MergeTreeBaseSelectProcessor::injectVirtualColumns(Block & block, MergeTreeReadTask * task, const Names & virtual_columns)
void MergeTreeBaseSelectProcessor::injectVirtualColumns(
Block & block, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
VirtualColumnsInserterIntoBlock inserter { block };
injectVirtualColumnsImpl(block.rows(), inserter, task, virtual_columns);
VirtualColumnsInserterIntoBlock inserter{block};
injectVirtualColumnsImpl(block.rows(), inserter, task, partition_value_type, virtual_columns);
}
void MergeTreeBaseSelectProcessor::injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const Names & virtual_columns)
void MergeTreeBaseSelectProcessor::injectVirtualColumns(
Chunk & chunk, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
UInt64 num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
VirtualColumnsInserterIntoColumns inserter { columns };
injectVirtualColumnsImpl(num_rows, inserter, task, virtual_columns);
VirtualColumnsInserterIntoColumns inserter{columns};
injectVirtualColumnsImpl(num_rows, inserter, task, partition_value_type, virtual_columns);
chunk.setColumns(columns, num_rows);
}
@ -371,10 +414,10 @@ void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const P
}
Block MergeTreeBaseSelectProcessor::transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns)
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
executePrewhereActions(block, prewhere_info);
injectVirtualColumns(block, nullptr, virtual_columns);
injectVirtualColumns(block, nullptr, partition_value_type, virtual_columns);
return block;
}

View File

@ -33,7 +33,8 @@ public:
~MergeTreeBaseSelectProcessor() override;
static Block transformHeader(Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns);
static Block transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns);
static void executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info);
@ -48,8 +49,10 @@ protected:
Chunk readFromPartImpl();
/// Two versions for header and chunk.
static void injectVirtualColumns(Block & block, MergeTreeReadTask * task, const Names & virtual_columns);
static void injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const Names & virtual_columns);
static void
injectVirtualColumns(Block & block, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns);
static void
injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns);
void initializeRangeReaders(MergeTreeReadTask & task);
@ -68,6 +71,9 @@ protected:
bool use_uncompressed_cache;
Names virt_column_names;
DataTypePtr partition_value_type;
/// This header is used for chunks from readFromPart().
Block header_without_virtual_columns;

View File

@ -7,6 +7,7 @@
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/NestedUtils.h>
#include <Formats/FormatFactory.h>
#include <Functions/FunctionFactory.h>
@ -685,6 +686,62 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
}
DataTypePtr MergeTreeData::getPartitionValueType() const
{
DataTypePtr partition_value_type;
auto partition_types = getInMemoryMetadataPtr()->partition_key.sample_block.getDataTypes();
if (partition_types.empty())
partition_value_type = std::make_shared<DataTypeUInt8>();
else
partition_value_type = std::make_shared<DataTypeTuple>(std::move(partition_types));
return partition_value_type;
}
Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const
{
DataTypePtr partition_value_type = getPartitionValueType();
bool has_partition_value = typeid_cast<const DataTypeTuple *>(partition_value_type.get());
Block block{
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_part"),
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_partition_id"),
ColumnWithTypeAndName(ColumnUUID::create(), std::make_shared<DataTypeUUID>(), "_part_uuid"),
ColumnWithTypeAndName(partition_value_type->createColumn(), partition_value_type, "_partition_value")};
MutableColumns columns = block.mutateColumns();
auto & part_column = columns[0];
auto & partition_id_column = columns[1];
auto & part_uuid_column = columns[2];
auto & partition_value_column = columns[3];
for (const auto & part : parts)
{
part_column->insert(part->name);
partition_id_column->insert(part->info.partition_id);
part_uuid_column->insert(part->uuid);
Tuple tuple(part->partition.value.begin(), part->partition.value.end());
if (has_partition_value)
partition_value_column->insert(tuple);
if (one_part)
{
part_column = ColumnConst::create(std::move(part_column), 1);
partition_id_column = ColumnConst::create(std::move(partition_id_column), 1);
part_uuid_column = ColumnConst::create(std::move(part_uuid_column), 1);
if (has_partition_value)
partition_value_column = ColumnConst::create(std::move(partition_value_column), 1);
break;
}
}
block.setColumns(std::move(columns));
if (!has_partition_value)
block.erase("_partition_value");
return block;
}
std::optional<UInt64> MergeTreeData::totalRowsByPartitionPredicateImpl(
const SelectQueryInfo & query_info, ContextPtr local_context, const DataPartsVector & parts) const
{
@ -692,7 +749,7 @@ std::optional<UInt64> MergeTreeData::totalRowsByPartitionPredicateImpl(
return 0u;
auto metadata_snapshot = getInMemoryMetadataPtr();
ASTPtr expression_ast;
Block virtual_columns_block = MergeTreeDataSelectExecutor::getBlockWithVirtualPartColumns(parts, true /* one_part */);
Block virtual_columns_block = getBlockWithVirtualPartColumns(parts, true /* one_part */);
// Generate valid expressions for filtering
bool valid = VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, local_context, virtual_columns_block, expression_ast);
@ -704,7 +761,7 @@ std::optional<UInt64> MergeTreeData::totalRowsByPartitionPredicateImpl(
std::unordered_set<String> part_values;
if (valid && expression_ast)
{
virtual_columns_block = MergeTreeDataSelectExecutor::getBlockWithVirtualPartColumns(parts, false /* one_part */);
virtual_columns_block = getBlockWithVirtualPartColumns(parts, false /* one_part */);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, local_context, expression_ast);
part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
if (part_values.empty())
@ -4268,6 +4325,7 @@ NamesAndTypesList MergeTreeData::getVirtuals() const
NameAndTypePair("_part_index", std::make_shared<DataTypeUInt64>()),
NameAndTypePair("_part_uuid", std::make_shared<DataTypeUUID>()),
NameAndTypePair("_partition_id", std::make_shared<DataTypeString>()),
NameAndTypePair("_partition_value", getPartitionValueType()),
NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>()),
};
}

View File

@ -762,6 +762,13 @@ public:
/// Remove current query id after query finished.
void removeQueryId(const String & query_id) const;
/// Return the partition expression types as a Tuple type. Return DataTypeUInt8 if partition expression is empty.
DataTypePtr getPartitionValueType() const;
/// Construct a block consisting only of possible virtual columns for part pruning.
/// If one_part is true, fill in at most one part.
Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const;
/// Limiting parallel sends per one table, used in DataPartsExchange
std::atomic_uint current_table_sends {0};

View File

@ -36,6 +36,7 @@
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/VirtualColumnUtils.h>
@ -62,6 +63,7 @@ namespace ErrorCodes
extern const int CANNOT_PARSE_TEXT;
extern const int TOO_MANY_PARTITIONS;
extern const int DUPLICATED_PART_UUIDS;
extern const int NO_SUCH_COLUMN_IN_TABLE;
}
@ -70,38 +72,6 @@ MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & d
{
}
Block MergeTreeDataSelectExecutor::getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part)
{
Block block(std::initializer_list<ColumnWithTypeAndName>{
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_part"),
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_partition_id"),
ColumnWithTypeAndName(ColumnUUID::create(), std::make_shared<DataTypeUUID>(), "_part_uuid")});
MutableColumns columns = block.mutateColumns();
auto & part_column = columns[0];
auto & partition_id_column = columns[1];
auto & part_uuid_column = columns[2];
for (const auto & part : parts)
{
part_column->insert(part->name);
partition_id_column->insert(part->info.partition_id);
part_uuid_column->insert(part->uuid);
if (one_part)
{
part_column = ColumnConst::create(std::move(part_column), 1);
partition_id_column = ColumnConst::create(std::move(partition_id_column), 1);
part_uuid_column = ColumnConst::create(std::move(part_uuid_column), 1);
break;
}
}
block.setColumns(std::move(columns));
return block;
}
size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
const MergeTreeData::DataPartsVector & parts,
const StorageMetadataPtr & metadata_snapshot,
@ -206,6 +176,18 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
{
virt_column_names.push_back(name);
}
else if (name == "_partition_value")
{
if (!typeid_cast<const DataTypeTuple *>(data.getPartitionValueType().get()))
{
throw Exception(
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE,
"Missing column `_partition_value` because there is no partition column in table {}",
data.getStorageID().getTableName());
}
virt_column_names.push_back(name);
}
else if (name == "_sample_factor")
{
sample_factor_column_queried = true;
@ -225,7 +207,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
std::unordered_set<String> part_values;
ASTPtr expression_ast;
auto virtual_columns_block = getBlockWithVirtualPartColumns(parts, true /* one_part */);
auto virtual_columns_block = data.getBlockWithVirtualPartColumns(parts, true /* one_part */);
// Generate valid expressions for filtering
VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, context, virtual_columns_block, expression_ast);
@ -233,7 +215,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
// If there is still something left, fill the virtual block and do the filtering.
if (expression_ast)
{
virtual_columns_block = getBlockWithVirtualPartColumns(parts, false /* one_part */);
virtual_columns_block = data.getBlockWithVirtualPartColumns(parts, false /* one_part */);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context, expression_ast);
part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
if (part_values.empty())

View File

@ -45,10 +45,6 @@ public:
unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
/// Construct a block consisting only of possible virtual columns for part pruning.
/// If one_part is true, fill in at most one part.
static Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part);
private:
const MergeTreeData & data;

View File

@ -0,0 +1,4 @@
1
4
4
1

View File

@ -0,0 +1,18 @@
drop table if exists tbl;
drop table if exists tbl2;
create table tbl(dt DateTime, i int, j String, v Float64) engine MergeTree partition by (toDate(dt), i % 2, length(j)) order by i settings index_granularity = 1;
insert into tbl values ('2021-04-01 00:01:02', 1, '123', 4), ('2021-04-01 01:01:02', 1, '12', 4), ('2021-04-01 02:11:02', 2, '345', 4), ('2021-04-01 04:31:02', 2, '2', 4), ('2021-04-02 00:01:02', 1, '1234', 4), ('2021-04-02 00:01:02', 2, '123', 4), ('2021-04-02 00:01:02', 3, '12', 4), ('2021-04-02 00:01:02', 4, '1', 4);
select count() from tbl where _partition_value = ('2021-04-01', 1, 2) settings max_rows_to_read = 1;
select count() from tbl where _partition_value.1 = '2021-04-01' settings max_rows_to_read = 4;
select count() from tbl where _partition_value.2 = 0 settings max_rows_to_read = 4;
select count() from tbl where _partition_value.3 = 4 settings max_rows_to_read = 1;
create table tbl2(i int) engine MergeTree order by i;
insert into tbl2 values (1);
select _partition_value from tbl2; -- { serverError 16 }
drop table tbl;
drop table tbl2;