Compile partition key

This commit is contained in:
alesapin 2020-05-20 15:16:55 +03:00
parent 2673c985bd
commit 8bc527eecd
10 changed files with 105 additions and 47 deletions

View File

@ -221,9 +221,8 @@ static NameSet getKeyColumns(const StoragePtr & storage)
NameSet key_columns;
if (merge_tree_data->partition_key_expr)
for (const String & col : merge_tree_data->partition_key_expr->getRequiredColumns())
key_columns.insert(col);
for (const String & col : merge_tree_data->getColumnsRequiredForPartitionKey())
key_columns.insert(col);
auto sorting_key_expr = merge_tree_data->sorting_key_expr;
if (sorting_key_expr)

View File

@ -9,6 +9,7 @@
#include <Interpreters/Context.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/quoteString.h>
#include <Interpreters/ExpressionActions.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
@ -433,4 +434,26 @@ NamesAndTypesList IStorage::getVirtuals() const
return {};
}
const StorageMetadataKeyField & IStorage::getPartitionKey() const
{
return partition_key;
}
void IStorage::setPartitionKey(const StorageMetadataKeyField & partition_key_)
{
partition_key = partition_key_;
}
Names IStorage::getColumnsRequiredForPartitionKey() const
{
if (partition_key.expressions)
return partition_key.expressions->getRequiredColumns();
return {};
}
bool IStorage::hasPartitionKey() const
{
return partition_key.expressions != nullptr;
}
}

View File

@ -183,6 +183,9 @@ public: /// thread-unsafe part. lockStructure must be acquired
///
/// By default return empty list of columns.
virtual NamesAndTypesList getVirtuals() const;
const StorageMetadataKeyField & getPartitionKey() const;
void setPartitionKey(const StorageMetadataKeyField & partition_key_);
bool hasPartitionKey() const;
protected: /// still thread-unsafe part.
void setIndices(IndicesDescription indices_);
@ -195,10 +198,21 @@ protected: /// still thread-unsafe part.
private:
StorageID storage_id;
mutable std::mutex id_mutex;
ColumnsDescription columns;
IndicesDescription indices;
ConstraintsDescription constraints;
StorageMetadataKeyField partition_key;
//StorageMetadataKeyField primary_key;
//StorageMetadataKeyField sorting_key;
//StorageMetadataKeyField sample_key;
//StorageMetadataField rows_ttl_entry;
//std::vector<StorageMetadataField> column_ttl_entries;
//std::vector<StorageMetadataField> move_ttl_entries;
private:
RWLockImpl::LockHolder tryLockTimed(
const RWLock & rwlock, RWLockImpl::Type type, const String & query_id, const SettingSeconds & acquire_timeout) const;
@ -441,7 +455,7 @@ public:
virtual Strings getDataPaths() const { return {}; }
/// Returns ASTExpressionList of partition key expression for storage or nullptr if there is none.
virtual ASTPtr getPartitionKeyAST() const { return nullptr; }
virtual ASTPtr getPartitionKeyAST() const { return partition_key.definition_ast; }
/// Returns ASTExpressionList of sorting key expression for storage or nullptr if there is none.
virtual ASTPtr getSortingKeyAST() const { return nullptr; }
@ -453,7 +467,7 @@ public:
virtual ASTPtr getSamplingKeyAST() const { return nullptr; }
/// Returns column names that need to be read to calculate partition key.
virtual Names getColumnsRequiredForPartitionKey() const { return {}; }
virtual Names getColumnsRequiredForPartitionKey() const;
/// Returns column names that need to be read to calculate sorting key.
virtual Names getColumnsRequiredForSortingKey() const { return {}; }

View File

