From 3e2f41f3010ca7b68762518738a35dee0f84f8e0 Mon Sep 17 00:00:00 2001 From: Kirill Nikiforov Date: Wed, 27 Mar 2024 18:35:44 +0300 Subject: [PATCH 01/24] support ATTACH PARTITION `ALL` FROM `TABLE` --- src/Storages/MergeTree/MergeTreeData.cpp | 4 +- src/Storages/StorageMergeTree.cpp | 19 +- src/Storages/StorageReplicatedMergeTree.cpp | 451 +++++++++--------- ...626_replace_partition_from_table.reference | 11 +- .../00626_replace_partition_from_table.sql | 6 +- ...e_partition_from_table_zookeeper.reference | 1 + ..._replace_partition_from_table_zookeeper.sh | 9 + 7 files changed, 273 insertions(+), 228 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 6cc8063d90a..7cdb18ce0b9 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -4876,7 +4876,7 @@ void MergeTreeData::checkAlterPartitionIsPossible( const auto * partition_ast = command.partition->as(); if (partition_ast && partition_ast->all) { - if (command.type != PartitionCommand::DROP_PARTITION && command.type != PartitionCommand::ATTACH_PARTITION) + if (command.type != PartitionCommand::DROP_PARTITION && command.type != PartitionCommand::ATTACH_PARTITION && (command.type == PartitionCommand::REPLACE_PARTITION && command.replace)) throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently"); } else @@ -5625,7 +5625,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc const auto & partition_ast = ast->as(); if (partition_ast.all) - throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only Support DETACH PARTITION ALL currently"); + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only Support DROP/DETACH/ATTACH PARTITION ALL currently"); if (!partition_ast.value) { diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 788f250e6a9..821b02b5b5f 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -2078,9 +2078,21 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con ProfileEventsScope profile_events_scope; MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, my_metadata_snapshot); - String partition_id = getPartitionIDFromQuery(partition, local_context); + DataPartsVector src_parts; + String partition_id; + bool is_all = partition->as()->all; + if (is_all) + { + if (replace) + throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently"); + + src_parts = src_data.getVisibleDataPartsVector(local_context); + } else + { + partition_id = getPartitionIDFromQuery(partition, local_context); + src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id); + } - DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id); MutableDataPartsVector dst_parts; std::vector dst_parts_locks; @@ -2088,6 +2100,9 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con for (const DataPartPtr & src_part : src_parts) { + if (is_all) + partition_id = src_part->partition.getID(src_data); + if (!canReplacePartition(src_part)) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot replace partition '{}' because part '{}' has inconsistent granularity with table", diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ce6735d9176..7d7ab712163 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -7961,232 +7961,247 @@ void StorageReplicatedMergeTree::replacePartitionFrom( ProfileEventsScope profile_events_scope; MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot); - String partition_id = getPartitionIDFromQuery(partition, query_context); - /// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet. - DataPartsVector src_all_parts = src_data.getVisibleDataPartsVectorInPartition(query_context, partition_id); - - LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size()); - - static const String TMP_PREFIX = "tmp_replace_from_"; + const String TMP_PREFIX = "tmp_replace_from_"; auto zookeeper = getZooKeeper(); - /// Retry if alter_partition_version changes - for (size_t retry = 0; retry < 1000; ++retry) + std::unordered_set partitions; + if (partition->as()->all) { - DataPartsVector src_parts; - MutableDataPartsVector dst_parts; - std::vector dst_parts_locks; - Strings block_id_paths; - Strings part_checksums; - std::vector ephemeral_locks; - String alter_partition_version_path = zookeeper_path + "/alter_partition_version"; - Coordination::Stat alter_partition_version_stat; - zookeeper->get(alter_partition_version_path, &alter_partition_version_stat); - - /// Firstly, generate last block number and compute drop_range - /// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block. - /// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop. - /// TODO why not to add normal DROP_RANGE entry to replication queue if `replace` is true? - MergeTreePartInfo drop_range; - std::optional delimiting_block_lock; - bool partition_was_empty = !getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true); - if (replace && partition_was_empty) - { - /// Nothing to drop, will just attach new parts - LOG_INFO(log, "Partition {} was empty, REPLACE PARTITION will work as ATTACH PARTITION FROM", drop_range.partition_id); - replace = false; - } - - if (!replace) - { - /// It's ATTACH PARTITION FROM, not REPLACE PARTITION. We have to reset drop range - drop_range = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id); - } - - assert(replace == !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range)); - - String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range); - - std::set replaced_parts; - for (const auto & src_part : src_all_parts) - { - /// We also make some kind of deduplication to avoid duplicated parts in case of ATTACH PARTITION - /// Assume that merges in the partition are quite rare - /// Save deduplication block ids with special prefix replace_partition - - if (!canReplacePartition(src_part)) - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Cannot replace partition '{}' because part '{}" - "' has inconsistent granularity with table", partition_id, src_part->name); - - String hash_hex = src_part->checksums.getTotalChecksumHex(); - const bool is_duplicated_part = replaced_parts.contains(hash_hex); - replaced_parts.insert(hash_hex); - - if (replace) - LOG_INFO(log, "Trying to replace {} with hash_hex {}", src_part->name, hash_hex); - else - LOG_INFO(log, "Trying to attach {} with hash_hex {}", src_part->name, hash_hex); - - String block_id_path = (replace || is_duplicated_part) ? "" : (fs::path(zookeeper_path) / "blocks" / (partition_id + "_replace_from_" + hash_hex)); - - auto lock = allocateBlockNumber(partition_id, zookeeper, block_id_path); - if (!lock) - { - LOG_INFO(log, "Part {} (hash {}) has been already attached", src_part->name, hash_hex); - continue; - } - - UInt64 index = lock->getNumber(); - MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level); - - bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication - || dynamic_cast(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication; - - IDataPartStorage::ClonePartParams clone_params - { - .copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()), - .metadata_version_to_write = metadata_snapshot->getMetadataVersion() - }; - - auto [dst_part, part_lock] = cloneAndLoadDataPart( - src_part, - TMP_PREFIX, - dst_part_info, - metadata_snapshot, - clone_params, - query_context->getReadSettings(), - query_context->getWriteSettings()); - - dst_parts.emplace_back(std::move(dst_part)); - dst_parts_locks.emplace_back(std::move(part_lock)); - src_parts.emplace_back(src_part); - ephemeral_locks.emplace_back(std::move(*lock)); - block_id_paths.emplace_back(block_id_path); - part_checksums.emplace_back(hash_hex); - } - - ReplicatedMergeTreeLogEntryData entry; - { - auto src_table_id = src_data.getStorageID(); - entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE; - entry.source_replica = replica_name; - entry.create_time = time(nullptr); - entry.replace_range_entry = std::make_shared(); - - auto & entry_replace = *entry.replace_range_entry; - entry_replace.drop_range_part_name = drop_range_fake_part_name; - entry_replace.from_database = src_table_id.database_name; - entry_replace.from_table = src_table_id.table_name; - for (const auto & part : src_parts) - entry_replace.src_part_names.emplace_back(part->name); - for (const auto & part : dst_parts) - entry_replace.new_part_names.emplace_back(part->name); - for (const String & checksum : part_checksums) - entry_replace.part_names_checksums.emplace_back(checksum); - entry_replace.columns_version = -1; - } - if (replace) - { - /// Cancel concurrent inserts in range - clearLockedBlockNumbersInPartition(*zookeeper, drop_range.partition_id, drop_range.min_block, drop_range.max_block); - /// Remove deduplication block_ids of replacing parts - clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.min_block, drop_range.max_block); - } + throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently"); - Coordination::Responses op_results; - DataPartsVector parts_holder; - - try - { - Coordination::Requests ops; - for (size_t i = 0; i < dst_parts.size(); ++i) - { - getCommitPartOps(ops, dst_parts[i], block_id_paths[i]); - ephemeral_locks[i].getUnlockOp(ops); - } - - if (auto txn = query_context->getZooKeeperMetadataTransaction()) - txn->moveOpsTo(ops); - - delimiting_block_lock->getUnlockOp(ops); - /// Check and update version to avoid race with DROP_RANGE - ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version)); - /// Just update version, because merges assignment relies on it - ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1)); - ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); - - Transaction transaction(*this, NO_TRANSACTION_RAW); - { - auto data_parts_lock = lockParts(); - for (auto & part : dst_parts) - renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock); - } - - for (const auto & dst_part : dst_parts) - lockSharedData(*dst_part, false, /*hardlinked_files*/ {}); - - Coordination::Error code = zookeeper->tryMulti(ops, op_results); - if (code == Coordination::Error::ZOK) - delimiting_block_lock->assumeUnlocked(); - else if (code == Coordination::Error::ZBADVERSION) - { - /// Cannot retry automatically, because some zookeeper ops were lost on the first attempt. Will retry on DDLWorker-level. - if (query_context->getZooKeeperMetadataTransaction()) - throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, - "Cannot execute alter, because alter partition version was suddenly changed due " - "to concurrent alter"); - continue; - } - else - zkutil::KeeperMultiException::check(code, ops, op_results); - - { - auto data_parts_lock = lockParts(); - transaction.commit(&data_parts_lock); - if (replace) - { - parts_holder = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, drop_range.partition_id, &data_parts_lock); - /// We ignore the list of parts returned from the function below. We will remove them from zk when executing REPLACE_RANGE - removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(NO_TRANSACTION_RAW, drop_range, data_parts_lock); - } - } - - PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(dst_parts, watch.elapsed(), profile_events_scope.getSnapshot())); - } - catch (...) - { - PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(dst_parts, watch.elapsed()), ExecutionStatus::fromCurrentException("", true)); - for (const auto & dst_part : dst_parts) - unlockSharedData(*dst_part); - - throw; - } - - String log_znode_path = dynamic_cast(*op_results.back()).path_created; - entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - - for (auto & lock : ephemeral_locks) - lock.assumeUnlocked(); - - lock2.reset(); - lock1.reset(); - - /// We need to pull the DROP_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost) - queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC); - parts_holder.clear(); - cleanup_thread.wakeup(); - - - waitForLogEntryToBeProcessedIfNecessary(entry, query_context); - - return; + partitions = src_data.getAllPartitionIds(); + LOG_INFO(log, "Will try to attach {} partitions", partitions.size()); + } else + { + partitions = std::unordered_set(); + partitions.emplace(getPartitionIDFromQuery(partition, query_context)); } - throw Exception( - ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot assign ALTER PARTITION, because another ALTER PARTITION query was concurrently executed"); + for (const auto & partition_id : partitions) { + auto src_all_parts = src_data.getVisibleDataPartsVectorInPartition(query_context, partition_id); + LOG_DEBUG(log, "Cloning {} parts from partition '{}'", src_all_parts.size(), partition_id); + + auto ok = false; + /// Retry if alter_partition_version changes + for (size_t retry = 0; retry < 1000; ++retry) + { + DataPartsVector src_parts; + MutableDataPartsVector dst_parts; + std::vector dst_parts_locks; + Strings block_id_paths; + Strings part_checksums; + std::vector ephemeral_locks; + String alter_partition_version_path = zookeeper_path + "/alter_partition_version"; + Coordination::Stat alter_partition_version_stat; + zookeeper->get(alter_partition_version_path, &alter_partition_version_stat); + + /// Firstly, generate last block number and compute drop_range + /// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block. + /// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop. + /// TODO why not to add normal DROP_RANGE entry to replication queue if `replace` is true? + MergeTreePartInfo drop_range; + std::optional delimiting_block_lock; + bool partition_was_empty = !getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true); + if (replace && partition_was_empty) + { + /// Nothing to drop, will just attach new parts + LOG_INFO(log, "Partition {} was empty, REPLACE PARTITION will work as ATTACH PARTITION FROM", drop_range.partition_id); + replace = false; + } + + if (!replace) + { + /// It's ATTACH PARTITION FROM, not REPLACE PARTITION. We have to reset drop range + drop_range = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id); + } + + assert(replace == !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range)); + + String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range); + + std::set replaced_parts; + for (const auto & src_part : src_all_parts) + { + /// We also make some kind of deduplication to avoid duplicated parts in case of ATTACH PARTITION + /// Assume that merges in the partition are quite rare + /// Save deduplication block ids with special prefix replace_partition + + if (!canReplacePartition(src_part)) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Cannot replace partition '{}' because part '{}" + "' has inconsistent granularity with table", partition_id, src_part->name); + + String hash_hex = src_part->checksums.getTotalChecksumHex(); + const bool is_duplicated_part = replaced_parts.contains(hash_hex); + replaced_parts.insert(hash_hex); + + if (replace) + LOG_INFO(log, "Trying to replace '{}' with hash_hex '{}'", src_part->name, hash_hex); + else + LOG_INFO(log, "Trying to attach '{}' with hash_hex '{}'", src_part->name, hash_hex); + + String block_id_path = (replace || is_duplicated_part) ? "" : (fs::path(zookeeper_path) / "blocks" / (partition_id + "_replace_from_" + hash_hex)); + + auto lock = allocateBlockNumber(partition_id, zookeeper, block_id_path); + if (!lock) + { + LOG_INFO(log, "Part '{}' (hash '{}') in partition '{}' has been already attached", src_part->name, hash_hex, partition_id); + continue; + } + + UInt64 index = lock->getNumber(); + MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level); + + bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication + || dynamic_cast(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication; + + IDataPartStorage::ClonePartParams clone_params + { + .copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()), + .metadata_version_to_write = metadata_snapshot->getMetadataVersion() + }; + + auto [dst_part, part_lock] = cloneAndLoadDataPart( + src_part, + TMP_PREFIX, + dst_part_info, + metadata_snapshot, + clone_params, + query_context->getReadSettings(), + query_context->getWriteSettings()); + + dst_parts.emplace_back(std::move(dst_part)); + dst_parts_locks.emplace_back(std::move(part_lock)); + src_parts.emplace_back(src_part); + ephemeral_locks.emplace_back(std::move(*lock)); + block_id_paths.emplace_back(block_id_path); + part_checksums.emplace_back(hash_hex); + } + + ReplicatedMergeTreeLogEntryData entry; + { + auto src_table_id = src_data.getStorageID(); + entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE; + entry.source_replica = replica_name; + entry.create_time = time(nullptr); + entry.replace_range_entry = std::make_shared(); + + auto & entry_replace = *entry.replace_range_entry; + entry_replace.drop_range_part_name = drop_range_fake_part_name; + entry_replace.from_database = src_table_id.database_name; + entry_replace.from_table = src_table_id.table_name; + for (const auto & part : src_parts) + entry_replace.src_part_names.emplace_back(part->name); + for (const auto & part : dst_parts) + entry_replace.new_part_names.emplace_back(part->name); + for (const String & checksum : part_checksums) + entry_replace.part_names_checksums.emplace_back(checksum); + entry_replace.columns_version = -1; + } + + if (replace) + { + /// Cancel concurrent inserts in range + clearLockedBlockNumbersInPartition(*zookeeper, drop_range.partition_id, drop_range.min_block, drop_range.max_block); + /// Remove deduplication block_ids of replacing parts + clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.min_block, drop_range.max_block); + } + + Coordination::Responses op_results; + DataPartsVector parts_holder; + + try + { + Coordination::Requests ops; + for (size_t i = 0; i < dst_parts.size(); ++i) + { + getCommitPartOps(ops, dst_parts[i], block_id_paths[i]); + ephemeral_locks[i].getUnlockOp(ops); + } + + if (auto txn = query_context->getZooKeeperMetadataTransaction()) + txn->moveOpsTo(ops); + + delimiting_block_lock->getUnlockOp(ops); + /// Check and update version to avoid race with DROP_RANGE + ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version)); + /// Just update version, because merges assignment relies on it + ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1)); + ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); + + Transaction transaction(*this, NO_TRANSACTION_RAW); + { + auto data_parts_lock = lockParts(); + for (auto & part : dst_parts) + renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock); + } + + for (const auto & dst_part : dst_parts) + lockSharedData(*dst_part, false, /*hardlinked_files*/ {}); + + Coordination::Error code = zookeeper->tryMulti(ops, op_results); + if (code == Coordination::Error::ZOK) + delimiting_block_lock->assumeUnlocked(); + else if (code == Coordination::Error::ZBADVERSION) + { + /// Cannot retry automatically, because some zookeeper ops were lost on the first attempt. Will retry on DDLWorker-level. + if (query_context->getZooKeeperMetadataTransaction()) + throw Exception(ErrorCodes::CANNOT_ASSIGN_ALTER, + "Cannot execute alter on partition '{}', because alter partition version was suddenly changed due " + "to concurrent alter", partition_id); + continue; + } + else + zkutil::KeeperMultiException::check(code, ops, op_results); + + { + auto data_parts_lock = lockParts(); + transaction.commit(&data_parts_lock); + if (replace) + { + parts_holder = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, drop_range.partition_id, &data_parts_lock); + /// We ignore the list of parts returned from the function below. We will remove them from zk when executing REPLACE_RANGE + removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(NO_TRANSACTION_RAW, drop_range, data_parts_lock); + } + } + + PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(dst_parts, watch.elapsed(), profile_events_scope.getSnapshot())); + } + catch (...) + { + PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(dst_parts, watch.elapsed()), ExecutionStatus::fromCurrentException("", true)); + for (const auto & dst_part : dst_parts) + unlockSharedData(*dst_part); + + throw; + } + + String log_znode_path = dynamic_cast(*op_results.back()).path_created; + entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + for (auto & lock : ephemeral_locks) + lock.assumeUnlocked(); + + /// We need to pull the DROP_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost) + queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC); + parts_holder.clear(); + cleanup_thread.wakeup(); + + waitForLogEntryToBeProcessedIfNecessary(entry, query_context); + + ok = true; + break; + } + + if (!ok) + throw Exception( + ErrorCodes::CANNOT_ASSIGN_ALTER, "Cannot assign ALTER PARTITION '{}', because another ALTER PARTITION query was concurrently executed", partition_id); + } + + lock2.reset(); + lock1.reset(); } void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) diff --git a/tests/queries/0_stateless/00626_replace_partition_from_table.reference b/tests/queries/0_stateless/00626_replace_partition_from_table.reference index 611f3a93ced..0f8ded245d0 100644 --- a/tests/queries/0_stateless/00626_replace_partition_from_table.reference +++ b/tests/queries/0_stateless/00626_replace_partition_from_table.reference @@ -10,11 +10,12 @@ REPLACE recursive 4 8 1 ATTACH FROM -5 8 +6 8 +10 12 OPTIMIZE -5 8 5 -5 8 3 +10 12 9 +10 12 5 After restart -5 8 +10 12 DETACH+ATTACH PARTITION -3 4 +7 7 diff --git a/tests/queries/0_stateless/00626_replace_partition_from_table.sql b/tests/queries/0_stateless/00626_replace_partition_from_table.sql index 7224224334e..3f712f48c06 100644 --- a/tests/queries/0_stateless/00626_replace_partition_from_table.sql +++ b/tests/queries/0_stateless/00626_replace_partition_from_table.sql @@ -53,12 +53,16 @@ DROP TABLE src; CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k; INSERT INTO src VALUES (1, '0', 1); INSERT INTO src VALUES (1, '1', 1); +INSERT INTO src VALUES (2, '2', 1); +INSERT INTO src VALUES (3, '3', 1); SYSTEM STOP MERGES dst; -INSERT INTO dst VALUES (1, '1', 2); +INSERT INTO dst VALUES (1, '1', 2), (1, '2', 0); ALTER TABLE dst ATTACH PARTITION 1 FROM src; SELECT count(), sum(d) FROM dst; +ALTER TABLE dst ATTACH PARTITION ALL FROM src; +SELECT count(), sum(d) FROM dst; SELECT 'OPTIMIZE'; SELECT count(), sum(d), uniqExact(_part) FROM dst; diff --git a/tests/queries/0_stateless/00626_replace_partition_from_table_zookeeper.reference b/tests/queries/0_stateless/00626_replace_partition_from_table_zookeeper.reference index c6208941ac6..6a7c3478f86 100644 --- a/tests/queries/0_stateless/00626_replace_partition_from_table_zookeeper.reference +++ b/tests/queries/0_stateless/00626_replace_partition_from_table_zookeeper.reference @@ -16,6 +16,7 @@ REPLACE recursive ATTACH FROM 5 8 5 8 +7 12 REPLACE with fetch 4 6 4 6 diff --git a/tests/queries/0_stateless/00626_replace_partition_from_table_zookeeper.sh b/tests/queries/0_stateless/00626_replace_partition_from_table_zookeeper.sh index ffbf4df4ba7..49976df83b7 100755 --- a/tests/queries/0_stateless/00626_replace_partition_from_table_zookeeper.sh +++ b/tests/queries/0_stateless/00626_replace_partition_from_table_zookeeper.sh @@ -82,6 +82,8 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE src;" $CLICKHOUSE_CLIENT --query="CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;" $CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '0', 1);" $CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '1', 1);" +$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (3, '1', 2);" +$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (4, '1', 2);" $CLICKHOUSE_CLIENT --query="INSERT INTO dst_r2 VALUES (1, '1', 2);" query_with_retry "ALTER TABLE dst_r2 ATTACH PARTITION 1 FROM src;" @@ -90,6 +92,13 @@ $CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r1;" $CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM dst_r1;" $CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM dst_r2;" +query_with_retry "ALTER TABLE dst_r2 ATTACH PARTITION ALL FROM src;" +$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r2;" +$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM dst_r2;" +query_with_retry "ALTER TABLE dst_r2 DROP PARTITION 3;" +$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r2;" +query_with_retry "ALTER TABLE dst_r2 DROP PARTITION 4;" +$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r2;" $CLICKHOUSE_CLIENT --query="SELECT 'REPLACE with fetch';" $CLICKHOUSE_CLIENT --query="DROP TABLE src;" From 0fbf612e150d83d9d0dfb7b157ea4cef6127c86a Mon Sep 17 00:00:00 2001 From: Kirill Nikiforov Date: Wed, 27 Mar 2024 18:41:24 +0300 Subject: [PATCH 02/24] support ATTACH PARTITION `ALL` FROM `TABLE` --- src/Storages/StorageReplicatedMergeTree.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 7d7ab712163..5434a8bd63e 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -7962,7 +7962,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot); - const String TMP_PREFIX = "tmp_replace_from_"; + static const String TMP_PREFIX = "tmp_replace_from_"; auto zookeeper = getZooKeeper(); std::unordered_set partitions; From 9c5e094289df6f0c0063819b68ae96a23f032d5e Mon Sep 17 00:00:00 2001 From: Kirill Nikiforov Date: Wed, 27 Mar 2024 19:57:41 +0300 Subject: [PATCH 03/24] fix cs --- src/Storages/StorageReplicatedMergeTree.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 5434a8bd63e..b4ffd5eff0a 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -7979,7 +7979,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom( partitions.emplace(getPartitionIDFromQuery(partition, query_context)); } - for (const auto & partition_id : partitions) { + for (const auto & partition_id : partitions) + { auto src_all_parts = src_data.getVisibleDataPartsVectorInPartition(query_context, partition_id); LOG_DEBUG(log, "Cloning {} parts from partition '{}'", src_all_parts.size(), partition_id); From 1d4aa10099fec19f2b60f9094a9f9c004ab06421 Mon Sep 17 00:00:00 2001 From: Kirill Nikiforov Date: Sun, 31 Mar 2024 00:09:40 +0300 Subject: [PATCH 04/24] fix cs --- src/Storages/StorageMergeTree.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 821b02b5b5f..04115cf0ede 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -2087,7 +2087,8 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently"); src_parts = src_data.getVisibleDataPartsVector(local_context); - } else + } + else { partition_id = getPartitionIDFromQuery(partition, local_context); src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id); From 115d7cfa572556f321373e441e5becf3221426ea Mon Sep 17 00:00:00 2001 From: Kirill Nikiforov Date: Fri, 26 Apr 2024 15:22:34 +0300 Subject: [PATCH 05/24] fix after merge --- src/Storages/StorageReplicatedMergeTree.cpp | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 84bb2e78eb6..b0d2faf1651 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -7982,13 +7982,6 @@ void StorageReplicatedMergeTree::replacePartitionFrom( ProfileEventsScope profile_events_scope; MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot); - String partition_id = getPartitionIDFromQuery(partition, query_context); - - /// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet. - DataPartsVector src_all_parts = src_data.getVisibleDataPartsVectorInPartition(query_context, partition_id); - - LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size()); - static const String TMP_PREFIX = "tmp_replace_from_"; auto zookeeper = getZooKeeper(); @@ -7999,7 +7992,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently"); partitions = src_data.getAllPartitionIds(); - LOG_INFO(log, "Will try to attach {} partitions", partitions.size()); + LOG_INFO(log, "Will try to attach {} partitions without replace", partitions.size()); } else { partitions = std::unordered_set(); @@ -8008,6 +8001,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( for (const auto & partition_id : partitions) { + /// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet. auto src_all_parts = src_data.getVisibleDataPartsVectorInPartition(query_context, partition_id); LOG_DEBUG(log, "Cloning {} parts from partition '{}'", src_all_parts.size(), partition_id); From 624afd68c86bb4fc3eee76f3fb9779ffb2319c68 Mon Sep 17 00:00:00 2001 From: Kirill Nikiforov Date: Fri, 16 Aug 2024 01:24:52 +0400 Subject: [PATCH 06/24] parallel attach from multiple partitions --- src/Storages/StorageReplicatedMergeTree.cpp | 112 ++++++++++++++------ src/Storages/StorageReplicatedMergeTree.h | 13 +++ 2 files changed, 95 insertions(+), 30 deletions(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index a3c1ab7cdff..67f51b186c5 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8030,25 +8030,84 @@ void StorageReplicatedMergeTree::replacePartitionFrom( /// First argument is true, because we possibly will add new data to current table. auto lock1 = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout); auto lock2 = source_table->lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout); - auto storage_settings_ptr = getSettings(); - auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr(); - auto metadata_snapshot = getInMemoryMetadataPtr(); + const auto storage_settings_ptr = getSettings(); + const auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr(); + const auto metadata_snapshot = getInMemoryMetadataPtr(); + const MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot); - Stopwatch watch; + std::unordered_set partitions; + if (partition->as()->all) + { + if (replace) + throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently"); + + partitions = src_data.getAllPartitionIds(); + } + else + { + partitions = std::unordered_set(); + partitions.emplace(getPartitionIDFromQuery(partition, query_context)); + } + LOG_INFO(log, "Will try to attach {} partitions", partitions.size()); + + const Stopwatch watch; ProfileEventsScope profile_events_scope; + const auto zookeeper = getZooKeeper(); - MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot); - String partition_id = getPartitionIDFromQuery(partition, query_context); + const bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication + || dynamic_cast(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication; + try + { + std::unique_ptr entries[partitions.size()]; + size_t idx = 0; + for (const auto & partition_id : partitions) + { + entries[idx] = replacePartitionFromImpl(watch, + profile_events_scope, + metadata_snapshot, + src_data, + partition_id, + zookeeper, + replace, + zero_copy_enabled, + storage_settings_ptr->always_use_copy_instead_of_hardlinks, + query_context); + ++idx; + } + + for (const auto & entry : entries) + waitForLogEntryToBeProcessedIfNecessary(*entry, query_context); + + lock2.reset(); + lock1.reset(); + } + catch(...) + { + lock2.reset(); + lock1.reset(); + + throw; + } +} + +std::unique_ptr StorageReplicatedMergeTree::replacePartitionFromImpl( + const Stopwatch & watch, + ProfileEventsScope & profile_events_scope, + const StorageMetadataPtr & metadata_snapshot, + const MergeTreeData & src_data, + const String & partition_id, + const ZooKeeperPtr & zookeeper, + bool replace, + const bool & zero_copy_enabled, + const bool & always_use_copy_instead_of_hardlinks, + const ContextPtr & query_context) +{ /// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet. DataPartsVector src_all_parts = src_data.getVisibleDataPartsVectorInPartition(query_context, partition_id); - LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size()); - static const String TMP_PREFIX = "tmp_replace_from_"; - auto zookeeper = getZooKeeper(); - /// Retry if alter_partition_version changes for (size_t retry = 0; retry < 1000; ++retry) { @@ -8133,11 +8192,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom( UInt64 index = lock->getNumber(); MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level); - bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication - || dynamic_cast(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication; IDataPartStorage::ClonePartParams clone_params { - .copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()), + .copy_instead_of_hardlink = always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()), .metadata_version_to_write = metadata_snapshot->getMetadataVersion() }; if (replace) @@ -8145,7 +8202,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( /// Replace can only work on the same disk auto [dst_part, part_lock] = cloneAndLoadDataPart( src_part, - TMP_PREFIX, + TMP_PREFIX_REPLACE_PARTITION_FROM, dst_part_info, metadata_snapshot, clone_params, @@ -8160,7 +8217,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( /// Attach can work on another disk auto [dst_part, part_lock] = cloneAndLoadDataPart( src_part, - TMP_PREFIX, + TMP_PREFIX_REPLACE_PARTITION_FROM, dst_part_info, metadata_snapshot, clone_params, @@ -8176,15 +8233,16 @@ void StorageReplicatedMergeTree::replacePartitionFrom( part_checksums.emplace_back(hash_hex); } - ReplicatedMergeTreeLogEntryData entry; + //ReplicatedMergeTreeLogEntryData entry; + auto entry = std::make_unique(); { auto src_table_id = src_data.getStorageID(); - entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE; - entry.source_replica = replica_name; - entry.create_time = time(nullptr); - entry.replace_range_entry = std::make_shared(); + entry->type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE; + entry->source_replica = replica_name; + entry->create_time = time(nullptr); + entry->replace_range_entry = std::make_shared(); - auto & entry_replace = *entry.replace_range_entry; + auto & entry_replace = *entry->replace_range_entry; entry_replace.drop_range_part_name = drop_range_fake_part_name; entry_replace.from_database = src_table_id.database_name; entry_replace.from_table = src_table_id.table_name; @@ -8225,7 +8283,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version)); /// Just update version, because merges assignment relies on it ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1)); - ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); + ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry->toString(), zkutil::CreateMode::PersistentSequential)); Transaction transaction(*this, NO_TRANSACTION_RAW); { @@ -8275,14 +8333,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom( } String log_znode_path = dynamic_cast(*op_results.back()).path_created; - entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + entry->znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); for (auto & lock : ephemeral_locks) lock.assumeUnlocked(); - lock2.reset(); - lock1.reset(); - /// We need to pull the REPLACE_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost) queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC); // No need to block operations further, especially that in case we have to wait for mutation to finish, the intent would block @@ -8291,10 +8346,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( parts_holder.clear(); cleanup_thread.wakeup(); - - waitForLogEntryToBeProcessedIfNecessary(entry, query_context); - - return; + return entry; } throw Exception( diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 2e54f17d5d5..4291aebf3e8 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -1013,6 +1014,18 @@ private: DataPartsVector::const_iterator it; }; + const String TMP_PREFIX_REPLACE_PARTITION_FROM = "tmp_replace_from_"; + std::unique_ptr replacePartitionFromImpl( + const Stopwatch & watch, + ProfileEventsScope & profile_events_scope, + const StorageMetadataPtr & metadata_snapshot, + const MergeTreeData & src_data, + const String & partition_id, + const zkutil::ZooKeeperPtr & zookeeper, + bool replace, + const bool & zero_copy_enabled, + const bool & always_use_copy_instead_of_hardlinks, + const ContextPtr & query_context); }; String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info); From c31c4154ca47c6f1313c04974b7bb95bbcb34cff Mon Sep 17 00:00:00 2001 From: Kirill Nikiforov Date: Fri, 16 Aug 2024 01:49:54 +0400 Subject: [PATCH 07/24] fix cs --- src/Storages/StorageReplicatedMergeTree.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 67f51b186c5..38309282768 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8083,7 +8083,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( lock2.reset(); lock1.reset(); } - catch(...) + catch (...) { lock2.reset(); lock1.reset(); From 46f27b03f9dd2f122da6cfcb9905f5bb7ec944e7 Mon Sep 17 00:00:00 2001 From: Kirill Nikiforov Date: Fri, 16 Aug 2024 15:48:28 +0400 Subject: [PATCH 08/24] fix transaction --- src/Storages/StorageReplicatedMergeTree.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 38309282768..4915bf9f366 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8108,6 +8108,13 @@ std::unique_ptr StorageReplicatedMergeTree::rep DataPartsVector src_all_parts = src_data.getVisibleDataPartsVectorInPartition(query_context, partition_id); LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size()); + std::optional txn; + if (auto query_txn = query_context->getZooKeeperMetadataTransaction()) + txn.emplace(query_txn->getZooKeeper(), + query_txn->getDatabaseZooKeeperPath(), + query_txn->isInitialQuery(), + query_txn->getTaskZooKeeperPath()); + /// Retry if alter_partition_version changes for (size_t retry = 0; retry < 1000; ++retry) { @@ -8275,7 +8282,7 @@ std::unique_ptr StorageReplicatedMergeTree::rep ephemeral_locks[i].getUnlockOp(ops); } - if (auto txn = query_context->getZooKeeperMetadataTransaction()) + if (txn) txn->moveOpsTo(ops); delimiting_block_lock->getUnlockOp(ops); From de2f1adf9167db66469efd8e8b5d2f828f993ec1 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 22 Aug 2024 18:02:03 +0000 Subject: [PATCH 09/24] Fix mergine of aggregated data for grouping sets. --- src/Interpreters/InterpreterSelectQuery.cpp | 2 - src/Planner/Planner.cpp | 2 - .../QueryPlan/MergingAggregatedStep.cpp | 2 +- .../Transforms/MergingAggregatedTransform.cpp | 118 ++++++++++++++++-- .../Transforms/MergingAggregatedTransform.h | 10 +- .../02165_replicated_grouping_sets.reference | 54 ++++++++ .../02165_replicated_grouping_sets.sql | 5 + 7 files changed, 179 insertions(+), 14 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 0c79f4310ce..9e5fffac6e4 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -2010,8 +2010,6 @@ static void executeMergeAggregatedImpl( SortDescription group_by_sort_description) { auto keys = aggregation_keys.getNames(); - if (has_grouping_sets) - keys.insert(keys.begin(), "__grouping_set"); /** There are two modes of distributed aggregation. * diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp index d3d20c6fba0..c0efed8550f 100644 --- a/src/Planner/Planner.cpp +++ b/src/Planner/Planner.cpp @@ -504,8 +504,6 @@ void addMergingAggregatedStep(QueryPlan & query_plan, */ auto keys = aggregation_analysis_result.aggregation_keys; - if (!aggregation_analysis_result.grouping_sets_parameters_list.empty()) - keys.insert(keys.begin(), "__grouping_set"); Aggregator::Params params(keys, aggregation_analysis_result.aggregate_descriptions, diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.cpp b/src/Processors/QueryPlan/MergingAggregatedStep.cpp index a5062ac8216..50bd1a882ef 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.cpp +++ b/src/Processors/QueryPlan/MergingAggregatedStep.cpp @@ -48,7 +48,7 @@ MergingAggregatedStep::MergingAggregatedStep( bool memory_bound_merging_of_aggregation_results_enabled_) : ITransformingStep( input_stream_, - params_.getHeader(input_stream_.header, final_), + MergingAggregatedTransform::appendGroupingIfNeeded(input_stream_.header, params_.getHeader(input_stream_.header, final_)), getTraits(should_produce_results_in_order_of_bucket_number_)) , params(std::move(params_)) , final(final_) diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index 446e60a0b81..114a32b3d83 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -2,6 +2,7 @@ #include #include #include +#include namespace DB { @@ -10,13 +11,106 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header, Block out_header) +{ + if (in_header.has("__grouping_set")) + out_header.insert(0, in_header.getByName("__grouping_set")); + + return out_header; +} + MergingAggregatedTransform::MergingAggregatedTransform( Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_) - : IAccumulatingTransform(std::move(header_), params_->getHeader()) - , params(std::move(params_)), max_threads(max_threads_) + : IAccumulatingTransform(header_, appendGroupingIfNeeded(header_, params_->getHeader())) + , params(std::move(params_)), max_threads(max_threads_), has_grouping_sets(header_.has("__grouping_set")) { } +void MergingAggregatedTransform::addBlock(Block block) +{ + if (!has_grouping_sets) + { + auto & bucket_to_blocks = grouping_sets[0]; + bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); + return; + } + + auto grouping_position = block.getPositionByName("__grouping_set"); + auto grouping_column = block.getByPosition(grouping_position).column; + block.erase(grouping_position); + + const auto * grouping_column_typed = typeid_cast(grouping_column.get()); + if (!grouping_column_typed) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected UInt64 column for __grouping_set, got {}", grouping_column->getName()); + + const auto & grouping_data = grouping_column_typed->getData(); + std::map enumerated_groups; + IColumn::Selector selector; + + size_t num_rows = grouping_data.size(); + UInt64 last_group = grouping_data[0]; + for (size_t row = 1; row < num_rows; ++row) + { + auto group = grouping_data[row]; + if (last_group == group) + continue; + + if (enumerated_groups.empty()) + { + selector.reserve(num_rows); + enumerated_groups.emplace(last_group, enumerated_groups.size()); + } + + selector.resize_fill(row, enumerated_groups[last_group]); + enumerated_groups.emplace(last_group, enumerated_groups.size()); + } + + if (enumerated_groups.empty()) + { + auto & bucket_to_blocks = grouping_sets[last_group]; + bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); + return; + } + + selector.resize_fill(num_rows, enumerated_groups[last_group]); + + const size_t num_groups = enumerated_groups.size(); + Blocks splitted_blocks(num_groups); + + for (size_t group_id = 0; group_id < num_groups; ++group_id) + splitted_blocks[group_id] = block.cloneEmpty(); + + size_t columns_in_block = block.columns(); + for (size_t col_idx_in_block = 0; col_idx_in_block < columns_in_block; ++col_idx_in_block) + { + MutableColumns splitted_columns = block.getByPosition(col_idx_in_block).column->scatter(num_groups, selector); + for (size_t group_id = 0; group_id < num_groups; ++group_id) + splitted_blocks[group_id].getByPosition(col_idx_in_block).column = std::move(splitted_columns[group_id]); + } + + for (auto [group, group_id] : enumerated_groups) + { + auto & bucket_to_blocks = grouping_sets[group]; + auto & splitted_block = splitted_blocks[group_id]; + splitted_block.info = block.info; + bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(splitted_block)); + } +} + +void MergingAggregatedTransform::appendGroupingColumn(UInt64 group, BlocksList & block_list) +{ + auto grouping_position = getOutputPort().getHeader().getPositionByName("__grouping_set"); + for (auto & block : block_list) + { + auto num_rows = block.rows(); + ColumnWithTypeAndName col; + col.type = std::make_shared(); + col.name = "__grouping_set"; + col.column = ColumnUInt64::create(num_rows, group); + block.insert(grouping_position, std::move(col)); + } +} + void MergingAggregatedTransform::consume(Chunk chunk) { if (!consume_started) @@ -46,7 +140,7 @@ void MergingAggregatedTransform::consume(Chunk chunk) block.info.is_overflows = agg_info->is_overflows; block.info.bucket_num = agg_info->bucket_num; - bucket_to_blocks[agg_info->bucket_num].emplace_back(std::move(block)); + addBlock(std::move(block)); } else if (chunk.getChunkInfos().get()) { @@ -54,7 +148,7 @@ void MergingAggregatedTransform::consume(Chunk chunk) block.info.is_overflows = false; block.info.bucket_num = -1; - bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); + addBlock(std::move(block)); } else throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk should have AggregatedChunkInfo in MergingAggregatedTransform."); @@ -70,9 +164,19 @@ Chunk MergingAggregatedTransform::generate() /// Exception safety. Make iterator valid in case any method below throws. next_block = blocks.begin(); - /// TODO: this operation can be made async. Add async for IAccumulatingTransform. - params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, is_cancelled); - blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads); + for (auto & [group, group_blocks] : grouping_sets) + { + /// TODO: this operation can be made async. Add async for IAccumulatingTransform. + AggregatedDataVariants data_variants; + params->aggregator.mergeBlocks(std::move(group_blocks), data_variants, max_threads, is_cancelled); + auto merged_blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads); + + if (has_grouping_sets) + appendGroupingColumn(group, merged_blocks); + + blocks.splice(blocks.end(), std::move(merged_blocks)); + } + next_block = blocks.begin(); } diff --git a/src/Processors/Transforms/MergingAggregatedTransform.h b/src/Processors/Transforms/MergingAggregatedTransform.h index ade76b2f304..1d801f7a94d 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.h +++ b/src/Processors/Transforms/MergingAggregatedTransform.h @@ -15,6 +15,8 @@ public: MergingAggregatedTransform(Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_); String getName() const override { return "MergingAggregatedTransform"; } + static Block appendGroupingIfNeeded(const Block & in_header, Block out_header); + protected: void consume(Chunk chunk) override; Chunk generate() override; @@ -24,8 +26,9 @@ private: LoggerPtr log = getLogger("MergingAggregatedTransform"); size_t max_threads; - AggregatedDataVariants data_variants; - Aggregator::BucketToBlocks bucket_to_blocks; + using GroupingSets = std::unordered_map; + GroupingSets grouping_sets; + const bool has_grouping_sets; UInt64 total_input_rows = 0; UInt64 total_input_blocks = 0; @@ -35,6 +38,9 @@ private: bool consume_started = false; bool generate_started = false; + + void addBlock(Block block); + void appendGroupingColumn(UInt64 group, BlocksList & block_list); }; } diff --git a/tests/queries/0_stateless/02165_replicated_grouping_sets.reference b/tests/queries/0_stateless/02165_replicated_grouping_sets.reference index 659cd98368d..4589dc7d7a5 100644 --- a/tests/queries/0_stateless/02165_replicated_grouping_sets.reference +++ b/tests/queries/0_stateless/02165_replicated_grouping_sets.reference @@ -11,3 +11,57 @@ 0 6 4 1 10 4 2 14 4 +['.'] +['.','.'] +['.','.','.'] +['.','.','.','.'] +['.','.','.','.','.'] +['.','.','.','.','.','.'] +['.','.','.','.','.','.','.'] +['.','.','.','.','.','.','.','.'] +['.','.','.','.','.','.','.','.','.'] +['.'] +['.'] +['.','.'] +['.','.'] +['.','.','.'] +['.','.','.'] +['.','.','.','.'] +['.','.','.','.'] +['.','.','.','.','.'] +['.','.','.','.','.'] +['.','.','.','.','.','.'] +['.','.','.','.','.','.'] +['.','.','.','.','.','.','.'] +['.','.','.','.','.','.','.'] +['.','.','.','.','.','.','.','.'] +['.','.','.','.','.','.','.','.'] +['.','.','.','.','.','.','.','.','.'] +['.','.','.','.','.','.','.','.','.'] +1 +2 +3 +4 +5 +6 +7 +8 +9 +1 +1 +2 +2 +3 +3 +4 +4 +5 +5 +6 +6 +7 +7 +8 +8 +9 +9 diff --git a/tests/queries/0_stateless/02165_replicated_grouping_sets.sql b/tests/queries/0_stateless/02165_replicated_grouping_sets.sql index d92d92c3e72..333dab79575 100644 --- a/tests/queries/0_stateless/02165_replicated_grouping_sets.sql +++ b/tests/queries/0_stateless/02165_replicated_grouping_sets.sql @@ -43,3 +43,8 @@ GROUP BY ORDER BY sum_value ASC, count_value ASC; + +SELECT arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; From 0e136ded28dc1191dd344500d031f43d7a5750e2 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 22 Aug 2024 19:06:04 +0000 Subject: [PATCH 10/24] Fixing header. --- src/Processors/QueryPlan/MergingAggregatedStep.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.cpp b/src/Processors/QueryPlan/MergingAggregatedStep.cpp index 50bd1a882ef..8332ad73df6 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.cpp +++ b/src/Processors/QueryPlan/MergingAggregatedStep.cpp @@ -154,7 +154,9 @@ void MergingAggregatedStep::describeActions(JSONBuilder::JSONMap & map) const void MergingAggregatedStep::updateOutputStream() { - output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, final), getDataStreamTraits()); + const auto & in_header = input_streams.front().header; + output_stream = createOutputStream(input_streams.front(), + MergingAggregatedTransform::appendGroupingIfNeeded(in_header, params.getHeader(in_header, final)), getDataStreamTraits()); if (is_order_overwritten) /// overwrite order again applyOrder(group_by_sort_description, overwritten_sort_scope); } From 77061db95595cea33c2e5f84804c1f9a799ec6d6 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 26 Aug 2024 14:37:02 +0000 Subject: [PATCH 11/24] Adding comments and checks. --- .../QueryPlan/MergingAggregatedStep.cpp | 13 +++++++++++++ .../Transforms/MergingAggregatedTransform.cpp | 16 +++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.cpp b/src/Processors/QueryPlan/MergingAggregatedStep.cpp index 8332ad73df6..d35c38a4e32 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.cpp +++ b/src/Processors/QueryPlan/MergingAggregatedStep.cpp @@ -10,6 +10,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + static bool memoryBoundMergingWillBeUsed( const DataStream & input_stream, bool memory_bound_merging_of_aggregation_results_enabled, @@ -93,6 +98,10 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c if (memoryBoundMergingWillBeUsed()) { + if (input_streams.front().header.has("__grouping_set")) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Memory bound merging of aggregated results is not supported for grouping sets."); + auto transform = std::make_shared( pipeline.getHeader(), pipeline.getNumStreams(), @@ -123,6 +132,10 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c if (!memory_efficient_aggregation) { + if (input_streams.front().header.has("__grouping_set")) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Memory efficient merging of aggregated results is not supported for grouping sets."); + /// We union several sources into one, paralleling the work. pipeline.resize(1); diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index 114a32b3d83..99fbf3bf4f0 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -13,6 +13,10 @@ namespace ErrorCodes Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header, Block out_header) { + /// __grouping_set is neigher GROUP BY key nor an aggregate function. + /// It behaves like a GROUP BY key, but we cannot append it to keys + /// because it changes hashing method and buckets for two level aggregation. + /// Now, this column is processed "manually" by merging each group separately. if (in_header.has("__grouping_set")) out_header.insert(0, in_header.getByName("__grouping_set")); @@ -39,32 +43,41 @@ void MergingAggregatedTransform::addBlock(Block block) auto grouping_column = block.getByPosition(grouping_position).column; block.erase(grouping_position); + /// Split a block by __grouping_set values. + const auto * grouping_column_typed = typeid_cast(grouping_column.get()); if (!grouping_column_typed) throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected UInt64 column for __grouping_set, got {}", grouping_column->getName()); - const auto & grouping_data = grouping_column_typed->getData(); + /// Enumerate groups and fill the selector. std::map enumerated_groups; IColumn::Selector selector; + const auto & grouping_data = grouping_column_typed->getData(); size_t num_rows = grouping_data.size(); UInt64 last_group = grouping_data[0]; for (size_t row = 1; row < num_rows; ++row) { auto group = grouping_data[row]; + + /// Optimization for equal ranges. if (last_group == group) continue; + /// Optimization for single group. if (enumerated_groups.empty()) { selector.reserve(num_rows); enumerated_groups.emplace(last_group, enumerated_groups.size()); } + /// Fill the last equal range. selector.resize_fill(row, enumerated_groups[last_group]); + /// Enumerate new group if did not see it before. enumerated_groups.emplace(last_group, enumerated_groups.size()); } + /// Optimization for single group. if (enumerated_groups.empty()) { auto & bucket_to_blocks = grouping_sets[last_group]; @@ -72,6 +85,7 @@ void MergingAggregatedTransform::addBlock(Block block) return; } + /// Fill the last equal range. selector.resize_fill(num_rows, enumerated_groups[last_group]); const size_t num_groups = enumerated_groups.size(); From 42e7cc476e4e733839370681366ffde64185ba6c Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 26 Aug 2024 14:48:29 +0000 Subject: [PATCH 12/24] Fixing typos. --- src/Processors/Transforms/MergingAggregatedTransform.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index 99fbf3bf4f0..9b107b70075 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -13,7 +13,7 @@ namespace ErrorCodes Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header, Block out_header) { - /// __grouping_set is neigher GROUP BY key nor an aggregate function. + /// __grouping_set is neither GROUP BY key nor an aggregate function. /// It behaves like a GROUP BY key, but we cannot append it to keys /// because it changes hashing method and buckets for two level aggregation. /// Now, this column is processed "manually" by merging each group separately. From 5f587af078eb6f9c962ee1ba0dccfefcab400f3a Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 26 Aug 2024 15:15:16 +0000 Subject: [PATCH 13/24] Review fix. --- src/Processors/Transforms/MergingAggregatedTransform.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index 9b107b70075..78fb2f340bf 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -74,7 +74,9 @@ void MergingAggregatedTransform::addBlock(Block block) /// Fill the last equal range. selector.resize_fill(row, enumerated_groups[last_group]); /// Enumerate new group if did not see it before. - enumerated_groups.emplace(last_group, enumerated_groups.size()); + enumerated_groups.emplace(group, enumerated_groups.size()); + + last_group = group; } /// Optimization for single group. From 9d9ef691968f4d93bc90bad9624af2b3390b98e2 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 26 Aug 2024 15:21:48 +0000 Subject: [PATCH 14/24] Fixing check. --- src/Processors/QueryPlan/MergingAggregatedStep.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.cpp b/src/Processors/QueryPlan/MergingAggregatedStep.cpp index d35c38a4e32..7207b5e6c7f 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.cpp +++ b/src/Processors/QueryPlan/MergingAggregatedStep.cpp @@ -132,10 +132,6 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c if (!memory_efficient_aggregation) { - if (input_streams.front().header.has("__grouping_set")) - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Memory efficient merging of aggregated results is not supported for grouping sets."); - /// We union several sources into one, paralleling the work. pipeline.resize(1); @@ -145,6 +141,9 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c } else { + if (input_streams.front().header.has("__grouping_set")) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Memory efficient merging of aggregated results is not supported for grouping sets."); auto num_merge_threads = memory_efficient_merge_threads ? memory_efficient_merge_threads : max_threads; From c7d0d790e2b37bcd91f5e147d775e656bf3d22a7 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 27 Aug 2024 12:38:37 +0000 Subject: [PATCH 15/24] fix materialized views with optimize_functions_to_subcolumns --- .../Passes/FunctionToSubcolumnsPass.cpp | 10 +++-- .../0_stateless/03230_subcolumns_mv.reference | 1 + .../0_stateless/03230_subcolumns_mv.sql | 37 +++++++++++++++++++ 3 files changed, 45 insertions(+), 3 deletions(-) create mode 100644 tests/queries/0_stateless/03230_subcolumns_mv.reference create mode 100644 tests/queries/0_stateless/03230_subcolumns_mv.sql diff --git a/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp b/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp index 1fc3eec6833..6caf69e3a2c 100644 --- a/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp +++ b/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp @@ -209,7 +209,7 @@ std::map, NodeToSubcolumnTransformer> node_transfor }, }; -std::tuple getTypedNodesForOptimization(const QueryTreeNodePtr & node) +std::tuple getTypedNodesForOptimization(const QueryTreeNodePtr & node, const ContextPtr & context) { auto * function_node = node->as(); if (!function_node) @@ -232,6 +232,10 @@ std::tuple getTypedNodesForOptimizati const auto & storage_snapshot = table_node->getStorageSnapshot(); auto column = first_argument_column_node->getColumn(); + auto view_source = context->getViewSource(); + if (view_source && view_source->getStorageID().getFullNameNotQuoted() == storage->getStorageID().getFullNameNotQuoted()) + return {}; + if (!storage->supportsOptimizationToSubcolumns() || storage->isVirtualColumn(column.name, storage_snapshot->metadata)) return {}; @@ -266,7 +270,7 @@ public: return; } - auto [function_node, first_argument_node, table_node] = getTypedNodesForOptimization(node); + auto [function_node, first_argument_node, table_node] = getTypedNodesForOptimization(node, getContext()); if (function_node && first_argument_node && table_node) { enterImpl(*function_node, *first_argument_node, *table_node); @@ -416,7 +420,7 @@ public: if (!getSettings().optimize_functions_to_subcolumns) return; - auto [function_node, first_argument_column_node, table_node] = getTypedNodesForOptimization(node); + auto [function_node, first_argument_column_node, table_node] = getTypedNodesForOptimization(node, getContext()); if (!function_node || !first_argument_column_node || !table_node) return; diff --git a/tests/queries/0_stateless/03230_subcolumns_mv.reference b/tests/queries/0_stateless/03230_subcolumns_mv.reference new file mode 100644 index 00000000000..03528148b49 --- /dev/null +++ b/tests/queries/0_stateless/03230_subcolumns_mv.reference @@ -0,0 +1 @@ +['key1','key2'] ['value1','value2'] diff --git a/tests/queries/0_stateless/03230_subcolumns_mv.sql b/tests/queries/0_stateless/03230_subcolumns_mv.sql new file mode 100644 index 00000000000..e2e577f54c1 --- /dev/null +++ b/tests/queries/0_stateless/03230_subcolumns_mv.sql @@ -0,0 +1,37 @@ +DROP TABLE IF EXISTS rawtable; +DROP TABLE IF EXISTS raw_to_attributes_mv; +DROP TABLE IF EXISTS attributes; + +SET optimize_functions_to_subcolumns = 1; + +CREATE TABLE rawtable +( + `Attributes` Map(String, String), +) +ENGINE = MergeTree +ORDER BY tuple(); + +CREATE MATERIALIZED VIEW raw_to_attributes_mv TO attributes +( + `AttributeKeys` Array(String), + `AttributeValues` Array(String) +) +AS SELECT + mapKeys(Attributes) AS AttributeKeys, + mapValues(Attributes) AS AttributeValues +FROM rawtable; + +CREATE TABLE attributes +( + `AttributeKeys` Array(String), + `AttributeValues` Array(String) +) +ENGINE = ReplacingMergeTree +ORDER BY tuple(); + +INSERT INTO rawtable VALUES ({'key1': 'value1', 'key2': 'value2'}); +SELECT * FROM raw_to_attributes_mv ORDER BY AttributeKeys; + +DROP TABLE IF EXISTS rawtable; +DROP TABLE IF EXISTS raw_to_attributes_mv; +DROP TABLE IF EXISTS attributes; From 90cc6199664705c0c0214f60b4cbb246480d372d Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 27 Aug 2024 15:06:43 +0000 Subject: [PATCH 16/24] Better care about grouping keys order for GROUPING SETS. --- src/Interpreters/Aggregator.h | 12 + src/Interpreters/InterpreterSelectQuery.cpp | 51 ++-- src/Planner/Planner.cpp | 1 + src/Processors/QueryPlan/AggregatingStep.cpp | 101 ++++--- src/Processors/QueryPlan/AggregatingStep.h | 19 +- .../QueryPlan/MergingAggregatedStep.cpp | 14 +- .../QueryPlan/MergingAggregatedStep.h | 2 + .../Transforms/MergingAggregatedTransform.cpp | 196 ++++++++++--- .../Transforms/MergingAggregatedTransform.h | 26 +- .../02165_replicated_grouping_sets.reference | 266 ++++++++++++++---- .../02165_replicated_grouping_sets.sql | 23 +- 11 files changed, 517 insertions(+), 194 deletions(-) diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index f4f1e9a1df3..2cb04fc7c51 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -59,6 +59,18 @@ class CompiledAggregateFunctionsHolder; class NativeWriter; struct OutputBlockColumns; +struct GroupingSetsParams +{ + GroupingSetsParams() = default; + + GroupingSetsParams(Names used_keys_, Names missing_keys_) : used_keys(std::move(used_keys_)), missing_keys(std::move(missing_keys_)) { } + + Names used_keys; + Names missing_keys; +}; + +using GroupingSetsParamsList = std::vector; + /** How are "total" values calculated with WITH TOTALS? * (For more details, see TotalsHavingTransform.) * diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 9e5fffac6e4..ca0e84a5267 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -347,6 +347,27 @@ bool shouldIgnoreQuotaAndLimits(const StorageID & table_id) return false; } +GroupingSetsParamsList getAggregatorGroupingSetsParams(const NamesAndTypesLists & aggregation_keys_list, const Names & all_keys) +{ + GroupingSetsParamsList result; + + for (const auto & aggregation_keys : aggregation_keys_list) + { + NameSet keys; + for (const auto & key : aggregation_keys) + keys.insert(key.name); + + Names missing_keys; + for (const auto & key : all_keys) + if (!keys.contains(key)) + missing_keys.push_back(key); + + result.emplace_back(aggregation_keys.getNames(), std::move(missing_keys)); + } + + return result; +} + } InterpreterSelectQuery::InterpreterSelectQuery( @@ -2005,6 +2026,7 @@ static void executeMergeAggregatedImpl( bool has_grouping_sets, const Settings & settings, const NamesAndTypesList & aggregation_keys, + const NamesAndTypesLists & aggregation_keys_list, const AggregateDescriptions & aggregates, bool should_produce_results_in_order_of_bucket_number, SortDescription group_by_sort_description) @@ -2027,10 +2049,12 @@ static void executeMergeAggregatedImpl( */ Aggregator::Params params(keys, aggregates, overflow_row, settings.max_threads, settings.max_block_size, settings.min_hit_rate_to_use_consecutive_keys_optimization); + auto grouping_sets_params = getAggregatorGroupingSetsParams(aggregation_keys_list, keys); auto merging_aggregated = std::make_unique( query_plan.getCurrentDataStream(), params, + grouping_sets_params, final, /// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989) settings.distributed_aggregation_memory_efficient && is_remote_storage && !has_grouping_sets, @@ -2651,30 +2675,6 @@ static Aggregator::Params getAggregatorParams( }; } -static GroupingSetsParamsList getAggregatorGroupingSetsParams(const SelectQueryExpressionAnalyzer & query_analyzer, const Names & all_keys) -{ - GroupingSetsParamsList result; - if (query_analyzer.useGroupingSetKey()) - { - auto const & aggregation_keys_list = query_analyzer.aggregationKeysList(); - - for (const auto & aggregation_keys : aggregation_keys_list) - { - NameSet keys; - for (const auto & key : aggregation_keys) - keys.insert(key.name); - - Names missing_keys; - for (const auto & key : all_keys) - if (!keys.contains(key)) - missing_keys.push_back(key); - - result.emplace_back(aggregation_keys.getNames(), std::move(missing_keys)); - } - } - return result; -} - void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const ActionsAndProjectInputsFlagPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info) { executeExpression(query_plan, expression, "Before GROUP BY"); @@ -2694,7 +2694,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac settings.group_by_two_level_threshold, settings.group_by_two_level_threshold_bytes); - auto grouping_sets_params = getAggregatorGroupingSetsParams(*query_analyzer, keys); + auto grouping_sets_params = getAggregatorGroupingSetsParams(query_analyzer->aggregationKeysList(), keys); SortDescription group_by_sort_description; SortDescription sort_description_for_merging; @@ -2762,6 +2762,7 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool has_grouping_sets, context->getSettingsRef(), query_analyzer->aggregationKeys(), + query_analyzer->aggregationKeysList(), query_analyzer->aggregates(), should_produce_results_in_order_of_bucket_number, std::move(group_by_sort_description)); diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp index c0efed8550f..7b5101c5c7d 100644 --- a/src/Planner/Planner.cpp +++ b/src/Planner/Planner.cpp @@ -528,6 +528,7 @@ void addMergingAggregatedStep(QueryPlan & query_plan, auto merging_aggregated = std::make_unique( query_plan.getCurrentDataStream(), params, + aggregation_analysis_result.grouping_sets_parameters_list, query_analysis_result.aggregate_final, /// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989) settings.distributed_aggregation_memory_efficient && (is_remote_storage || parallel_replicas_from_merge_tree) && !query_analysis_result.aggregation_with_rollup_or_cube_or_grouping_sets, diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index 8a5ed7fde65..a4d707704b1 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -151,6 +151,61 @@ void AggregatingStep::applyOrder(SortDescription sort_description_for_merging_, explicit_sorting_required_for_aggregation_in_order = false; } +ActionsDAG AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG( + const Block & in_header, + const Block & out_header, + const GroupingSetsParamsList & grouping_sets_params, + UInt64 group, + bool group_by_use_nulls) +{ + /// Here we create a DAG which fills missing keys and adds `__grouping_set` column + ActionsDAG dag(in_header.getColumnsWithTypeAndName()); + ActionsDAG::NodeRawConstPtrs outputs; + outputs.reserve(out_header.columns() + 1); + + auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, group), 0); + const auto * grouping_node = &dag.addColumn( + {ColumnPtr(std::move(grouping_col)), std::make_shared(), "__grouping_set"}); + + grouping_node = &dag.materializeNode(*grouping_node); + outputs.push_back(grouping_node); + + const auto & missing_columns = grouping_sets_params[group].missing_keys; + const auto & used_keys = grouping_sets_params[group].used_keys; + + auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr); + for (size_t i = 0; i < out_header.columns(); ++i) + { + const auto & col = out_header.getByPosition(i); + const auto missing_it = std::find_if( + missing_columns.begin(), missing_columns.end(), [&](const auto & missing_col) { return missing_col == col.name; }); + const auto used_it = std::find_if( + used_keys.begin(), used_keys.end(), [&](const auto & used_col) { return used_col == col.name; }); + if (missing_it != missing_columns.end()) + { + auto column_with_default = col.column->cloneEmpty(); + col.type->insertDefaultInto(*column_with_default); + column_with_default->finalize(); + + auto column = ColumnConst::create(std::move(column_with_default), 0); + const auto * node = &dag.addColumn({ColumnPtr(std::move(column)), col.type, col.name}); + node = &dag.materializeNode(*node); + outputs.push_back(node); + } + else + { + const auto * column_node = dag.getOutputs()[in_header.getPositionByName(col.name)]; + if (used_it != used_keys.end() && group_by_use_nulls && column_node->result_type->canBeInsideNullable()) + outputs.push_back(&dag.addFunction(to_nullable_function, { column_node }, col.name)); + else + outputs.push_back(column_node); + } + } + + dag.getOutputs().swap(outputs); + return dag; +} + void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) { QueryPipelineProcessorsCollector collector(pipeline, this); @@ -300,51 +355,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B { const auto & header = ports[set_counter]->getHeader(); - /// Here we create a DAG which fills missing keys and adds `__grouping_set` column - ActionsDAG dag(header.getColumnsWithTypeAndName()); - ActionsDAG::NodeRawConstPtrs outputs; - outputs.reserve(output_header.columns() + 1); - - auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, set_counter), 0); - const auto * grouping_node = &dag.addColumn( - {ColumnPtr(std::move(grouping_col)), std::make_shared(), "__grouping_set"}); - - grouping_node = &dag.materializeNode(*grouping_node); - outputs.push_back(grouping_node); - - const auto & missing_columns = grouping_sets_params[set_counter].missing_keys; - const auto & used_keys = grouping_sets_params[set_counter].used_keys; - - auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr); - for (size_t i = 0; i < output_header.columns(); ++i) - { - auto & col = output_header.getByPosition(i); - const auto missing_it = std::find_if( - missing_columns.begin(), missing_columns.end(), [&](const auto & missing_col) { return missing_col == col.name; }); - const auto used_it = std::find_if( - used_keys.begin(), used_keys.end(), [&](const auto & used_col) { return used_col == col.name; }); - if (missing_it != missing_columns.end()) - { - auto column_with_default = col.column->cloneEmpty(); - col.type->insertDefaultInto(*column_with_default); - column_with_default->finalize(); - - auto column = ColumnConst::create(std::move(column_with_default), 0); - const auto * node = &dag.addColumn({ColumnPtr(std::move(column)), col.type, col.name}); - node = &dag.materializeNode(*node); - outputs.push_back(node); - } - else - { - const auto * column_node = dag.getOutputs()[header.getPositionByName(col.name)]; - if (used_it != used_keys.end() && group_by_use_nulls && column_node->result_type->canBeInsideNullable()) - outputs.push_back(&dag.addFunction(to_nullable_function, { column_node }, col.name)); - else - outputs.push_back(column_node); - } - } - - dag.getOutputs().swap(outputs); + auto dag = makeCreatingMissingKeysForGroupingSetDAG(header, output_header, grouping_sets_params, set_counter, group_by_use_nulls); auto expression = std::make_shared(std::move(dag), settings.getActionsSettings()); auto transform = std::make_shared(header, expression); diff --git a/src/Processors/QueryPlan/AggregatingStep.h b/src/Processors/QueryPlan/AggregatingStep.h index ae43295024a..4e4078047f1 100644 --- a/src/Processors/QueryPlan/AggregatingStep.h +++ b/src/Processors/QueryPlan/AggregatingStep.h @@ -7,18 +7,6 @@ namespace DB { -struct GroupingSetsParams -{ - GroupingSetsParams() = default; - - GroupingSetsParams(Names used_keys_, Names missing_keys_) : used_keys(std::move(used_keys_)), missing_keys(std::move(missing_keys_)) { } - - Names used_keys; - Names missing_keys; -}; - -using GroupingSetsParamsList = std::vector; - Block appendGroupingSetColumn(Block header); Block generateOutputHeader(const Block & input_header, const Names & keys, bool use_nulls); @@ -77,6 +65,13 @@ public: /// Argument input_stream would be the second input (from projection). std::unique_ptr convertToAggregatingProjection(const DataStream & input_stream) const; + static ActionsDAG makeCreatingMissingKeysForGroupingSetDAG( + const Block & in_header, + const Block & out_header, + const GroupingSetsParamsList & grouping_sets_params, + UInt64 group, + bool group_by_use_nulls); + private: void updateOutputStream() override; diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.cpp b/src/Processors/QueryPlan/MergingAggregatedStep.cpp index 7207b5e6c7f..f3eb352faac 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.cpp +++ b/src/Processors/QueryPlan/MergingAggregatedStep.cpp @@ -42,6 +42,7 @@ static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_ MergingAggregatedStep::MergingAggregatedStep( const DataStream & input_stream_, Aggregator::Params params_, + GroupingSetsParamsList grouping_sets_params_, bool final_, bool memory_efficient_aggregation_, size_t max_threads_, @@ -56,6 +57,7 @@ MergingAggregatedStep::MergingAggregatedStep( MergingAggregatedTransform::appendGroupingIfNeeded(input_stream_.header, params_.getHeader(input_stream_.header, final_)), getTraits(should_produce_results_in_order_of_bucket_number_)) , params(std::move(params_)) + , grouping_sets_params(std::move(grouping_sets_params_)) , final(final_) , memory_efficient_aggregation(memory_efficient_aggregation_) , max_threads(max_threads_) @@ -94,14 +96,13 @@ void MergingAggregatedStep::applyOrder(SortDescription sort_description, DataStr void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { - auto transform_params = std::make_shared(pipeline.getHeader(), std::move(params), final); - if (memoryBoundMergingWillBeUsed()) { - if (input_streams.front().header.has("__grouping_set")) + if (input_streams.front().header.has("__grouping_set") || !grouping_sets_params.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Memory bound merging of aggregated results is not supported for grouping sets."); + auto transform_params = std::make_shared(pipeline.getHeader(), std::move(params), final); auto transform = std::make_shared( pipeline.getHeader(), pipeline.getNumStreams(), @@ -136,18 +137,19 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c pipeline.resize(1); /// Now merge the aggregated blocks - pipeline.addSimpleTransform([&](const Block & header) - { return std::make_shared(header, transform_params, max_threads); }); + auto transform = std::make_shared(pipeline.getHeader(), params, final, grouping_sets_params, max_threads); + pipeline.addTransform(std::move(transform)); } else { - if (input_streams.front().header.has("__grouping_set")) + if (input_streams.front().header.has("__grouping_set") || !grouping_sets_params.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Memory efficient merging of aggregated results is not supported for grouping sets."); auto num_merge_threads = memory_efficient_merge_threads ? memory_efficient_merge_threads : max_threads; + auto transform_params = std::make_shared(pipeline.getHeader(), std::move(params), final); pipeline.addMergingAggregatedMemoryEfficientTransform(transform_params, num_merge_threads); } diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.h b/src/Processors/QueryPlan/MergingAggregatedStep.h index 654f794d5f5..5c3842a6c33 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.h +++ b/src/Processors/QueryPlan/MergingAggregatedStep.h @@ -16,6 +16,7 @@ public: MergingAggregatedStep( const DataStream & input_stream_, Aggregator::Params params_, + GroupingSetsParamsList grouping_sets_params_, bool final_, bool memory_efficient_aggregation_, size_t max_threads_, @@ -43,6 +44,7 @@ private: Aggregator::Params params; + GroupingSetsParamsList grouping_sets_params; bool final; bool memory_efficient_aggregation; size_t max_threads; diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index 78fb2f340bf..cf383cfcf9d 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -1,7 +1,9 @@ #include #include #include +#include #include +#include #include namespace DB @@ -23,19 +25,93 @@ Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header return out_header; } +MergingAggregatedTransform::~MergingAggregatedTransform() = default; + MergingAggregatedTransform::MergingAggregatedTransform( - Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_) - : IAccumulatingTransform(header_, appendGroupingIfNeeded(header_, params_->getHeader())) - , params(std::move(params_)), max_threads(max_threads_), has_grouping_sets(header_.has("__grouping_set")) + Block header_, + Aggregator::Params params, + bool final, + GroupingSetsParamsList grouping_sets_params, + size_t max_threads_) + : IAccumulatingTransform(header_, appendGroupingIfNeeded(header_, params.getHeader(header_, final))) + , max_threads(max_threads_) { + if (!grouping_sets_params.empty()) + { + if (!header_.has("__grouping_set")) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Cannot find __grouping_set column in header of MergingAggregatedTransform with grouping sets." + "Header {}", header_.dumpStructure()); + + auto in_header = header_; + in_header.erase(header_.getPositionByName("__grouping_set")); + auto out_header = params.getHeader(header_, final); + + grouping_sets.reserve(grouping_sets_params.size()); + for (const auto & grouping_set_params : grouping_sets_params) + { + size_t group = grouping_sets.size(); + + ActionsDAG reordering(in_header.getColumnsWithTypeAndName()); + auto & outputs = reordering.getOutputs(); + ActionsDAG::NodeRawConstPtrs new_outputs; + new_outputs.reserve(in_header.columns() + grouping_set_params.used_keys.size() - grouping_set_params.used_keys.size()); + + std::unordered_map index; + for (size_t pos = 0; pos < outputs.size(); ++pos) + index.emplace(outputs[pos]->result_name, pos); + + for (const auto & used_name : grouping_set_params.used_keys) + { + auto & idx = index[used_name]; + new_outputs.push_back(outputs[idx]); + } + + for (const auto & used_name : grouping_set_params.used_keys) + index[used_name] = outputs.size(); + for (const auto & missing_name : grouping_set_params.missing_keys) + index[missing_name] = outputs.size(); + + for (const auto * output : outputs) + { + if (index[output->result_name] != outputs.size()) + new_outputs.push_back(output); + } + + outputs.swap(new_outputs); + + Aggregator::Params set_params(grouping_set_params.used_keys, + params.aggregates, + params.overflow_row, + params.max_threads, + params.max_block_size, + params.min_hit_rate_to_use_consecutive_keys_optimization); + + auto transform_params = std::make_shared(reordering.updateHeader(in_header), std::move(set_params), final); + + auto creating = AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG( + transform_params->getHeader(), + out_header, + grouping_sets_params, group, false); + + auto & groupiung_set = grouping_sets.emplace_back(); + groupiung_set.reordering_key_columns_actions = std::make_shared(std::move(reordering)); + groupiung_set.creating_missing_keys_actions = std::make_shared(std::move(creating)); + groupiung_set.params = std::move(transform_params); + } + } + else + { + auto & groupiung_set = grouping_sets.emplace_back(); + groupiung_set.params = std::make_shared(header_, std::move(params), final); + } } void MergingAggregatedTransform::addBlock(Block block) { - if (!has_grouping_sets) + if (grouping_sets.size() == 1) { - auto & bucket_to_blocks = grouping_sets[0]; - bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); + grouping_sets[0].bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); return; } @@ -49,13 +125,12 @@ void MergingAggregatedTransform::addBlock(Block block) if (!grouping_column_typed) throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected UInt64 column for __grouping_set, got {}", grouping_column->getName()); - /// Enumerate groups and fill the selector. - std::map enumerated_groups; IColumn::Selector selector; const auto & grouping_data = grouping_column_typed->getData(); size_t num_rows = grouping_data.size(); UInt64 last_group = grouping_data[0]; + UInt64 max_group = last_group; for (size_t row = 1; row < num_rows; ++row) { auto group = grouping_data[row]; @@ -65,32 +140,32 @@ void MergingAggregatedTransform::addBlock(Block block) continue; /// Optimization for single group. - if (enumerated_groups.empty()) - { + if (selector.empty()) selector.reserve(num_rows); - enumerated_groups.emplace(last_group, enumerated_groups.size()); - } /// Fill the last equal range. - selector.resize_fill(row, enumerated_groups[last_group]); - /// Enumerate new group if did not see it before. - enumerated_groups.emplace(group, enumerated_groups.size()); - + selector.resize_fill(row, last_group); last_group = group; + max_group = std::max(last_group, max_group); } + if (max_group >= grouping_sets.size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Invalid group number {}. Number of groups {}.", last_group, grouping_sets.size()); + /// Optimization for single group. - if (enumerated_groups.empty()) + if (selector.empty()) { - auto & bucket_to_blocks = grouping_sets[last_group]; - bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); + auto bucket = block.info.bucket_num; + grouping_sets[last_group].reordering_key_columns_actions->execute(block); + grouping_sets[last_group].bucket_to_blocks[bucket].emplace_back(std::move(block)); return; } /// Fill the last equal range. - selector.resize_fill(num_rows, enumerated_groups[last_group]); + selector.resize_fill(num_rows, last_group); - const size_t num_groups = enumerated_groups.size(); + const size_t num_groups = max_group + 1; Blocks splitted_blocks(num_groups); for (size_t group_id = 0; group_id < num_groups; ++group_id) @@ -104,28 +179,28 @@ void MergingAggregatedTransform::addBlock(Block block) splitted_blocks[group_id].getByPosition(col_idx_in_block).column = std::move(splitted_columns[group_id]); } - for (auto [group, group_id] : enumerated_groups) + for (size_t group = 0; group < num_groups; ++group) { - auto & bucket_to_blocks = grouping_sets[group]; - auto & splitted_block = splitted_blocks[group_id]; + auto & splitted_block = splitted_blocks[group]; splitted_block.info = block.info; - bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(splitted_block)); + grouping_sets[group].reordering_key_columns_actions->execute(splitted_block); + grouping_sets[group].bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(splitted_block)); } } -void MergingAggregatedTransform::appendGroupingColumn(UInt64 group, BlocksList & block_list) -{ - auto grouping_position = getOutputPort().getHeader().getPositionByName("__grouping_set"); - for (auto & block : block_list) - { - auto num_rows = block.rows(); - ColumnWithTypeAndName col; - col.type = std::make_shared(); - col.name = "__grouping_set"; - col.column = ColumnUInt64::create(num_rows, group); - block.insert(grouping_position, std::move(col)); - } -} +// void MergingAggregatedTransform::appendGroupingColumn(UInt64 group, BlocksList & block_list) +// { +// auto grouping_position = getOutputPort().getHeader().getPositionByName("__grouping_set"); +// for (auto & block : block_list) +// { +// auto num_rows = block.rows(); +// ColumnWithTypeAndName col; +// col.type = std::make_shared(); +// col.name = "__grouping_set"; +// col.column = ColumnUInt64::create(num_rows, group); +// block.insert(grouping_position, std::move(col)); +// } +// } void MergingAggregatedTransform::consume(Chunk chunk) { @@ -170,6 +245,25 @@ void MergingAggregatedTransform::consume(Chunk chunk) throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk should have AggregatedChunkInfo in MergingAggregatedTransform."); } +// static void debugBlock(const Block & block) +// { +// std::cerr << block.dumpStructure() << std::endl; +// size_t rows = block.rows(); +// for (size_t row = 0; row < rows; ++row) +// { +// for (size_t col = 0; col < block.columns(); ++col) +// { +// const auto & c = block.getByPosition(col); +// if (c.column->isNumeric()) +// std::cerr << c.column->getUInt(row) << ' '; +// else +// std::cerr << c.column->getDataAt(row).toString() << ' '; + +// } +// std::cerr << std::endl; +// } +// } + Chunk MergingAggregatedTransform::generate() { if (!generate_started) @@ -180,15 +274,31 @@ Chunk MergingAggregatedTransform::generate() /// Exception safety. Make iterator valid in case any method below throws. next_block = blocks.begin(); - for (auto & [group, group_blocks] : grouping_sets) + for (auto & grouping_set : grouping_sets) { - /// TODO: this operation can be made async. Add async for IAccumulatingTransform. + auto & params = grouping_set.params; + auto & bucket_to_blocks = grouping_set.bucket_to_blocks; AggregatedDataVariants data_variants; - params->aggregator.mergeBlocks(std::move(group_blocks), data_variants, max_threads, is_cancelled); + + // std::cerr << "== Group " << group << std::endl; + // for (const auto & [buk, lst] : bucket_to_blocks) + // { + // std::cerr << ".. buk " << buk << std::endl; + // for (const auto & b : lst) + // debugBlock(b); + // } + + /// TODO: this operation can be made async. Add async for IAccumulatingTransform. + params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, is_cancelled); auto merged_blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads); - if (has_grouping_sets) - appendGroupingColumn(group, merged_blocks); + if (grouping_set.creating_missing_keys_actions) + for (auto & block : merged_blocks) + grouping_set.creating_missing_keys_actions->execute(block); + + // std::cerr << "== Merged " << group << std::endl; + // for (const auto & b : merged_blocks) + // debugBlock(b); blocks.splice(blocks.end(), std::move(merged_blocks)); } diff --git a/src/Processors/Transforms/MergingAggregatedTransform.h b/src/Processors/Transforms/MergingAggregatedTransform.h index 1d801f7a94d..3a043ad74b8 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.h +++ b/src/Processors/Transforms/MergingAggregatedTransform.h @@ -6,13 +6,24 @@ namespace DB { +class ExpressionActions; +using ExpressionActionsPtr = std::shared_ptr; + /** A pre-aggregate stream of blocks in which each block is already aggregated. * Aggregate functions in blocks should not be finalized so that their states can be merged. */ class MergingAggregatedTransform : public IAccumulatingTransform { public: - MergingAggregatedTransform(Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_); + MergingAggregatedTransform( + Block header_, + Aggregator::Params params_, + bool final_, + GroupingSetsParamsList grouping_sets_params, + size_t max_threads_); + + ~MergingAggregatedTransform() override; + String getName() const override { return "MergingAggregatedTransform"; } static Block appendGroupingIfNeeded(const Block & in_header, Block out_header); @@ -22,13 +33,19 @@ protected: Chunk generate() override; private: - AggregatingTransformParamsPtr params; LoggerPtr log = getLogger("MergingAggregatedTransform"); size_t max_threads; - using GroupingSets = std::unordered_map; + struct GroupingSet + { + Aggregator::BucketToBlocks bucket_to_blocks; + ExpressionActionsPtr reordering_key_columns_actions; + ExpressionActionsPtr creating_missing_keys_actions; + AggregatingTransformParamsPtr params; + }; + + using GroupingSets = std::vector; GroupingSets grouping_sets; - const bool has_grouping_sets; UInt64 total_input_rows = 0; UInt64 total_input_blocks = 0; @@ -40,7 +57,6 @@ private: bool generate_started = false; void addBlock(Block block); - void appendGroupingColumn(UInt64 group, BlocksList & block_list); }; } diff --git a/tests/queries/0_stateless/02165_replicated_grouping_sets.reference b/tests/queries/0_stateless/02165_replicated_grouping_sets.reference index 4589dc7d7a5..31cbf2ad670 100644 --- a/tests/queries/0_stateless/02165_replicated_grouping_sets.reference +++ b/tests/queries/0_stateless/02165_replicated_grouping_sets.reference @@ -11,57 +11,215 @@ 0 6 4 1 10 4 2 14 4 -['.'] -['.','.'] -['.','.','.'] -['.','.','.','.'] -['.','.','.','.','.'] -['.','.','.','.','.','.'] -['.','.','.','.','.','.','.'] -['.','.','.','.','.','.','.','.'] -['.','.','.','.','.','.','.','.','.'] -['.'] -['.'] -['.','.'] -['.','.'] -['.','.','.'] -['.','.','.'] -['.','.','.','.'] -['.','.','.','.'] -['.','.','.','.','.'] -['.','.','.','.','.'] -['.','.','.','.','.','.'] -['.','.','.','.','.','.'] -['.','.','.','.','.','.','.'] -['.','.','.','.','.','.','.'] -['.','.','.','.','.','.','.','.'] -['.','.','.','.','.','.','.','.'] -['.','.','.','.','.','.','.','.','.'] -['.','.','.','.','.','.','.','.','.'] -1 -2 -3 -4 -5 -6 -7 -8 -9 -1 -1 -2 -2 -3 -3 -4 -4 -5 -5 -6 -6 -7 -7 -8 -8 -9 -9 +-- { echo On } + +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +1 ['.'] +2 ['.','.'] +2 ['.','.','.'] +2 ['.','.','.','.'] +2 ['.','.','.','.','.'] +2 ['.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.','.'] +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +1 ['.'] +1 ['.'] +2 ['.','.'] +2 ['.','.'] +2 ['.','.','.'] +2 ['.','.','.'] +2 ['.','.','.','.'] +2 ['.','.','.','.'] +2 ['.','.','.','.','.'] +2 ['.','.','.','.','.'] +2 ['.','.','.','.','.','.'] +2 ['.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.','.'] +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +1 1 +2 2 +2 3 +2 4 +2 5 +2 6 +2 7 +2 8 +2 9 +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +1 1 +1 1 +2 2 +2 2 +2 3 +2 3 +2 4 +2 4 +2 5 +2 5 +2 6 +2 6 +2 7 +2 7 +2 8 +2 8 +2 9 +2 9 +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +1 1 +1 1 +2 2 +2 2 +2 3 +2 3 +2 4 +2 4 +2 5 +2 5 +2 6 +2 6 +2 7 +2 7 +2 8 +2 8 +2 9 +2 9 +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +1 1 +1 1 +1 1 +2 2 +2 2 +2 2 +2 3 +2 3 +2 3 +2 4 +2 4 +2 4 +2 5 +2 5 +2 5 +2 6 +2 6 +2 6 +2 7 +2 7 +2 7 +2 8 +2 8 +2 8 +2 9 +2 9 +2 9 +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +2 ['.'] +2 ['.','.'] +2 ['.','.','.'] +2 ['.','.','.','.'] +2 ['.','.','.','.','.'] +2 ['.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.','.'] +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +2 ['.'] +2 ['.'] +2 ['.','.'] +2 ['.','.'] +2 ['.','.','.'] +2 ['.','.','.'] +2 ['.','.','.','.'] +2 ['.','.','.','.'] +2 ['.','.','.','.','.'] +2 ['.','.','.','.','.'] +2 ['.','.','.','.','.','.'] +2 ['.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.','.'] +2 ['.','.','.','.','.','.','.','.','.'] +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +2 1 +2 2 +2 3 +2 4 +2 5 +2 6 +2 7 +2 8 +2 9 +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +2 1 +2 1 +2 2 +2 2 +2 3 +2 3 +2 4 +2 4 +2 5 +2 5 +2 6 +2 6 +2 7 +2 7 +2 8 +2 8 +2 9 +2 9 +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +2 1 +2 1 +2 2 +2 2 +2 3 +2 3 +2 4 +2 4 +2 5 +2 5 +2 6 +2 6 +2 7 +2 7 +2 8 +2 8 +2 9 +2 9 +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +2 1 +2 1 +2 1 +2 2 +2 2 +2 2 +2 3 +2 3 +2 3 +2 4 +2 4 +2 4 +2 5 +2 5 +2 5 +2 6 +2 6 +2 6 +2 7 +2 7 +2 7 +2 8 +2 8 +2 8 +2 9 +2 9 +2 9 diff --git a/tests/queries/0_stateless/02165_replicated_grouping_sets.sql b/tests/queries/0_stateless/02165_replicated_grouping_sets.sql index 333dab79575..47d4446f348 100644 --- a/tests/queries/0_stateless/02165_replicated_grouping_sets.sql +++ b/tests/queries/0_stateless/02165_replicated_grouping_sets.sql @@ -44,7 +44,22 @@ ORDER BY sum_value ASC, count_value ASC; -SELECT arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; -SELECT arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; -SELECT toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; -SELECT toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +set prefer_localhost_replica = 1; + +-- { echo On } + +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; + +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; + +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; + +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; +SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000; From 2b495e22cdeea4c769c73b7e21f448be2c5ffcae Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 27 Aug 2024 15:38:52 +0000 Subject: [PATCH 17/24] Fixing a test. --- src/Processors/Transforms/MergingAggregatedTransform.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index cf383cfcf9d..dd97364f879 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -111,7 +111,10 @@ void MergingAggregatedTransform::addBlock(Block block) { if (grouping_sets.size() == 1) { - grouping_sets[0].bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); + auto bucket = block.info.bucket_num; + if (grouping_sets[0].reordering_key_columns_actions) + grouping_sets[0].reordering_key_columns_actions->execute(block); + grouping_sets[0].bucket_to_blocks[bucket].emplace_back(std::move(block)); return; } From ac91471042ebac5fc5467aef9efe806124293f1a Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 27 Aug 2024 16:06:12 +0000 Subject: [PATCH 18/24] add comment --- src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp b/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp index 6caf69e3a2c..6f1c3937880 100644 --- a/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp +++ b/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp @@ -232,6 +232,8 @@ std::tuple getTypedNodesForOptimizati const auto & storage_snapshot = table_node->getStorageSnapshot(); auto column = first_argument_column_node->getColumn(); + /// If view source is set we cannot optimize because it doesn't support moving functions to subcolumns. + /// The storage is replaced to the view source but it happens only after building a query tree and applying passes. auto view_source = context->getViewSource(); if (view_source && view_source->getStorageID().getFullNameNotQuoted() == storage->getStorageID().getFullNameNotQuoted()) return {}; From a7584bbb80c68917702a31492a5024faf7c2aaf4 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 27 Aug 2024 16:07:31 +0000 Subject: [PATCH 19/24] Remove comments. --- .../Transforms/MergingAggregatedTransform.cpp | 45 ------------------- 1 file changed, 45 deletions(-) diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index dd97364f879..edd544fb6af 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -191,20 +191,6 @@ void MergingAggregatedTransform::addBlock(Block block) } } -// void MergingAggregatedTransform::appendGroupingColumn(UInt64 group, BlocksList & block_list) -// { -// auto grouping_position = getOutputPort().getHeader().getPositionByName("__grouping_set"); -// for (auto & block : block_list) -// { -// auto num_rows = block.rows(); -// ColumnWithTypeAndName col; -// col.type = std::make_shared(); -// col.name = "__grouping_set"; -// col.column = ColumnUInt64::create(num_rows, group); -// block.insert(grouping_position, std::move(col)); -// } -// } - void MergingAggregatedTransform::consume(Chunk chunk) { if (!consume_started) @@ -248,25 +234,6 @@ void MergingAggregatedTransform::consume(Chunk chunk) throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk should have AggregatedChunkInfo in MergingAggregatedTransform."); } -// static void debugBlock(const Block & block) -// { -// std::cerr << block.dumpStructure() << std::endl; -// size_t rows = block.rows(); -// for (size_t row = 0; row < rows; ++row) -// { -// for (size_t col = 0; col < block.columns(); ++col) -// { -// const auto & c = block.getByPosition(col); -// if (c.column->isNumeric()) -// std::cerr << c.column->getUInt(row) << ' '; -// else -// std::cerr << c.column->getDataAt(row).toString() << ' '; - -// } -// std::cerr << std::endl; -// } -// } - Chunk MergingAggregatedTransform::generate() { if (!generate_started) @@ -283,14 +250,6 @@ Chunk MergingAggregatedTransform::generate() auto & bucket_to_blocks = grouping_set.bucket_to_blocks; AggregatedDataVariants data_variants; - // std::cerr << "== Group " << group << std::endl; - // for (const auto & [buk, lst] : bucket_to_blocks) - // { - // std::cerr << ".. buk " << buk << std::endl; - // for (const auto & b : lst) - // debugBlock(b); - // } - /// TODO: this operation can be made async. Add async for IAccumulatingTransform. params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, is_cancelled); auto merged_blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads); @@ -299,10 +258,6 @@ Chunk MergingAggregatedTransform::generate() for (auto & block : merged_blocks) grouping_set.creating_missing_keys_actions->execute(block); - // std::cerr << "== Merged " << group << std::endl; - // for (const auto & b : merged_blocks) - // debugBlock(b); - blocks.splice(blocks.end(), std::move(merged_blocks)); } From a1517cb9d6598c6ae7cfef5d574702966ea244a9 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 27 Aug 2024 16:56:03 +0000 Subject: [PATCH 20/24] Refactor a bit and add a comment. --- .../Transforms/MergingAggregatedTransform.cpp | 64 +++++++++++-------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/src/Processors/Transforms/MergingAggregatedTransform.cpp b/src/Processors/Transforms/MergingAggregatedTransform.cpp index edd544fb6af..9b76acb8081 100644 --- a/src/Processors/Transforms/MergingAggregatedTransform.cpp +++ b/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -25,6 +25,42 @@ Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header return out_header; } +/// We should keep the order for GROUPING SET keys. +/// Initiator creates a separate Aggregator for every group, so should we do here. +/// Otherwise, two-level aggregation will split the data into different buckets, +/// and the result may have duplicating rows. +static ActionsDAG makeReorderingActions(const Block & in_header, const GroupingSetsParams & params) +{ + ActionsDAG reordering(in_header.getColumnsWithTypeAndName()); + auto & outputs = reordering.getOutputs(); + ActionsDAG::NodeRawConstPtrs new_outputs; + new_outputs.reserve(in_header.columns() + params.used_keys.size() - params.used_keys.size()); + + std::unordered_map index; + for (size_t pos = 0; pos < outputs.size(); ++pos) + index.emplace(outputs[pos]->result_name, pos); + + for (const auto & used_name : params.used_keys) + { + auto & idx = index[used_name]; + new_outputs.push_back(outputs[idx]); + } + + for (const auto & used_name : params.used_keys) + index[used_name] = outputs.size(); + for (const auto & missing_name : params.missing_keys) + index[missing_name] = outputs.size(); + + for (const auto * output : outputs) + { + if (index[output->result_name] != outputs.size()) + new_outputs.push_back(output); + } + + outputs.swap(new_outputs); + return reordering; +} + MergingAggregatedTransform::~MergingAggregatedTransform() = default; MergingAggregatedTransform::MergingAggregatedTransform( @@ -52,33 +88,7 @@ MergingAggregatedTransform::MergingAggregatedTransform( { size_t group = grouping_sets.size(); - ActionsDAG reordering(in_header.getColumnsWithTypeAndName()); - auto & outputs = reordering.getOutputs(); - ActionsDAG::NodeRawConstPtrs new_outputs; - new_outputs.reserve(in_header.columns() + grouping_set_params.used_keys.size() - grouping_set_params.used_keys.size()); - - std::unordered_map index; - for (size_t pos = 0; pos < outputs.size(); ++pos) - index.emplace(outputs[pos]->result_name, pos); - - for (const auto & used_name : grouping_set_params.used_keys) - { - auto & idx = index[used_name]; - new_outputs.push_back(outputs[idx]); - } - - for (const auto & used_name : grouping_set_params.used_keys) - index[used_name] = outputs.size(); - for (const auto & missing_name : grouping_set_params.missing_keys) - index[missing_name] = outputs.size(); - - for (const auto * output : outputs) - { - if (index[output->result_name] != outputs.size()) - new_outputs.push_back(output); - } - - outputs.swap(new_outputs); + auto reordering = makeReorderingActions(in_header, grouping_set_params); Aggregator::Params set_params(grouping_set_params.used_keys, params.aggregates, From d6127d5f4966b7fad22073e26ee466d654b7529b Mon Sep 17 00:00:00 2001 From: Konstantin Smirnov <46676677+konnectr@users.noreply.github.com> Date: Wed, 28 Aug 2024 22:30:12 +0500 Subject: [PATCH 21/24] add ON CLUSTER --- docs/ru/sql-reference/statements/system.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ru/sql-reference/statements/system.md b/docs/ru/sql-reference/statements/system.md index 3e7d67d90ff..d17e5acd693 100644 --- a/docs/ru/sql-reference/statements/system.md +++ b/docs/ru/sql-reference/statements/system.md @@ -280,7 +280,7 @@ SYSTEM START REPLICATION QUEUES [ON CLUSTER cluster_name] [[db.]replicated_merge Ждет когда таблица семейства `ReplicatedMergeTree` будет синхронизирована с другими репликами в кластере, но не более `receive_timeout` секунд: ``` sql -SYSTEM SYNC REPLICA [db.]replicated_merge_tree_family_table_name [STRICT | LIGHTWEIGHT [FROM 'srcReplica1'[, 'srcReplica2'[, ...]]] | PULL] +SYSTEM SYNC REPLICA [ON CLUSTER cluster_name] [db.]replicated_merge_tree_family_table_name [STRICT | LIGHTWEIGHT [FROM 'srcReplica1'[, 'srcReplica2'[, ...]]] | PULL] ``` После выполнения этого запроса таблица `[db.]replicated_merge_tree_family_table_name` загружает команды из общего реплицированного лога в свою собственную очередь репликации. Затем запрос ждет, пока реплика не обработает все загруженные команды. Поддерживаются следующие модификаторы: From cd9be01c65767ff88131c110b5a2daf0a663034e Mon Sep 17 00:00:00 2001 From: Aleksa Cukovic Date: Thu, 29 Aug 2024 14:40:38 +0200 Subject: [PATCH 22/24] Fix row policy documentation grammar --- .../statements/create/row-policy.md | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/docs/en/sql-reference/statements/create/row-policy.md b/docs/en/sql-reference/statements/create/row-policy.md index cd7718793bd..8be766710fd 100644 --- a/docs/en/sql-reference/statements/create/row-policy.md +++ b/docs/en/sql-reference/statements/create/row-policy.md @@ -8,7 +8,7 @@ title: "CREATE ROW POLICY" Creates a [row policy](../../../guides/sre/user-management/index.md#row-policy-management), i.e. a filter used to determine which rows a user can read from a table. :::tip -Row policies makes sense only for users with readonly access. If user can modify table or copy partitions between tables, it defeats the restrictions of row policies. +Row policies make sense only for users with readonly access. If a user can modify a table or copy partitions between tables, it defeats the restrictions of row policies. ::: Syntax: @@ -24,40 +24,40 @@ CREATE [ROW] POLICY [IF NOT EXISTS | OR REPLACE] policy_name1 [ON CLUSTER cluste ## USING Clause -Allows to specify a condition to filter rows. An user will see a row if the condition is calculated to non-zero for the row. +Allows specifying a condition to filter rows. A user will see a row if the condition is calculated to non-zero for the row. ## TO Clause -In the section `TO` you can provide a list of users and roles this policy should work for. For example, `CREATE ROW POLICY ... TO accountant, john@localhost`. +In the `TO` section you can provide a list of users and roles this policy should work for. For example, `CREATE ROW POLICY ... TO accountant, john@localhost`. -Keyword `ALL` means all the ClickHouse users including current user. Keyword `ALL EXCEPT` allow to exclude some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost` +Keyword `ALL` means all the ClickHouse users, including current user. Keyword `ALL EXCEPT` allows excluding some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost` :::note -If there are no row policies defined for a table then any user can `SELECT` all the row from the table. Defining one or more row policies for the table makes the access to the table depending on the row policies no matter if those row policies are defined for the current user or not. For example, the following policy +If there are no row policies defined for a table, then any user can `SELECT` all the rows from the table. Defining one or more row policies for the table makes access to the table dependent on the row policies, no matter if those row policies are defined for the current user or not. For example, the following policy: `CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter` -forbids the users `mira` and `peter` to see the rows with `b != 1`, and any non-mentioned user (e.g., the user `paul`) will see no rows from `mydb.table1` at all. +forbids the users `mira` and `peter` from seeing the rows with `b != 1`, and any non-mentioned user (e.g., the user `paul`) will see no rows from `mydb.table1` at all. -If that's not desirable it can't be fixed by adding one more row policy, like the following: +If that's not desirable, it can be fixed by adding one more row policy, like the following: `CREATE ROW POLICY pol2 ON mydb.table1 USING 1 TO ALL EXCEPT mira, peter` ::: ## AS Clause -It's allowed to have more than one policy enabled on the same table for the same user at the one time. So we need a way to combine the conditions from multiple policies. +It's allowed to have more than one policy enabled on the same table for the same user at one time. So we need a way to combine the conditions from multiple policies. -By default policies are combined using the boolean `OR` operator. For example, the following policies +By default, policies are combined using the boolean `OR` operator. For example, the following policies: ``` sql CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 TO peter, antonio ``` -enables the user `peter` to see rows with either `b=1` or `c=2`. +enable the user `peter` to see rows with either `b=1` or `c=2`. -The `AS` clause specifies how policies should be combined with other policies. Policies can be either permissive or restrictive. By default policies are permissive, which means they are combined using the boolean `OR` operator. +The `AS` clause specifies how policies should be combined with other policies. Policies can be either permissive or restrictive. By default, policies are permissive, which means they are combined using the boolean `OR` operator. A policy can be defined as restrictive as an alternative. Restrictive policies are combined using the boolean `AND` operator. @@ -68,25 +68,25 @@ row_is_visible = (one or more of the permissive policies' conditions are non-zer (all of the restrictive policies's conditions are non-zero) ``` -For example, the following policies +For example, the following policies: ``` sql CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio ``` -enables the user `peter` to see rows only if both `b=1` AND `c=2`. +enable the user `peter` to see rows only if both `b=1` AND `c=2`. Database policies are combined with table policies. -For example, the following policies +For example, the following policies: ``` sql CREATE ROW POLICY pol1 ON mydb.* USING b=1 TO mira, peter CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio ``` -enables the user `peter` to see table1 rows only if both `b=1` AND `c=2`, although +enable the user `peter` to see table1 rows only if both `b=1` AND `c=2`, although any other table in mydb would have only `b=1` policy applied for the user. From 79c01e717f56132c1af6503b93fc7094dcd47637 Mon Sep 17 00:00:00 2001 From: Kirill Nikiforov Date: Thu, 29 Aug 2024 23:15:39 +0400 Subject: [PATCH 23/24] rm comment, add docs --- docs/en/sql-reference/statements/alter/partition.md | 2 +- src/Storages/StorageReplicatedMergeTree.cpp | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/en/sql-reference/statements/alter/partition.md b/docs/en/sql-reference/statements/alter/partition.md index 1bb7817364a..11926b2aa08 100644 --- a/docs/en/sql-reference/statements/alter/partition.md +++ b/docs/en/sql-reference/statements/alter/partition.md @@ -351,7 +351,7 @@ ALTER TABLE mt DELETE IN PARTITION ID '2' WHERE p = 2; You can specify the partition expression in `ALTER ... PARTITION` queries in different ways: - As a value from the `partition` column of the `system.parts` table. For example, `ALTER TABLE visits DETACH PARTITION 201901`. -- Using the keyword `ALL`. It can be used only with DROP/DETACH/ATTACH. For example, `ALTER TABLE visits ATTACH PARTITION ALL`. +- Using the keyword `ALL`. It can be used only with DROP/DETACH/ATTACH/ATTACH FROM. For example, `ALTER TABLE visits ATTACH PARTITION ALL`. - As a tuple of expressions or constants that matches (in types) the table partitioning keys tuple. In the case of a single element partitioning key, the expression should be wrapped in the `tuple (...)` function. For example, `ALTER TABLE visits DETACH PARTITION tuple(toYYYYMM(toDate('2019-01-25')))`. - Using the partition ID. Partition ID is a string identifier of the partition (human-readable, if possible) that is used as the names of partitions in the file system and in ZooKeeper. The partition ID must be specified in the `PARTITION ID` clause, in a single quotes. For example, `ALTER TABLE visits DETACH PARTITION ID '201901'`. - In the [ALTER ATTACH PART](#attach-partitionpart) and [DROP DETACHED PART](#drop-detached-partitionpart) query, to specify the name of a part, use string literal with a value from the `name` column of the [system.detached_parts](/docs/en/operations/system-tables/detached_parts.md/#system_tables-detached_parts) table. For example, `ALTER TABLE visits ATTACH PART '201901_1_1_0'`. diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 4915bf9f366..0688b05e2ad 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8240,7 +8240,6 @@ std::unique_ptr StorageReplicatedMergeTree::rep part_checksums.emplace_back(hash_hex); } - //ReplicatedMergeTreeLogEntryData entry; auto entry = std::make_unique(); { auto src_table_id = src_data.getStorageID(); From d65b298268c35b0dd87ddc252e5cae88644c74e4 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 2 Sep 2024 18:08:46 +0200 Subject: [PATCH 24/24] Update src/Storages/StorageReplicatedMergeTree.cpp --- src/Storages/StorageReplicatedMergeTree.cpp | 27 ++++++--------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index f884c25b071..def4d79b5c4 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8058,13 +8058,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom( const bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication || dynamic_cast(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication; - try + std::unique_ptr entries[partitions.size()]; + size_t idx = 0; + for (const auto & partition_id : partitions) { - std::unique_ptr entries[partitions.size()]; - size_t idx = 0; - for (const auto & partition_id : partitions) - { - entries[idx] = replacePartitionFromImpl(watch, + entries[idx] = replacePartitionFromImpl(watch, profile_events_scope, metadata_snapshot, src_data, @@ -8074,22 +8072,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom( zero_copy_enabled, storage_settings_ptr->always_use_copy_instead_of_hardlinks, query_context); - ++idx; - } - - for (const auto & entry : entries) - waitForLogEntryToBeProcessedIfNecessary(*entry, query_context); - - lock2.reset(); - lock1.reset(); + ++idx; } - catch (...) - { - lock2.reset(); - lock1.reset(); - throw; - } + for (const auto & entry : entries) + waitForLogEntryToBeProcessedIfNecessary(*entry, query_context); } std::unique_ptr StorageReplicatedMergeTree::replacePartitionFromImpl(