diff --git a/src/Storages/KeyDescription.cpp b/src/Storages/KeyDescription.cpp index 721663d3ca5..534195dd0f0 100644 --- a/src/Storages/KeyDescription.cpp +++ b/src/Storages/KeyDescription.cpp @@ -2,7 +2,6 @@ #include #include -#include #include #include #include @@ -87,25 +86,28 @@ KeyDescription KeyDescription::getKeyFromAST( return getSortingKeyFromAST(definition_ast, columns, context, {}); } -void KeyDescription::moduloToModuloLegacyRecursive(ASTPtr node_expr) +bool KeyDescription::moduloToModuloLegacyRecursive(ASTPtr node_expr) { if (!node_expr) return; auto * function_expr = node_expr->as(); + bool modulo_in_ast = false; if (function_expr) { if (function_expr->name == "modulo") { function_expr->name = "moduloLegacy"; + modulo_in_ast = true; } if (function_expr->arguments) { auto children = function_expr->arguments->children; for (const auto & child : children) - moduloToModuloLegacyRecursive(child); + modulo_in_ast |= moduloToModuloLegacyRecursive(child); } } + return modulo_in_ast; } KeyDescription KeyDescription::getSortingKeyFromAST( @@ -115,11 +117,8 @@ KeyDescription KeyDescription::getSortingKeyFromAST( const std::optional & additional_column) { KeyDescription result; - ASTPtr adjusted_ast = definition_ast; - moduloToModuloLegacyRecursive(adjusted_ast); - - result.definition_ast = adjusted_ast; - result.expression_list_ast = extractKeyExpressionList(adjusted_ast); + result.definition_ast = definition_ast; + result.expression_list_ast = extractKeyExpressionList(definition_ast); if (additional_column) { diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index d5f4d91850c..8992cb19d16 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -379,6 +379,7 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name) MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, const String & part_name, const String & replica_path, const String & host, @@ -493,7 +494,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( if (e.code() != ErrorCodes::S3_ERROR) throw; /// Try again but without S3 copy - return fetchPart(metadata_snapshot, part_name, replica_path, host, port, timeouts, + return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts, user, password, interserver_scheme, to_detached, tmp_prefix_, nullptr, false); } } @@ -557,7 +558,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( MergeTreeData::DataPart::Checksums checksums; return part_type == "InMemory" - ? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, std::move(reservation), in, projections) + ? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, std::move(reservation), in, projections) : downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, reservation->getDisk(), in, projections, checksums); } @@ -565,6 +566,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( const String & part_name, const UUID & part_uuid, const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, ReservationPtr reservation, PooledReadWriteBufferFromHTTP & in, size_t projections) @@ -619,7 +621,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( new_data_part->is_temp = true; new_data_part->setColumns(block.getNamesAndTypesList()); new_data_part->minmax_idx.update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey())); - new_data_part->partition.create(metadata_snapshot, block, 0); + new_data_part->partition.create(metadata_snapshot, block, 0, context); MergedBlockOutputStream part_out( new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {})); diff --git a/src/Storages/MergeTree/DataPartsExchange.h b/src/Storages/MergeTree/DataPartsExchange.h index 07b60fde1f1..f59942ef7f4 100644 --- a/src/Storages/MergeTree/DataPartsExchange.h +++ b/src/Storages/MergeTree/DataPartsExchange.h @@ -65,6 +65,7 @@ public: /// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory. MergeTreeData::MutableDataPartPtr fetchPart( const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, const String & part_name, const String & replica_path, const String & host, @@ -106,6 +107,7 @@ private: const String & part_name, const UUID & part_uuid, const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, ReservationPtr reservation, PooledReadWriteBufferFromHTTP & in, size_t projections); diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index 79f935b5f9f..11feb905bb6 100644 --- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -23,7 +23,7 @@ void MergeTreeBlockOutputStream::writePrefix() void MergeTreeBlockOutputStream::write(const Block & block) { - auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot); + auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context); for (auto & current_block : part_blocks) { Stopwatch watch; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 51077996212..01b23b8ccda 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -772,7 +772,7 @@ std::optional MergeTreeData::totalRowsByPartitionPredicateImpl( // Generate valid expressions for filtering bool valid = VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, local_context, virtual_columns_block, expression_ast); - PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, local_context, true /* strict */); + PartitionPruner partition_pruner(metadata_snapshot, query_info, local_context, true /* strict */); if (partition_pruner.isUseless() && !valid) return {}; @@ -876,13 +876,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) { /// Create and correctly initialize global WAL object write_ahead_log = std::make_shared(*this, disk_ptr, it->name()); - for (auto && part : write_ahead_log->restore(metadata_snapshot)) + for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext())) parts_from_wal.push_back(std::move(part)); } else if (settings->in_memory_parts_enable_wal) { MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name()); - for (auto && part : wal.restore(metadata_snapshot)) + for (auto && part : wal.restore(metadata_snapshot, getContext())) parts_from_wal.push_back(std::move(part)); } } diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 0f5e69448d2..5f51358a7d2 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -503,7 +503,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( minmax_idx_condition.emplace( query_info, context, minmax_columns_names, data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context))); - partition_pruner.emplace(metadata_snapshot_base->getPartitionKey(), query_info, context, false /* strict */); + partition_pruner.emplace(metadata_snapshot_base, query_info, context, false /* strict */); if (settings.force_index_by_date && (minmax_idx_condition->alwaysUnknownOrTrue() && partition_pruner->isUseless())) { diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 2a85faa7083..387200f7a41 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -140,7 +140,8 @@ void updateTTL( } -BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot) +BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts( + const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) { BlocksWithPartition result; if (!block || !block.rows()) @@ -155,12 +156,12 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block } Block block_copy = block; - const auto & partition_key = metadata_snapshot->getPartitionKey(); - partition_key.expression->execute(block_copy); + /// After expression execution partition key columns will be added to block_copy with names regarding partition function. + auto partition_key_sample_block = MergeTreePartition::executePartitionByExpression(metadata_snapshot, block_copy, context); ColumnRawPtrs partition_columns; - partition_columns.reserve(partition_key.sample_block.columns()); - for (const ColumnWithTypeAndName & element : partition_key.sample_block) + partition_columns.reserve(partition_key_sample_block.columns()); + for (const ColumnWithTypeAndName & element : partition_key_sample_block) partition_columns.emplace_back(block_copy.getByName(element.name).column.get()); PODArray partition_num_to_first_row; diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.h b/src/Storages/MergeTree/MergeTreeDataWriter.h index 4c5b75657ee..bea5e05591e 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.h +++ b/src/Storages/MergeTree/MergeTreeDataWriter.h @@ -40,7 +40,7 @@ public: * (split rows by partition) * Works deterministically: if same block was passed, function will return same result in same order. */ - static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot); + static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context); /** All rows must correspond to same partition. * Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData. diff --git a/src/Storages/MergeTree/MergeTreePartition.cpp b/src/Storages/MergeTree/MergeTreePartition.cpp index 848c676c6b1..8c21e5ca411 100644 --- a/src/Storages/MergeTree/MergeTreePartition.cpp +++ b/src/Storages/MergeTree/MergeTreePartition.cpp @@ -169,22 +169,55 @@ void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr out->finalize(); } -void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row) +void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context) { if (!metadata_snapshot->hasPartitionKey()) return; - const auto & partition_key = metadata_snapshot->getPartitionKey(); - partition_key.expression->execute(block); - size_t partition_columns_num = partition_key.sample_block.columns(); + auto partition_key_sample_block = executePartitionByExpression(metadata_snapshot, block, context); + size_t partition_columns_num = partition_key_sample_block.columns(); value.resize(partition_columns_num); + const String modulo_legacy_function_name = "moduloLegacy"; for (size_t i = 0; i < partition_columns_num; ++i) { - const auto & column_name = partition_key.sample_block.getByPosition(i).name; - const auto & partition_column = block.getByName(column_name).column; - partition_column->get(row, value[i]); + const auto & column_name = partition_key_sample_block.getByPosition(i).name; + auto & partition_column = block.getByName(column_name); + + /// Executing partition_by expression adds new columns to passed block according to partition functions. + /// The block is passed by reference and is used afterwards. `moduloLegacy` needs to be substituted back + /// with just `modulo`, because it was a temporary substitution. + if (column_name.starts_with(modulo_legacy_function_name)) + partition_column.name = "modulo" + partition_column.name.substr(modulo_legacy_function_name.size()); + + partition_column.column->get(row, value[i]); } } +Block MergeTreePartition::executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context) +{ + auto adjusted_partition_key = adjustPartitionKey(metadata_snapshot, context); + adjusted_partition_key.expression->execute(block); + return adjusted_partition_key.sample_block; +} + +KeyDescription MergeTreePartition::adjustPartitionKey(const StorageMetadataPtr & metadata_snapshot, ContextPtr context) +{ + const auto & partition_key = metadata_snapshot->getPartitionKey(); + if (!partition_key.definition_ast) + return partition_key; + + ASTPtr ast_copy = partition_key.definition_ast->clone(); + + /// Implementation of modulo function was changed from 8bit result type to 16bit. For backward compatibility partition by expression is always + /// calculated according to previous version - `moduloLegacy`. + if (KeyDescription::moduloToModuloLegacyRecursive(ast_copy)) + { + auto adjusted_partition_key = KeyDescription::getKeyFromAST(ast_copy, metadata_snapshot->columns, context); + return adjusted_partition_key; + } + + return partition_key; +} + } diff --git a/src/Storages/MergeTree/MergeTreePartition.h b/src/Storages/MergeTree/MergeTreePartition.h index a5c27cef18a..5ee20718c16 100644 --- a/src/Storages/MergeTree/MergeTreePartition.h +++ b/src/Storages/MergeTree/MergeTreePartition.h @@ -41,7 +41,13 @@ public: void assign(const MergeTreePartition & other) { value.assign(other.value); } - void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row); + void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context); + + /// Adjust partition key and execute its expression on block. Return sample block according to used expression. + static Block executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context); + + /// Make a modified partition key with substitution from modulo to moduloLegacy. Used in paritionPruner. + static KeyDescription adjustPartitionKey(const StorageMetadataPtr & partition_key, ContextPtr context); }; } diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp index 4c92d4f6136..ba6a53026fb 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp @@ -111,7 +111,7 @@ void MergeTreeWriteAheadLog::rotate(const std::unique_lock &) init(); } -MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot) +MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context) { std::unique_lock lock(write_mutex); @@ -192,7 +192,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor MergedBlockOutputStream part_out(part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {})); part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey())); - part->partition.create(metadata_snapshot, block, 0); + part->partition.create(metadata_snapshot, block, 0, context); if (metadata_snapshot->hasSortingKey()) metadata_snapshot->getSortingKey().expression->execute(block); diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.h b/src/Storages/MergeTree/MergeTreeWriteAheadLog.h index 3b8dc5befc0..e01911aa8b8 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.h +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.h @@ -62,7 +62,7 @@ public: void addPart(DataPartInMemoryPtr & part); void dropPart(const String & part_name); - std::vector restore(const StorageMetadataPtr & metadata_snapshot); + std::vector restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context); using MinMaxBlockNumber = std::pair; static std::optional tryParseMinMaxBlockNumber(const String & filename); diff --git a/src/Storages/MergeTree/PartitionPruner.h b/src/Storages/MergeTree/PartitionPruner.h index 98c947bb0ca..fbed0e6ab99 100644 --- a/src/Storages/MergeTree/PartitionPruner.h +++ b/src/Storages/MergeTree/PartitionPruner.h @@ -14,15 +14,18 @@ class PartitionPruner { private: std::unordered_map partition_filter_map; - const KeyDescription & partition_key; + + /// partition_key is adjusted here (with substitution from modulo to moduloLegacy). + KeyDescription partition_key; + KeyCondition partition_condition; bool useless; using DataPart = IMergeTreeDataPart; using DataPartPtr = std::shared_ptr; public: - PartitionPruner(const KeyDescription & partition_key_, const SelectQueryInfo & query_info, ContextPtr context, bool strict) - : partition_key(partition_key_) + PartitionPruner(const StorageMetadataPtr & metadata, const SelectQueryInfo & query_info, ContextPtr context, bool strict) + : partition_key(MergeTreePartition::adjustPartitionKey(metadata, context)) , partition_condition( query_info, context, partition_key.column_names, partition_key.expression, true /* single_point */, strict) , useless(strict ? partition_condition.anyUnknownOrAlwaysTrue() : partition_condition.alwaysUnknownOrTrue()) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 51a24606dde..7f167f929e5 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -136,7 +136,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) if (quorum) checkQuorumPrecondition(zookeeper); - auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot); + auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context); for (auto & current_block : part_blocks) { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp index 28e8b07ab6c..db1c2bc89af 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp @@ -206,18 +206,16 @@ void ReplicatedMergeTreeTableMetadata::checkImmutableFieldsEquals(const Replicat ", local: " + DB::toString(data_format_version.toUnderType()), ErrorCodes::METADATA_MISMATCH); - //if (partition_key != from_zk.partition_key) - // throw Exception( - // "Existing table metadata in ZooKeeper differs in partition key expression." - // " Stored in ZooKeeper: " + from_zk.partition_key + ", local: " + partition_key, - // ErrorCodes::METADATA_MISMATCH); + if (partition_key != from_zk.partition_key) + throw Exception( + "Existing table metadata in ZooKeeper differs in partition key expression." + " Stored in ZooKeeper: " + from_zk.partition_key + ", local: " + partition_key, + ErrorCodes::METADATA_MISMATCH); } void ReplicatedMergeTreeTableMetadata::checkEquals(const ReplicatedMergeTreeTableMetadata & from_zk, const ColumnsDescription & columns, ContextPtr context) const { - std::cerr << "\n\nKSSENII partitition key: " << partition_key << ", in zk: " << from_zk.partition_key << std::endl; - //String parsed = IndicesDescription::parse(from_zk.partition_key, columns, context).toString(); checkImmutableFieldsEquals(from_zk); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index d04dc46ea83..74c86e00112 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2466,7 +2466,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR); part_desc->res_part = fetcher.fetchPart( - metadata_snapshot, part_desc->found_new_part_name, source_replica_path, + metadata_snapshot, getContext(), part_desc->found_new_part_name, source_replica_path, address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, TMP_PREFIX + "fetch_"); /// TODO: check columns_version of fetched part @@ -3929,6 +3929,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora return fetcher.fetchPart( metadata_snapshot, + getContext(), part_name, source_replica_path, address.host, @@ -4084,7 +4085,7 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const ErrorCodes::INTERSERVER_SCHEME_DOESNT_MATCH); return fetcher.fetchPart( - metadata_snapshot, part_name, source_replica_path, + metadata_snapshot, getContext(), part_name, source_replica_path, address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, "", nullptr, true, replaced_disk);