diff --git a/src/Storages/KeyDescription.cpp b/src/Storages/KeyDescription.cpp index be327313b4d..de4fda36622 100644 --- a/src/Storages/KeyDescription.cpp +++ b/src/Storages/KeyDescription.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -86,6 +87,27 @@ KeyDescription KeyDescription::getKeyFromAST( return getSortingKeyFromAST(definition_ast, columns, context, {}); } +static void moduloToModuloLegacyRecursive(ASTPtr node_expr) +{ + if (!node_expr) + return; + + auto * function_expr = node_expr->as(); + if (function_expr) + { + if (function_expr->name == "modulo") + { + function_expr->name = "moduloLegacy"; + } + if (function_expr->arguments) + { + auto children = function_expr->arguments->children; + for (const auto & child : children) + moduloToModuloLegacyRecursive(child); + } + } +} + KeyDescription KeyDescription::getSortingKeyFromAST( const ASTPtr & definition_ast, const ColumnsDescription & columns, @@ -93,8 +115,11 @@ KeyDescription KeyDescription::getSortingKeyFromAST( const std::optional & additional_column) { KeyDescription result; - result.definition_ast = definition_ast; - result.expression_list_ast = extractKeyExpressionList(definition_ast); + ASTPtr adjusted_ast = definition_ast; + moduloToModuloLegacyRecursive(adjusted_ast); + + result.definition_ast = adjusted_ast; + result.expression_list_ast = extractKeyExpressionList(adjusted_ast); if (additional_column) { diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 8992cb19d16..d5f4d91850c 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -379,7 +379,6 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name) MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( const StorageMetadataPtr & metadata_snapshot, - ContextPtr context, const String & part_name, const String & replica_path, const String & host, @@ -494,7 +493,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( if (e.code() != ErrorCodes::S3_ERROR) throw; /// Try again but without S3 copy - return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts, + return fetchPart(metadata_snapshot, part_name, replica_path, host, port, timeouts, user, password, interserver_scheme, to_detached, tmp_prefix_, nullptr, false); } } @@ -558,7 +557,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( MergeTreeData::DataPart::Checksums checksums; return part_type == "InMemory" - ? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, std::move(reservation), in, projections) + ? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, std::move(reservation), in, projections) : downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, reservation->getDisk(), in, projections, checksums); } @@ -566,7 +565,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( const String & part_name, const UUID & part_uuid, const StorageMetadataPtr & metadata_snapshot, - ContextPtr context, ReservationPtr reservation, PooledReadWriteBufferFromHTTP & in, size_t projections) @@ -621,7 +619,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( new_data_part->is_temp = true; new_data_part->setColumns(block.getNamesAndTypesList()); new_data_part->minmax_idx.update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey())); - new_data_part->partition.create(metadata_snapshot, block, 0, context); + new_data_part->partition.create(metadata_snapshot, block, 0); MergedBlockOutputStream part_out( new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {})); diff --git a/src/Storages/MergeTree/DataPartsExchange.h b/src/Storages/MergeTree/DataPartsExchange.h index f59942ef7f4..07b60fde1f1 100644 --- a/src/Storages/MergeTree/DataPartsExchange.h +++ b/src/Storages/MergeTree/DataPartsExchange.h @@ -65,7 +65,6 @@ public: /// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory. MergeTreeData::MutableDataPartPtr fetchPart( const StorageMetadataPtr & metadata_snapshot, - ContextPtr context, const String & part_name, const String & replica_path, const String & host, @@ -107,7 +106,6 @@ private: const String & part_name, const UUID & part_uuid, const StorageMetadataPtr & metadata_snapshot, - ContextPtr context, ReservationPtr reservation, PooledReadWriteBufferFromHTTP & in, size_t projections); diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index 11feb905bb6..79f935b5f9f 100644 --- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -23,7 +23,7 @@ void MergeTreeBlockOutputStream::writePrefix() void MergeTreeBlockOutputStream::write(const Block & block) { - auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context); + auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot); for (auto & current_block : part_blocks) { Stopwatch watch; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 01b23b8ccda..c6e40be08ae 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -772,7 +772,7 @@ std::optional MergeTreeData::totalRowsByPartitionPredicateImpl( // Generate valid expressions for filtering bool valid = VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, local_context, virtual_columns_block, expression_ast); - PartitionPruner partition_pruner(metadata_snapshot, query_info, local_context, true /* strict */); + PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, local_context, true /* strict */); if (partition_pruner.isUseless() && !valid) return {}; @@ -876,13 +876,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) { /// Create and correctly initialize global WAL object write_ahead_log = std::make_shared(*this, disk_ptr, it->name()); - for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext())) + for (auto && part : write_ahead_log->restore(metadata_snapshot)) parts_from_wal.push_back(std::move(part)); } else if (settings->in_memory_parts_enable_wal) { MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name()); - for (auto && part : wal.restore(metadata_snapshot, getContext())) + for (auto && part : wal.restore(metadata_snapshot)) parts_from_wal.push_back(std::move(part)); } } @@ -2350,7 +2350,7 @@ MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSet(c DataPartsVector parts_to_remove; if (drop_range.min_block > drop_range.max_block) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid drop range: {}", drop_range.getPartName()); + return parts_to_remove; auto partition_range = getDataPartsPartitionRange(drop_range.partition_id); diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 5f51358a7d2..0f5e69448d2 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -503,7 +503,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( minmax_idx_condition.emplace( query_info, context, minmax_columns_names, data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context))); - partition_pruner.emplace(metadata_snapshot_base, query_info, context, false /* strict */); + partition_pruner.emplace(metadata_snapshot_base->getPartitionKey(), query_info, context, false /* strict */); if (settings.force_index_by_date && (minmax_idx_condition->alwaysUnknownOrTrue() && partition_pruner->isUseless())) { diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 387200f7a41..2a85faa7083 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -140,8 +140,7 @@ void updateTTL( } -BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts( - const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) +BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot) { BlocksWithPartition result; if (!block || !block.rows()) @@ -156,12 +155,12 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts( } Block block_copy = block; - /// After expression execution partition key columns will be added to block_copy with names regarding partition function. - auto partition_key_sample_block = MergeTreePartition::executePartitionByExpression(metadata_snapshot, block_copy, context); + const auto & partition_key = metadata_snapshot->getPartitionKey(); + partition_key.expression->execute(block_copy); ColumnRawPtrs partition_columns; - partition_columns.reserve(partition_key_sample_block.columns()); - for (const ColumnWithTypeAndName & element : partition_key_sample_block) + partition_columns.reserve(partition_key.sample_block.columns()); + for (const ColumnWithTypeAndName & element : partition_key.sample_block) partition_columns.emplace_back(block_copy.getByName(element.name).column.get()); PODArray partition_num_to_first_row; diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.h b/src/Storages/MergeTree/MergeTreeDataWriter.h index bea5e05591e..4c5b75657ee 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.h +++ b/src/Storages/MergeTree/MergeTreeDataWriter.h @@ -40,7 +40,7 @@ public: * (split rows by partition) * Works deterministically: if same block was passed, function will return same result in same order. */ - static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context); + static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot); /** All rows must correspond to same partition. * Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData. diff --git a/src/Storages/MergeTree/MergeTreePartition.cpp b/src/Storages/MergeTree/MergeTreePartition.cpp index 471905c581d..61cc7d17545 100644 --- a/src/Storages/MergeTree/MergeTreePartition.cpp +++ b/src/Storages/MergeTree/MergeTreePartition.cpp @@ -169,79 +169,22 @@ void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr out->finalize(); } -void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context) +void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row) { if (!metadata_snapshot->hasPartitionKey()) return; - auto partition_key_sample_block = executePartitionByExpression(metadata_snapshot, block, context); - size_t partition_columns_num = partition_key_sample_block.columns(); + const auto & partition_key = metadata_snapshot->getPartitionKey(); + partition_key.expression->execute(block); + size_t partition_columns_num = partition_key.sample_block.columns(); value.resize(partition_columns_num); - const String modulo_legacy_function_name = "moduloLegacy"; for (size_t i = 0; i < partition_columns_num; ++i) { - const auto & column_name = partition_key_sample_block.getByPosition(i).name; - auto & partition_column = block.getByName(column_name); - - /// Executing partition_by expression adds new columns to passed block according to partition functions. - /// The block is passed by reference and is used afterwards. `moduloLegacy` needs to be substituted back - /// with just `modulo`, because it was a temporary substitution. - if (column_name.starts_with(modulo_legacy_function_name)) - partition_column.name = "modulo" + partition_column.name.substr(modulo_legacy_function_name.size()); - - partition_column.column->get(row, value[i]); + const auto & column_name = partition_key.sample_block.getByPosition(i).name; + auto & partition_column = block.getByName(column_name).column; + partition_column->get(row, value[i]); } } -static bool moduloToModuloLegacyRecursive(ASTPtr node_expr) -{ - if (!node_expr) - return false; - - auto * function_expr = node_expr->as(); - bool modulo_in_partition_key = false; - if (function_expr) - { - if (function_expr->name == "modulo") - { - function_expr->name = "moduloLegacy"; - modulo_in_partition_key = true; - } - if (function_expr->arguments) - { - auto children = function_expr->arguments->children; - for (const auto & child : children) - modulo_in_partition_key |= moduloToModuloLegacyRecursive(child); - } - } - return modulo_in_partition_key; -} - -Block MergeTreePartition::executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context) -{ - auto adjusted_partition_key = adjustPartitionKey(metadata_snapshot, context); - adjusted_partition_key.expression->execute(block); - return adjusted_partition_key.sample_block; -} - -KeyDescription MergeTreePartition::adjustPartitionKey(const StorageMetadataPtr & metadata_snapshot, ContextPtr context) -{ - const auto & partition_key = metadata_snapshot->getPartitionKey(); - if (!partition_key.definition_ast) - return partition_key; - - ASTPtr ast_copy = partition_key.definition_ast->clone(); - - /// Implementation of modulo function was changed from 8bit result type to 16bit. For backward compatibility partition by expression is always - /// calculated according to previous version - `moduloLegacy`. - if (moduloToModuloLegacyRecursive(ast_copy)) - { - auto adjusted_partition_key = KeyDescription::getKeyFromAST(ast_copy, metadata_snapshot->columns, context); - return adjusted_partition_key; - } - - return partition_key; -} - } diff --git a/src/Storages/MergeTree/MergeTreePartition.h b/src/Storages/MergeTree/MergeTreePartition.h index 5ee20718c16..a5c27cef18a 100644 --- a/src/Storages/MergeTree/MergeTreePartition.h +++ b/src/Storages/MergeTree/MergeTreePartition.h @@ -41,13 +41,7 @@ public: void assign(const MergeTreePartition & other) { value.assign(other.value); } - void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context); - - /// Adjust partition key and execute its expression on block. Return sample block according to used expression. - static Block executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context); - - /// Make a modified partition key with substitution from modulo to moduloLegacy. Used in paritionPruner. - static KeyDescription adjustPartitionKey(const StorageMetadataPtr & partition_key, ContextPtr context); + void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row); }; } diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp index ba6a53026fb..4c92d4f6136 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp @@ -111,7 +111,7 @@ void MergeTreeWriteAheadLog::rotate(const std::unique_lock &) init(); } -MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context) +MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot) { std::unique_lock lock(write_mutex); @@ -192,7 +192,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor MergedBlockOutputStream part_out(part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {})); part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey())); - part->partition.create(metadata_snapshot, block, 0, context); + part->partition.create(metadata_snapshot, block, 0); if (metadata_snapshot->hasSortingKey()) metadata_snapshot->getSortingKey().expression->execute(block); diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.h b/src/Storages/MergeTree/MergeTreeWriteAheadLog.h index e01911aa8b8..3b8dc5befc0 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.h +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.h @@ -62,7 +62,7 @@ public: void addPart(DataPartInMemoryPtr & part); void dropPart(const String & part_name); - std::vector restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context); + std::vector restore(const StorageMetadataPtr & metadata_snapshot); using MinMaxBlockNumber = std::pair; static std::optional tryParseMinMaxBlockNumber(const String & filename); diff --git a/src/Storages/MergeTree/PartitionPruner.h b/src/Storages/MergeTree/PartitionPruner.h index fbed0e6ab99..98c947bb0ca 100644 --- a/src/Storages/MergeTree/PartitionPruner.h +++ b/src/Storages/MergeTree/PartitionPruner.h @@ -14,18 +14,15 @@ class PartitionPruner { private: std::unordered_map partition_filter_map; - - /// partition_key is adjusted here (with substitution from modulo to moduloLegacy). - KeyDescription partition_key; - + const KeyDescription & partition_key; KeyCondition partition_condition; bool useless; using DataPart = IMergeTreeDataPart; using DataPartPtr = std::shared_ptr; public: - PartitionPruner(const StorageMetadataPtr & metadata, const SelectQueryInfo & query_info, ContextPtr context, bool strict) - : partition_key(MergeTreePartition::adjustPartitionKey(metadata, context)) + PartitionPruner(const KeyDescription & partition_key_, const SelectQueryInfo & query_info, ContextPtr context, bool strict) + : partition_key(partition_key_) , partition_condition( query_info, context, partition_key.column_names, partition_key.expression, true /* single_point */, strict) , useless(strict ? partition_condition.anyUnknownOrAlwaysTrue() : partition_condition.alwaysUnknownOrTrue()) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 7f167f929e5..51a24606dde 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -136,7 +136,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) if (quorum) checkQuorumPrecondition(zookeeper); - auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context); + auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot); for (auto & current_block : part_blocks) { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp index db1c2bc89af..28e8b07ab6c 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp @@ -206,16 +206,18 @@ void ReplicatedMergeTreeTableMetadata::checkImmutableFieldsEquals(const Replicat ", local: " + DB::toString(data_format_version.toUnderType()), ErrorCodes::METADATA_MISMATCH); - if (partition_key != from_zk.partition_key) - throw Exception( - "Existing table metadata in ZooKeeper differs in partition key expression." - " Stored in ZooKeeper: " + from_zk.partition_key + ", local: " + partition_key, - ErrorCodes::METADATA_MISMATCH); + //if (partition_key != from_zk.partition_key) + // throw Exception( + // "Existing table metadata in ZooKeeper differs in partition key expression." + // " Stored in ZooKeeper: " + from_zk.partition_key + ", local: " + partition_key, + // ErrorCodes::METADATA_MISMATCH); } void ReplicatedMergeTreeTableMetadata::checkEquals(const ReplicatedMergeTreeTableMetadata & from_zk, const ColumnsDescription & columns, ContextPtr context) const { + std::cerr << "\n\nKSSENII partitition key: " << partition_key << ", in zk: " << from_zk.partition_key << std::endl; + //String parsed = IndicesDescription::parse(from_zk.partition_key, columns, context).toString(); checkImmutableFieldsEquals(from_zk); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 74c86e00112..d04dc46ea83 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2466,7 +2466,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR); part_desc->res_part = fetcher.fetchPart( - metadata_snapshot, getContext(), part_desc->found_new_part_name, source_replica_path, + metadata_snapshot, part_desc->found_new_part_name, source_replica_path, address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, TMP_PREFIX + "fetch_"); /// TODO: check columns_version of fetched part @@ -3929,7 +3929,6 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora return fetcher.fetchPart( metadata_snapshot, - getContext(), part_name, source_replica_path, address.host, @@ -4085,7 +4084,7 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const ErrorCodes::INTERSERVER_SCHEME_DOESNT_MATCH); return fetcher.fetchPart( - metadata_snapshot, getContext(), part_name, source_replica_path, + metadata_snapshot, part_name, source_replica_path, address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, "", nullptr, true, replaced_disk);