first part

This commit is contained in:
Alexandra Latysheva 2020-09-30 23:16:27 +00:00
parent bbbe51033d
commit e89a56969f
13 changed files with 283 additions and 60 deletions

View File

@ -158,6 +158,7 @@ class IColumn;
\ \
M(UInt64, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled.", 0) \ M(UInt64, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled.", 0) \
M(Milliseconds, insert_quorum_timeout, 600000, "", 0) \ M(Milliseconds, insert_quorum_timeout, 600000, "", 0) \
M(Bool, insert_quorum_parallel, false, "For quorum INSERT queries - enable to make parallel inserts without linearizability", 0) \
M(UInt64, select_sequential_consistency, 0, "For SELECT queries from the replicated table, throw an exception if the replica does not have a chunk written with the quorum; do not read the parts that have not yet been written with the quorum.", 0) \ M(UInt64, select_sequential_consistency, 0, "For SELECT queries from the replicated table, throw an exception if the replica does not have a chunk written with the quorum; do not read the parts that have not yet been written with the quorum.", 0) \
M(UInt64, table_function_remote_max_addresses, 1000, "The maximum number of different shards and the maximum number of replicas of one shard in the `remote` function.", 0) \ M(UInt64, table_function_remote_max_addresses, 1000, "The maximum number of different shards and the maximum number of replicas of one shard in the `remote` function.", 0) \
M(Milliseconds, read_backoff_min_latency_ms, 1000, "Setting to reduce the number of threads in case of slow reads. Pay attention only to reads that took at least that much time.", 0) \ M(Milliseconds, read_backoff_min_latency_ms, 1000, "Setting to reduce the number of threads in case of slow reads. Pay attention only to reads that took at least that much time.", 0) \

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <tuple> #include <tuple>
#include <optional>
#include <common/types.h> #include <common/types.h>
#include <common/DayNum.h> #include <common/DayNum.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h> #include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
@ -18,6 +19,8 @@ struct MergeTreePartInfo
Int64 max_block = 0; Int64 max_block = 0;
UInt32 level = 0; UInt32 level = 0;
Int64 mutation = 0; /// If the part has been mutated or contains mutated parts, is equal to mutation version number. Int64 mutation = 0; /// If the part has been mutated or contains mutated parts, is equal to mutation version number.
std::optional<String> block_id; /// hex if write with quorum and min_block == max_block
/// shouldn't be here...
MergeTreePartInfo() = default; MergeTreePartInfo() = default;

View File

@ -0,0 +1,66 @@
#pragma once
#include <set>
#include <common/types.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumStatusEntry.h>
namespace DB
{
/** To implement the functionality of the "quorum write".
* Information about which replicas the inserted part of data appeared on,
* and on how many replicas it should be.
*/
struct ReplicatedMergeTreeBlockEntry
{
String part_name;
std::optional<ReplicatedMergeTreeQuorumStatusEntry> quorum_status;
ReplicatedMergeTreeBlockEntry() {}
ReplicatedMergeTreeBlockEntry(const String & str)
{
fromString(str);
}
void writeText(WriteBuffer & out) const
{
out << part_name << "\n";
if (quorum_status)
quorum_status->writeText(out);
}
void readText(ReadBuffer & in)
{
in >> part_name;
if (!in.eof())
{
in >> "\n";
quorum_status = ReplicatedMergeTreeQuorumStatusEntry();
quorum_status->readText(in);
}
}
String toString() const
{
WriteBufferFromOwnString out;
writeText(out);
return out.str();
}
void fromString(const String & str)
{
ReadBufferFromString in(str);
readText(in);
}
};
}

View File

