Fix ReplicatedMergeTreeSink, add more info to logs

This commit is contained in:
vdimir 2022-09-06 12:09:03 +00:00
parent 000ba8a60d
commit 6234d564d1
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
3 changed files with 80 additions and 57 deletions

View File

@ -41,42 +41,45 @@ struct ReplicatedMergeTreeSink::DelayedChunk
String block_id; String block_id;
}; };
DelayedChunk() = default;
explicit DelayedChunk(size_t replicas_num_) : replicas_num(replicas_num_) {}
size_t replicas_num = 0;
std::vector<Partition> partitions; std::vector<Partition> partitions;
}; };
ReplicatedMergeTreeSink::ReplicatedMergeTreeSink( ReplicatedMergeTreeSink::ReplicatedMergeTreeSink(
StorageReplicatedMergeTree & storage_, StorageReplicatedMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot_, const StorageMetadataPtr & metadata_snapshot_,
size_t quorum_, size_t quorum_size,
size_t quorum_timeout_ms_, size_t quorum_timeout_ms_,
size_t max_parts_per_block_, size_t max_parts_per_block_,
bool quorum_parallel_, bool quorum_parallel_,
bool deduplicate_, bool deduplicate_,
bool majority_quorum_, bool majority_quorum,
ContextPtr context_, ContextPtr context_,
bool is_attach_) bool is_attach_)
: SinkToStorage(metadata_snapshot_->getSampleBlock()) : SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_) , storage(storage_)
, metadata_snapshot(metadata_snapshot_) , metadata_snapshot(metadata_snapshot_)
, quorum(quorum_) , required_quorum_size(majority_quorum ? std::nullopt : std::make_optional<size_t>(quorum_size))
, quorum_timeout_ms(quorum_timeout_ms_) , quorum_timeout_ms(quorum_timeout_ms_)
, max_parts_per_block(max_parts_per_block_) , max_parts_per_block(max_parts_per_block_)
, is_attach(is_attach_) , is_attach(is_attach_)
, quorum_parallel(quorum_parallel_) , quorum_parallel(quorum_parallel_)
, deduplicate(deduplicate_) , deduplicate(deduplicate_)
, majority_quorum(majority_quorum_)
, log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)")) , log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)"))
, context(context_) , context(context_)
, storage_snapshot(storage.getStorageSnapshot(metadata_snapshot, context)) , storage_snapshot(storage.getStorageSnapshot(metadata_snapshot, context))
{ {
/// The quorum value `1` has the same meaning as if it is disabled. /// The quorum value `1` has the same meaning as if it is disabled.
if (quorum == 1) if (required_quorum_size == 1)
quorum = 0; required_quorum_size = 0;
} }
ReplicatedMergeTreeSink::~ReplicatedMergeTreeSink() = default; ReplicatedMergeTreeSink::~ReplicatedMergeTreeSink() = default;
/// Allow to verify that the session in ZooKeeper is still alive. /// Allow to verify that the session in ZooKeeper is still alive.
static void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper) static void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper)
{ {
@ -87,9 +90,11 @@ static void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper)
throw Exception("ZooKeeper session has been expired.", ErrorCodes::NO_ZOOKEEPER); throw Exception("ZooKeeper session has been expired.", ErrorCodes::NO_ZOOKEEPER);
} }
size_t ReplicatedMergeTreeSink::checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper)
void ReplicatedMergeTreeSink::setMajorityQuorumAndCheckQuorum(zkutil::ZooKeeperPtr & zookeeper)
{ {
if (!isQuorumEnabled())
return 0;
quorum_info.status_path = storage.zookeeper_path + "/quorum/status"; quorum_info.status_path = storage.zookeeper_path + "/quorum/status";
Strings replicas = zookeeper->getChildren(fs::path(storage.zookeeper_path) / "replicas"); Strings replicas = zookeeper->getChildren(fs::path(storage.zookeeper_path) / "replicas");
@ -107,12 +112,12 @@ void ReplicatedMergeTreeSink::setMajorityQuorumAndCheckQuorum(zkutil::ZooKeeperP
if (status.get().error == Coordination::Error::ZOK) if (status.get().error == Coordination::Error::ZOK)
++active_replicas; ++active_replicas;
if (majority_quorum) size_t replicas_number = replicas.size();
quorum = std::max(quorum, replicas.size() / 2 + 1); size_t quorum_size = getQuorumSize(replicas_number);
if (active_replicas < quorum) if (active_replicas < quorum_size)
throw Exception(ErrorCodes::TOO_FEW_LIVE_REPLICAS, "Number of alive replicas ({}) is less than requested quorum ({}).", throw Exception(ErrorCodes::TOO_FEW_LIVE_REPLICAS, "Number of alive replicas ({}) is less than requested quorum ({}/{}).",
active_replicas, quorum); active_replicas, quorum_size, replicas_number);
/** Is there a quorum for the last part for which a quorum is needed? /** Is there a quorum for the last part for which a quorum is needed?
* Write of all the parts with the included quorum is linearly ordered. * Write of all the parts with the included quorum is linearly ordered.
@ -138,8 +143,9 @@ void ReplicatedMergeTreeSink::setMajorityQuorumAndCheckQuorum(zkutil::ZooKeeperP
quorum_info.is_active_node_value = is_active.data; quorum_info.is_active_node_value = is_active.data;
quorum_info.is_active_node_version = is_active.stat.version; quorum_info.is_active_node_version = is_active.stat.version;
quorum_info.host_node_version = host.stat.version; quorum_info.host_node_version = host.stat.version;
}
return replicas_number;
}
void ReplicatedMergeTreeSink::consume(Chunk chunk) void ReplicatedMergeTreeSink::consume(Chunk chunk)
{ {
@ -153,8 +159,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
* And also check that during the insertion, the replica was not reinitialized or disabled (by the value of `is_active` node). * And also check that during the insertion, the replica was not reinitialized or disabled (by the value of `is_active` node).
* TODO Too complex logic, you can do better. * TODO Too complex logic, you can do better.
*/ */
if (quorumEnabled()) size_t replicas_num = checkQuorumPrecondition(zookeeper);
setMajorityQuorumAndCheckQuorum(zookeeper);
storage.writer.deduceTypesOfObjectColumns(storage_snapshot, block); storage.writer.deduceTypesOfObjectColumns(storage_snapshot, block);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context); auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
@ -198,11 +203,11 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
} }
block_id = temp_part.part->getZeroLevelPartBlockID(block_dedup_token); block_id = temp_part.part->getZeroLevelPartBlockID(block_dedup_token);
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows()); LOG_DEBUG(log, "Wrote block with ID '{}', {} rows on {} replicas", block_id, current_block.block.rows(), replicas_num);
} }
else else
{ {
LOG_DEBUG(log, "Wrote block with {} rows", current_block.block.rows()); LOG_DEBUG(log, "Wrote block with {} rows on {} replicas", current_block.block.rows(), replicas_num);
} }
UInt64 elapsed_ns = watch.elapsed(); UInt64 elapsed_ns = watch.elapsed();
@ -216,7 +221,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
if (streams > max_insert_delayed_streams_for_parallel_write) if (streams > max_insert_delayed_streams_for_parallel_write)
{ {
finishDelayedChunk(zookeeper); finishDelayedChunk(zookeeper);
delayed_chunk = std::make_unique<ReplicatedMergeTreeSink::DelayedChunk>(); delayed_chunk = std::make_unique<ReplicatedMergeTreeSink::DelayedChunk>(replicas_num);
delayed_chunk->partitions = std::move(partitions); delayed_chunk->partitions = std::move(partitions);
finishDelayedChunk(zookeeper); finishDelayedChunk(zookeeper);
@ -259,7 +264,7 @@ void ReplicatedMergeTreeSink::finishDelayedChunk(zkutil::ZooKeeperPtr & zookeepe
try try
{ {
commitPart(zookeeper, part, partition.block_id, partition.temp_part.builder); commitPart(zookeeper, part, partition.block_id, partition.temp_part.builder, delayed_chunk->replicas_num);
last_block_is_duplicate = last_block_is_duplicate || part->is_duplicate; last_block_is_duplicate = last_block_is_duplicate || part->is_duplicate;
@ -278,7 +283,6 @@ void ReplicatedMergeTreeSink::finishDelayedChunk(zkutil::ZooKeeperPtr & zookeepe
delayed_chunk.reset(); delayed_chunk.reset();
} }
void ReplicatedMergeTreeSink::writeExistingPart(MergeTreeData::MutableDataPartPtr & part) void ReplicatedMergeTreeSink::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
{ {
/// NOTE: No delay in this case. That's Ok. /// NOTE: No delay in this case. That's Ok.
@ -286,15 +290,14 @@ void ReplicatedMergeTreeSink::writeExistingPart(MergeTreeData::MutableDataPartPt
auto zookeeper = storage.getZooKeeper(); auto zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(zookeeper); assertSessionIsNotExpired(zookeeper);
if (quorumEnabled()) size_t replicas_num = checkQuorumPrecondition(zookeeper);
setMajorityQuorumAndCheckQuorum(zookeeper);
Stopwatch watch; Stopwatch watch;
try try
{ {
part->version.setCreationTID(Tx::PrehistoricTID, nullptr); part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
commitPart(zookeeper, part, "", part->data_part_storage->getBuilder()); commitPart(zookeeper, part, "", part->data_part_storage->getBuilder(), replicas_num);
PartLog::addNewPart(storage.getContext(), part, watch.elapsed()); PartLog::addNewPart(storage.getContext(), part, watch.elapsed());
} }
catch (...) catch (...)
@ -304,12 +307,12 @@ void ReplicatedMergeTreeSink::writeExistingPart(MergeTreeData::MutableDataPartPt
} }
} }
void ReplicatedMergeTreeSink::commitPart( void ReplicatedMergeTreeSink::commitPart(
zkutil::ZooKeeperPtr & zookeeper, zkutil::ZooKeeperPtr & zookeeper,
MergeTreeData::MutableDataPartPtr & part, MergeTreeData::MutableDataPartPtr & part,
const String & block_id, const String & block_id,
DataPartStorageBuilderPtr builder) DataPartStorageBuilderPtr builder,
size_t replicas_num)
{ {
metadata_snapshot->check(part->getColumns()); metadata_snapshot->check(part->getColumns());
assertSessionIsNotExpired(zookeeper); assertSessionIsNotExpired(zookeeper);
@ -372,7 +375,7 @@ void ReplicatedMergeTreeSink::commitPart(
log_entry.source_replica = storage.replica_name; log_entry.source_replica = storage.replica_name;
log_entry.new_part_name = part->name; log_entry.new_part_name = part->name;
/// TODO maybe add UUID here as well? /// TODO maybe add UUID here as well?
log_entry.quorum = quorum; log_entry.quorum = getQuorumSize(replicas_num);
log_entry.block_id = block_id; log_entry.block_id = block_id;
log_entry.new_part_type = part->getType(); log_entry.new_part_type = part->getType();
@ -389,11 +392,11 @@ void ReplicatedMergeTreeSink::commitPart(
* but for it the quorum has not yet been reached. * but for it the quorum has not yet been reached.
* You can not do the next quorum record at this time.) * You can not do the next quorum record at this time.)
*/ */
if (quorumEnabled()) if (isQuorumEnabled())
{ {
ReplicatedMergeTreeQuorumEntry quorum_entry; ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.part_name = part->name; quorum_entry.part_name = part->name;
quorum_entry.required_number_of_replicas = quorum; quorum_entry.required_number_of_replicas = getQuorumSize(replicas_num);
quorum_entry.replicas.insert(storage.replica_name); quorum_entry.replicas.insert(storage.replica_name);
/** At this point, this node will contain information that the current replica received a part. /** At this point, this node will contain information that the current replica received a part.
@ -441,7 +444,7 @@ void ReplicatedMergeTreeSink::commitPart(
{ {
part->is_duplicate = true; part->is_duplicate = true;
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks); ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
if (quorumEnabled()) if (isQuorumEnabled())
{ {
LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it, but checking quorum.", block_id, existing_part_name); LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it, but checking quorum.", block_id, existing_part_name);
@ -451,7 +454,7 @@ void ReplicatedMergeTreeSink::commitPart(
else else
quorum_path = storage.zookeeper_path + "/quorum/status"; quorum_path = storage.zookeeper_path + "/quorum/status";
waitForQuorum(zookeeper, existing_part_name, quorum_path, quorum_info.is_active_node_value); waitForQuorum(zookeeper, existing_part_name, quorum_path, quorum_info.is_active_node_value, replicas_num);
} }
else else
{ {
@ -598,7 +601,7 @@ void ReplicatedMergeTreeSink::commitPart(
break; break;
} }
if (quorumEnabled()) if (isQuorumEnabled())
{ {
if (is_already_existing_part) if (is_already_existing_part)
{ {
@ -610,7 +613,7 @@ void ReplicatedMergeTreeSink::commitPart(
storage.updateQuorum(part->name, false); storage.updateQuorum(part->name, false);
} }
waitForQuorum(zookeeper, part->name, quorum_info.status_path, quorum_info.is_active_node_value); waitForQuorum(zookeeper, part->name, quorum_info.status_path, quorum_info.is_active_node_value, replicas_num);
} }
} }
@ -632,10 +635,11 @@ void ReplicatedMergeTreeSink::waitForQuorum(
zkutil::ZooKeeperPtr & zookeeper, zkutil::ZooKeeperPtr & zookeeper,
const std::string & part_name, const std::string & part_name,
const std::string & quorum_path, const std::string & quorum_path,
const std::string & is_active_node_value) const const std::string & is_active_node_value,
size_t replicas_num) const
{ {
/// We are waiting for quorum to be satisfied. /// We are waiting for quorum to be satisfied.
LOG_TRACE(log, "Waiting for quorum"); LOG_TRACE(log, "Waiting for quorum '{}' for part {} on {} replicas", quorum_path, part_name, replicas_num);
try try
{ {
@ -659,7 +663,7 @@ void ReplicatedMergeTreeSink::waitForQuorum(
if (!event->tryWait(quorum_timeout_ms)) if (!event->tryWait(quorum_timeout_ms))
throw Exception("Timeout while waiting for quorum", ErrorCodes::TIMEOUT_EXCEEDED); throw Exception("Timeout while waiting for quorum", ErrorCodes::TIMEOUT_EXCEEDED);
LOG_TRACE(log, "Quorum {} updated, will check quorum node still exists", quorum_path); LOG_TRACE(log, "Quorum {} for part {} updated, will check quorum node still exists", quorum_path, part_name);
} }
/// And what if it is possible that the current replica at this time has ceased to be active /// And what if it is possible that the current replica at this time has ceased to be active
@ -677,8 +681,23 @@ void ReplicatedMergeTreeSink::waitForQuorum(
ErrorCodes::UNKNOWN_STATUS_OF_INSERT); ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
} }
LOG_TRACE(log, "Quorum satisfied"); LOG_TRACE(log, "Quorum '{}' for part {} satisfied", quorum_path, part_name);
} }
size_t ReplicatedMergeTreeSink::getQuorumSize(size_t replicas_num) const
{
if (!isQuorumEnabled())
return 0;
if (required_quorum_size)
return required_quorum_size.value();
return replicas_num / 2 + 1;
}
bool ReplicatedMergeTreeSink::isQuorumEnabled() const
{
return !required_quorum_size.has_value() || required_quorum_size.value() > 1;
}
} }

View File

@ -69,25 +69,34 @@ private:
}; };
QuorumInfo quorum_info; QuorumInfo quorum_info;
/// set quorum if majority_quorum is true and checks active replicas
void setMajorityQuorumAndCheckQuorum(zkutil::ZooKeeperPtr & zookeeper); /// Checks active replicas.
/// Returns total number of replicas.
size_t checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper);
/// Rename temporary part and commit to ZooKeeper. /// Rename temporary part and commit to ZooKeeper.
void commitPart( void commitPart(
zkutil::ZooKeeperPtr & zookeeper, zkutil::ZooKeeperPtr & zookeeper,
MergeTreeData::MutableDataPartPtr & part, MergeTreeData::MutableDataPartPtr & part,
const String & block_id, const String & block_id,
DataPartStorageBuilderPtr part_builder); DataPartStorageBuilderPtr part_builder,
size_t replicas_num);
/// Wait for quorum to be satisfied on path (quorum_path) form part (part_name) /// Wait for quorum to be satisfied on path (quorum_path) form part (part_name)
/// Also checks that replica still alive. /// Also checks that replica still alive.
void waitForQuorum( void waitForQuorum(
zkutil::ZooKeeperPtr & zookeeper, const std::string & part_name, zkutil::ZooKeeperPtr & zookeeper, const std::string & part_name,
const std::string & quorum_path, const std::string & is_active_node_value) const; const std::string & quorum_path, const std::string & is_active_node_value, size_t replicas_num) const;
StorageReplicatedMergeTree & storage; StorageReplicatedMergeTree & storage;
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
size_t quorum;
/// Empty means use majority quorum.
std::optional<size_t> required_quorum_size;
size_t getQuorumSize(size_t replicas_num) const;
bool isQuorumEnabled() const;
size_t quorum_timeout_ms; size_t quorum_timeout_ms;
size_t max_parts_per_block; size_t max_parts_per_block;
@ -95,7 +104,6 @@ private:
bool quorum_parallel = false; bool quorum_parallel = false;
const bool deduplicate = true; const bool deduplicate = true;
bool last_block_is_duplicate = false; bool last_block_is_duplicate = false;
bool majority_quorum = false;
using Logger = Poco::Logger; using Logger = Poco::Logger;
Poco::Logger * log; Poco::Logger * log;
@ -110,10 +118,6 @@ private:
std::unique_ptr<DelayedChunk> delayed_chunk; std::unique_ptr<DelayedChunk> delayed_chunk;
void finishDelayedChunk(zkutil::ZooKeeperPtr & zookeeper); void finishDelayedChunk(zkutil::ZooKeeperPtr & zookeeper);
bool quorumEnabled() const
{
return majority_quorum || quorum != 0;
}
}; };
} }

View File

@ -3641,8 +3641,8 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name, bool is_
if (quorum_entry.replicas.size() >= quorum_entry.required_number_of_replicas) if (quorum_entry.replicas.size() >= quorum_entry.required_number_of_replicas)
{ {
/// The quorum is reached. Delete the node, and update information about the last part that was successfully written with quorum. /// The quorum is reached. Delete the node, and update information about the last part that was successfully written with quorum.
LOG_TRACE(log, "Got {} replicas confirmed quorum {}, going to remove node", LOG_TRACE(log, "Got {} (of {}) replicas confirmed quorum {}, going to remove node",
quorum_entry.replicas.size(), quorum_status_path); quorum_entry.replicas.size(), quorum_entry.required_number_of_replicas, quorum_status_path);
Coordination::Requests ops; Coordination::Requests ops;
Coordination::Responses responses; Coordination::Responses responses;
@ -3690,8 +3690,8 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name, bool is_
} }
else else
{ {
LOG_TRACE(log, "Quorum {} still not satisfied (have only {} replicas), updating node", LOG_TRACE(log, "Quorum {} still not satisfied (have only {} of {} replicas), updating node",
quorum_status_path, quorum_entry.replicas.size()); quorum_status_path, quorum_entry.replicas.size(), quorum_entry.required_number_of_replicas);
/// We update the node, registering there one more replica. /// We update the node, registering there one more replica.
auto code = zookeeper->trySet(quorum_status_path, quorum_entry.toString(), stat.version); auto code = zookeeper->trySet(quorum_status_path, quorum_entry.toString(), stat.version);
@ -4304,12 +4304,12 @@ ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock StorageReplicatedMerg
auto added_parts = part_with_quorum.added_parts; auto added_parts = part_with_quorum.added_parts;
for (const auto & added_part : added_parts) for (const auto & added_part : added_parts)
{
if (!getActiveContainingPart(added_part.second)) if (!getActiveContainingPart(added_part.second))
throw Exception( throw Exception(ErrorCodes::REPLICA_IS_NOT_IN_QUORUM,
"Replica doesn't have part " + added_part.second "Replica doesn't have part '{}' which was successfully written to quorum of other replicas. "
+ " which was successfully written to quorum of other replicas." "Send query to another replica or disable 'select_sequential_consistency' setting", added_part.second);
" Send query to another replica or disable 'select_sequential_consistency' setting.", }
ErrorCodes::REPLICA_IS_NOT_IN_QUORUM);
for (const auto & max_block : part_with_quorum.getMaxInsertedBlocks()) for (const auto & max_block : part_with_quorum.getMaxInsertedBlocks())
max_added_blocks[max_block.first] = max_block.second; max_added_blocks[max_block.first] = max_block.second;