diff --git a/src/Processors/Chunk.h b/src/Processors/Chunk.h index 398b401c0ff..15d91431b68 100644 --- a/src/Processors/Chunk.h +++ b/src/Processors/Chunk.h @@ -113,6 +113,7 @@ private: using Chunks = std::vector; +/// ChunkOffsets marks offsets of different sub-chunks, which will be used by async inserts. class ChunkOffsets : public ChunkInfo { public: diff --git a/src/Storages/MergeTree/EphemeralLockInZooKeeper.cpp b/src/Storages/MergeTree/EphemeralLockInZooKeeper.cpp index 9f3927c0fa9..f0474e48a75 100644 --- a/src/Storages/MergeTree/EphemeralLockInZooKeeper.cpp +++ b/src/Storages/MergeTree/EphemeralLockInZooKeeper.cpp @@ -13,16 +13,19 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -EphemeralLockInZooKeeper::EphemeralLockInZooKeeper(const String & path_prefix_, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & path_) - : zookeeper(zookeeper_), path_prefix(path_prefix_), path(path_) +EphemeralLockInZooKeeper::EphemeralLockInZooKeeper(const String & path_prefix_, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & path_, const String & conflict_path_) + : zookeeper(zookeeper_), path_prefix(path_prefix_), path(path_), conflict_path(conflict_path_) { - if (path.size() <= path_prefix.size()) + if (conflict_path.empty() && path.size() <= path_prefix.size()) throw Exception("Logical error: name of the main node is shorter than prefix.", ErrorCodes::LOGICAL_ERROR); } +template std::optional createEphemeralLockInZooKeeper( - const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & deduplication_path) + const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const T & deduplication_path) { + constexpr bool async_insert = std::is_same_v>; + String path; if (deduplication_path.empty()) @@ -36,14 +39,40 @@ std::optional createEphemeralLockInZooKeeper( /// Check for duplicates in advance, to avoid superfluous block numbers allocation Coordination::Requests ops; - ops.emplace_back(zkutil::makeCreateRequest(deduplication_path, "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeRemoveRequest(deduplication_path, -1)); + if constexpr (async_insert) + { + for (const auto & single_dedup_path : deduplication_path) + { + ops.emplace_back(zkutil::makeCreateRequest(single_dedup_path, "", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeRemoveRequest(single_dedup_path, -1)); + } + } + else + { + ops.emplace_back(zkutil::makeCreateRequest(deduplication_path, "", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeRemoveRequest(deduplication_path, -1)); + } ops.emplace_back(zkutil::makeCreateRequest(path_prefix_, holder_path, zkutil::CreateMode::EphemeralSequential)); Coordination::Responses responses; Coordination::Error e = zookeeper_->tryMulti(ops, responses); if (e != Coordination::Error::ZOK) { - if (responses[0]->error == Coordination::Error::ZNODEEXISTS) + if constexpr (async_insert) + { + for (size_t i = 0; i < deduplication_path.size(); i++) + { + if (responses[i*2]->error == Coordination::Error::ZNODEEXISTS) + { + const String & failed_op_path = deduplication_path[i]; + LOG_DEBUG( + &Poco::Logger::get("createEphemeralLockInZooKeeper"), + "Deduplication path already exists: deduplication_path={}", + failed_op_path); + return EphemeralLockInZooKeeper{"", nullptr, "", failed_op_path}; + } + } + } + else if (responses[0]->error == Coordination::Error::ZNODEEXISTS) { LOG_DEBUG( &Poco::Logger::get("createEphemeralLockInZooKeeper"), @@ -51,11 +80,8 @@ std::optional createEphemeralLockInZooKeeper( deduplication_path); return {}; } - else - { - zkutil::KeeperMultiException::check(e, ops, responses); // This should always throw the proper exception - throw Exception("Unable to handle error {} when acquiring ephemeral lock in ZK", ErrorCodes::LOGICAL_ERROR); - } + zkutil::KeeperMultiException::check(e, ops, responses); // This should always throw the proper exception + throw Exception("Unable to handle error {} when acquiring ephemeral lock in ZK", ErrorCodes::LOGICAL_ERROR); } path = dynamic_cast(responses.back().get())->path_created; @@ -64,43 +90,6 @@ std::optional createEphemeralLockInZooKeeper( return EphemeralLockInZooKeeper{path_prefix_, zookeeper_, path}; } -std::tuple, std::vector> createEphemeralLockInZooKeeper( - const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const std::vector & deduplication_paths) -{ - /// The /abandonable_lock- name is for backward compatibility. - String holder_path_prefix = temp_path + "/abandonable_lock-"; - String holder_path; - - /// Check for duplicates in advance, to avoid superfluous block numbers allocation - Coordination::Requests ops; - for (const auto & deduplication_path : deduplication_paths) - { - ops.emplace_back(zkutil::makeCreateRequest(deduplication_path, "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeRemoveRequest(deduplication_path, -1)); - } - ops.emplace_back(zkutil::makeCreateRequest(holder_path_prefix, "", zkutil::CreateMode::EphemeralSequential)); - Coordination::Responses responses; - Coordination::Error e = zookeeper_->tryMulti(ops, responses); - if (e != Coordination::Error::ZOK) - { - /// TODO we should use some cache to check the conflict in advance. - for (const auto & response: responses) - { - if (response->error == Coordination::Error::ZNODEEXISTS) - { - String failed_op_path = zkutil::KeeperMultiException(e, ops, responses).getPathForFirstFailedOp(); - return std::make_pair(std::nullopt, std::vector({failed_op_path})); - } - } - zkutil::KeeperMultiException::check(e, ops, responses); // This should always throw the proper exception - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unable to handle error {} when acquiring ephemeral lock in ZK", toString(e)); - } - - holder_path = dynamic_cast(responses.back().get())->path_created; - - return std::make_pair(EphemeralLockInZooKeeper{path_prefix_, zookeeper_, holder_path}, std::vector()); -} - void EphemeralLockInZooKeeper::unlock() { Coordination::Requests ops; @@ -230,4 +219,10 @@ EphemeralLocksInAllPartitions::~EphemeralLocksInAllPartitions() } } +template std::optional createEphemeralLockInZooKeeper( + const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & deduplication_path); + +template std::optional createEphemeralLockInZooKeeper>( + const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const std::vector & deduplication_path); + } diff --git a/src/Storages/MergeTree/EphemeralLockInZooKeeper.h b/src/Storages/MergeTree/EphemeralLockInZooKeeper.h index 7e972e387e4..eef0366dc8b 100644 --- a/src/Storages/MergeTree/EphemeralLockInZooKeeper.h +++ b/src/Storages/MergeTree/EphemeralLockInZooKeeper.h @@ -26,13 +26,12 @@ namespace ErrorCodes /// Since 22.11 it creates single ephemeral node with `path_prefix` that references persistent fake "secondary node". class EphemeralLockInZooKeeper : public boost::noncopyable { + template friend std::optional createEphemeralLockInZooKeeper( - const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & deduplication_path); - friend std::tuple, std::vector> createEphemeralLockInZooKeeper( - const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const std::vector & deduplication_path); + const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const T & deduplication_path); protected: - EphemeralLockInZooKeeper(const String & path_prefix_, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & path_); + EphemeralLockInZooKeeper(const String & path_prefix_, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & path_, const String & conflict_path_ = ""); public: EphemeralLockInZooKeeper() = delete; @@ -53,6 +52,7 @@ public: rhs.zookeeper = nullptr; path_prefix = std::move(rhs.path_prefix); path = std::move(rhs.path); + conflict_path = std::move(rhs.conflict_path); return *this; } @@ -67,6 +67,11 @@ public: return path; } + String getConflictPath() const + { + return conflict_path; + } + /// Parse the number at the end of the path. UInt64 getNumber() const { @@ -99,13 +104,12 @@ private: ZooKeeperWithFaultInjectionPtr zookeeper; String path_prefix; String path; + String conflict_path; }; +template std::optional createEphemeralLockInZooKeeper( - const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & deduplication_path); - -std::tuple, std::vector> createEphemeralLockInZooKeeper( - const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const std::vector & deduplication_path); + const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const T & deduplication_path); /// Acquires block number locks in all partitions. class EphemeralLocksInAllPartitions : public boost::noncopyable diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 2f1630950f6..99017392f8f 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -43,7 +43,7 @@ struct ReplicatedMergeTreeSink::DelayedChunk MergeTreeDataWriter::TemporaryPart temp_part; UInt64 elapsed_ns; BlockIDsType block_id; - BlockWithPartition block; + BlockWithPartition block_with_partition; std::unordered_map block_id_to_offset_idx; Partition() = default; @@ -51,7 +51,7 @@ struct ReplicatedMergeTreeSink::DelayedChunk : temp_part(std::move(temp_part_)), elapsed_ns(elapsed_ns_), block_id(std::move(block_id_)), - block(std::move(block_)) + block_with_partition(std::move(block_)) { initBlockIDMap(); } @@ -78,77 +78,105 @@ struct ReplicatedMergeTreeSink::DelayedChunk std::vector partitions; }; -template -inline String toString(const std::vector & vec) +namespace { - String res = "{"; - for (const auto & item : vec) - res += toString(item) + ","; - return res + "}"; -} - -/// remove the conflict parts of block for rewriting again. -void rewriteBlock(Poco::Logger * log, typename ReplicatedMergeTreeSink::DelayedChunk::Partition & partition, const std::vector & block_paths) -{ - std::vector offset_idx; - for (const auto & raw_path : block_paths) + template + inline String toString(const std::vector & vec) { - std::filesystem::path p(raw_path); - String conflict_block_id = p.filename(); - auto it = partition.block_id_to_offset_idx.find(conflict_block_id); - if (it == partition.block_id_to_offset_idx.end()) - throw Exception("unknown conflict path " + conflict_block_id, ErrorCodes::LOGICAL_ERROR); - offset_idx.push_back(it->second); + String res = "{"; + for (const auto & item : vec) + res += DB::toString(item) + ","; + return res + "}"; } - std::sort(offset_idx.begin(), offset_idx.end()); - auto & offsets = partition.block.offsets->offsets; - size_t idx = 0, remove_count = 0; - auto it = offset_idx.begin(); - std::vector new_offsets; - std::vector new_block_ids; - - /// construct filter - size_t rows = partition.block.block.rows(); - auto filter_col = ColumnUInt8::create(rows, 1u); - ColumnUInt8::Container & vec = filter_col->getData(); - UInt8 * pos = vec.data(); - for (auto & offset : offsets) + /// remove the conflict parts of block for rewriting again. + void rewriteBlock(Poco::Logger * log, typename ReplicatedMergeTreeSink::DelayedChunk::Partition & partition, const std::vector & block_paths) { - if (it != offset_idx.end() && *it == idx) + std::vector offset_idx; + for (const auto & raw_path : block_paths) { - size_t start_pos = idx > 0 ? offsets[idx - 1] : 0; - size_t end_pos = offset; - remove_count += end_pos - start_pos; - while (start_pos < end_pos) + std::filesystem::path p(raw_path); + String conflict_block_id = p.filename(); + auto it = partition.block_id_to_offset_idx.find(conflict_block_id); + if (it == partition.block_id_to_offset_idx.end()) + throw Exception("Unknown conflict path " + conflict_block_id, ErrorCodes::LOGICAL_ERROR); + offset_idx.push_back(it->second); + } + std::sort(offset_idx.begin(), offset_idx.end()); + + auto & offsets = partition.block_with_partition.offsets->offsets; + size_t idx = 0, remove_count = 0; + auto it = offset_idx.begin(); + std::vector new_offsets; + std::vector new_block_ids; + + /// construct filter + size_t rows = partition.block_with_partition.block.rows(); + auto filter_col = ColumnUInt8::create(rows, 1u); + ColumnUInt8::Container & vec = filter_col->getData(); + UInt8 * pos = vec.data(); + for (auto & offset : offsets) + { + if (it != offset_idx.end() && *it == idx) { - *(pos + start_pos) = 0; - start_pos ++; + size_t start_pos = idx > 0 ? offsets[idx - 1] : 0; + size_t end_pos = offset; + remove_count += end_pos - start_pos; + while (start_pos < end_pos) + { + *(pos + start_pos) = 0; + start_pos ++; + } + it++; } - it++; + else + { + new_offsets.push_back(offset - remove_count); + new_block_ids.push_back(partition.block_id[idx]); + } + idx++; } - else + + LOG_TRACE(log, "New block IDs: {}, new offsets: {}", toString(new_block_ids), toString(new_offsets)); + + offsets = std::move(new_offsets); + partition.block_id = std::move(new_block_ids); + auto cols = partition.block_with_partition.block.getColumns(); + for (auto & col : cols) { - new_offsets.push_back(offset - remove_count); - new_block_ids.push_back(partition.block_id[idx]); + col = col -> filter(vec, rows - remove_count); } - idx++; + partition.block_with_partition.block.setColumns(cols); + + LOG_TRACE(log, "New block rows {}", partition.block_with_partition.block.rows()); + + partition.initBlockIDMap(); } - LOG_TRACE(log, "New block IDs: {}, new offsets: {}", toString(new_block_ids), toString(new_offsets)); - - offsets = std::move(new_offsets); - partition.block_id = std::move(new_block_ids); - auto cols = partition.block.block.getColumns(); - for (auto & col : cols) + std::vector getHashesForBlocks(BlockWithPartition & block, String partition_id) { - col = col -> filter(vec, rows - remove_count); + size_t start = 0; + auto cols = block.block.getColumns(); + std::vector block_id_vec; + for (auto offset : block.offsets->offsets) + { + SipHash hash; + for (size_t i = start; i < offset; ++i) + for (const auto & col : cols) + col->updateHashWithValue(i, hash); + union + { + char bytes[16]; + UInt64 words[2]; + } hash_value; + hash.get128(hash_value.bytes); + + block_id_vec.push_back(partition_id + "_" + DB::toString(hash_value.words[0]) + "_" + DB::toString(hash_value.words[1])); + + start = offset; + } + return block_id_vec; } - partition.block.block.setColumns(cols); - - LOG_TRACE(log, "New block rows {}", partition.block.block.rows()); - - partition.initBlockIDMap(); } template @@ -263,31 +291,6 @@ size_t ReplicatedMergeTreeSink::checkQuorumPrecondition(const ZooK return replicas_number; } -std::vector getHashesForBlocks(BlockWithPartition & block, String partition_id) -{ - size_t start = 0; - auto cols = block.block.getColumns(); - std::vector block_id_vec; - for (auto offset : block.offsets->offsets) - { - SipHash hash; - for (size_t i = start; i < offset; ++i) - for (const auto & col : cols) - col->updateHashWithValue(i, hash); - union - { - char bytes[16]; - UInt64 words[2]; - } hash_value; - hash.get128(hash_value.bytes); - - block_id_vec.push_back(partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1])); - - start = offset; - } - return block_id_vec; -} - template void ReplicatedMergeTreeSink::consume(Chunk chunk) { @@ -363,7 +366,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk) if constexpr (async_insert) { /// TODO consider insert_deduplication_token - block_id = getBlockIDVec(current_block, temp_part.part->info.partition_id); + block_id = getHashesForBlocks(current_block, temp_part.part->info.partition_id); LOG_TRACE(log, "async insert part, part id {}, block id {}, offsets {}, size {}", temp_part.part->info.partition_id, toString(block_id), toString(current_block.offsets->offsets), current_block.offsets->offsets.size()); } else if (deduplicate) @@ -484,8 +487,8 @@ void ReplicatedMergeTreeSink::finishDelayedChunk(const ZooKeeperWithFaultI rewriteBlock(log, partition, conflict_block_ids); if (partition.block_id.empty()) break; - partition.block.partition = std::move(partition.temp_part.part->partition.value); - partition.temp_part = storage.writer.writeTempPart(partition.block, metadata_snapshot, context); + partition.block_with_partition.partition = std::move(partition.temp_part.part->partition.value); + partition.temp_part = storage.writer.writeTempPart(partition.block_with_partition, metadata_snapshot, context); } } @@ -605,17 +608,11 @@ std::vector ReplicatedMergeTreeSink::commitPart( if constexpr (async_insert) { for (const auto & single_block_id : block_id) - { block_id_path.push_back(storage.zookeeper_path + "/blocks/" + single_block_id); - } } else if (deduplicate_block) block_id_path = storage.zookeeper_path + "/blocks/" + block_id; - std::optional block_number_lock; - if constexpr (async_insert) - std::tie(block_number_lock, conflict_block_ids) = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path); - else - block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path); + auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path); ThreadFuzzer::maybeInjectSleep(); /// Prepare transaction to ZooKeeper @@ -627,6 +624,20 @@ std::vector ReplicatedMergeTreeSink::commitPart( String existing_part_name; if (block_number_lock) { + if constexpr (async_insert) + { + /// The truth is that we always get only one path from block_number_lock. + /// This is a restriction of Keeper. Here I would like to use vector because + /// I wanna keep extensibility for future optimization, for instance, using + /// cache to resolve conflicts in advance. + String conflict_path = block_number_lock->getConflictPath(); + if (!conflict_path.empty()) + { + LOG_TRACE(log, "Cannot get lock, the conflict path is {}", conflict_path); + conflict_block_ids.push_back(conflict_path); + return; + } + } is_already_existing_part = false; block_number = block_number_lock->getNumber(); @@ -715,14 +726,8 @@ std::vector ReplicatedMergeTreeSink::commitPart( quorum_info.host_node_version)); } } - else if constexpr (async_insert) - { - LOG_TRACE(log, "cannot get lock, the conflict block ids are {}", toString(conflict_block_ids)); - if (conflict_block_ids.empty()) - throw Exception("conflict block ids and block number lock should not be empty at the same time", ErrorCodes::LOGICAL_ERROR); - return; - } - else + /// async_insert will never return null lock, because they need the conflict path. + else if constexpr (!async_insert) { is_already_existing_part = true; @@ -776,6 +781,8 @@ std::vector ReplicatedMergeTreeSink::commitPart( /// Do not check for duplicate on commit to ZK. block_id_path.clear(); } + else + throw Exception("Conflict block ids and block number lock should not be empty at the same time for async inserts", ErrorCodes::LOGICAL_ERROR); /// Information about the part. storage.getCommitPartOps(ops, part, block_id_path); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 143634ba7ad..505e0406a5d 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5313,37 +5313,6 @@ bool StorageReplicatedMergeTree::existsNodeCached(const ZooKeeperWithFaultInject return res; } -std::tuple, std::vector> -StorageReplicatedMergeTree::allocateBlockNumber( - const String & partition_id, - const ZooKeeperWithFaultInjectionPtr & zookeeper, - const std::vector & zookeeper_block_id_paths) const -{ - String zookeeper_table_path = zookeeper_path; - - String block_numbers_path = fs::path(zookeeper_table_path) / "block_numbers"; - String partition_path = fs::path(block_numbers_path) / partition_id; - - if (!existsNodeCached(zookeeper, partition_path)) - { - Coordination::Requests ops; - ops.push_back(zkutil::makeCreateRequest(partition_path, "", zkutil::CreateMode::Persistent)); - /// We increment data version of the block_numbers node so that it becomes possible - /// to check in a ZK transaction that the set of partitions didn't change - /// (unfortunately there is no CheckChildren op). - ops.push_back(zkutil::makeSetRequest(block_numbers_path, "", -1)); - - Coordination::Responses responses; - Coordination::Error code = zookeeper->tryMulti(ops, responses); - if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS) - zkutil::KeeperMultiException::check(code, ops, responses); - } - - return createEphemeralLockInZooKeeper( - fs::path(partition_path) / "block-", fs::path(zookeeper_table_path) / "temp", zookeeper, zookeeper_block_id_paths); -} - - std::optional StorageReplicatedMergeTree::allocateBlockNumber( const String & partition_id, const zkutil::ZooKeeperPtr & zookeeper, @@ -5354,11 +5323,11 @@ std::optional StorageReplicatedMergeTree::allocateBloc partition_id, std::make_shared(zookeeper), zookeeper_block_id_path, zookeeper_path_prefix); } - +template std::optional StorageReplicatedMergeTree::allocateBlockNumber( const String & partition_id, const ZooKeeperWithFaultInjectionPtr & zookeeper, - const String & zookeeper_block_id_path, + const T & zookeeper_block_id_path, const String & zookeeper_path_prefix) const { String zookeeper_table_path; @@ -8789,6 +8758,18 @@ void StorageReplicatedMergeTree::attachRestoredParts(MutableDataPartsVector && p sink->writeExistingPart(part); } +template std::optional StorageReplicatedMergeTree::allocateBlockNumber( + const String & partition_id, + const ZooKeeperWithFaultInjectionPtr & zookeeper, + const String & zookeeper_block_id_path, + const String & zookeeper_path_prefix) const; + +template std::optional StorageReplicatedMergeTree::allocateBlockNumber>( + const String & partition_id, + const ZooKeeperWithFaultInjectionPtr & zookeeper, + const std::vector & zookeeper_block_id_path, + const String & zookeeper_path_prefix) const; + #if 0 PartsTemporaryRename renamed_parts(*this, "detached/"); MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 9dc2f29ea9b..e564a31901e 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -732,17 +732,14 @@ private: std::optional allocateBlockNumber( const String & partition_id, const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_block_id_path = "", const String & zookeeper_path_prefix = "") const; + + template std::optional allocateBlockNumber( const String & partition_id, const ZooKeeperWithFaultInjectionPtr & zookeeper, - const String & zookeeper_block_id_path = "", + const T & zookeeper_block_id_path, const String & zookeeper_path_prefix = "") const; - std::tuple, std::vector> allocateBlockNumber( - const String & partition_id, - const ZooKeeperWithFaultInjectionPtr & zookeeper, - const std::vector & zookeeper_block_id_paths) const; - /** Wait until all replicas, including this, execute the specified action from the log. * If replicas are added at the same time, it can not wait the added replica. * diff --git a/tests/queries/0_stateless/02481_async_insert_dedup.python b/tests/queries/0_stateless/02481_async_insert_dedup.python index 5465e7b988c..d2fcc0fabe2 100644 --- a/tests/queries/0_stateless/02481_async_insert_dedup.python +++ b/tests/queries/0_stateless/02481_async_insert_dedup.python @@ -65,7 +65,7 @@ client.query(''' CREATE TABLE t_async_insert_dedup ( EventDate DateTime, KeyID UInt32 -) Engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/t_async_insert_dedup', '{replica}') +) Engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/t_async_insert_dedup', '{replica}') PARTITION BY toYYYYMM(EventDate) ORDER BY (KeyID, EventDate) ''')