@ -1,4 +1,5 @@
#include <Storages/StorageReplicatedMergeTree.h> #include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h> #include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h> #include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Interpreters/PartLog.h> #include <Interpreters/PartLog.h>
@ -39,12 +40,14 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
size_t quorum_, size_t quorum_,
size_t quorum_timeout_ms_, size_t quorum_timeout_ms_,
size_t max_parts_per_block_, size_t max_parts_per_block_,
bool quorum_parallel_,
bool deduplicate_) bool deduplicate_)
: storage(storage_) : storage(storage_)
, metadata_snapshot(metadata_snapshot_) , metadata_snapshot(metadata_snapshot_)
, quorum(quorum_) , quorum(quorum_)
, quorum_timeout_ms(quorum_timeout_ms_) , quorum_timeout_ms(quorum_timeout_ms_)
, max_parts_per_block(max_parts_per_block_) , max_parts_per_block(max_parts_per_block_)
, quorum_parallel(quorum_parallel_)
, deduplicate(deduplicate_) , deduplicate(deduplicate_)
, log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)")) , log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)"))
{ {
@ -243,15 +246,21 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
Int64 block_number = 0; Int64 block_number = 0;
String existing_part_name; String existing_part_name;
ReplicatedMergeTreeBlockEntry block_entry;
if (block_number_lock) if (block_number_lock)
{ {
is_already_existing_part = false; is_already_existing_part = false;
block_number = block_number_lock->getNumber(); block_number = block_number_lock->getNumber();
block_entry.part_name = part->name;
/// Set part attributes according to part_number. Prepare an entry for log. /// Set part attributes according to part_number. Prepare an entry for log.
part->info.min_block = block_number; part->info.min_block = block_number;
part->info.max_block = block_number; part->info.max_block = block_number;
/// ALEXELEXA
/// somehow need to send this block_if to part node. TODO
part->info.block_id = block_id;
part->info.level = 0; part->info.level = 0;
part->name = part->getNewName(part->info); part->name = part->getNewName(part->info);
@ -282,10 +291,9 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
*/ */
if (quorum) if (quorum)
{ {
ReplicatedMergeTreeQuorumEntry quorum_entry; ReplicatedMergeTreeQuorumStatusEntry status_entry;
quorum_entry.part_name = part->name; status_entry.required_number_of_replicas = quorum;
quorum_entry.required_number_of_replicas = quorum; status_entry.replicas.insert(storage.replica_name);
quorum_entry.replicas.insert(storage.replica_name);
/** At this point, this node will contain information that the current replica received a part. /** At this point, this node will contain information that the current replica received a part.
* When other replicas will receive this part (in the usual way, processing the replication log), * When other replicas will receive this part (in the usual way, processing the replication log),
@ -294,12 +302,22 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
* which indicates that the quorum has been reached. * which indicates that the quorum has been reached.
*/ */
if (!quorum_parallel)
{
ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.part_name = part->name;
quorum_entry.status = status_entry;
ops.emplace_back( ops.emplace_back(
zkutil::makeCreateRequest( zkutil::makeCreateRequest(
quorum_info.status_path, quorum_info.status_path,
quorum_entry.toString(), quorum_entry.toString(),
zkutil::CreateMode::Persistent)); zkutil::CreateMode::Persistent));
}
else
block_entry.quorum_status = status_entry;
/// Make sure that during the insertion time, the replica was not reinitialized or disabled (when the server is finished). /// Make sure that during the insertion time, the replica was not reinitialized or disabled (when the server is finished).
ops.emplace_back( ops.emplace_back(
zkutil::makeCheckRequest( zkutil::makeCheckRequest(
@ -352,7 +370,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
} }
/// Information about the part. /// Information about the part.
storage.getCommitPartOps(ops, part, block_id_path); storage.getCommitPartOps(ops, part, block_id_path, block_entry);
MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set. MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set.
bool renamed = false; bool renamed = false;
@ -466,13 +484,15 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
if (is_already_existing_part) if (is_already_existing_part)
{ {
/// We get duplicate part without fetch /// We get duplicate part without fetch
storage.updateQuorum(part->name); /// ALEXELEXA
/// should reset here something, after thinking in TODO
storage.updateQuorum(part->name, part->info.block_id);
} }
/// We are waiting for quorum to be satisfied. /// We are waiting for quorum to be satisfied.
LOG_TRACE(log, "Waiting for quorum"); LOG_TRACE(log, "Waiting for quorum");
String quorum_status_path = storage.zookeeper_path + "/quorum/status"; String quorum_status_path = quorum_parallel ? storage.zookeeper_path + "/blocks/" + block_id : storage.zookeeper_path + "/quorum/status";
try try
{ {
@ -481,15 +501,25 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
zkutil::EventPtr event = std::make_shared<Poco::Event>(); zkutil::EventPtr event = std::make_shared<Poco::Event>();
std::string value; std::string value;
ReplicatedMergeTreeQuorumEntry quorum_entry;
ReplicatedMergeTreeBlockEntry block_entry;
/// `get` instead of `exists` so that `watch` does not leak if the node is no longer there. /// `get` instead of `exists` so that `watch` does not leak if the node is no longer there.
if (!zookeeper->tryGet(quorum_status_path, value, nullptr, event)) if (!zookeeper->tryGet(quorum_status_path, value, nullptr, event))
break; break;
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
/// If the node has time to disappear, and then appear again for the next insert. /// If the node has time to disappear, and then appear again for the next insert.
if (quorum_parallel)
{
block_entry.fromString(value);
if (block_entry.part_name != part->name)
break;
}
else
{
quorum_entry.fromString(value);
if (quorum_entry.part_name != part->name) if (quorum_entry.part_name != part->name)
break; break;
}
if (!event->tryWait(quorum_timeout_ms)) if (!event->tryWait(quorum_timeout_ms))
throw Exception("Timeout while waiting for quorum", ErrorCodes::TIMEOUT_EXCEEDED); throw Exception("Timeout while waiting for quorum", ErrorCodes::TIMEOUT_EXCEEDED);

View File

@ -28,6 +28,7 @@ public:
size_t quorum_, size_t quorum_,
size_t quorum_timeout_ms_, size_t quorum_timeout_ms_,
size_t max_parts_per_block_, size_t max_parts_per_block_,
bool quorum_parallel_,
bool deduplicate_); bool deduplicate_);
Block getHeader() const override; Block getHeader() const override;
@ -64,6 +65,7 @@ private:
size_t quorum_timeout_ms; size_t quorum_timeout_ms;
size_t max_parts_per_block; size_t max_parts_per_block;
bool quorum_parallel = false;
bool deduplicate = true; bool deduplicate = true;
bool last_block_is_duplicate = false; bool last_block_is_duplicate = false;

View File

@ -70,6 +70,8 @@ void ReplicatedMergeTreeCleanupThread::iterate()
if (storage.is_leader) if (storage.is_leader)
{ {
clearOldLogs(); clearOldLogs();
/// ALEXELEXA
/// may be just remove it?
clearOldBlocks(); clearOldBlocks();
clearOldMutations(); clearOldMutations();
} }
@ -344,6 +346,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
for (auto it = first_outdated_block; it != timed_blocks.end(); ++it) for (auto it = first_outdated_block; it != timed_blocks.end(); ++it)
{ {
String path = storage.zookeeper_path + "/blocks/" + it->node; String path = storage.zookeeper_path + "/blocks/" + it->node;
/// ALEXELEXA
/// should check somehow is it quorum block
try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path)); try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path));
} }

View File

@ -34,11 +34,24 @@ ReplicatedMergeTreePartHeader ReplicatedMergeTreePartHeader::fromColumnsAndCheck
return ReplicatedMergeTreePartHeader(getSipHash(columns.toString()), std::move(checksums)); return ReplicatedMergeTreePartHeader(getSipHash(columns.toString()), std::move(checksums));
} }
ReplicatedMergeTreePartHeader ReplicatedMergeTreePartHeader::fromColumnsChecksumsBlockID(
const NamesAndTypesList & columns,
const MergeTreeDataPartChecksums & full_checksums,
const String & block_id_)
{
MinimalisticDataPartChecksums checksums;
checksums.computeTotalChecksums(full_checksums);
return ReplicatedMergeTreePartHeader(getSipHash(columns.toString()), std::move(checksums), block_id_);
}
void ReplicatedMergeTreePartHeader::read(ReadBuffer & in) void ReplicatedMergeTreePartHeader::read(ReadBuffer & in)
{ {
in >> "part header format version: 1\n"; in >> "part header format version: 1\n";
in.readStrict(columns_hash.data(), columns_hash.size()); in.readStrict(columns_hash.data(), columns_hash.size());
checksums.deserializeWithoutHeader(in); checksums.deserializeWithoutHeader(in);
if (!in.eof())
in >> "block_id: " >> block_id.value() >> "\n";
} }
ReplicatedMergeTreePartHeader ReplicatedMergeTreePartHeader::fromString(const String & str) ReplicatedMergeTreePartHeader ReplicatedMergeTreePartHeader::fromString(const String & str)
@ -54,6 +67,8 @@ void ReplicatedMergeTreePartHeader::write(WriteBuffer & out) const
writeString("part header format version: 1\n", out); writeString("part header format version: 1\n", out);
out.write(columns_hash.data(), columns_hash.size()); out.write(columns_hash.data(), columns_hash.size());
checksums.serializeWithoutHeader(out); checksums.serializeWithoutHeader(out);
if (block_id)
out << "block_id " << block_id.value() << "\n";
} }
String ReplicatedMergeTreePartHeader::toString() const String ReplicatedMergeTreePartHeader::toString() const

