mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 13:13:36 +00:00
Partition key in StorageInMemoryMetadata
This commit is contained in:
parent
62f2c17a66
commit
ab61abccc1
@ -214,7 +214,7 @@ MutationsInterpreter::MutationsInterpreter(
|
||||
select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context, storage, limits);
|
||||
}
|
||||
|
||||
static NameSet getKeyColumns(const StoragePtr & storage)
|
||||
static NameSet getKeyColumns(const StoragePtr & storage, const StorageMetadataPtr & metadata_snapshot)
|
||||
{
|
||||
const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get());
|
||||
if (!merge_tree_data)
|
||||
@ -222,7 +222,7 @@ static NameSet getKeyColumns(const StoragePtr & storage)
|
||||
|
||||
NameSet key_columns;
|
||||
|
||||
for (const String & col : merge_tree_data->getColumnsRequiredForPartitionKey())
|
||||
for (const String & col : metadata_snapshot->getColumnsRequiredForPartitionKey())
|
||||
key_columns.insert(col);
|
||||
|
||||
for (const String & col : merge_tree_data->getColumnsRequiredForSortingKey())
|
||||
@ -239,15 +239,16 @@ static NameSet getKeyColumns(const StoragePtr & storage)
|
||||
}
|
||||
|
||||
static void validateUpdateColumns(
|
||||
const StoragePtr & storage, const NameSet & updated_columns,
|
||||
const StoragePtr & storage,
|
||||
const StorageMetadataPtr & metadata_snapshot, const NameSet & updated_columns,
|
||||
const std::unordered_map<String, Names> & column_to_affected_materialized)
|
||||
{
|
||||
NameSet key_columns = getKeyColumns(storage);
|
||||
NameSet key_columns = getKeyColumns(storage, metadata_snapshot);
|
||||
|
||||
for (const String & column_name : updated_columns)
|
||||
{
|
||||
auto found = false;
|
||||
for (const auto & col : storage->getColumns().getOrdinary())
|
||||
for (const auto & col : metadata_snapshot->getColumns().getOrdinary())
|
||||
{
|
||||
if (col.name == column_name)
|
||||
{
|
||||
@ -258,7 +259,7 @@ static void validateUpdateColumns(
|
||||
|
||||
if (!found)
|
||||
{
|
||||
for (const auto & col : storage->getColumns().getMaterialized())
|
||||
for (const auto & col : metadata_snapshot->getColumns().getMaterialized())
|
||||
{
|
||||
if (col.name == column_name)
|
||||
throw Exception("Cannot UPDATE materialized column " + backQuote(column_name), ErrorCodes::CANNOT_UPDATE_COLUMN);
|
||||
@ -326,7 +327,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
|
||||
}
|
||||
}
|
||||
|
||||
validateUpdateColumns(storage, updated_columns, column_to_affected_materialized);
|
||||
validateUpdateColumns(storage, metadata_snapshot, updated_columns, column_to_affected_materialized);
|
||||
}
|
||||
|
||||
/// Columns, that we need to read for calculation of skip indices or TTL expressions.
|
||||
|
@ -319,28 +319,6 @@ NamesAndTypesList IStorage::getVirtuals() const
|
||||
return {};
|
||||
}
|
||||
|
||||
const KeyDescription & IStorage::getPartitionKey() const
|
||||
{
|
||||
return metadata->partition_key;
|
||||
}
|
||||
|
||||
bool IStorage::isPartitionKeyDefined() const
|
||||
{
|
||||
return metadata->partition_key.definition_ast != nullptr;
|
||||
}
|
||||
|
||||
bool IStorage::hasPartitionKey() const
|
||||
{
|
||||
return !metadata->partition_key.column_names.empty();
|
||||
}
|
||||
|
||||
Names IStorage::getColumnsRequiredForPartitionKey() const
|
||||
{
|
||||
if (hasPartitionKey())
|
||||
return metadata->partition_key.expression->getRequiredColumns();
|
||||
return {};
|
||||
}
|
||||
|
||||
const KeyDescription & IStorage::getSortingKey() const
|
||||
{
|
||||
return metadata->sorting_key;
|
||||
|
@ -427,18 +427,6 @@ public:
|
||||
/// Returns data paths if storage supports it, empty vector otherwise.
|
||||
virtual Strings getDataPaths() const { return {}; }
|
||||
|
||||
/// Returns structure with partition key.
|
||||
const KeyDescription & getPartitionKey() const;
|
||||
/// Returns ASTExpressionList of partition key expression for storage or nullptr if there is none.
|
||||
ASTPtr getPartitionKeyAST() const { return metadata->partition_key.definition_ast; }
|
||||
/// Storage has user-defined (in CREATE query) partition key.
|
||||
bool isPartitionKeyDefined() const;
|
||||
/// Storage has partition key.
|
||||
bool hasPartitionKey() const;
|
||||
/// Returns column names that need to be read to calculate partition key.
|
||||
Names getColumnsRequiredForPartitionKey() const;
|
||||
|
||||
|
||||
/// Returns structure with sorting key.
|
||||
const KeyDescription & getSortingKey() const;
|
||||
/// Returns ASTExpressionList of sorting key expression for storage or nullptr if there is none.
|
||||
|
@ -496,7 +496,8 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
|
||||
minmax_idx.load(storage, volume->getDisk(), path);
|
||||
}
|
||||
|
||||
String calculated_partition_id = partition.getID(storage.getPartitionKey().sample_block);
|
||||
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
|
||||
String calculated_partition_id = partition.getID(metadata_snapshot->getPartitionKey().sample_block);
|
||||
if (calculated_partition_id != info.partition_id)
|
||||
throw Exception(
|
||||
"While loading part " + getFullPath() + ": calculated partition ID: " + calculated_partition_id
|
||||
@ -840,6 +841,7 @@ void IMergeTreeDataPart::checkConsistencyBase() const
|
||||
{
|
||||
String path = getFullRelativePath();
|
||||
|
||||
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
|
||||
const auto & pk = storage.getPrimaryKey();
|
||||
if (!checksums.empty())
|
||||
{
|
||||
@ -851,7 +853,7 @@ void IMergeTreeDataPart::checkConsistencyBase() const
|
||||
if (!checksums.files.count("count.txt"))
|
||||
throw Exception("No checksum for count.txt", ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||
|
||||
if (storage.hasPartitionKey() && !checksums.files.count("partition.dat"))
|
||||
if (metadata_snapshot->hasPartitionKey() && !checksums.files.count("partition.dat"))
|
||||
throw Exception("No checksum for partition.dat", ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||
|
||||
if (!isEmpty())
|
||||
@ -884,7 +886,7 @@ void IMergeTreeDataPart::checkConsistencyBase() const
|
||||
{
|
||||
check_file_not_empty(volume->getDisk(), path + "count.txt");
|
||||
|
||||
if (storage.hasPartitionKey())
|
||||
if (metadata_snapshot->hasPartitionKey())
|
||||
check_file_not_empty(volume->getDisk(), path + "partition.dat");
|
||||
|
||||
for (const String & col_name : storage.minmax_idx_columns)
|
||||
|
@ -16,7 +16,7 @@ void MergeTreeBlockOutputStream::write(const Block & block)
|
||||
{
|
||||
storage.delayInsertOrThrowIfNeeded();
|
||||
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block);
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
|
||||
for (auto & current_block : part_blocks)
|
||||
{
|
||||
Stopwatch watch;
|
||||
|
@ -169,7 +169,7 @@ MergeTreeData::MergeTreeData(
|
||||
min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING;
|
||||
}
|
||||
|
||||
setProperties(metadata_, attach);
|
||||
setProperties(metadata_, metadata_, attach);
|
||||
const auto settings = getSettings();
|
||||
|
||||
/// NOTE: using the same columns list as is read when performing actual merges.
|
||||
@ -184,7 +184,7 @@ MergeTreeData::MergeTreeData(
|
||||
}
|
||||
|
||||
|
||||
setTTLExpressions(metadata_);
|
||||
setTTLExpressions(metadata_, metadata_);
|
||||
|
||||
/// format_file always contained on any data path
|
||||
PathWithDisk version_file;
|
||||
@ -274,7 +274,7 @@ static void checkKeyExpression(const ExpressionActions & expr, const Block & sam
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeData::checkProperties(const StorageInMemoryMetadata & new_metadata, bool attach) const
|
||||
void MergeTreeData::checkProperties(const StorageInMemoryMetadata & new_metadata, const StorageInMemoryMetadata & /*old_metadata*/, bool attach) const
|
||||
{
|
||||
if (!new_metadata.sorting_key.definition_ast)
|
||||
throw Exception("ORDER BY cannot be empty", ErrorCodes::BAD_ARGUMENTS);
|
||||
@ -381,9 +381,9 @@ void MergeTreeData::checkProperties(const StorageInMemoryMetadata & new_metadata
|
||||
|
||||
}
|
||||
|
||||
void MergeTreeData::setProperties(const StorageInMemoryMetadata & new_metadata, bool attach)
|
||||
void MergeTreeData::setProperties(const StorageInMemoryMetadata & new_metadata, const StorageInMemoryMetadata & old_metadata, bool attach)
|
||||
{
|
||||
checkProperties(new_metadata, attach);
|
||||
checkProperties(new_metadata, old_metadata, attach);
|
||||
setInMemoryMetadata(new_metadata);
|
||||
}
|
||||
|
||||
@ -475,7 +475,7 @@ void MergeTreeData::initPartitionKey(const KeyDescription & new_partition_key)
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeData::checkTTLExpressions(const StorageInMemoryMetadata & new_metadata) const
|
||||
void MergeTreeData::checkTTLExpressions(const StorageInMemoryMetadata & new_metadata, const StorageInMemoryMetadata & old_metadata) const
|
||||
{
|
||||
auto new_column_ttls = new_metadata.column_ttls_by_name;
|
||||
|
||||
@ -483,8 +483,8 @@ void MergeTreeData::checkTTLExpressions(const StorageInMemoryMetadata & new_meta
|
||||
{
|
||||
NameSet columns_ttl_forbidden;
|
||||
|
||||
if (hasPartitionKey())
|
||||
for (const auto & col : getColumnsRequiredForPartitionKey())
|
||||
if (old_metadata.hasPartitionKey())
|
||||
for (const auto & col : old_metadata.getColumnsRequiredForPartitionKey())
|
||||
columns_ttl_forbidden.insert(col);
|
||||
|
||||
if (hasSortingKey())
|
||||
@ -517,9 +517,9 @@ void MergeTreeData::checkTTLExpressions(const StorageInMemoryMetadata & new_meta
|
||||
}
|
||||
|
||||
/// Todo replace columns with TTL for columns
|
||||
void MergeTreeData::setTTLExpressions(const StorageInMemoryMetadata & new_metadata)
|
||||
void MergeTreeData::setTTLExpressions(const StorageInMemoryMetadata & new_metadata, const StorageInMemoryMetadata & old_metadata)
|
||||
{
|
||||
checkTTLExpressions(new_metadata);
|
||||
checkTTLExpressions(new_metadata, old_metadata);
|
||||
//setColumnTTLs(new_metadata.column_ttls_by_name);
|
||||
//setTableTTLs(new_metadata.table_ttl);
|
||||
}
|
||||
@ -1251,12 +1251,12 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, const S
|
||||
/// (and not as a part of some expression) and if the ALTER only affects column metadata.
|
||||
NameSet columns_alter_type_metadata_only;
|
||||
|
||||
if (hasPartitionKey())
|
||||
if (old_metadata.hasPartitionKey())
|
||||
{
|
||||
/// Forbid altering partition key columns because it can change partition ID format.
|
||||
/// TODO: in some cases (e.g. adding an Enum value) a partition key column can still be ALTERed.
|
||||
/// We should allow it.
|
||||
for (const String & col : getColumnsRequiredForPartitionKey())
|
||||
for (const String & col : old_metadata.getColumnsRequiredForPartitionKey())
|
||||
columns_alter_type_forbidden.insert(col);
|
||||
}
|
||||
|
||||
@ -1284,7 +1284,7 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, const S
|
||||
columns_alter_type_forbidden.insert(merging_params.sign_column);
|
||||
|
||||
std::map<String, const IDataType *> old_types;
|
||||
for (const auto & column : getColumns().getAllPhysical())
|
||||
for (const auto & column : old_metadata.getColumns().getAllPhysical())
|
||||
old_types.emplace(column.name, column.type.get());
|
||||
|
||||
|
||||
@ -1329,9 +1329,9 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, const S
|
||||
}
|
||||
}
|
||||
|
||||
checkProperties(new_metadata);
|
||||
checkProperties(new_metadata, old_metadata);
|
||||
|
||||
checkTTLExpressions(new_metadata);
|
||||
checkTTLExpressions(new_metadata, old_metadata);
|
||||
|
||||
if (hasSettingsChanges())
|
||||
{
|
||||
@ -2450,7 +2450,8 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
|
||||
|
||||
/// Re-parse partition key fields using the information about expected field types.
|
||||
|
||||
size_t fields_count = getPartitionKey().sample_block.columns();
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
size_t fields_count = metadata_snapshot->getPartitionKey().sample_block.columns();
|
||||
if (partition_ast.fields_count != fields_count)
|
||||
throw Exception(
|
||||
"Wrong number of fields in the partition expression: " + toString(partition_ast.fields_count) +
|
||||
@ -2467,7 +2468,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
|
||||
ReadBufferFromMemory right_paren_buf(")", 1);
|
||||
ConcatReadBuffer buf({&left_paren_buf, &fields_buf, &right_paren_buf});
|
||||
|
||||
auto input_stream = FormatFactory::instance().getInput("Values", buf, getPartitionKey().sample_block, context, context.getSettingsRef().max_block_size);
|
||||
auto input_stream = FormatFactory::instance().getInput("Values", buf, metadata_snapshot->getPartitionKey().sample_block, context, context.getSettingsRef().max_block_size);
|
||||
|
||||
auto block = input_stream->read();
|
||||
if (!block || !block.rows())
|
||||
@ -2964,7 +2965,7 @@ bool MergeTreeData::mayBenefitFromIndexForIn(
|
||||
}
|
||||
}
|
||||
|
||||
MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(IStorage & source_table) const
|
||||
MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(IStorage & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const
|
||||
{
|
||||
MergeTreeData * src_data = dynamic_cast<MergeTreeData *>(&source_table);
|
||||
if (!src_data)
|
||||
@ -2972,7 +2973,7 @@ MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(IStorage & sour
|
||||
" supports attachPartitionFrom only for MergeTree family of table engines."
|
||||
" Got " + source_table.getName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
|
||||
if (getColumns().getAllPhysical().sizeOfDifference(src_data->getColumns().getAllPhysical()))
|
||||
if (my_snapshot->getColumns().getAllPhysical().sizeOfDifference(src_snapshot->getColumns().getAllPhysical()))
|
||||
throw Exception("Tables have different structure", ErrorCodes::INCOMPATIBLE_COLUMNS);
|
||||
|
||||
auto query_to_string = [] (const ASTPtr & ast)
|
||||
@ -2983,7 +2984,7 @@ MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(IStorage & sour
|
||||
if (query_to_string(getSortingKeyAST()) != query_to_string(src_data->getSortingKeyAST()))
|
||||
throw Exception("Tables have different ordering", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
if (query_to_string(getPartitionKeyAST()) != query_to_string(src_data->getPartitionKeyAST()))
|
||||
if (query_to_string(my_snapshot->getPartitionKeyAST()) != query_to_string(src_snapshot->getPartitionKeyAST()))
|
||||
throw Exception("Tables have different partition key", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
if (format_version != src_data->format_version)
|
||||
@ -2992,9 +2993,10 @@ MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(IStorage & sour
|
||||
return *src_data;
|
||||
}
|
||||
|
||||
MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(const StoragePtr & source_table) const
|
||||
MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(
|
||||
const StoragePtr & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const
|
||||
{
|
||||
return checkStructureAndGetMergeTreeData(*source_table);
|
||||
return checkStructureAndGetMergeTreeData(*source_table, src_snapshot, my_snapshot);
|
||||
}
|
||||
|
||||
MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(const MergeTreeData::DataPartPtr & src_part,
|
||||
|
@ -556,8 +556,8 @@ public:
|
||||
/// Extracts MergeTreeData of other *MergeTree* storage
|
||||
/// and checks that their structure suitable for ALTER TABLE ATTACH PARTITION FROM
|
||||
/// Tables structure should be locked.
|
||||
MergeTreeData & checkStructureAndGetMergeTreeData(const StoragePtr & source_table) const;
|
||||
MergeTreeData & checkStructureAndGetMergeTreeData(IStorage & source_table) const;
|
||||
MergeTreeData & checkStructureAndGetMergeTreeData(const StoragePtr & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const;
|
||||
MergeTreeData & checkStructureAndGetMergeTreeData(IStorage & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const;
|
||||
|
||||
MergeTreeData::MutableDataPartPtr cloneAndLoadDataPartOnSameDisk(
|
||||
const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix, const MergeTreePartInfo & dst_part_info);
|
||||
@ -781,14 +781,14 @@ protected:
|
||||
/// The same for clearOldTemporaryDirectories.
|
||||
std::mutex clear_old_temporary_directories_mutex;
|
||||
|
||||
void checkProperties(const StorageInMemoryMetadata & new_metadata, bool attach = false) const;
|
||||
void checkProperties(const StorageInMemoryMetadata & new_metadata, const StorageInMemoryMetadata & old_metadata, bool attach = false) const;
|
||||
|
||||
void setProperties(const StorageInMemoryMetadata & new_metadata, bool attach = false);
|
||||
void setProperties(const StorageInMemoryMetadata & new_metadata, const StorageInMemoryMetadata & old_metadata, bool attach = false);
|
||||
|
||||
void initPartitionKey(const KeyDescription & new_partition_key);
|
||||
|
||||
void checkTTLExpressions(const StorageInMemoryMetadata & new_metadata) const;
|
||||
void setTTLExpressions(const StorageInMemoryMetadata & new_metadata);
|
||||
void checkTTLExpressions(const StorageInMemoryMetadata & new_metadata, const StorageInMemoryMetadata & old_metadata) const;
|
||||
void setTTLExpressions(const StorageInMemoryMetadata & new_metadata, const StorageInMemoryMetadata & old_metadata);
|
||||
|
||||
void checkStoragePolicy(const StoragePolicyPtr & new_storage_policy) const;
|
||||
|
||||
|
@ -132,7 +132,7 @@ void updateTTL(
|
||||
|
||||
}
|
||||
|
||||
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block, size_t max_parts)
|
||||
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot)
|
||||
{
|
||||
BlocksWithPartition result;
|
||||
if (!block || !block.rows())
|
||||
@ -140,14 +140,14 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
|
||||
|
||||
data.check(block, true);
|
||||
|
||||
if (!data.hasPartitionKey()) /// Table is not partitioned.
|
||||
if (!metadata_snapshot->hasPartitionKey()) /// Table is not partitioned.
|
||||
{
|
||||
result.emplace_back(Block(block), Row());
|
||||
return result;
|
||||
}
|
||||
|
||||
Block block_copy = block;
|
||||
const auto & partition_key = data.getPartitionKey();
|
||||
const auto & partition_key = metadata_snapshot->getPartitionKey();
|
||||
partition_key.expression->execute(block_copy);
|
||||
|
||||
ColumnRawPtrs partition_columns;
|
||||
@ -206,7 +206,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
|
||||
|
||||
MergeTreePartition partition(std::move(block_with_partition.partition));
|
||||
|
||||
MergeTreePartInfo new_part_info(partition.getID(data.getPartitionKey().sample_block), temp_index, temp_index, 0);
|
||||
MergeTreePartInfo new_part_info(partition.getID(metadata_snapshot->getPartitionKey().sample_block), temp_index, temp_index, 0);
|
||||
String part_name;
|
||||
if (data.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
{
|
||||
|
@ -40,7 +40,7 @@ public:
|
||||
* (split rows by partition)
|
||||
* Works deterministically: if same block was passed, function will return same result in same order.
|
||||
*/
|
||||
BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts);
|
||||
BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot);
|
||||
|
||||
/** All rows must correspond to same partition.
|
||||
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
|
||||
|
@ -26,7 +26,7 @@ static std::unique_ptr<ReadBufferFromFileBase> openForReading(const DiskPtr & di
|
||||
|
||||
String MergeTreePartition::getID(const MergeTreeData & storage) const
|
||||
{
|
||||
return getID(storage.getPartitionKey().sample_block);
|
||||
return getID(storage.getInMemoryMetadataPtr()->getPartitionKey().sample_block);
|
||||
}
|
||||
|
||||
/// NOTE: This ID is used to create part names which are then persisted in ZK and as directory names on the file system.
|
||||
@ -89,7 +89,7 @@ String MergeTreePartition::getID(const Block & partition_key_sample) const
|
||||
|
||||
void MergeTreePartition::serializeText(const MergeTreeData & storage, WriteBuffer & out, const FormatSettings & format_settings) const
|
||||
{
|
||||
const auto & partition_key_sample = storage.getPartitionKey().sample_block;
|
||||
const auto & partition_key_sample = storage.getInMemoryMetadataPtr()->getPartitionKey().sample_block;
|
||||
size_t key_size = partition_key_sample.columns();
|
||||
|
||||
if (key_size == 0)
|
||||
@ -124,10 +124,11 @@ void MergeTreePartition::serializeText(const MergeTreeData & storage, WriteBuffe
|
||||
|
||||
void MergeTreePartition::load(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path)
|
||||
{
|
||||
if (!storage.hasPartitionKey())
|
||||
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
|
||||
if (!metadata_snapshot->hasPartitionKey())
|
||||
return;
|
||||
|
||||
const auto & partition_key_sample = storage.getPartitionKey().sample_block;
|
||||
const auto & partition_key_sample = metadata_snapshot->getPartitionKey().sample_block;
|
||||
auto partition_file_path = part_path + "partition.dat";
|
||||
auto file = openForReading(disk, partition_file_path);
|
||||
value.resize(partition_key_sample.columns());
|
||||
@ -137,7 +138,7 @@ void MergeTreePartition::load(const MergeTreeData & storage, const DiskPtr & dis
|
||||
|
||||
void MergeTreePartition::store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const
|
||||
{
|
||||
store(storage.getPartitionKey().sample_block, disk, part_path, checksums);
|
||||
store(storage.getInMemoryMetadataPtr()->getPartitionKey().sample_block, disk, part_path, checksums);
|
||||
}
|
||||
|
||||
void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const
|
||||
|
@ -130,7 +130,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
|
||||
if (quorum)
|
||||
checkQuorumPrecondition(zookeeper);
|
||||
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block);
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
|
||||
|
||||
for (auto & current_block : part_blocks)
|
||||
{
|
||||
|
@ -51,7 +51,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
|
||||
data_format_version = data.format_version;
|
||||
|
||||
if (data.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
partition_key = formattedAST(data.getPartitionKey().expression_list_ast);
|
||||
partition_key = formattedAST(metadata_snapshot->getPartitionKey().expression_list_ast);
|
||||
|
||||
ttl_table = formattedAST(metadata_snapshot->getTableTTLs().definition_ast);
|
||||
|
||||
|
@ -291,4 +291,25 @@ Block StorageInMemoryMetadata::getSampleBlockForColumns(const Names & column_nam
|
||||
return res;
|
||||
}
|
||||
|
||||
const KeyDescription & StorageInMemoryMetadata::getPartitionKey() const
|
||||
{
|
||||
return partition_key;
|
||||
}
|
||||
|
||||
bool StorageInMemoryMetadata::isPartitionKeyDefined() const
|
||||
{
|
||||
return partition_key.definition_ast != nullptr;
|
||||
}
|
||||
|
||||
bool StorageInMemoryMetadata::hasPartitionKey() const
|
||||
{
|
||||
return !partition_key.column_names.empty();
|
||||
}
|
||||
|
||||
Names StorageInMemoryMetadata::getColumnsRequiredForPartitionKey() const
|
||||
{
|
||||
if (hasPartitionKey())
|
||||
return partition_key.expression->getRequiredColumns();
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
@ -112,6 +112,17 @@ struct StorageInMemoryMetadata
|
||||
Block getSampleBlockWithVirtuals(const NamesAndTypesList & virtuals) const; /// ordinary + materialized + virtuals.
|
||||
Block getSampleBlockForColumns(
|
||||
const Names & column_names, const NamesAndTypesList & virtuals) const; /// ordinary + materialized + aliases + virtuals.
|
||||
|
||||
/// Returns structure with partition key.
|
||||
const KeyDescription & getPartitionKey() const;
|
||||
/// Returns ASTExpressionList of partition key expression for storage or nullptr if there is none.
|
||||
ASTPtr getPartitionKeyAST() const { return partition_key.definition_ast; }
|
||||
/// Storage has user-defined (in CREATE query) partition key.
|
||||
bool isPartitionKeyDefined() const;
|
||||
/// Storage has partition key.
|
||||
bool hasPartitionKey() const;
|
||||
/// Returns column names that need to be read to calculate partition key.
|
||||
Names getColumnsRequiredForPartitionKey() const;
|
||||
};
|
||||
|
||||
using StorageMetadataPtr = std::shared_ptr<StorageInMemoryMetadata>;
|
||||
|
@ -259,6 +259,7 @@ void StorageMergeTree::alter(
|
||||
auto table_id = getStorageID();
|
||||
|
||||
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
|
||||
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
|
||||
auto maybe_mutation_commands = commands.getMutationCommands(new_metadata, context.getSettingsRef().materialize_ttl_after_modify, context);
|
||||
String mutation_file_name;
|
||||
Int64 mutation_version = -1;
|
||||
@ -282,8 +283,8 @@ void StorageMergeTree::alter(
|
||||
|
||||
changeSettings(new_metadata.settings_changes, table_lock_holder);
|
||||
/// Reinitialize primary key because primary key column types might have changed.
|
||||
setProperties(new_metadata);
|
||||
setTTLExpressions(new_metadata);
|
||||
setProperties(new_metadata, old_metadata);
|
||||
setTTLExpressions(new_metadata, old_metadata);
|
||||
|
||||
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id, new_metadata);
|
||||
|
||||
@ -1151,9 +1152,11 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
|
||||
{
|
||||
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
|
||||
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
|
||||
auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr();
|
||||
auto my_metadata_snapshot = getInMemoryMetadataPtr();
|
||||
|
||||
Stopwatch watch;
|
||||
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table);
|
||||
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, my_metadata_snapshot);
|
||||
String partition_id = getPartitionIDFromQuery(partition, context);
|
||||
|
||||
DataPartsVector src_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
|
||||
@ -1232,9 +1235,12 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
|
||||
" should have the same storage policy of source table " + getStorageID().getNameForLogs() + ". " +
|
||||
getStorageID().getNameForLogs() + ": " + this->getStoragePolicy()->getName() + ", " +
|
||||
dest_table_storage->getStorageID().getNameForLogs() + ": " + dest_table_storage->getStoragePolicy()->getName(), ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
auto dest_metadata_snapshot = dest_table->getInMemoryMetadataPtr();
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
Stopwatch watch;
|
||||
|
||||
MergeTreeData & src_data = dest_table_storage->checkStructureAndGetMergeTreeData(*this);
|
||||
MergeTreeData & src_data = dest_table_storage->checkStructureAndGetMergeTreeData(*this, metadata_snapshot, dest_metadata_snapshot);
|
||||
String partition_id = getPartitionIDFromQuery(partition, context);
|
||||
|
||||
DataPartsVector src_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
|
||||
|
@ -758,6 +758,7 @@ void StorageReplicatedMergeTree::checkTableStructure(const String & zookeeper_pr
|
||||
void StorageReplicatedMergeTree::setTableStructure(ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff)
|
||||
{
|
||||
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
|
||||
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
|
||||
if (new_columns != new_metadata.columns)
|
||||
{
|
||||
new_metadata.columns = new_columns;
|
||||
@ -820,8 +821,8 @@ void StorageReplicatedMergeTree::setTableStructure(ColumnsDescription new_column
|
||||
|
||||
/// Even if the primary/sorting keys didn't change we must reinitialize it
|
||||
/// because primary key column types might have changed.
|
||||
setProperties(new_metadata);
|
||||
setTTLExpressions(new_metadata);
|
||||
setProperties(new_metadata, old_metadata);
|
||||
setTTLExpressions(new_metadata, old_metadata);
|
||||
}
|
||||
|
||||
|
||||
@ -1794,6 +1795,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
|
||||
auto table_lock_holder_dst_table = lockStructureForShare(
|
||||
false, RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
|
||||
auto dst_metadata_snapshot = getInMemoryMetadataPtr();
|
||||
|
||||
for (size_t i = 0; i < entry_replace.new_part_names.size(); ++i)
|
||||
{
|
||||
@ -1843,10 +1845,11 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto src_metadata_snapshot = source_table->getInMemoryMetadataPtr();
|
||||
MergeTreeData * src_data = nullptr;
|
||||
try
|
||||
{
|
||||
src_data = &checkStructureAndGetMergeTreeData(source_table);
|
||||
src_data = &checkStructureAndGetMergeTreeData(source_table, src_metadata_snapshot, dst_metadata_snapshot);
|
||||
}
|
||||
catch (Exception &)
|
||||
{
|
||||
@ -5212,8 +5215,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
|
||||
auto lock1 = lockStructureForShare(true, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
|
||||
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
|
||||
|
||||
auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr();
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
|
||||
Stopwatch watch;
|
||||
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table);
|
||||
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot);
|
||||
String partition_id = getPartitionIDFromQuery(partition, context);
|
||||
|
||||
DataPartsVector src_all_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
|
||||
@ -5405,8 +5411,11 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
||||
getStorageID().getNameForLogs() + ": " + this->getStoragePolicy()->getName() + ", " +
|
||||
getStorageID().getNameForLogs() + ": " + dest_table_storage->getStoragePolicy()->getName(), ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
auto dest_metadata_snapshot = dest_table->getInMemoryMetadataPtr();
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
|
||||
Stopwatch watch;
|
||||
MergeTreeData & src_data = dest_table_storage->checkStructureAndGetMergeTreeData(*this);
|
||||
MergeTreeData & src_data = dest_table_storage->checkStructureAndGetMergeTreeData(*this, metadata_snapshot, dest_metadata_snapshot);
|
||||
auto src_data_id = src_data.getStorageID();
|
||||
String partition_id = getPartitionIDFromQuery(partition, query_context);
|
||||
|
||||
|
@ -122,13 +122,13 @@ protected:
|
||||
throw;
|
||||
}
|
||||
|
||||
columns = storage->getColumns();
|
||||
auto metadadata_snapshot = storage->getInMemoryMetadataPtr();
|
||||
columns = metadadata_snapshot->getColumns();
|
||||
|
||||
cols_required_for_partition_key = storage->getColumnsRequiredForPartitionKey();
|
||||
cols_required_for_partition_key = metadadata_snapshot->getColumnsRequiredForPartitionKey();
|
||||
cols_required_for_sorting_key = storage->getColumnsRequiredForSortingKey();
|
||||
cols_required_for_primary_key = storage->getColumnsRequiredForPrimaryKey();
|
||||
cols_required_for_sampling = storage->getColumnsRequiredForSampling();
|
||||
|
||||
column_sizes = storage->getColumnSizes();
|
||||
}
|
||||
|
||||
|
@ -267,6 +267,7 @@ protected:
|
||||
throw;
|
||||
}
|
||||
}
|
||||
auto metadata_snapshot = table->getInMemoryMetadataPtr();
|
||||
|
||||
++rows_count;
|
||||
|
||||
@ -365,7 +366,7 @@ protected:
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
assert(table != nullptr);
|
||||
if ((expression_ptr = table->getPartitionKeyAST()))
|
||||
if ((expression_ptr = metadata_snapshot->getPartitionKeyAST()))
|
||||
res_columns[res_index++]->insert(queryToString(expression_ptr));
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
Loading…
Reference in New Issue
Block a user