address comments

This commit is contained in:
Han Fei 2022-12-07 23:40:52 +01:00
parent 7a3dcd196e
commit 3c11aca040
6 changed files with 49 additions and 51 deletions

View File

@ -59,17 +59,15 @@ std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
{
if constexpr (async_insert)
{
for (size_t i = 0; i < deduplication_path.size(); i++)
auto failed_idx = zkutil::getFailedOpIndex(Coordination::Error::ZNODEEXISTS, responses);
if (failed_idx < deduplication_path.size() * 2)
{
if (responses[i*2]->error == Coordination::Error::ZNODEEXISTS)
{
const String & failed_op_path = deduplication_path[i];
LOG_DEBUG(
&Poco::Logger::get("createEphemeralLockInZooKeeper"),
"Deduplication path already exists: deduplication_path={}",
failed_op_path);
return EphemeralLockInZooKeeper{"", nullptr, "", failed_op_path};
}
const String & failed_op_path = deduplication_path[failed_idx / 2];
LOG_DEBUG(
&Poco::Logger::get("createEphemeralLockInZooKeeper"),
"Deduplication path already exists: deduplication_path={}",
failed_op_path);
return EphemeralLockInZooKeeper{"", nullptr, "", failed_op_path};
}
}
else if (responses[0]->error == Coordination::Error::ZNODEEXISTS)

View File

@ -67,6 +67,8 @@ public:
return path;
}
// In case of async inserts, we try to get locks for multiple inserts and need to know which insert is conflicted.
// That's why we need this function.
String getConflictPath() const
{
return conflict_path;

View File

@ -8,6 +8,7 @@
#include <DataTypes/ObjectUtils.h>
#include <Core/Block.h>
#include <IO/Operators.h>
#include <fmt/core.h>
namespace ProfileEvents
{
@ -36,7 +37,7 @@ namespace ErrorCodes
}
template<bool async_insert>
struct ReplicatedMergeTreeSink<async_insert>::DelayedChunk
struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
{
struct Partition
{
@ -80,24 +81,17 @@ struct ReplicatedMergeTreeSink<async_insert>::DelayedChunk
namespace
{
/// Convert block id vector to string. Output at most 50 rows.
/// Convert block id vector to string. Output at most 50 ids.
template<typename T>
inline String toString(const std::vector<T> & vec)
{
String res = "{";
size_t size = vec.size();
if (size > 50) size = 50;
for (size_t i = 0; i < size; ++i)
{
res += DB::toString(vec[i]);
if (i + 1 < size)
res += ",";
}
return res + "}";
return fmt::format("({})", fmt::join(vec.begin(), vec.begin() + size, ","));
}
/// remove the conflict parts of block for rewriting again.
void rewriteBlock(Poco::Logger * log, typename ReplicatedMergeTreeSink<true>::DelayedChunk::Partition & partition, const std::vector<String> & block_paths)
void rewriteBlock(Poco::Logger * log, typename ReplicatedMergeTreeSinkImpl<true>::DelayedChunk::Partition & partition, const std::vector<String> & block_paths)
{
std::vector<size_t> offset_idx;
for (const auto & raw_path : block_paths)
@ -164,7 +158,7 @@ namespace
{
size_t start = 0;
auto cols = block.block.getColumns();
std::vector<String> block_id_vec;
std::vector<String> block_id_vec(block.offsets->offsets.size());
for (auto offset : block.offsets->offsets)
{
SipHash hash;
@ -187,7 +181,7 @@ namespace
}
template<bool async_insert>
ReplicatedMergeTreeSink<async_insert>::ReplicatedMergeTreeSink(
ReplicatedMergeTreeSinkImpl<async_insert>::ReplicatedMergeTreeSinkImpl(
StorageReplicatedMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot_,
size_t quorum_size,
@ -217,7 +211,7 @@ ReplicatedMergeTreeSink<async_insert>::ReplicatedMergeTreeSink(
}
template<bool async_insert>
ReplicatedMergeTreeSink<async_insert>::~ReplicatedMergeTreeSink() = default;
ReplicatedMergeTreeSinkImpl<async_insert>::~ReplicatedMergeTreeSinkImpl() = default;
/// Allow to verify that the session in ZooKeeper is still alive.
static void assertSessionIsNotExpired(const zkutil::ZooKeeperPtr & zookeeper)
@ -230,7 +224,7 @@ static void assertSessionIsNotExpired(const zkutil::ZooKeeperPtr & zookeeper)
}
template<bool async_insert>
size_t ReplicatedMergeTreeSink<async_insert>::checkQuorumPrecondition(const ZooKeeperWithFaultInjectionPtr & zookeeper)
size_t ReplicatedMergeTreeSinkImpl<async_insert>::checkQuorumPrecondition(const ZooKeeperWithFaultInjectionPtr & zookeeper)
{
if (!isQuorumEnabled())
return 0;
@ -299,7 +293,7 @@ size_t ReplicatedMergeTreeSink<async_insert>::checkQuorumPrecondition(const ZooK
}
template<bool async_insert>
void ReplicatedMergeTreeSink<async_insert>::consume(Chunk chunk)
void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
{
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
@ -349,7 +343,8 @@ void ReplicatedMergeTreeSink<async_insert>::consume(Chunk chunk)
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context, chunk_offsets);
using DelayedPartitions = std::vector<typename ReplicatedMergeTreeSink<async_insert>::DelayedChunk::Partition>;
using DelayedPartition = typename ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk::Partition;
using DelayedPartitions = std::vector<DelayedPartition>;
DelayedPartitions partitions;
size_t streams = 0;
@ -411,7 +406,7 @@ void ReplicatedMergeTreeSink<async_insert>::consume(Chunk chunk)
if (streams > max_insert_delayed_streams_for_parallel_write)
{
finishDelayedChunk(zookeeper);
delayed_chunk = std::make_unique<ReplicatedMergeTreeSink<async_insert>::DelayedChunk>(replicas_num);
delayed_chunk = std::make_unique<ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk>(replicas_num);
delayed_chunk->partitions = std::move(partitions);
finishDelayedChunk(zookeeper);
@ -420,7 +415,7 @@ void ReplicatedMergeTreeSink<async_insert>::consume(Chunk chunk)
partitions = DelayedPartitions{};
}
partitions.emplace_back(typename ReplicatedMergeTreeSink<async_insert>::DelayedChunk::Partition(
partitions.emplace_back(DelayedPartition(
std::move(temp_part),
elapsed_ns,
std::move(block_id),
@ -429,7 +424,7 @@ void ReplicatedMergeTreeSink<async_insert>::consume(Chunk chunk)
}
finishDelayedChunk(zookeeper);
delayed_chunk = std::make_unique<ReplicatedMergeTreeSink::DelayedChunk>();
delayed_chunk = std::make_unique<ReplicatedMergeTreeSinkImpl::DelayedChunk>();
delayed_chunk->partitions = std::move(partitions);
/// If deduplicated data should not be inserted into MV, we need to set proper
@ -441,7 +436,7 @@ void ReplicatedMergeTreeSink<async_insert>::consume(Chunk chunk)
}
template<>
void ReplicatedMergeTreeSink<false>::finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper)
void ReplicatedMergeTreeSinkImpl<false>::finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper)
{
if (!delayed_chunk)
return;
@ -476,7 +471,7 @@ void ReplicatedMergeTreeSink<false>::finishDelayedChunk(const ZooKeeperWithFault
}
template<>
void ReplicatedMergeTreeSink<true>::finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper)
void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper)
{
if (!delayed_chunk)
return;
@ -504,7 +499,7 @@ void ReplicatedMergeTreeSink<true>::finishDelayedChunk(const ZooKeeperWithFaultI
}
template<bool async_insert>
void ReplicatedMergeTreeSink<async_insert>::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
void ReplicatedMergeTreeSinkImpl<async_insert>::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
{
/// NOTE: No delay in this case. That's Ok.
@ -530,7 +525,7 @@ void ReplicatedMergeTreeSink<async_insert>::writeExistingPart(MergeTreeData::Mut
}
template<bool async_insert>
std::vector<String> ReplicatedMergeTreeSink<async_insert>::commitPart(
std::vector<String> ReplicatedMergeTreeSinkImpl<async_insert>::commitPart(
const ZooKeeperWithFaultInjectionPtr & zookeeper,
MergeTreeData::MutableDataPartPtr & part,
const BlockIDsType & block_id,
@ -995,7 +990,7 @@ std::vector<String> ReplicatedMergeTreeSink<async_insert>::commitPart(
}
template<bool async_insert>
void ReplicatedMergeTreeSink<async_insert>::onStart()
void ReplicatedMergeTreeSinkImpl<async_insert>::onStart()
{
/// Only check "too many parts" before write,
/// because interrupting long-running INSERT query in the middle is not convenient for users.
@ -1003,7 +998,7 @@ void ReplicatedMergeTreeSink<async_insert>::onStart()
}
template<bool async_insert>
void ReplicatedMergeTreeSink<async_insert>::onFinish()
void ReplicatedMergeTreeSinkImpl<async_insert>::onFinish()
{
auto zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(zookeeper);
@ -1011,7 +1006,7 @@ void ReplicatedMergeTreeSink<async_insert>::onFinish()
}
template<bool async_insert>
void ReplicatedMergeTreeSink<async_insert>::waitForQuorum(
void ReplicatedMergeTreeSinkImpl<async_insert>::waitForQuorum(
const ZooKeeperWithFaultInjectionPtr & zookeeper,
const std::string & part_name,
const std::string & quorum_path,
@ -1066,7 +1061,7 @@ void ReplicatedMergeTreeSink<async_insert>::waitForQuorum(
}
template<bool async_insert>
String ReplicatedMergeTreeSink<async_insert>::quorumLogMessage(size_t replicas_num) const
String ReplicatedMergeTreeSinkImpl<async_insert>::quorumLogMessage(size_t replicas_num) const
{
if (!isQuorumEnabled())
return "";
@ -1074,7 +1069,7 @@ String ReplicatedMergeTreeSink<async_insert>::quorumLogMessage(size_t replicas_n
}
template<bool async_insert>
size_t ReplicatedMergeTreeSink<async_insert>::getQuorumSize(size_t replicas_num) const
size_t ReplicatedMergeTreeSinkImpl<async_insert>::getQuorumSize(size_t replicas_num) const
{
if (!isQuorumEnabled())
return 0;
@ -1086,12 +1081,12 @@ size_t ReplicatedMergeTreeSink<async_insert>::getQuorumSize(size_t replicas_num)
}
template<bool async_insert>
bool ReplicatedMergeTreeSink<async_insert>::isQuorumEnabled() const
bool ReplicatedMergeTreeSinkImpl<async_insert>::isQuorumEnabled() const
{
return !required_quorum_size.has_value() || required_quorum_size.value() > 1;
}
template class ReplicatedMergeTreeSink<true>;
template class ReplicatedMergeTreeSink<false>;
template class ReplicatedMergeTreeSinkImpl<true>;
template class ReplicatedMergeTreeSinkImpl<false>;
}

View File

@ -29,10 +29,10 @@ using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;
/// identify different async inserts inside the same part. It will remove the duplicate inserts
/// when it encounters lock and retries.
template<bool async_insert>
class ReplicatedMergeTreeSink : public SinkToStorage
class ReplicatedMergeTreeSinkImpl : public SinkToStorage
{
public:
ReplicatedMergeTreeSink(
ReplicatedMergeTreeSinkImpl(
StorageReplicatedMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot_,
size_t quorum_,
@ -46,7 +46,7 @@ public:
// needed to set the special LogEntryType::ATTACH_PART
bool is_attach_ = false);
~ReplicatedMergeTreeSink() override;
~ReplicatedMergeTreeSinkImpl() override;
void onStart() override;
void consume(Chunk chunk) override;
@ -134,4 +134,7 @@ private:
void finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper);
};
using ReplicatedMergeTreeSinkWithAsyncDeduplicate = ReplicatedMergeTreeSinkImpl<true>;
using ReplicatedMergeTreeSink = ReplicatedMergeTreeSinkImpl<false>;
}

View File

@ -4525,7 +4525,7 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con
bool deduplicate = storage_settings_ptr->replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
bool async_deduplicate = query_settings.async_insert && storage_settings_ptr->replicated_deduplication_window_for_async_inserts != 0 && query_settings.insert_deduplicate;
if (async_deduplicate)
return std::make_shared<ReplicatedMergeTreeSink<true>>(
return std::make_shared<ReplicatedMergeTreeSinkWithAsyncDeduplicate>(
*this, metadata_snapshot, query_settings.insert_quorum.valueOr(0),
query_settings.insert_quorum_timeout.totalMilliseconds(),
query_settings.max_partitions_per_insert_block,
@ -4535,7 +4535,7 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con
local_context);
// TODO: should we also somehow pass list of columns to deduplicate on to the ReplicatedMergeTreeSink?
return std::make_shared<ReplicatedMergeTreeSink<false>>(
return std::make_shared<ReplicatedMergeTreeSink>(
*this, metadata_snapshot, query_settings.insert_quorum.valueOr(0),
query_settings.insert_quorum_timeout.totalMilliseconds(),
query_settings.max_partitions_per_insert_block,
@ -5236,7 +5236,7 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
/// TODO Allow to use quorum here.
ReplicatedMergeTreeSink<false> output(*this, metadata_snapshot, 0, 0, 0, false, false, false, query_context,
ReplicatedMergeTreeSink output(*this, metadata_snapshot, 0, 0, 0, false, false, false, query_context,
/*is_attach*/true);
for (size_t i = 0; i < loaded_parts.size(); ++i)
@ -8738,7 +8738,7 @@ void StorageReplicatedMergeTree::restoreDataFromBackup(RestorerFromBackup & rest
void StorageReplicatedMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
auto sink = std::make_shared<ReplicatedMergeTreeSink<false>>(*this, metadata_snapshot, 0, 0, 0, false, false, false, getContext(), /*is_attach*/true);
auto sink = std::make_shared<ReplicatedMergeTreeSink>(*this, metadata_snapshot, 0, 0, 0, false, false, false, getContext(), /*is_attach*/true);
for (auto part : parts)
sink->writeExistingPart(part);
}
@ -8760,7 +8760,7 @@ PartsTemporaryRename renamed_parts(*this, "detached/");
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
/// TODO Allow to use quorum here.
ReplicatedMergeTreeSink<false> output(*this, metadata_snapshot, 0, 0, 0, false, false, query_context,
ReplicatedMergeTreeSink output(*this, metadata_snapshot, 0, 0, 0, false, false, query_context,
/*is_attach*/true);
for (size_t i = 0; i < loaded_parts.size(); ++i)

View File

@ -357,7 +357,7 @@ private:
void clearOldPartsAndRemoveFromZK();
template<bool async_insert>
friend class ReplicatedMergeTreeSink;
friend class ReplicatedMergeTreeSinkImpl;
friend class ReplicatedMergeTreePartCheckThread;
friend class ReplicatedMergeTreeCleanupThread;
friend class ReplicatedMergeTreeAlterThread;