View File

@ -6,6 +6,7 @@
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <array> #include <array>
#include <optional>
namespace DB namespace DB
@ -28,6 +29,9 @@ public:
static ReplicatedMergeTreePartHeader fromColumnsAndChecksums( static ReplicatedMergeTreePartHeader fromColumnsAndChecksums(
const NamesAndTypesList & columns, const MergeTreeDataPartChecksums & full_checksums); const NamesAndTypesList & columns, const MergeTreeDataPartChecksums & full_checksums);
static ReplicatedMergeTreePartHeader fromColumnsChecksumsBlockID(
const NamesAndTypesList & columns, const MergeTreeDataPartChecksums & full_checksums, const String & block_id_);
void read(ReadBuffer & in); void read(ReadBuffer & in);
static ReplicatedMergeTreePartHeader fromString(const String & str); static ReplicatedMergeTreePartHeader fromString(const String & str);
@ -38,13 +42,15 @@ public:
const MinimalisticDataPartChecksums & getChecksums() const { return checksums; } const MinimalisticDataPartChecksums & getChecksums() const { return checksums; }
private: private:
ReplicatedMergeTreePartHeader(std::array<char, 16> columns_hash_, MinimalisticDataPartChecksums checksums_) ReplicatedMergeTreePartHeader(std::array<char, 16> columns_hash_, MinimalisticDataPartChecksums checksums_,
: columns_hash(std::move(columns_hash_)), checksums(std::move(checksums_)) std::optional<String> block_id_ = std::nullopt)
: columns_hash(std::move(columns_hash_)), checksums(std::move(checksums_)), block_id(std::move(block_id_))
{ {
} }
std::array<char, 16> columns_hash; std::array<char, 16> columns_hash;
MinimalisticDataPartChecksums checksums; MinimalisticDataPartChecksums checksums;
std::optional<String> block_id;
}; };
} }