@ -494,7 +494,7 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
minmax_idx.load(storage, volume->getDisk(), path);
}
String calculated_partition_id = partition.getID(storage.partition_key_sample);
String calculated_partition_id = partition.getID(storage.getPartitionKey().sample_block);
if (calculated_partition_id != info.partition_id)
throw Exception(
"While loading part " + getFullPath() + ": calculated partition ID: " + calculated_partition_id
@ -852,7 +852,7 @@ void IMergeTreeDataPart::checkConsistencyBase() const
if (!checksums.files.count("count.txt"))
throw Exception("No checksum for count.txt", ErrorCodes::NO_FILE_IN_DATA_PART);
if (storage.partition_key_expr && !checksums.files.count("partition.dat"))
if (storage.hasPartitionKey() && !checksums.files.count("partition.dat"))
throw Exception("No checksum for partition.dat", ErrorCodes::NO_FILE_IN_DATA_PART);
if (!isEmpty())
@ -885,7 +885,7 @@ void IMergeTreeDataPart::checkConsistencyBase() const
{
check_file_not_empty(volume->getDisk(), path + "count.txt");
if (storage.partition_key_expr)
if (storage.hasPartitionKey())
check_file_not_empty(volume->getDisk(), path + "partition.dat");
for (const String & col_name : storage.minmax_idx_columns)

View File

@ -130,7 +130,6 @@ MergeTreeData::MergeTreeData(
: IStorage(table_id_)
, global_context(context_)
, merging_params(merging_params_)
, partition_by_ast(metadata.partition_by_ast)
, sample_by_ast(metadata.sample_by_ast)
, settings_ast(metadata.settings_ast)
, require_part_metadata(require_part_metadata_)
@ -169,8 +168,8 @@ MergeTreeData::MergeTreeData(
{
try
{
partition_by_ast = makeASTFunction("toYYYYMM", std::make_shared<ASTIdentifier>(date_column_name));
initPartitionKey();
auto partition_by_ast = makeASTFunction("toYYYYMM", std::make_shared<ASTIdentifier>(date_column_name));
initPartitionKey(partition_by_ast);
if (minmax_idx_date_column_pos == -1)
throw Exception("Could not find Date column", ErrorCodes::BAD_TYPE_OF_FIELD);
@ -185,7 +184,7 @@ MergeTreeData::MergeTreeData(
else
{
is_custom_partitioned = true;
initPartitionKey();
initPartitionKey(metadata.partition_by_ast);
min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING;
}
@ -252,8 +251,8 @@ StorageInMemoryMetadata MergeTreeData::getInMemoryMetadata() const
{
StorageInMemoryMetadata metadata(getColumns(), getIndices(), getConstraints());
if (partition_by_ast)
metadata.partition_by_ast = partition_by_ast->clone();
if (hasPartitionKey())
metadata.partition_by_ast = getPartitionKey().definition_ast->clone();
if (order_by_ast)
metadata.order_by_ast = order_by_ast->clone();
@ -511,28 +510,31 @@ ASTPtr MergeTreeData::extractKeyExpressionList(const ASTPtr & node)
}
void MergeTreeData::initPartitionKey()
void MergeTreeData::initPartitionKey(ASTPtr partition_by_ast)
{
ASTPtr partition_key_expr_list = extractKeyExpressionList(partition_by_ast);
StorageMetadataKeyField new_partition_key;
new_partition_key.definition_ast = partition_by_ast;
new_partition_key.expression_ast = partition_key_expr_list;
if (partition_key_expr_list->children.empty())
return;
{
auto syntax_result = SyntaxAnalyzer(global_context).analyze(partition_key_expr_list, getColumns().getAllPhysical());
partition_key_expr = ExpressionAnalyzer(partition_key_expr_list, syntax_result, global_context).getActions(false);
new_partition_key.expressions = ExpressionAnalyzer(partition_key_expr_list, syntax_result, global_context).getActions(false);
}
for (const ASTPtr & ast : partition_key_expr_list->children)
{
String col_name = ast->getColumnName();
partition_key_sample.insert(partition_key_expr->getSampleBlock().getByName(col_name));
new_partition_key.sample_block.insert(new_partition_key.expressions->getSampleBlock().getByName(col_name));
}
checkKeyExpression(*partition_key_expr, partition_key_sample, "Partition");
checkKeyExpression(*new_partition_key.expressions, new_partition_key.sample_block, "Partition");
/// Add all columns used in the partition key to the min-max index.
const NamesAndTypesList & minmax_idx_columns_with_types = partition_key_expr->getRequiredColumnsWithTypes();
const NamesAndTypesList & minmax_idx_columns_with_types = new_partition_key.expressions->getRequiredColumnsWithTypes();
minmax_idx_expr = std::make_shared<ExpressionActions>(minmax_idx_columns_with_types, global_context);
for (const NameAndTypePair & column : minmax_idx_columns_with_types)
{
@ -577,6 +579,7 @@ void MergeTreeData::initPartitionKey()
}
}
}
setPartitionKey(new_partition_key);
}
namespace
@ -631,8 +634,8 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns,
{
NameSet columns_ttl_forbidden;
if (partition_key_expr)
for (const auto & col : partition_key_expr->getRequiredColumns())
if (hasPartitionKey())
for (const auto & col : getPartitionKey().expressions->getRequiredColumns())
columns_ttl_forbidden.insert(col);
if (sorting_key_expr)
@ -1419,12 +1422,12 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, const S
/// (and not as a part of some expression) and if the ALTER only affects column metadata.
NameSet columns_alter_type_metadata_only;
if (partition_key_expr)
if (hasPartitionKey())
{
/// Forbid altering partition key columns because it can change partition ID format.
/// TODO: in some cases (e.g. adding an Enum value) a partition key column can still be ALTERed.
/// We should allow it.
for (const String & col : partition_key_expr->getRequiredColumns())
for (const String & col : getColumnsRequiredForPartitionKey())
columns_alter_type_forbidden.insert(col);
}
@ -2621,7 +2624,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
/// Re-parse partition key fields using the information about expected field types.
size_t fields_count = partition_key_sample.columns();
size_t fields_count = getPartitionKey().sample_block.columns();
if (partition_ast.fields_count != fields_count)
throw Exception(
"Wrong number of fields in the partition expression: " + toString(partition_ast.fields_count) +
@ -2638,7 +2641,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
ReadBufferFromMemory right_paren_buf(")", 1);
ConcatReadBuffer buf({&left_paren_buf, &fields_buf, &right_paren_buf});
auto input_stream = FormatFactory::instance().getInput("Values", buf, partition_key_sample, context, context.getSettingsRef().max_block_size);
auto input_stream = FormatFactory::instance().getInput("Values", buf, getPartitionKey().sample_block, context, context.getSettingsRef().max_block_size);
auto block = input_stream->read();
if (!block || !block.rows())
@ -3161,7 +3164,7 @@ MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(IStorage & sour
if (query_to_string(order_by_ast) != query_to_string(src_data->order_by_ast))
throw Exception("Tables have different ordering", ErrorCodes::BAD_ARGUMENTS);
if (query_to_string(partition_by_ast) != query_to_string(src_data->partition_by_ast))
if (query_to_string(getPartitionKey().definition_ast) != query_to_string(src_data->getPartitionKey().definition_ast))
throw Exception("Tables have different partition key", ErrorCodes::BAD_ARGUMENTS);
if (format_version != src_data->format_version)

View File

@ -334,12 +334,11 @@ public:
/// See comments about methods below in IStorage interface
StorageInMemoryMetadata getInMemoryMetadata() const override;
ASTPtr getPartitionKeyAST() const override { return partition_by_ast; }
ASTPtr getSortingKeyAST() const override { return sorting_key_expr_ast; }
ASTPtr getPrimaryKeyAST() const override { return primary_key_expr_ast; }
ASTPtr getSamplingKeyAST() const override { return sample_by_ast; }
Names getColumnsRequiredForPartitionKey() const override { return (partition_key_expr ? partition_key_expr->getRequiredColumns() : Names{}); }
//Names getColumnsRequiredForPartitionKey() const override { return (partition_key_expr ? partition_key_expr->getRequiredColumns() : Names{}); }
Names getColumnsRequiredForSortingKey() const override { return sorting_key_expr->getRequiredColumns(); }
Names getColumnsRequiredForPrimaryKey() const override { return primary_key_expr->getRequiredColumns(); }
Names getColumnsRequiredForSampling() const override { return columns_required_for_sampling; }
@ -648,8 +647,6 @@ public:
const MergingParams merging_params;
bool is_custom_partitioned = false;
ExpressionActionsPtr partition_key_expr;
Block partition_key_sample;
ExpressionActionsPtr minmax_idx_expr;
Names minmax_idx_columns;
@ -738,7 +735,6 @@ protected:
friend struct ReplicatedMergeTreeTableMetadata;
friend class StorageReplicatedMergeTree;
ASTPtr partition_by_ast;
ASTPtr order_by_ast;
ASTPtr primary_key_ast;
ASTPtr sample_by_ast;
@ -853,7 +849,7 @@ protected:
void setProperties(const StorageInMemoryMetadata & metadata, bool only_check = false, bool attach = false);
void initPartitionKey();
void initPartitionKey(ASTPtr partition_by_ast);
void setTTLExpressions(const ColumnsDescription & columns,
const ASTPtr & new_ttl_table_ast, bool only_check = false);

View File

@ -138,18 +138,19 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
data.check(block, true);
block.checkNumberOfRows();
if (!data.partition_key_expr) /// Table is not partitioned.
if (!data.hasPartitionKey()) /// Table is not partitioned.
{
result.emplace_back(Block(block), Row());
return result;
}
Block block_copy = block;
data.partition_key_expr->execute(block_copy);
const auto & partition_key = data.getPartitionKey();
partition_key.expressions->execute(block_copy);
ColumnRawPtrs partition_columns;
partition_columns.reserve(data.partition_key_sample.columns());
for (const ColumnWithTypeAndName & element : data.partition_key_sample)
partition_columns.reserve(partition_key.sample_block.columns());
for (const ColumnWithTypeAndName & element : partition_key.sample_block)
partition_columns.emplace_back(block_copy.getByName(element.name).column.get());
PODArray<size_t> partition_num_to_first_row;
@ -203,7 +204,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
MergeTreePartition partition(std::move(block_with_partition.partition));
MergeTreePartInfo new_part_info(partition.getID(data.partition_key_sample), temp_index, temp_index, 0);
MergeTreePartInfo new_part_info(partition.getID(data.getPartitionKey().sample_block), temp_index, temp_index, 0);
String part_name;
if (data.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{

View File

@ -26,7 +26,7 @@ static std::unique_ptr<ReadBufferFromFileBase> openForReading(const DiskPtr & di
String MergeTreePartition::getID(const MergeTreeData & storage) const
{
return getID(storage.partition_key_sample);
return getID(storage.getPartitionKey().sample_block);
}
/// NOTE: This ID is used to create part names which are then persisted in ZK and as directory names on the file system.
@ -89,7 +89,8 @@ String MergeTreePartition::getID(const Block & partition_key_sample) const
void MergeTreePartition::serializeText(const MergeTreeData & storage, WriteBuffer & out, const FormatSettings & format_settings) const
{
size_t key_size = storage.partition_key_sample.columns();
const auto & partition_key_sample = storage.getPartitionKey().sample_block;
size_t key_size = partition_key_sample.columns();
if (key_size == 0)
{
@ -97,7 +98,7 @@ void MergeTreePartition::serializeText(const MergeTreeData & storage, WriteBuffe
}
else if (key_size == 1)
{
const DataTypePtr & type = storage.partition_key_sample.getByPosition(0).type;
const DataTypePtr & type = partition_key_sample.getByPosition(0).type;
auto column = type->createColumn();
column->insert(value[0]);
type->serializeAsText(*column, 0, out, format_settings);
@ -108,7 +109,7 @@ void MergeTreePartition::serializeText(const MergeTreeData & storage, WriteBuffe
Columns columns;
for (size_t i = 0; i < key_size; ++i)
{
const auto & type = storage.partition_key_sample.getByPosition(i).type;
const auto & type = partition_key_sample.getByPosition(i).type;
types.push_back(type);
auto column = type->createColumn();
column->insert(value[i]);
@ -123,19 +124,20 @@ void MergeTreePartition::serializeText(const MergeTreeData & storage, WriteBuffe
void MergeTreePartition::load(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path)
{
if (!storage.partition_key_expr)
if (!storage.hasPartitionKey())
return;
const auto & partition_key_sample = storage.getPartitionKey().sample_block;
auto partition_file_path = part_path + "partition.dat";
auto file = openForReading(disk, partition_file_path);
value.resize(storage.partition_key_sample.columns());
for (size_t i = 0; i < storage.partition_key_sample.columns(); ++i)
storage.partition_key_sample.getByPosition(i).type->deserializeBinary(value[i], *file);
value.resize(partition_key_sample.columns());
for (size_t i = 0; i < partition_key_sample.columns(); ++i)
partition_key_sample.getByPosition(i).type->deserializeBinary(value[i], *file);
}
void MergeTreePartition::store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const
{
store(storage.partition_key_sample, disk, part_path, checksums);
store(storage.getPartitionKey().sample_block, disk, part_path, checksums);
}
void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const

View File

@ -51,7 +51,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
data_format_version = data.format_version;
if (data.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
partition_key = formattedAST(MergeTreeData::extractKeyExpressionList(data.partition_by_ast));
partition_key = formattedAST(data.getPartitionKey().expression_ast);
ttl_table = formattedAST(data.ttl_table_ast);

View File

@ -43,4 +43,24 @@ struct StorageInMemoryMetadata
StorageInMemoryMetadata & operator=(const StorageInMemoryMetadata & other);
};
struct StorageMetadataField
{
ASTPtr ast;
ExpressionActionsPtr actions;
};
struct StorageMetadataKeyField
{
ASTPtr definition_ast;
ASTPtr expression_ast;
ExpressionActionsPtr expressions;
Names expression_columns;
Block sample_block;
DataTypes data_types;
StorageMetadataKeyField & operator=(const StorageMetadataKeyField & other) = default;
};
}