From c7f51fe60eb211351b961c587268ba3c03135d03 Mon Sep 17 00:00:00 2001 From: kssenii Date: Sat, 15 May 2021 18:32:20 +0000 Subject: [PATCH] Modulo adjustment for partition key --- src/Functions/modulo.cpp | 2 +- src/Storages/MergeTree/DataPartsExchange.cpp | 8 ++- src/Storages/MergeTree/DataPartsExchange.h | 2 + .../MergeTree/MergeTreeBlockOutputStream.cpp | 2 +- src/Storages/MergeTree/MergeTreeData.cpp | 4 +- .../MergeTree/MergeTreeDataWriter.cpp | 11 +-- src/Storages/MergeTree/MergeTreeDataWriter.h | 2 +- src/Storages/MergeTree/MergeTreePartition.cpp | 69 +++++++++++++++++-- src/Storages/MergeTree/MergeTreePartition.h | 5 +- .../MergeTree/MergeTreeWriteAheadLog.cpp | 4 +- .../MergeTree/MergeTreeWriteAheadLog.h | 2 +- .../ReplicatedMergeTreeBlockOutputStream.cpp | 2 +- src/Storages/StorageReplicatedMergeTree.cpp | 5 +- .../01869_function_modulo_legacy.reference | 1 - .../01869_function_modulo_legacy.sql | 1 - .../01870_modulo_partition_key.reference | 63 +++++++++++++++++ .../01870_modulo_partition_key.sql | 21 ++++++ 17 files changed, 175 insertions(+), 29 deletions(-) create mode 100644 tests/queries/0_stateless/01870_modulo_partition_key.reference create mode 100644 tests/queries/0_stateless/01870_modulo_partition_key.sql diff --git a/src/Functions/modulo.cpp b/src/Functions/modulo.cpp index 5610354d5e5..115f7a23443 100644 --- a/src/Functions/modulo.cpp +++ b/src/Functions/modulo.cpp @@ -140,7 +140,7 @@ void registerFunctionModulo(FunctionFactory & factory) } struct NameModuloLegacy { static constexpr auto name = "moduloLegacy"; }; -using FunctionModuloLegacy = BinaryArithmeticOverloadResolver; +using FunctionModuloLegacy = BinaryArithmeticOverloadResolver; void registerFunctionModuloLegacy(FunctionFactory & factory) { diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index d5f4d91850c..8992cb19d16 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -379,6 +379,7 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name) MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, const String & part_name, const String & replica_path, const String & host, @@ -493,7 +494,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( if (e.code() != ErrorCodes::S3_ERROR) throw; /// Try again but without S3 copy - return fetchPart(metadata_snapshot, part_name, replica_path, host, port, timeouts, + return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts, user, password, interserver_scheme, to_detached, tmp_prefix_, nullptr, false); } } @@ -557,7 +558,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( MergeTreeData::DataPart::Checksums checksums; return part_type == "InMemory" - ? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, std::move(reservation), in, projections) + ? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, std::move(reservation), in, projections) : downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, reservation->getDisk(), in, projections, checksums); } @@ -565,6 +566,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( const String & part_name, const UUID & part_uuid, const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, ReservationPtr reservation, PooledReadWriteBufferFromHTTP & in, size_t projections) @@ -619,7 +621,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( new_data_part->is_temp = true; new_data_part->setColumns(block.getNamesAndTypesList()); new_data_part->minmax_idx.update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey())); - new_data_part->partition.create(metadata_snapshot, block, 0); + new_data_part->partition.create(metadata_snapshot, block, 0, context); MergedBlockOutputStream part_out( new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {})); diff --git a/src/Storages/MergeTree/DataPartsExchange.h b/src/Storages/MergeTree/DataPartsExchange.h index 07b60fde1f1..f59942ef7f4 100644 --- a/src/Storages/MergeTree/DataPartsExchange.h +++ b/src/Storages/MergeTree/DataPartsExchange.h @@ -65,6 +65,7 @@ public: /// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory. MergeTreeData::MutableDataPartPtr fetchPart( const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, const String & part_name, const String & replica_path, const String & host, @@ -106,6 +107,7 @@ private: const String & part_name, const UUID & part_uuid, const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, ReservationPtr reservation, PooledReadWriteBufferFromHTTP & in, size_t projections); diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index 79f935b5f9f..11feb905bb6 100644 --- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -23,7 +23,7 @@ void MergeTreeBlockOutputStream::writePrefix() void MergeTreeBlockOutputStream::write(const Block & block) { - auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot); + auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context); for (auto & current_block : part_blocks) { Stopwatch watch; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 22fe540222e..a4e99ca864a 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -876,13 +876,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) { /// Create and correctly initialize global WAL object write_ahead_log = std::make_shared(*this, disk_ptr, it->name()); - for (auto && part : write_ahead_log->restore(metadata_snapshot)) + for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext())) parts_from_wal.push_back(std::move(part)); } else if (settings->in_memory_parts_enable_wal) { MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name()); - for (auto && part : wal.restore(metadata_snapshot)) + for (auto && part : wal.restore(metadata_snapshot, getContext())) parts_from_wal.push_back(std::move(part)); } } diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 2a85faa7083..387200f7a41 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -140,7 +140,8 @@ void updateTTL( } -BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot) +BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts( + const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) { BlocksWithPartition result; if (!block || !block.rows()) @@ -155,12 +156,12 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block } Block block_copy = block; - const auto & partition_key = metadata_snapshot->getPartitionKey(); - partition_key.expression->execute(block_copy); + /// After expression execution partition key columns will be added to block_copy with names regarding partition function. + auto partition_key_sample_block = MergeTreePartition::executePartitionByExpression(metadata_snapshot, block_copy, context); ColumnRawPtrs partition_columns; - partition_columns.reserve(partition_key.sample_block.columns()); - for (const ColumnWithTypeAndName & element : partition_key.sample_block) + partition_columns.reserve(partition_key_sample_block.columns()); + for (const ColumnWithTypeAndName & element : partition_key_sample_block) partition_columns.emplace_back(block_copy.getByName(element.name).column.get()); PODArray partition_num_to_first_row; diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.h b/src/Storages/MergeTree/MergeTreeDataWriter.h index 4c5b75657ee..bea5e05591e 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.h +++ b/src/Storages/MergeTree/MergeTreeDataWriter.h @@ -40,7 +40,7 @@ public: * (split rows by partition) * Works deterministically: if same block was passed, function will return same result in same order. */ - static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot); + static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context); /** All rows must correspond to same partition. * Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData. diff --git a/src/Storages/MergeTree/MergeTreePartition.cpp b/src/Storages/MergeTree/MergeTreePartition.cpp index 897b868db25..65227e927e1 100644 --- a/src/Storages/MergeTree/MergeTreePartition.cpp +++ b/src/Storages/MergeTree/MergeTreePartition.cpp @@ -134,7 +134,16 @@ void MergeTreePartition::load(const MergeTreeData & storage, const DiskPtr & dis auto file = openForReading(disk, partition_file_path); value.resize(partition_key_sample.columns()); for (size_t i = 0; i < partition_key_sample.columns(); ++i) - partition_key_sample.getByPosition(i).type->getDefaultSerialization()->deserializeBinary(value[i], *file); + { + if (partition_key_sample.getByPosition(i).name.starts_with("modulo")) + { + SerializationNumber().deserializeBinary(value[i], *file); + } + else + { + partition_key_sample.getByPosition(i).type->getDefaultSerialization()->deserializeBinary(value[i], *file); + } + } } void MergeTreePartition::store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const @@ -152,29 +161,75 @@ void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr auto out = disk->writeFile(part_path + "partition.dat"); HashingWriteBuffer out_hashing(*out); for (size_t i = 0; i < value.size(); ++i) - partition_key_sample.getByPosition(i).type->getDefaultSerialization()->serializeBinary(value[i], out_hashing); + { + if (partition_key_sample.getByPosition(i).name.starts_with("modulo")) + { + SerializationNumber().serializeBinary(value[i], out_hashing); + } + else + { + partition_key_sample.getByPosition(i).type->getDefaultSerialization()->serializeBinary(value[i], out_hashing); + } + } + out_hashing.next(); checksums.files["partition.dat"].file_size = out_hashing.count(); checksums.files["partition.dat"].file_hash = out_hashing.getHash(); out->finalize(); } -void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row) +void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context) { if (!metadata_snapshot->hasPartitionKey()) return; - const auto & partition_key = metadata_snapshot->getPartitionKey(); - partition_key.expression->execute(block); - size_t partition_columns_num = partition_key.sample_block.columns(); + auto partition_key_sample_block = executePartitionByExpression(metadata_snapshot, block, context); + size_t partition_columns_num = partition_key_sample_block.columns(); value.resize(partition_columns_num); for (size_t i = 0; i < partition_columns_num; ++i) { - const auto & column_name = partition_key.sample_block.getByPosition(i).name; + const auto & column_name = partition_key_sample_block.getByPosition(i).name; const auto & partition_column = block.getByName(column_name).column; partition_column->get(row, value[i]); } } +Block MergeTreePartition::executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context) +{ + const auto & partition_key = metadata_snapshot->getPartitionKey(); + bool adjusted_expression = false; + auto modulo_to_modulo_legacy = [&](ASTFunction * function_ast) + { + if (function_ast->name == "modulo") + { + function_ast->name = "moduloLegacy"; + adjusted_expression = true; + } + }; + ASTPtr new_ast = partition_key.definition_ast->clone(); + if (auto * function_ast = typeid_cast(new_ast.get())) + { + if (function_ast->name == "tuple") + { + auto children = function_ast->arguments->children; + for (auto child : children) + { + if (auto * child_function_ast = typeid_cast(child.get())) + modulo_to_modulo_legacy(child_function_ast); + } + } + else + modulo_to_modulo_legacy(function_ast); + + if (adjusted_expression) + { + auto adjusted_partition_key = KeyDescription::getKeyFromAST(new_ast, metadata_snapshot->columns, context); + adjusted_partition_key.expression->execute(block); + return adjusted_partition_key.sample_block; + } + } + partition_key.expression->execute(block); + return partition_key.sample_block; +} } diff --git a/src/Storages/MergeTree/MergeTreePartition.h b/src/Storages/MergeTree/MergeTreePartition.h index f89b6f22d4f..6a6225a31ff 100644 --- a/src/Storages/MergeTree/MergeTreePartition.h +++ b/src/Storages/MergeTree/MergeTreePartition.h @@ -40,7 +40,10 @@ public: void assign(const MergeTreePartition & other) { value.assign(other.value); } - void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row); + void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context); + + /// Adjust partition key and execute its expression on block. Return sample block according to used expression. + static Block executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context); }; } diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp index 4c92d4f6136..ba6a53026fb 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp @@ -111,7 +111,7 @@ void MergeTreeWriteAheadLog::rotate(const std::unique_lock &) init(); } -MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot) +MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context) { std::unique_lock lock(write_mutex); @@ -192,7 +192,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor MergedBlockOutputStream part_out(part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {})); part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey())); - part->partition.create(metadata_snapshot, block, 0); + part->partition.create(metadata_snapshot, block, 0, context); if (metadata_snapshot->hasSortingKey()) metadata_snapshot->getSortingKey().expression->execute(block); diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.h b/src/Storages/MergeTree/MergeTreeWriteAheadLog.h index 3b8dc5befc0..e01911aa8b8 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.h +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.h @@ -62,7 +62,7 @@ public: void addPart(DataPartInMemoryPtr & part); void dropPart(const String & part_name); - std::vector restore(const StorageMetadataPtr & metadata_snapshot); + std::vector restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context); using MinMaxBlockNumber = std::pair; static std::optional tryParseMinMaxBlockNumber(const String & filename); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 51a24606dde..7f167f929e5 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -136,7 +136,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) if (quorum) checkQuorumPrecondition(zookeeper); - auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot); + auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context); for (auto & current_block : part_blocks) { diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 973fc291064..546fce0aeea 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2427,7 +2427,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR); part_desc->res_part = fetcher.fetchPart( - metadata_snapshot, part_desc->found_new_part_name, source_replica_path, + metadata_snapshot, getContext(), part_desc->found_new_part_name, source_replica_path, address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, TMP_PREFIX + "fetch_"); /// TODO: check columns_version of fetched part @@ -3880,6 +3880,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora return fetcher.fetchPart( metadata_snapshot, + getContext(), part_name, source_replica_path, address.host, @@ -4035,7 +4036,7 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const ErrorCodes::INTERSERVER_SCHEME_DOESNT_MATCH); return fetcher.fetchPart( - metadata_snapshot, part_name, source_replica_path, + metadata_snapshot, getContext(), part_name, source_replica_path, address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, "", nullptr, true, replaced_disk); diff --git a/tests/queries/0_stateless/01869_function_modulo_legacy.reference b/tests/queries/0_stateless/01869_function_modulo_legacy.reference index 014b45dae5e..f6c9a31de37 100644 --- a/tests/queries/0_stateless/01869_function_modulo_legacy.reference +++ b/tests/queries/0_stateless/01869_function_modulo_legacy.reference @@ -1,3 +1,2 @@ 199 -99 57 diff --git a/tests/queries/0_stateless/01869_function_modulo_legacy.sql b/tests/queries/0_stateless/01869_function_modulo_legacy.sql index 6609b545e48..66e2edddd0c 100644 --- a/tests/queries/0_stateless/01869_function_modulo_legacy.sql +++ b/tests/queries/0_stateless/01869_function_modulo_legacy.sql @@ -1,3 +1,2 @@ SELECT moduloLegacy(199, 200); -SELECT moduloLegacy(299, 200); SELECT moduloLegacy(-199, 200); diff --git a/tests/queries/0_stateless/01870_modulo_partition_key.reference b/tests/queries/0_stateless/01870_modulo_partition_key.reference new file mode 100644 index 00000000000..6011efb10e8 --- /dev/null +++ b/tests/queries/0_stateless/01870_modulo_partition_key.reference @@ -0,0 +1,63 @@ +simple partition key: +-61 +-60 +-59 +-58 +-57 +-5 +-4 +-3 +-2 +-1 +0 +0 +1 +2 +3 +4 +57 +58 +59 +60 +complex partition key: +(-1,-1) +(-1,0) +(-2,-2) +(-2,-3) +(-2,59) +(-2,60) +(0,-4) +(0,-5) +(0,-57) +(0,-58) +(0,4) +(0,57) +(0,58) +(1,-61) +(1,0) +(1,1) +(2,-59) +(2,-60) +(2,2) +(2,3) +comparison: +0 -205 -5 -5 +1 -204 -4 -4 +2 -203 -3 -3 +3 -202 -2 -2 +4 -201 -1 -1 +5 -200 0 0 +6 -199 57 -199 +7 -198 58 -198 +8 -197 59 -197 +9 -196 60 -196 +400 195 -61 195 +401 196 -60 196 +402 197 -59 197 +403 198 -58 198 +404 199 -57 199 +405 200 0 0 +406 201 1 1 +407 202 2 2 +408 203 3 3 +409 204 4 4 diff --git a/tests/queries/0_stateless/01870_modulo_partition_key.sql b/tests/queries/0_stateless/01870_modulo_partition_key.sql new file mode 100644 index 00000000000..271151e5184 --- /dev/null +++ b/tests/queries/0_stateless/01870_modulo_partition_key.sql @@ -0,0 +1,21 @@ +SELECT 'simple partition key:'; +DROP TABLE IF EXISTS table1; +CREATE TABLE table1 (id Int64, v UInt64) +ENGINE = ReplicatedReplacingMergeTree('/clickhouse/test/tables/table', '1', v) +PARTITION BY id % 200 ORDER BY id; +INSERT INTO table1 SELECT number-205, number FROM numbers(10); +INSERT INTO table1 SELECT number-205, number FROM numbers(400, 10); +SELECT toInt64(partition) as p FROM system.parts WHERE table='table1' ORDER BY p; + +SELECT 'complex partition key:'; +DROP TABLE IF EXISTS table2; +CREATE TABLE table2 (id Int64, v UInt64) +ENGINE = MergeTree() +PARTITION BY (toInt32(id / 2) % 3, id % 200) ORDER BY id; +INSERT INTO table2 SELECT number-205, number FROM numbers(10); +INSERT INTO table2 SELECT number-205, number FROM numbers(400, 10); +SELECT partition as p FROM system.parts WHERE table='table2' ORDER BY p; + +SELECT 'comparison:'; +SELECT v, v-205 as vv, moduloLegacy(vv, 200), modulo(vv, 200) FROM table1 ORDER BY v; +