View File

@ -8,6 +8,7 @@
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <Common/ZooKeeper/ZooKeeper.h> #include <Common/ZooKeeper/ZooKeeper.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumStatusEntry.h>
namespace DB namespace DB
@ -20,8 +21,7 @@ namespace DB
struct ReplicatedMergeTreeQuorumEntry struct ReplicatedMergeTreeQuorumEntry
{ {
String part_name; String part_name;
size_t required_number_of_replicas{}; ReplicatedMergeTreeQuorumStatusEntry status;
std::set<String> replicas;
ReplicatedMergeTreeQuorumEntry() {} ReplicatedMergeTreeQuorumEntry() {}
ReplicatedMergeTreeQuorumEntry(const String & str) ReplicatedMergeTreeQuorumEntry(const String & str)
@ -32,31 +32,15 @@ struct ReplicatedMergeTreeQuorumEntry
void writeText(WriteBuffer & out) const void writeText(WriteBuffer & out) const
{ {
out << "version: 1\n" out << "version: 1\n"
<< "part_name: " << part_name << "\n" << "part_name: " << part_name << "\n";
<< "required_number_of_replicas: " << required_number_of_replicas << "\n" status.writeText(out);
<< "actual_number_of_replicas: " << replicas.size() << "\n"
<< "replicas:\n";
for (const auto & replica : replicas)
out << escape << replica << "\n";
} }
void readText(ReadBuffer & in) void readText(ReadBuffer & in)
{ {
size_t actual_number_of_replicas = 0;
in >> "version: 1\n" in >> "version: 1\n"
>> "part_name: " >> part_name >> "\n" >> "part_name: " >> part_name >> "\n";
>> "required_number_of_replicas: " >> required_number_of_replicas >> "\n" status.readText(in);
>> "actual_number_of_replicas: " >> actual_number_of_replicas >> "\n"
>> "replicas:\n";
for (size_t i = 0; i < actual_number_of_replicas; ++i)
{
String replica;
in >> escape >> replica >> "\n";
replicas.insert(replica);
}
} }
String toString() const String toString() const

View File

@ -0,0 +1,76 @@
#pragma once
#include <set>
#include <common/types.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Common/ZooKeeper/ZooKeeper.h>
namespace DB
{
/** To implement the functionality of the "quorum write".
* Information about which replicas the inserted part of data appeared on,
* and on how many replicas it should be.
*/
struct ReplicatedMergeTreeQuorumStatusEntry
{
size_t required_number_of_replicas{};
std::set<String> replicas;
ReplicatedMergeTreeQuorumStatusEntry() {}
ReplicatedMergeTreeQuorumStatusEntry(const String & str)
{
fromString(str);
}
void writeText(WriteBuffer & out) const
{
out << "required_number_of_replicas: " << required_number_of_replicas << "\n"
<< "actual_number_of_replicas: " << replicas.size() << "\n"
<< "replicas:\n";
for (const auto & replica : replicas)
out << escape << replica << "\n";
}
void readText(ReadBuffer & in)
{
size_t actual_number_of_replicas = 0;
in >> "required_number_of_replicas: " >> required_number_of_replicas >> "\n"
>> "actual_number_of_replicas: " >> actual_number_of_replicas >> "\n"
>> "replicas:\n";
for (size_t i = 0; i < actual_number_of_replicas; ++i)
{
String replica;
in >> escape >> replica >> "\n";
replicas.insert(replica);
}
}
String toString() const
{
WriteBufferFromOwnString out;
writeText(out);
return out.str();
}
void fromString(const String & str)
{
ReadBufferFromString in(str);
readText(in);
}
bool isQuorumReached()
{
return required_number_of_replicas <= replicas.size();
}
};
}

View File

@ -227,9 +227,9 @@ void ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart()
if (zookeeper->tryGet(storage.zookeeper_path + "/quorum/status", quorum_str)) if (zookeeper->tryGet(storage.zookeeper_path + "/quorum/status", quorum_str))
{ {
ReplicatedMergeTreeQuorumEntry quorum_entry; ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.fromString(quorum_str); quorum_entry.status.fromString(quorum_str);
if (!quorum_entry.replicas.count(storage.replica_name) if (!quorum_entry.status.replicas.count(storage.replica_name)
&& zookeeper->exists(storage.replica_path + "/parts/" + quorum_entry.part_name)) && zookeeper->exists(storage.replica_path + "/parts/" + quorum_entry.part_name))
{ {
LOG_WARNING(log, "We have part {} but we is not in quorum. Updating quorum. This shouldn't happen often.", quorum_entry.part_name); LOG_WARNING(log, "We have part {} but we is not in quorum. Updating quorum. This shouldn't happen often.", quorum_entry.part_name);

View File

@ -1176,6 +1176,7 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
{ {
ops.emplace_back(zkutil::makeCreateRequest( ops.emplace_back(zkutil::makeCreateRequest(
part_path, local_part_header.toString(), zkutil::CreateMode::Persistent)); part_path, local_part_header.toString(), zkutil::CreateMode::Persistent));
LOG_DEBUG(log, "local_part_header.toString(): {}", local_part_header.toString());
} }
else else
{ {
@ -3028,7 +3029,7 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(
/** If a quorum is tracked for a part, update information about it in ZK. /** If a quorum is tracked for a part, update information about it in ZK.
*/ */
void StorageReplicatedMergeTree::updateQuorum(const String & part_name) void StorageReplicatedMergeTree::updateQuorum(const String & part_name, const std::optional<String> & block_id)
{ {
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
@ -3036,25 +3037,44 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
const String quorum_status_path = zookeeper_path + "/quorum/status"; const String quorum_status_path = zookeeper_path + "/quorum/status";
/// The name of the previous part for which the quorum was reached. /// The name of the previous part for which the quorum was reached.
const String quorum_last_part_path = zookeeper_path + "/quorum/last_part"; const String quorum_last_part_path = zookeeper_path + "/quorum/last_part";
const String quorum_parallel_status_path = block_id ? zookeeper_path + "/blocks/" + *block_id : "";
String value;
Coordination::Stat stat;
/// If there is no node, then all quorum INSERTs have already reached the quorum, and nothing is needed. /// If there is no node, then all quorum INSERTs have already reached the quorum, and nothing is needed.
while (zookeeper->tryGet(quorum_status_path, value, &stat)) String value;
Coordination::Stat stat;
while (true)
{ {
bool is_parallel = false;
ReplicatedMergeTreeBlockEntry block_entry;
ReplicatedMergeTreeQuorumEntry quorum_entry; ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.fromString(value);
if (quorum_entry.part_name != part_name) if (zookeeper->tryGet(quorum_status_path, value, &stat))
{
quorum_entry.fromString(value);
quorum_entry.status.replicas.insert(replica_name);
}
else if (block_id && zookeeper->tryGet(quorum_parallel_status_path, value, &stat))
{
block_entry.fromString(value);
// The quorum has already been achieved
if (!block_entry.quorum_status)
break;
is_parallel = true;
block_entry.quorum_status->replicas.insert(replica_name);
if (block_entry.quorum_status->isQuorumReached())
block_entry.quorum_status.reset();
}
else
break;
if (quorum_entry.part_name != part_name && block_entry.part_name != part_name)
{ {
/// The quorum has already been achieved. Moreover, another INSERT with a quorum has already started. /// The quorum has already been achieved. Moreover, another INSERT with a quorum has already started.
break; break;
} }
quorum_entry.replicas.insert(replica_name); if (quorum_entry.status.isQuorumReached())
if (quorum_entry.replicas.size() >= quorum_entry.required_number_of_replicas)
{ {
/// The quorum is reached. Delete the node, and update information about the last part that was successfully written with quorum. /// The quorum is reached. Delete the node, and update information about the last part that was successfully written with quorum.
@ -3097,9 +3117,17 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
throw Coordination::Exception(code, quorum_status_path); throw Coordination::Exception(code, quorum_status_path);
} }
else else
{
Coordination::Error code;
if (is_parallel)
{ {
/// We update the node, registering there one more replica. /// We update the node, registering there one more replica.
auto code = zookeeper->trySet(quorum_status_path, quorum_entry.toString(), stat.version); code = zookeeper->trySet(quorum_status_path, quorum_entry.toString(), stat.version);
}
else {
/// Update parallel quorum. It also might be reached here.
code = zookeeper->trySet(quorum_parallel_status_path, block_entry.toString(), stat.version);
}
if (code == Coordination::Error::ZOK) if (code == Coordination::Error::ZOK)
{ {
@ -3588,6 +3616,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
*this, metadata_snapshot, query_settings.insert_quorum, *this, metadata_snapshot, query_settings.insert_quorum,
query_settings.insert_quorum_timeout.totalMilliseconds(), query_settings.insert_quorum_timeout.totalMilliseconds(),
query_settings.max_partitions_per_insert_block, query_settings.max_partitions_per_insert_block,
query_settings.insert_quorum_parallel,
deduplicate); deduplicate);
} }
@ -4164,7 +4193,7 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
PartsTemporaryRename renamed_parts(*this, "detached/"); PartsTemporaryRename renamed_parts(*this, "detached/");
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts); MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false); /// TODO Allow to use quorum here. ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false); /// TODO Allow to use quorum here.
for (size_t i = 0; i < loaded_parts.size(); ++i) for (size_t i = 0; i < loaded_parts.size(); ++i)
{ {
String old_name = loaded_parts[i]->name; String old_name = loaded_parts[i]->name;
@ -5242,6 +5271,8 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
if (result.error == Coordination::Error::ZNONODE) if (result.error == Coordination::Error::ZNONODE)
continue; continue;
/// ALEXELEXA
/// should parse it here using another way
ReadBufferFromString buf(result.data); ReadBufferFromString buf(result.data);
Int64 block_num = 0; Int64 block_num = 0;
bool parsed = tryReadIntText(block_num, buf) && buf.eof(); bool parsed = tryReadIntText(block_num, buf) && buf.eof();
@ -5661,18 +5692,19 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
void StorageReplicatedMergeTree::getCommitPartOps( void StorageReplicatedMergeTree::getCommitPartOps(
Coordination::Requests & ops, Coordination::Requests & ops,
MutableDataPartPtr & part, MutableDataPartPtr & part,
const String & block_id_path) const const String & block_id_path,
ReplicatedMergeTreeBlockEntry block_entry) const
{ {
const String & part_name = part->name;
const auto storage_settings_ptr = getSettings(); const auto storage_settings_ptr = getSettings();
if (!block_id_path.empty()) if (!block_id_path.empty())
{ {
/// Make final duplicate check and commit block_id /// Make final duplicate check and commit block_id
block_entry.part_name = part->name;
ops.emplace_back( ops.emplace_back(
zkutil::makeCreateRequest( zkutil::makeCreateRequest(
block_id_path, block_id_path,
part_name, /// We will be able to know original part number for duplicate blocks, if we want. block_entry.toString(), /// We will be able to know original part number for duplicate blocks, if we want.
zkutil::CreateMode::Persistent)); zkutil::CreateMode::Persistent));
} }
@ -5683,6 +5715,8 @@ void StorageReplicatedMergeTree::getCommitPartOps(
replica_path + "/parts/" + part->name, replica_path + "/parts/" + part->name,
ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(part->getColumns(), part->checksums).toString(), ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(part->getColumns(), part->checksums).toString(),
zkutil::CreateMode::Persistent)); zkutil::CreateMode::Persistent));
LOG_DEBUG(log, "ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(part->getColumns(), part->checksums).toString(): {}",
ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(part->getColumns(), part->checksums).toString());
} }
else else
{ {

View File

@ -10,6 +10,7 @@
#include <Storages/MergeTree/MergeTreeDataWriter.h> #include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h> #include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h> #include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h> #include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h> #include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h> #include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
@ -350,7 +351,8 @@ private:
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
void getCommitPartOps(Coordination::Requests & ops, MutableDataPartPtr & part, const String & block_id_path = "") const; void getCommitPartOps(Coordination::Requests & ops, MutableDataPartPtr & part, const String & block_id_path = "",
ReplicatedMergeTreeBlockEntry block_entry = ReplicatedMergeTreeBlockEntry()) const;
/// Adds actions to `ops` that remove a part from ZooKeeper. /// Adds actions to `ops` that remove a part from ZooKeeper.
/// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes). /// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes).
@ -492,7 +494,7 @@ private:
/// With the quorum being tracked, add a replica to the quorum for the part. /// With the quorum being tracked, add a replica to the quorum for the part.
void updateQuorum(const String & part_name); void updateQuorum(const String & part_name, const std::optional<String> & block_id = std::nullopt);
/// Deletes info from quorum/last_part node for particular partition_id. /// Deletes info from quorum/last_part node for particular partition_id.
void cleanLastPartNode(const String & partition_id); void cleanLastPartNode(const String & partition_id);