mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-30 05:30:51 +00:00
working copy (with some debug info)
This commit is contained in:
parent
e89a56969f
commit
8263e62298
@ -60,7 +60,6 @@ public:
|
||||
|
||||
using Type = MergeTreeDataPartType;
|
||||
|
||||
|
||||
IMergeTreeDataPart(
|
||||
const MergeTreeData & storage_,
|
||||
const String & name_,
|
||||
@ -349,6 +348,8 @@ public:
|
||||
/// part creation (using alter query with materialize_ttl setting).
|
||||
bool checkAllTTLCalculated(const StorageMetadataPtr & metadata_snapshot) const;
|
||||
|
||||
std::optional<String> block_id;
|
||||
|
||||
protected:
|
||||
|
||||
/// Total size of all columns, calculated once in calcuateColumnSizesOnDisk
|
||||
|
@ -31,10 +31,12 @@ struct ReplicatedMergeTreeBlockEntry
|
||||
|
||||
void writeText(WriteBuffer & out) const
|
||||
{
|
||||
out << part_name << "\n";
|
||||
out << part_name;
|
||||
|
||||
if (quorum_status)
|
||||
if (quorum_status) {
|
||||
out << "\n";
|
||||
quorum_status->writeText(out);
|
||||
}
|
||||
}
|
||||
|
||||
void readText(ReadBuffer & in)
|
||||
|
@ -101,7 +101,7 @@ void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKe
|
||||
*/
|
||||
|
||||
auto quorum_status = quorum_status_future.get();
|
||||
if (quorum_status.error != Coordination::Error::ZNONODE)
|
||||
if (quorum_status.error != Coordination::Error::ZNONODE && !quorum_parallel)
|
||||
throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status.data,
|
||||
ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
|
||||
|
||||
@ -172,6 +172,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
|
||||
|
||||
try
|
||||
{
|
||||
LOG_ERROR(log, "need to send here block_id somehow");
|
||||
commitPart(zookeeper, part, block_id);
|
||||
|
||||
/// Set a special error code if the block is duplicate
|
||||
@ -342,8 +343,9 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
||||
/// Note: race condition with DROP PARTITION operation is possible. User will get "No node" exception and it is Ok.
|
||||
existing_part_name = zookeeper->get(storage.zookeeper_path + "/blocks/" + block_id);
|
||||
|
||||
block_entry.fromString(existing_part_name);
|
||||
/// If it exists on our replica, ignore it.
|
||||
if (storage.getActiveContainingPart(existing_part_name))
|
||||
if (storage.getActiveContainingPart(block_entry.part_name))
|
||||
{
|
||||
LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it.", block_id, existing_part_name);
|
||||
part->is_duplicate = true;
|
||||
@ -370,6 +372,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
||||
}
|
||||
|
||||
/// Information about the part.
|
||||
LOG_INFO(log, "getCommitPartOps from ...OutputStream");
|
||||
storage.getCommitPartOps(ops, part, block_id_path, block_entry);
|
||||
|
||||
MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set.
|
||||
@ -484,9 +487,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
||||
if (is_already_existing_part)
|
||||
{
|
||||
/// We get duplicate part without fetch
|
||||
/// ALEXELEXA
|
||||
/// should reset here something, after thinking in TODO
|
||||
storage.updateQuorum(part->name, part->info.block_id);
|
||||
storage.updateQuorum(part->name);
|
||||
}
|
||||
|
||||
/// We are waiting for quorum to be satisfied.
|
||||
@ -511,7 +512,8 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
||||
if (quorum_parallel)
|
||||
{
|
||||
block_entry.fromString(value);
|
||||
if (block_entry.part_name != part->name)
|
||||
// quorum_status empty if quorum reached
|
||||
if (block_entry.part_name != part->name || !block_entry.quorum_status)
|
||||
break;
|
||||
}
|
||||
else
|
||||
|
@ -342,13 +342,25 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
|
||||
timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
|
||||
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);
|
||||
|
||||
zkutil::AsyncResponses<Coordination::GetResponse> try_get_futures;
|
||||
zkutil::AsyncResponses<Coordination::RemoveResponse> try_remove_futures;
|
||||
for (auto it = first_outdated_block; it != timed_blocks.end(); ++it)
|
||||
{
|
||||
String path = storage.zookeeper_path + "/blocks/" + it->node;
|
||||
/// ALEXELEXA
|
||||
/// should check somehow is it quorum block
|
||||
try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path));
|
||||
try_get_futures.emplace_back(path, zookeeper->asyncTryGet(path));
|
||||
}
|
||||
|
||||
/// Don't delete blocks that are still reaching quorum
|
||||
for (auto & pair : try_get_futures)
|
||||
{
|
||||
const String & path = pair.first;
|
||||
auto response = pair.second.get();
|
||||
if (response.error == Coordination::Error::ZOK)
|
||||
{
|
||||
ReplicatedMergeTreeBlockEntry block(response.data);
|
||||
if (!block.quorum_status)
|
||||
try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path));
|
||||
}
|
||||
}
|
||||
|
||||
for (auto & pair : try_remove_futures)
|
||||
|
@ -51,7 +51,11 @@ void ReplicatedMergeTreePartHeader::read(ReadBuffer & in)
|
||||
checksums.deserializeWithoutHeader(in);
|
||||
|
||||
if (!in.eof())
|
||||
in >> "block_id: " >> block_id.value() >> "\n";
|
||||
{
|
||||
String block_id_;
|
||||
in >> "\nblock_id: " >> block_id_;
|
||||
block_id = block_id_;
|
||||
}
|
||||
}
|
||||
|
||||
ReplicatedMergeTreePartHeader ReplicatedMergeTreePartHeader::fromString(const String & str)
|
||||
@ -68,7 +72,7 @@ void ReplicatedMergeTreePartHeader::write(WriteBuffer & out) const
|
||||
out.write(columns_hash.data(), columns_hash.size());
|
||||
checksums.serializeWithoutHeader(out);
|
||||
if (block_id)
|
||||
out << "block_id " << block_id.value() << "\n";
|
||||
out << "\nblock_id: " << block_id.value();
|
||||
}
|
||||
|
||||
String ReplicatedMergeTreePartHeader::toString() const
|
||||
|
@ -40,6 +40,7 @@ public:
|
||||
|
||||
const std::array<char, 16> & getColumnsHash() const { return columns_hash; }
|
||||
const MinimalisticDataPartChecksums & getChecksums() const { return checksums; }
|
||||
const std::optional<String> & getBlockID() const { return block_id; }
|
||||
|
||||
private:
|
||||
ReplicatedMergeTreePartHeader(std::array<char, 16> columns_hash_, MinimalisticDataPartChecksums checksums_,
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Storages/MergeTree/IMergeTreeDataPart.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
|
||||
|
||||
@ -1697,9 +1698,6 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
|
||||
prev_virtual_parts = queue.virtual_parts;
|
||||
}
|
||||
|
||||
/// Load current quorum status.
|
||||
auto quorum_status_future = zookeeper->asyncTryGet(queue.zookeeper_path + "/quorum/status");
|
||||
|
||||
/// Load current inserts
|
||||
std::unordered_set<String> lock_holder_paths;
|
||||
for (const String & entry : zookeeper->getChildren(queue.zookeeper_path + "/temp"))
|
||||
@ -1751,15 +1749,32 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
|
||||
|
||||
merges_version = queue_.pullLogsToQueue(zookeeper);
|
||||
|
||||
/// Load current quorum status.
|
||||
auto quorum_status_future = zookeeper->asyncTryGet(queue.zookeeper_path + "/quorum/status");
|
||||
Coordination::GetResponse quorum_status_response = quorum_status_future.get();
|
||||
if (quorum_status_response.error == Coordination::Error::ZOK)
|
||||
{
|
||||
ReplicatedMergeTreeQuorumEntry quorum_status;
|
||||
quorum_status.fromString(quorum_status_response.data);
|
||||
inprogress_quorum_part = quorum_status.part_name;
|
||||
inprogress_quorum_parts.insert(quorum_status.part_name);
|
||||
}
|
||||
else
|
||||
inprogress_quorum_part.clear();
|
||||
inprogress_quorum_parts.clear();
|
||||
|
||||
Strings partitions = zookeeper->getChildren(queue.replica_path + "/parts");
|
||||
for (const String & partition : partitions)
|
||||
{
|
||||
auto header = ReplicatedMergeTreePartHeader::fromString(zookeeper->get(queue.replica_path + "/parts/" + partition));
|
||||
if (header.getBlockID())
|
||||
{
|
||||
ReplicatedMergeTreeBlockEntry block(zookeeper->get(queue.zookeeper_path + "/blocks/" + *header.getBlockID()));
|
||||
if (partition != block.part_name)
|
||||
throw Exception("partition " + partition + " contain block_id " + *header.getBlockID() +
|
||||
" and block_id contain another partition " + block.part_name, ErrorCodes::LOGICAL_ERROR);
|
||||
if (block.quorum_status)
|
||||
inprogress_quorum_parts.insert(block.part_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool ReplicatedMergeTreeMergePredicate::operator()(
|
||||
@ -1821,7 +1836,7 @@ bool ReplicatedMergeTreeMergePredicate::canMergeTwoParts(
|
||||
|
||||
for (const MergeTreeData::DataPartPtr & part : {left, right})
|
||||
{
|
||||
if (part->name == inprogress_quorum_part)
|
||||
if (inprogress_quorum_parts.count(part->name))
|
||||
{
|
||||
if (out_reason)
|
||||
*out_reason = "Quorum insert for part " + part->name + " is currently in progress";
|
||||
@ -1916,7 +1931,7 @@ bool ReplicatedMergeTreeMergePredicate::canMergeSinglePart(
|
||||
const MergeTreeData::DataPartPtr & part,
|
||||
String * out_reason) const
|
||||
{
|
||||
if (part->name == inprogress_quorum_part)
|
||||
if (inprogress_quorum_parts.count(part->name))
|
||||
{
|
||||
if (out_reason)
|
||||
*out_reason = "Quorum insert for part " + part->name + " is currently in progress";
|
||||
@ -1957,7 +1972,7 @@ std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesir
|
||||
|
||||
/// We cannot mutate part if it's being inserted with quorum and it's not
|
||||
/// already reached.
|
||||
if (part->name == inprogress_quorum_part)
|
||||
if (inprogress_quorum_parts.count(part->name))
|
||||
return {};
|
||||
|
||||
std::lock_guard lock(queue.state_mutex);
|
||||
|
@ -475,7 +475,7 @@ private:
|
||||
std::unordered_map<String, std::set<Int64>> committing_blocks;
|
||||
|
||||
/// Quorum state taken at some later time than prev_virtual_parts.
|
||||
String inprogress_quorum_part;
|
||||
std::unordered_set<String> inprogress_quorum_parts;
|
||||
|
||||
int32_t merges_version = -1;
|
||||
};
|
||||
|
@ -69,7 +69,7 @@ struct ReplicatedMergeTreeQuorumStatusEntry
|
||||
|
||||
bool isQuorumReached()
|
||||
{
|
||||
return required_number_of_replicas <= replicas.size();
|
||||
return required_number_of_replicas && required_number_of_replicas <= replicas.size();
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -2,6 +2,8 @@
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeBlockEntry.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
@ -236,6 +238,34 @@ void ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart()
|
||||
storage.updateQuorum(quorum_entry.part_name);
|
||||
}
|
||||
}
|
||||
|
||||
Strings partitions;
|
||||
if (zookeeper->tryGetChildren(storage.replica_path + "/parts/", partitions) == Coordination::Error::ZOK)
|
||||
{
|
||||
for (auto & partition : partitions)
|
||||
{
|
||||
String part_str, block_str;
|
||||
if (!zookeeper->tryGet(storage.replica_path + "/parts/" + partition, part_str))
|
||||
continue;
|
||||
|
||||
auto header = ReplicatedMergeTreePartHeader::fromString(part_str);
|
||||
if (!header.getBlockID() || !zookeeper->tryGet(storage.zookeeper_path + "/blocks/" + *header.getBlockID(), block_str))
|
||||
continue;
|
||||
|
||||
ReplicatedMergeTreeBlockEntry block_entry(block_str);
|
||||
if (block_entry.part_name != partition)
|
||||
{
|
||||
LOG_WARNING(log, "partition {} contain block_id {} and block_id contain another partition {}", partition, *header.getBlockID(), block_entry.part_name);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (block_entry.quorum_status && !block_entry.quorum_status->replicas.count(storage.replica_name))
|
||||
{
|
||||
LOG_WARNING(log, "We have part {} but we is not in quorum. Updating quorum. This shouldn't happen often.", block_entry.part_name);
|
||||
storage.updateQuorum(block_entry.part_name, header.getBlockID());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include <Storages/MergeTree/MergeList.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeBlockEntry.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
|
||||
@ -1180,6 +1181,7 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "creating empty! (2)");
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
part_path, "", zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
@ -1700,30 +1702,37 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
|
||||
|
||||
if (replica.empty())
|
||||
{
|
||||
Coordination::Stat quorum_stat;
|
||||
Coordination::Stat quorum_stat, block_stat;
|
||||
String quorum_path = zookeeper_path + "/quorum/status";
|
||||
String quorum_str = zookeeper->get(quorum_path, &quorum_stat);
|
||||
String block_path = entry.block_id.empty() ? "" : zookeeper_path + "/blocks/" + entry.block_id;
|
||||
String quorum_str, parallel_quorum_str;
|
||||
ReplicatedMergeTreeQuorumEntry quorum_entry;
|
||||
quorum_entry.fromString(quorum_str);
|
||||
ReplicatedMergeTreeBlockEntry block_entry;
|
||||
|
||||
if (quorum_entry.part_name == entry.new_part_name)
|
||||
if (zookeeper->tryGet(quorum_path, quorum_str, &quorum_stat))
|
||||
quorum_entry.fromString(quorum_str);
|
||||
if (!block_path.empty() && zookeeper->tryGet(block_path, parallel_quorum_str, &block_stat))
|
||||
block_entry.fromString(parallel_quorum_str);
|
||||
bool is_parallel_quorum = (block_entry.part_name == entry.new_part_name);
|
||||
|
||||
if (quorum_entry.part_name == entry.new_part_name || is_parallel_quorum)
|
||||
{
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(quorum_path, quorum_stat.version));
|
||||
|
||||
auto part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
|
||||
|
||||
if (part_info.min_block != part_info.max_block)
|
||||
throw Exception("Logical error: log entry with quorum for part covering more than one block number",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (!is_parallel_quorum)
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(quorum_path, quorum_stat.version));
|
||||
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name,
|
||||
"",
|
||||
zkutil::CreateMode::Persistent));
|
||||
|
||||
/// Deleting from `blocks`.
|
||||
if (!entry.block_id.empty() && zookeeper->exists(zookeeper_path + "/blocks/" + entry.block_id))
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/blocks/" + entry.block_id, -1));
|
||||
if (!block_path.empty() && zookeeper->exists(block_path))
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(block_path, -1));
|
||||
|
||||
Coordination::Responses responses;
|
||||
auto code = zookeeper->tryMulti(ops, responses);
|
||||
@ -1757,8 +1766,12 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
|
||||
|
||||
try
|
||||
{
|
||||
std::optional<String> block_id;
|
||||
if (!entry.block_id.empty())
|
||||
block_id = entry.block_id;
|
||||
String part_name = entry.actual_new_part_name.empty() ? entry.new_part_name : entry.actual_new_part_name;
|
||||
if (!fetchPart(part_name, metadata_snapshot, zookeeper_path + "/replicas/" + replica, false, entry.quorum))
|
||||
LOG_ERROR(log, "if (!fetchPart(part_name, metadata_snapshot... ");
|
||||
if (!fetchPart(part_name, metadata_snapshot, zookeeper_path + "/replicas/" + replica, false, entry.quorum, nullptr, block_id))
|
||||
return false;
|
||||
}
|
||||
catch (Exception & e)
|
||||
@ -2111,6 +2124,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
if (interserver_scheme != address.scheme)
|
||||
throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
LOG_ERROR(log, "WOW: part_desc->res_part = fetcher.fetchPart(");
|
||||
part_desc->res_part = fetcher.fetchPart(
|
||||
metadata_snapshot, part_desc->found_new_part_name, source_replica_path,
|
||||
address.host, address.replication_port, timeouts, user, password, interserver_scheme, false, TMP_PREFIX + "fetch_");
|
||||
@ -2142,6 +2156,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
for (PartDescriptionPtr & part_desc : final_parts)
|
||||
{
|
||||
renameTempPartAndReplace(part_desc->res_part, nullptr, &transaction);
|
||||
LOG_INFO(log, "getCommitPartOps from Storage...::executeReplaceRange");
|
||||
getCommitPartOps(ops, part_desc->res_part);
|
||||
|
||||
if (ops.size() > zkutil::MULTI_BATCH_SIZE)
|
||||
@ -3039,6 +3054,8 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name, const st
|
||||
const String quorum_last_part_path = zookeeper_path + "/quorum/last_part";
|
||||
const String quorum_parallel_status_path = block_id ? zookeeper_path + "/blocks/" + *block_id : "";
|
||||
|
||||
LOG_ERROR(log, "updating quorum! {}", quorum_parallel_status_path);
|
||||
|
||||
/// If there is no node, then all quorum INSERTs have already reached the quorum, and nothing is needed.
|
||||
String value;
|
||||
Coordination::Stat stat;
|
||||
@ -3048,13 +3065,18 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name, const st
|
||||
ReplicatedMergeTreeBlockEntry block_entry;
|
||||
ReplicatedMergeTreeQuorumEntry quorum_entry;
|
||||
|
||||
LOG_ERROR(log, "trying to get something");
|
||||
if (zookeeper->tryGet(quorum_status_path, value, &stat))
|
||||
{
|
||||
LOG_ERROR(log, "got /quorum/status");
|
||||
LOG_ERROR(log, "got from /quorum/status: {}", value);
|
||||
quorum_entry.fromString(value);
|
||||
quorum_entry.status.replicas.insert(replica_name);
|
||||
}
|
||||
else if (block_id && zookeeper->tryGet(quorum_parallel_status_path, value, &stat))
|
||||
{
|
||||
LOG_ERROR(log, "got from {}", quorum_parallel_status_path);
|
||||
LOG_ERROR(log, "got from block: {}", value);
|
||||
block_entry.fromString(value);
|
||||
// The quorum has already been achieved
|
||||
if (!block_entry.quorum_status)
|
||||
@ -3062,20 +3084,24 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name, const st
|
||||
|
||||
is_parallel = true;
|
||||
block_entry.quorum_status->replicas.insert(replica_name);
|
||||
LOG_ERROR(log, "block data became: {}", block_entry.toString());
|
||||
if (block_entry.quorum_status->isQuorumReached())
|
||||
block_entry.quorum_status.reset();
|
||||
LOG_ERROR(log, "AFTER checkReached: block data became: {}", block_entry.toString());
|
||||
}
|
||||
else
|
||||
break;
|
||||
|
||||
if (quorum_entry.part_name != part_name && block_entry.part_name != part_name)
|
||||
{
|
||||
LOG_ERROR(log, "another part_names {} {} {}", quorum_entry.part_name, block_entry.part_name, part_name);
|
||||
/// The quorum has already been achieved. Moreover, another INSERT with a quorum has already started.
|
||||
break;
|
||||
}
|
||||
|
||||
if (quorum_entry.status.isQuorumReached())
|
||||
{
|
||||
LOG_ERROR(log, "normal quorum reached");
|
||||
/// The quorum is reached. Delete the node, and update information about the last part that was successfully written with quorum.
|
||||
|
||||
Coordination::Requests ops;
|
||||
@ -3119,12 +3145,15 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name, const st
|
||||
else
|
||||
{
|
||||
Coordination::Error code;
|
||||
if (is_parallel)
|
||||
LOG_ERROR(log, "else");
|
||||
if (!is_parallel)
|
||||
{
|
||||
LOG_ERROR(log, "!is_parallel");
|
||||
/// We update the node, registering there one more replica.
|
||||
code = zookeeper->trySet(quorum_status_path, quorum_entry.toString(), stat.version);
|
||||
}
|
||||
else {
|
||||
LOG_ERROR(log, "updating {} : {}", quorum_parallel_status_path, block_entry.toString());
|
||||
/// Update parallel quorum. It also might be reached here.
|
||||
code = zookeeper->trySet(quorum_parallel_status_path, block_entry.toString(), stat.version);
|
||||
}
|
||||
@ -3203,7 +3232,7 @@ void StorageReplicatedMergeTree::cleanLastPartNode(const String & partition_id)
|
||||
|
||||
|
||||
bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const StorageMetadataPtr & metadata_snapshot,
|
||||
const String & source_replica_path, bool to_detached, size_t quorum, zkutil::ZooKeeper::Ptr zookeeper_)
|
||||
const String & source_replica_path, bool to_detached, size_t quorum, zkutil::ZooKeeper::Ptr zookeeper_, const std::optional<String> & block_id)
|
||||
{
|
||||
auto zookeeper = zookeeper_ ? zookeeper_ : getZooKeeper();
|
||||
const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
|
||||
@ -3306,6 +3335,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
||||
+ "' != '" + address.scheme + "', can't fetch part from " + address.host,
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
LOG_ERROR(log, "WOW: return fetcher.fetchPart(");
|
||||
return fetcher.fetchPart(
|
||||
metadata_snapshot, part_name, source_replica_path,
|
||||
address.host, address.replication_port,
|
||||
@ -3333,7 +3363,10 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
||||
* If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method.
|
||||
*/
|
||||
if (quorum)
|
||||
updateQuorum(part_name);
|
||||
{
|
||||
LOG_ERROR(log, "updateQuorum from Storage...::3349");
|
||||
updateQuorum(part_name, block_id);
|
||||
}
|
||||
|
||||
merge_selecting_task->schedule();
|
||||
|
||||
@ -4830,6 +4863,7 @@ void StorageReplicatedMergeTree::fetchPartition(
|
||||
|
||||
try
|
||||
{
|
||||
LOG_ERROR(log, "OH:fetched = fetchPart(part, ...");
|
||||
fetched = fetchPart(part, metadata_snapshot, best_replica_path, true, 0, zookeeper);
|
||||
}
|
||||
catch (const DB::Exception & e)
|
||||
@ -5272,11 +5306,11 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
|
||||
continue;
|
||||
|
||||
/// ALEXELEXA
|
||||
/// should parse it here using another way
|
||||
ReadBufferFromString buf(result.data);
|
||||
ReplicatedMergeTreeBlockEntry block(result.data);
|
||||
ReadBufferFromString buf(block.part_name);
|
||||
Int64 block_num = 0;
|
||||
bool parsed = tryReadIntText(block_num, buf) && buf.eof();
|
||||
if (!parsed || (min_block_num <= block_num && block_num <= max_block_num))
|
||||
if ((!parsed || (min_block_num <= block_num && block_num <= max_block_num)) && !block.quorum_status)
|
||||
to_delete_futures.emplace_back(path, zookeeper.asyncTryRemove(path));
|
||||
}
|
||||
|
||||
@ -5422,6 +5456,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
||||
Coordination::Requests ops;
|
||||
for (size_t i = 0; i < dst_parts.size(); ++i)
|
||||
{
|
||||
LOG_INFO(log, "getCommitPartOps from Storage...::replacePartitionFrom");
|
||||
getCommitPartOps(ops, dst_parts[i], block_id_paths[i]);
|
||||
ephemeral_locks[i].getUnlockOps(ops);
|
||||
|
||||
@ -5612,6 +5647,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
||||
Coordination::Requests ops;
|
||||
for (size_t i = 0; i < dst_parts.size(); ++i)
|
||||
{
|
||||
LOG_INFO(log, "getCommitPartOps from Storage...::movePartitionToTable");
|
||||
dest_table_storage->getCommitPartOps(ops, dst_parts[i], block_id_paths[i]);
|
||||
ephemeral_locks[i].getUnlockOps(ops);
|
||||
|
||||
@ -5709,20 +5745,27 @@ void StorageReplicatedMergeTree::getCommitPartOps(
|
||||
}
|
||||
|
||||
/// Information about the part, in the replica
|
||||
String part_str, block_id = !block_id_path.empty() && block_entry.quorum_status ? block_id_path.substr(block_id_path.find_last_of('/') + 1) : "";
|
||||
if (storage_settings_ptr->use_minimalistic_part_header_in_zookeeper)
|
||||
{
|
||||
if (!block_id.empty())
|
||||
part_str = ReplicatedMergeTreePartHeader::fromColumnsChecksumsBlockID(part->getColumns(), part->checksums, block_id).toString();
|
||||
else
|
||||
part_str = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(part->getColumns(), part->checksums).toString();
|
||||
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
replica_path + "/parts/" + part->name,
|
||||
ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(part->getColumns(), part->checksums).toString(),
|
||||
part_str,
|
||||
zkutil::CreateMode::Persistent));
|
||||
LOG_DEBUG(log, "ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(part->getColumns(), part->checksums).toString(): {}",
|
||||
ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(part->getColumns(), part->checksums).toString());
|
||||
LOG_DEBUG(log, "ReplicatedMergeTreePartHeader::..: {}", part_str);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// ALEXELEXA think!
|
||||
LOG_DEBUG(log, "creating empty!!!");
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
replica_path + "/parts/" + part->name,
|
||||
"",
|
||||
"", /// how it reads?
|
||||
zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(
|
||||
replica_path + "/parts/" + part->name + "/columns",
|
||||
|
@ -486,7 +486,8 @@ private:
|
||||
const String & replica_path,
|
||||
bool to_detached,
|
||||
size_t quorum,
|
||||
zkutil::ZooKeeper::Ptr zookeeper_ = nullptr);
|
||||
zkutil::ZooKeeper::Ptr zookeeper_ = nullptr,
|
||||
const std::optional<String> & block_id = std::nullopt);
|
||||
|
||||
/// Required only to avoid races between executeLogEntry and fetchPartition
|
||||
std::unordered_set<String> currently_fetching_parts;
|
||||
|
Loading…
Reference in New Issue
Block a user