mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 18:12:02 +00:00
address comments
This commit is contained in:
parent
74c9ba8df0
commit
b80a2d6c89
@ -113,6 +113,7 @@ private:
|
||||
|
||||
using Chunks = std::vector<Chunk>;
|
||||
|
||||
/// ChunkOffsets marks offsets of different sub-chunks, which will be used by async inserts.
|
||||
class ChunkOffsets : public ChunkInfo
|
||||
{
|
||||
public:
|
||||
|
@ -13,16 +13,19 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
EphemeralLockInZooKeeper::EphemeralLockInZooKeeper(const String & path_prefix_, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & path_)
|
||||
: zookeeper(zookeeper_), path_prefix(path_prefix_), path(path_)
|
||||
EphemeralLockInZooKeeper::EphemeralLockInZooKeeper(const String & path_prefix_, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & path_, const String & conflict_path_)
|
||||
: zookeeper(zookeeper_), path_prefix(path_prefix_), path(path_), conflict_path(conflict_path_)
|
||||
{
|
||||
if (path.size() <= path_prefix.size())
|
||||
if (conflict_path.empty() && path.size() <= path_prefix.size())
|
||||
throw Exception("Logical error: name of the main node is shorter than prefix.", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
|
||||
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & deduplication_path)
|
||||
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const T & deduplication_path)
|
||||
{
|
||||
constexpr bool async_insert = std::is_same_v<T, std::vector<String>>;
|
||||
|
||||
String path;
|
||||
|
||||
if (deduplication_path.empty())
|
||||
@ -36,14 +39,40 @@ std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
|
||||
|
||||
/// Check for duplicates in advance, to avoid superfluous block numbers allocation
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeCreateRequest(deduplication_path, "", zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(deduplication_path, -1));
|
||||
if constexpr (async_insert)
|
||||
{
|
||||
for (const auto & single_dedup_path : deduplication_path)
|
||||
{
|
||||
ops.emplace_back(zkutil::makeCreateRequest(single_dedup_path, "", zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(single_dedup_path, -1));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ops.emplace_back(zkutil::makeCreateRequest(deduplication_path, "", zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(deduplication_path, -1));
|
||||
}
|
||||
ops.emplace_back(zkutil::makeCreateRequest(path_prefix_, holder_path, zkutil::CreateMode::EphemeralSequential));
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error e = zookeeper_->tryMulti(ops, responses);
|
||||
if (e != Coordination::Error::ZOK)
|
||||
{
|
||||
if (responses[0]->error == Coordination::Error::ZNODEEXISTS)
|
||||
if constexpr (async_insert)
|
||||
{
|
||||
for (size_t i = 0; i < deduplication_path.size(); i++)
|
||||
{
|
||||
if (responses[i*2]->error == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
const String & failed_op_path = deduplication_path[i];
|
||||
LOG_DEBUG(
|
||||
&Poco::Logger::get("createEphemeralLockInZooKeeper"),
|
||||
"Deduplication path already exists: deduplication_path={}",
|
||||
failed_op_path);
|
||||
return EphemeralLockInZooKeeper{"", nullptr, "", failed_op_path};
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (responses[0]->error == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
LOG_DEBUG(
|
||||
&Poco::Logger::get("createEphemeralLockInZooKeeper"),
|
||||
@ -51,11 +80,8 @@ std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
|
||||
deduplication_path);
|
||||
return {};
|
||||
}
|
||||
else
|
||||
{
|
||||
zkutil::KeeperMultiException::check(e, ops, responses); // This should always throw the proper exception
|
||||
throw Exception("Unable to handle error {} when acquiring ephemeral lock in ZK", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
zkutil::KeeperMultiException::check(e, ops, responses); // This should always throw the proper exception
|
||||
throw Exception("Unable to handle error {} when acquiring ephemeral lock in ZK", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
path = dynamic_cast<const Coordination::CreateResponse *>(responses.back().get())->path_created;
|
||||
@ -64,43 +90,6 @@ std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
|
||||
return EphemeralLockInZooKeeper{path_prefix_, zookeeper_, path};
|
||||
}
|
||||
|
||||
std::tuple<std::optional<EphemeralLockInZooKeeper>, std::vector<String>> createEphemeralLockInZooKeeper(
|
||||
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const std::vector<String> & deduplication_paths)
|
||||
{
|
||||
/// The /abandonable_lock- name is for backward compatibility.
|
||||
String holder_path_prefix = temp_path + "/abandonable_lock-";
|
||||
String holder_path;
|
||||
|
||||
/// Check for duplicates in advance, to avoid superfluous block numbers allocation
|
||||
Coordination::Requests ops;
|
||||
for (const auto & deduplication_path : deduplication_paths)
|
||||
{
|
||||
ops.emplace_back(zkutil::makeCreateRequest(deduplication_path, "", zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(deduplication_path, -1));
|
||||
}
|
||||
ops.emplace_back(zkutil::makeCreateRequest(holder_path_prefix, "", zkutil::CreateMode::EphemeralSequential));
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error e = zookeeper_->tryMulti(ops, responses);
|
||||
if (e != Coordination::Error::ZOK)
|
||||
{
|
||||
/// TODO we should use some cache to check the conflict in advance.
|
||||
for (const auto & response: responses)
|
||||
{
|
||||
if (response->error == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
String failed_op_path = zkutil::KeeperMultiException(e, ops, responses).getPathForFirstFailedOp();
|
||||
return std::make_pair(std::nullopt, std::vector<String>({failed_op_path}));
|
||||
}
|
||||
}
|
||||
zkutil::KeeperMultiException::check(e, ops, responses); // This should always throw the proper exception
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unable to handle error {} when acquiring ephemeral lock in ZK", toString(e));
|
||||
}
|
||||
|
||||
holder_path = dynamic_cast<const Coordination::CreateResponse *>(responses.back().get())->path_created;
|
||||
|
||||
return std::make_pair(EphemeralLockInZooKeeper{path_prefix_, zookeeper_, holder_path}, std::vector<String>());
|
||||
}
|
||||
|
||||
void EphemeralLockInZooKeeper::unlock()
|
||||
{
|
||||
Coordination::Requests ops;
|
||||
@ -230,4 +219,10 @@ EphemeralLocksInAllPartitions::~EphemeralLocksInAllPartitions()
|
||||
}
|
||||
}
|
||||
|
||||
template std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper<String>(
|
||||
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & deduplication_path);
|
||||
|
||||
template std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper<std::vector<String>>(
|
||||
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const std::vector<String> & deduplication_path);
|
||||
|
||||
}
|
||||
|
@ -26,13 +26,12 @@ namespace ErrorCodes
|
||||
/// Since 22.11 it creates single ephemeral node with `path_prefix` that references persistent fake "secondary node".
|
||||
class EphemeralLockInZooKeeper : public boost::noncopyable
|
||||
{
|
||||
template<typename T>
|
||||
friend std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
|
||||
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & deduplication_path);
|
||||
friend std::tuple<std::optional<EphemeralLockInZooKeeper>, std::vector<String>> createEphemeralLockInZooKeeper(
|
||||
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const std::vector<String> & deduplication_path);
|
||||
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const T & deduplication_path);
|
||||
|
||||
protected:
|
||||
EphemeralLockInZooKeeper(const String & path_prefix_, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & path_);
|
||||
EphemeralLockInZooKeeper(const String & path_prefix_, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & path_, const String & conflict_path_ = "");
|
||||
|
||||
public:
|
||||
EphemeralLockInZooKeeper() = delete;
|
||||
@ -53,6 +52,7 @@ public:
|
||||
rhs.zookeeper = nullptr;
|
||||
path_prefix = std::move(rhs.path_prefix);
|
||||
path = std::move(rhs.path);
|
||||
conflict_path = std::move(rhs.conflict_path);
|
||||
return *this;
|
||||
}
|
||||
|
||||
@ -67,6 +67,11 @@ public:
|
||||
return path;
|
||||
}
|
||||
|
||||
String getConflictPath() const
|
||||
{
|
||||
return conflict_path;
|
||||
}
|
||||
|
||||
/// Parse the number at the end of the path.
|
||||
UInt64 getNumber() const
|
||||
{
|
||||
@ -99,13 +104,12 @@ private:
|
||||
ZooKeeperWithFaultInjectionPtr zookeeper;
|
||||
String path_prefix;
|
||||
String path;
|
||||
String conflict_path;
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
|
||||
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const String & deduplication_path);
|
||||
|
||||
std::tuple<std::optional<EphemeralLockInZooKeeper>, std::vector<String>> createEphemeralLockInZooKeeper(
|
||||
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const std::vector<String> & deduplication_path);
|
||||
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const T & deduplication_path);
|
||||
|
||||
/// Acquires block number locks in all partitions.
|
||||
class EphemeralLocksInAllPartitions : public boost::noncopyable
|
||||
|
@ -43,7 +43,7 @@ struct ReplicatedMergeTreeSink<async_insert>::DelayedChunk
|
||||
MergeTreeDataWriter::TemporaryPart temp_part;
|
||||
UInt64 elapsed_ns;
|
||||
BlockIDsType block_id;
|
||||
BlockWithPartition block;
|
||||
BlockWithPartition block_with_partition;
|
||||
std::unordered_map<String, size_t> block_id_to_offset_idx;
|
||||
|
||||
Partition() = default;
|
||||
@ -51,7 +51,7 @@ struct ReplicatedMergeTreeSink<async_insert>::DelayedChunk
|
||||
: temp_part(std::move(temp_part_)),
|
||||
elapsed_ns(elapsed_ns_),
|
||||
block_id(std::move(block_id_)),
|
||||
block(std::move(block_))
|
||||
block_with_partition(std::move(block_))
|
||||
{
|
||||
initBlockIDMap();
|
||||
}
|
||||
@ -78,77 +78,105 @@ struct ReplicatedMergeTreeSink<async_insert>::DelayedChunk
|
||||
std::vector<Partition> partitions;
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
inline String toString(const std::vector<T> & vec)
|
||||
namespace
|
||||
{
|
||||
String res = "{";
|
||||
for (const auto & item : vec)
|
||||
res += toString(item) + ",";
|
||||
return res + "}";
|
||||
}
|
||||
|
||||
/// remove the conflict parts of block for rewriting again.
|
||||
void rewriteBlock(Poco::Logger * log, typename ReplicatedMergeTreeSink<true>::DelayedChunk::Partition & partition, const std::vector<String> & block_paths)
|
||||
{
|
||||
std::vector<size_t> offset_idx;
|
||||
for (const auto & raw_path : block_paths)
|
||||
template<typename T>
|
||||
inline String toString(const std::vector<T> & vec)
|
||||
{
|
||||
std::filesystem::path p(raw_path);
|
||||
String conflict_block_id = p.filename();
|
||||
auto it = partition.block_id_to_offset_idx.find(conflict_block_id);
|
||||
if (it == partition.block_id_to_offset_idx.end())
|
||||
throw Exception("unknown conflict path " + conflict_block_id, ErrorCodes::LOGICAL_ERROR);
|
||||
offset_idx.push_back(it->second);
|
||||
String res = "{";
|
||||
for (const auto & item : vec)
|
||||
res += DB::toString(item) + ",";
|
||||
return res + "}";
|
||||
}
|
||||
std::sort(offset_idx.begin(), offset_idx.end());
|
||||
|
||||
auto & offsets = partition.block.offsets->offsets;
|
||||
size_t idx = 0, remove_count = 0;
|
||||
auto it = offset_idx.begin();
|
||||
std::vector<size_t> new_offsets;
|
||||
std::vector<String> new_block_ids;
|
||||
|
||||
/// construct filter
|
||||
size_t rows = partition.block.block.rows();
|
||||
auto filter_col = ColumnUInt8::create(rows, 1u);
|
||||
ColumnUInt8::Container & vec = filter_col->getData();
|
||||
UInt8 * pos = vec.data();
|
||||
for (auto & offset : offsets)
|
||||
/// remove the conflict parts of block for rewriting again.
|
||||
void rewriteBlock(Poco::Logger * log, typename ReplicatedMergeTreeSink<true>::DelayedChunk::Partition & partition, const std::vector<String> & block_paths)
|
||||
{
|
||||
if (it != offset_idx.end() && *it == idx)
|
||||
std::vector<size_t> offset_idx;
|
||||
for (const auto & raw_path : block_paths)
|
||||
{
|
||||
size_t start_pos = idx > 0 ? offsets[idx - 1] : 0;
|
||||
size_t end_pos = offset;
|
||||
remove_count += end_pos - start_pos;
|
||||
while (start_pos < end_pos)
|
||||
std::filesystem::path p(raw_path);
|
||||
String conflict_block_id = p.filename();
|
||||
auto it = partition.block_id_to_offset_idx.find(conflict_block_id);
|
||||
if (it == partition.block_id_to_offset_idx.end())
|
||||
throw Exception("Unknown conflict path " + conflict_block_id, ErrorCodes::LOGICAL_ERROR);
|
||||
offset_idx.push_back(it->second);
|
||||
}
|
||||
std::sort(offset_idx.begin(), offset_idx.end());
|
||||
|
||||
auto & offsets = partition.block_with_partition.offsets->offsets;
|
||||
size_t idx = 0, remove_count = 0;
|
||||
auto it = offset_idx.begin();
|
||||
std::vector<size_t> new_offsets;
|
||||
std::vector<String> new_block_ids;
|
||||
|
||||
/// construct filter
|
||||
size_t rows = partition.block_with_partition.block.rows();
|
||||
auto filter_col = ColumnUInt8::create(rows, 1u);
|
||||
ColumnUInt8::Container & vec = filter_col->getData();
|
||||
UInt8 * pos = vec.data();
|
||||
for (auto & offset : offsets)
|
||||
{
|
||||
if (it != offset_idx.end() && *it == idx)
|
||||
{
|
||||
*(pos + start_pos) = 0;
|
||||
start_pos ++;
|
||||
size_t start_pos = idx > 0 ? offsets[idx - 1] : 0;
|
||||
size_t end_pos = offset;
|
||||
remove_count += end_pos - start_pos;
|
||||
while (start_pos < end_pos)
|
||||
{
|
||||
*(pos + start_pos) = 0;
|
||||
start_pos ++;
|
||||
}
|
||||
it++;
|
||||
}
|
||||
it++;
|
||||
else
|
||||
{
|
||||
new_offsets.push_back(offset - remove_count);
|
||||
new_block_ids.push_back(partition.block_id[idx]);
|
||||
}
|
||||
idx++;
|
||||
}
|
||||
else
|
||||
|
||||
LOG_TRACE(log, "New block IDs: {}, new offsets: {}", toString(new_block_ids), toString(new_offsets));
|
||||
|
||||
offsets = std::move(new_offsets);
|
||||
partition.block_id = std::move(new_block_ids);
|
||||
auto cols = partition.block_with_partition.block.getColumns();
|
||||
for (auto & col : cols)
|
||||
{
|
||||
new_offsets.push_back(offset - remove_count);
|
||||
new_block_ids.push_back(partition.block_id[idx]);
|
||||
col = col -> filter(vec, rows - remove_count);
|
||||
}
|
||||
idx++;
|
||||
partition.block_with_partition.block.setColumns(cols);
|
||||
|
||||
LOG_TRACE(log, "New block rows {}", partition.block_with_partition.block.rows());
|
||||
|
||||
partition.initBlockIDMap();
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "New block IDs: {}, new offsets: {}", toString(new_block_ids), toString(new_offsets));
|
||||
|
||||
offsets = std::move(new_offsets);
|
||||
partition.block_id = std::move(new_block_ids);
|
||||
auto cols = partition.block.block.getColumns();
|
||||
for (auto & col : cols)
|
||||
std::vector<String> getHashesForBlocks(BlockWithPartition & block, String partition_id)
|
||||
{
|
||||
col = col -> filter(vec, rows - remove_count);
|
||||
size_t start = 0;
|
||||
auto cols = block.block.getColumns();
|
||||
std::vector<String> block_id_vec;
|
||||
for (auto offset : block.offsets->offsets)
|
||||
{
|
||||
SipHash hash;
|
||||
for (size_t i = start; i < offset; ++i)
|
||||
for (const auto & col : cols)
|
||||
col->updateHashWithValue(i, hash);
|
||||
union
|
||||
{
|
||||
char bytes[16];
|
||||
UInt64 words[2];
|
||||
} hash_value;
|
||||
hash.get128(hash_value.bytes);
|
||||
|
||||
block_id_vec.push_back(partition_id + "_" + DB::toString(hash_value.words[0]) + "_" + DB::toString(hash_value.words[1]));
|
||||
|
||||
start = offset;
|
||||
}
|
||||
return block_id_vec;
|
||||
}
|
||||
partition.block.block.setColumns(cols);
|
||||
|
||||
LOG_TRACE(log, "New block rows {}", partition.block.block.rows());
|
||||
|
||||
partition.initBlockIDMap();
|
||||
}
|
||||
|
||||
template<bool async_insert>
|
||||
@ -263,31 +291,6 @@ size_t ReplicatedMergeTreeSink<async_insert>::checkQuorumPrecondition(const ZooK
|
||||
return replicas_number;
|
||||
}
|
||||
|
||||
std::vector<String> getHashesForBlocks(BlockWithPartition & block, String partition_id)
|
||||
{
|
||||
size_t start = 0;
|
||||
auto cols = block.block.getColumns();
|
||||
std::vector<String> block_id_vec;
|
||||
for (auto offset : block.offsets->offsets)
|
||||
{
|
||||
SipHash hash;
|
||||
for (size_t i = start; i < offset; ++i)
|
||||
for (const auto & col : cols)
|
||||
col->updateHashWithValue(i, hash);
|
||||
union
|
||||
{
|
||||
char bytes[16];
|
||||
UInt64 words[2];
|
||||
} hash_value;
|
||||
hash.get128(hash_value.bytes);
|
||||
|
||||
block_id_vec.push_back(partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]));
|
||||
|
||||
start = offset;
|
||||
}
|
||||
return block_id_vec;
|
||||
}
|
||||
|
||||
template<bool async_insert>
|
||||
void ReplicatedMergeTreeSink<async_insert>::consume(Chunk chunk)
|
||||
{
|
||||
@ -363,7 +366,7 @@ void ReplicatedMergeTreeSink<async_insert>::consume(Chunk chunk)
|
||||
if constexpr (async_insert)
|
||||
{
|
||||
/// TODO consider insert_deduplication_token
|
||||
block_id = getBlockIDVec(current_block, temp_part.part->info.partition_id);
|
||||
block_id = getHashesForBlocks(current_block, temp_part.part->info.partition_id);
|
||||
LOG_TRACE(log, "async insert part, part id {}, block id {}, offsets {}, size {}", temp_part.part->info.partition_id, toString(block_id), toString(current_block.offsets->offsets), current_block.offsets->offsets.size());
|
||||
}
|
||||
else if (deduplicate)
|
||||
@ -484,8 +487,8 @@ void ReplicatedMergeTreeSink<true>::finishDelayedChunk(const ZooKeeperWithFaultI
|
||||
rewriteBlock(log, partition, conflict_block_ids);
|
||||
if (partition.block_id.empty())
|
||||
break;
|
||||
partition.block.partition = std::move(partition.temp_part.part->partition.value);
|
||||
partition.temp_part = storage.writer.writeTempPart(partition.block, metadata_snapshot, context);
|
||||
partition.block_with_partition.partition = std::move(partition.temp_part.part->partition.value);
|
||||
partition.temp_part = storage.writer.writeTempPart(partition.block_with_partition, metadata_snapshot, context);
|
||||
}
|
||||
}
|
||||
|
||||
@ -605,17 +608,11 @@ std::vector<String> ReplicatedMergeTreeSink<async_insert>::commitPart(
|
||||
if constexpr (async_insert)
|
||||
{
|
||||
for (const auto & single_block_id : block_id)
|
||||
{
|
||||
block_id_path.push_back(storage.zookeeper_path + "/blocks/" + single_block_id);
|
||||
}
|
||||
}
|
||||
else if (deduplicate_block)
|
||||
block_id_path = storage.zookeeper_path + "/blocks/" + block_id;
|
||||
std::optional<EphemeralLockInZooKeeper> block_number_lock;
|
||||
if constexpr (async_insert)
|
||||
std::tie(block_number_lock, conflict_block_ids) = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path);
|
||||
else
|
||||
block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path);
|
||||
auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path);
|
||||
ThreadFuzzer::maybeInjectSleep();
|
||||
|
||||
/// Prepare transaction to ZooKeeper
|
||||
@ -627,6 +624,20 @@ std::vector<String> ReplicatedMergeTreeSink<async_insert>::commitPart(
|
||||
String existing_part_name;
|
||||
if (block_number_lock)
|
||||
{
|
||||
if constexpr (async_insert)
|
||||
{
|
||||
/// The truth is that we always get only one path from block_number_lock.
|
||||
/// This is a restriction of Keeper. Here I would like to use vector because
|
||||
/// I wanna keep extensibility for future optimization, for instance, using
|
||||
/// cache to resolve conflicts in advance.
|
||||
String conflict_path = block_number_lock->getConflictPath();
|
||||
if (!conflict_path.empty())
|
||||
{
|
||||
LOG_TRACE(log, "Cannot get lock, the conflict path is {}", conflict_path);
|
||||
conflict_block_ids.push_back(conflict_path);
|
||||
return;
|
||||
}
|
||||
}
|
||||
is_already_existing_part = false;
|
||||
block_number = block_number_lock->getNumber();
|
||||
|
||||
@ -715,14 +726,8 @@ std::vector<String> ReplicatedMergeTreeSink<async_insert>::commitPart(
|
||||
quorum_info.host_node_version));
|
||||
}
|
||||
}
|
||||
else if constexpr (async_insert)
|
||||
{
|
||||
LOG_TRACE(log, "cannot get lock, the conflict block ids are {}", toString(conflict_block_ids));
|
||||
if (conflict_block_ids.empty())
|
||||
throw Exception("conflict block ids and block number lock should not be empty at the same time", ErrorCodes::LOGICAL_ERROR);
|
||||
return;
|
||||
}
|
||||
else
|
||||
/// async_insert will never return null lock, because they need the conflict path.
|
||||
else if constexpr (!async_insert)
|
||||
{
|
||||
is_already_existing_part = true;
|
||||
|
||||
@ -776,6 +781,8 @@ std::vector<String> ReplicatedMergeTreeSink<async_insert>::commitPart(
|
||||
/// Do not check for duplicate on commit to ZK.
|
||||
block_id_path.clear();
|
||||
}
|
||||
else
|
||||
throw Exception("Conflict block ids and block number lock should not be empty at the same time for async inserts", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
/// Information about the part.
|
||||
storage.getCommitPartOps(ops, part, block_id_path);
|
||||
|
@ -5313,37 +5313,6 @@ bool StorageReplicatedMergeTree::existsNodeCached(const ZooKeeperWithFaultInject
|
||||
return res;
|
||||
}
|
||||
|
||||
std::tuple<std::optional<EphemeralLockInZooKeeper>, std::vector<String>>
|
||||
StorageReplicatedMergeTree::allocateBlockNumber(
|
||||
const String & partition_id,
|
||||
const ZooKeeperWithFaultInjectionPtr & zookeeper,
|
||||
const std::vector<String> & zookeeper_block_id_paths) const
|
||||
{
|
||||
String zookeeper_table_path = zookeeper_path;
|
||||
|
||||
String block_numbers_path = fs::path(zookeeper_table_path) / "block_numbers";
|
||||
String partition_path = fs::path(block_numbers_path) / partition_id;
|
||||
|
||||
if (!existsNodeCached(zookeeper, partition_path))
|
||||
{
|
||||
Coordination::Requests ops;
|
||||
ops.push_back(zkutil::makeCreateRequest(partition_path, "", zkutil::CreateMode::Persistent));
|
||||
/// We increment data version of the block_numbers node so that it becomes possible
|
||||
/// to check in a ZK transaction that the set of partitions didn't change
|
||||
/// (unfortunately there is no CheckChildren op).
|
||||
ops.push_back(zkutil::makeSetRequest(block_numbers_path, "", -1));
|
||||
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error code = zookeeper->tryMulti(ops, responses);
|
||||
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS)
|
||||
zkutil::KeeperMultiException::check(code, ops, responses);
|
||||
}
|
||||
|
||||
return createEphemeralLockInZooKeeper(
|
||||
fs::path(partition_path) / "block-", fs::path(zookeeper_table_path) / "temp", zookeeper, zookeeper_block_id_paths);
|
||||
}
|
||||
|
||||
|
||||
std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::allocateBlockNumber(
|
||||
const String & partition_id,
|
||||
const zkutil::ZooKeeperPtr & zookeeper,
|
||||
@ -5354,11 +5323,11 @@ std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::allocateBloc
|
||||
partition_id, std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), zookeeper_block_id_path, zookeeper_path_prefix);
|
||||
}
|
||||
|
||||
|
||||
template<typename T>
|
||||
std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::allocateBlockNumber(
|
||||
const String & partition_id,
|
||||
const ZooKeeperWithFaultInjectionPtr & zookeeper,
|
||||
const String & zookeeper_block_id_path,
|
||||
const T & zookeeper_block_id_path,
|
||||
const String & zookeeper_path_prefix) const
|
||||
{
|
||||
String zookeeper_table_path;
|
||||
@ -8789,6 +8758,18 @@ void StorageReplicatedMergeTree::attachRestoredParts(MutableDataPartsVector && p
|
||||
sink->writeExistingPart(part);
|
||||
}
|
||||
|
||||
template std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::allocateBlockNumber<String>(
|
||||
const String & partition_id,
|
||||
const ZooKeeperWithFaultInjectionPtr & zookeeper,
|
||||
const String & zookeeper_block_id_path,
|
||||
const String & zookeeper_path_prefix) const;
|
||||
|
||||
template std::optional<EphemeralLockInZooKeeper> StorageReplicatedMergeTree::allocateBlockNumber<std::vector<String>>(
|
||||
const String & partition_id,
|
||||
const ZooKeeperWithFaultInjectionPtr & zookeeper,
|
||||
const std::vector<String> & zookeeper_block_id_path,
|
||||
const String & zookeeper_path_prefix) const;
|
||||
|
||||
#if 0
|
||||
PartsTemporaryRename renamed_parts(*this, "detached/");
|
||||
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
|
||||
|
@ -732,17 +732,14 @@ private:
|
||||
std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
|
||||
const String & partition_id, const zkutil::ZooKeeperPtr & zookeeper,
|
||||
const String & zookeeper_block_id_path = "", const String & zookeeper_path_prefix = "") const;
|
||||
|
||||
template<typename T>
|
||||
std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
|
||||
const String & partition_id,
|
||||
const ZooKeeperWithFaultInjectionPtr & zookeeper,
|
||||
const String & zookeeper_block_id_path = "",
|
||||
const T & zookeeper_block_id_path,
|
||||
const String & zookeeper_path_prefix = "") const;
|
||||
|
||||
std::tuple<std::optional<EphemeralLockInZooKeeper>, std::vector<String>> allocateBlockNumber(
|
||||
const String & partition_id,
|
||||
const ZooKeeperWithFaultInjectionPtr & zookeeper,
|
||||
const std::vector<String> & zookeeper_block_id_paths) const;
|
||||
|
||||
/** Wait until all replicas, including this, execute the specified action from the log.
|
||||
* If replicas are added at the same time, it can not wait the added replica.
|
||||
*
|
||||
|
@ -65,7 +65,7 @@ client.query('''
|
||||
CREATE TABLE t_async_insert_dedup (
|
||||
EventDate DateTime,
|
||||
KeyID UInt32
|
||||
) Engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/t_async_insert_dedup', '{replica}')
|
||||
) Engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/t_async_insert_dedup', '{replica}')
|
||||
PARTITION BY toYYYYMM(EventDate)
|
||||
ORDER BY (KeyID, EventDate)
|
||||
''')
|
||||
|
Loading…
Reference in New Issue
Block a user