write mutations to ZK

This commit is contained in:
Alexey Zatelepin 2018-04-19 13:33:16 +03:00
parent 44c091669a
commit 21e03324af
7 changed files with 245 additions and 8 deletions

View File

@ -271,6 +271,20 @@ public:
return path;
}
void remove()
{
try
{
zookeeper.tryRemove(path);
path.clear();
}
catch (...)
{
ProfileEvents::increment(ProfileEvents::CannotRemoveEphemeralNode);
throw;
}
}
void getRemoveOps(zkutil::Requests & ops) const
{
ops.emplace_back(zkutil::makeRemoveRequest(path, -1));
@ -303,11 +317,10 @@ public:
try
{
zookeeper.tryRemove(path);
remove();
}
catch (...)
{
ProfileEvents::increment(ProfileEvents::CannotRemoveEphemeralNode);
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
}
}

View File

@ -0,0 +1,48 @@
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
namespace DB
{
void ReplicatedMergeTreeMutationEntry::writeText(WriteBuffer & out) const
{
out << "create time: " << LocalDateTime(create_time ? create_time : time(nullptr)) << "\n"
<< "source replica: " << source_replica << "\n"
<< "block number: " << block_number << "\n";
commands.writeText(out);
}
void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in)
{
LocalDateTime create_time_dt;
in >> "create time: " >> create_time_dt >> "\n";
create_time = create_time_dt;
in >> "source replica: " >> source_replica >> "\n";
in >> "block number: " >> block_number >> "\n";
commands.readText(in);
}
String ReplicatedMergeTreeMutationEntry::toString() const
{
WriteBufferFromOwnString out;
writeText(out);
return out.str();
}
ReplicatedMergeTreeMutationEntry ReplicatedMergeTreeMutationEntry::parse(const String & str)
{
ReadBufferFromString in(str);
ReplicatedMergeTreeMutationEntry res;
res.readText(in);
assertEOF(in);
return res;
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Common/Exception.h>
#include <Core/Types.h>
#include <IO/WriteHelpers.h>
#include <Storages/MutationCommands.h>
namespace DB
{
class ReadBuffer;
class WriteBuffer;
struct ReplicatedMergeTreeMutationEntry
{
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
String toString() const;
static ReplicatedMergeTreeMutationEntry parse(const String & str);
String znode_name;
time_t create_time = 0;
String source_replica;
Int64 block_number = 0;
MutationCommands commands;
};
}

View File

@ -0,0 +1,85 @@
#include <Storages/MutationCommands.h>
#include <IO/Operators.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
namespace DB
{
static String typeToString(MutationCommand::Type type)
{
switch (type)
{
case MutationCommand::DELETE: return "DELETE";
default:
throw Exception("Unknown mutation type: " + toString<int>(type), ErrorCodes::LOGICAL_ERROR);
}
}
void MutationCommand::writeText(WriteBuffer & out) const
{
out << typeToString(type) << "\n";
switch (type)
{
case MutationCommand::DELETE:
{
std::stringstream ss;
formatAST(*predicate, ss, /* hilite = */ false, /* one_line = */ true);
out << "predicate: " << ss.str() << "\n";
break;
}
default:
break;
}
}
void MutationCommand::readText(ReadBuffer & in)
{
String type_str;
in >> type_str >> "\n";
if (type_str == "DELETE")
{
type = DELETE;
String predicate_str;
in >> "predicate: " >> predicate_str >> "\n";
ParserExpressionWithOptionalAlias p_expr(false);
predicate = parseQuery(
p_expr, predicate_str.data(), predicate_str.data() + predicate_str.length(), "mutation predicate", 0);
}
else
{
/// We wouldn't know how to execute the mutation but at least we will be able to load it
/// and know that it is there (important for selection of merges).
type = UNKNOWN;
}
}
void MutationCommands::writeText(WriteBuffer & out) const
{
out << "mutation commands count: " << commands.size() << "\n";
for (const MutationCommand & command : commands)
{
command.writeText(out);
}
}
void MutationCommands::readText(ReadBuffer & in)
{
size_t count;
in >> "mutation commands count: " >> count >> "\n";
for (size_t i = 0; i < count; ++i)
{
MutationCommand command;
command.readText(in);
commands.push_back(std::move(command));
}
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Parsers/IAST.h>
#include <IO/WriteHelpers.h>
namespace DB
@ -10,10 +11,11 @@ struct MutationCommand
{
enum Type
{
UNKNOWN,
DELETE,
};
Type type;
Type type = UNKNOWN;
ASTPtr predicate;
@ -24,11 +26,17 @@ struct MutationCommand
res.predicate = predicate;
return res;
}
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
};
struct MutationCommands
{
std::vector<MutationCommand> commands;
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
};
}

View File

@ -8,6 +8,7 @@
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
@ -278,13 +279,17 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
auto zookeeper = getZooKeeper();
/// Working with quorum.
zookeeper->createIfNotExists(zookeeper_path + "/quorum", "");
zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", "");
zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", "");
zookeeper->createIfNotExists(zookeeper_path + "/quorum", String());
zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", String());
zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", String());
/// Tracking lag of replicas.
zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", "");
zookeeper->createIfNotExists(replica_path + "/max_processed_insert_time", "");
zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", String());
zookeeper->createIfNotExists(replica_path + "/max_processed_insert_time", String());
/// Mutations
zookeeper->createIfNotExists(zookeeper_path + "/mutations", "0");
zookeeper->createIfNotExists(replica_path + "/mutation_pointer", String());
}
@ -3538,6 +3543,50 @@ void StorageReplicatedMergeTree::freezePartition(const ASTPtr & partition, const
}
void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context &)
{
ReplicatedMergeTreeMutationEntry entry;
entry.source_replica = replica_name;
entry.commands = commands;
String mutations_path = zookeeper_path + "/mutations";
/// Store next mutation number in the mutations_path node and check its version to ensure that
/// nodes for mutations are created in the same order as the corresponding block numbers.
/// Should work well if the number of concurrent mutation requests is small.
while (true)
{
auto zookeeper = getZooKeeper();
zkutil::Stat mutations_stat;
String prev_mutation_number = zookeeper->get(mutations_path, &mutations_stat);
String mutation_number = padIndex(parse<Int64>(prev_mutation_number) + 1);
auto number_and_node = allocateBlockNumber(*zookeeper);
entry.create_time = time(nullptr);
entry.block_number = number_and_node.first;
zkutil::Requests requests;
requests.emplace_back(zkutil::makeSetRequest(mutations_path, mutation_number, mutations_stat.version));
requests.emplace_back(zkutil::makeCreateRequest(
mutations_path + "/" + mutation_number, entry.toString(), zkutil::CreateMode::Persistent));
zkutil::Responses responses;
int32_t rc = zookeeper->tryMulti(requests, responses);
if (rc == ZooKeeperImpl::ZooKeeper::ZOK)
break;
else if (rc == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
{
LOG_TRACE(log, "Version conflict when trying to create a mutation node, retrying...");
continue;
}
else
throw zkutil::KeeperException("Unable to create a mutation znode", rc);
}
}
void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
{
/// Critical section is not required (since grabOldParts() returns unique part set on each call)

View File

@ -118,6 +118,8 @@ public:
void fetchPartition(const ASTPtr & partition, const String & from, const Context & context) override;
void freezePartition(const ASTPtr & partition, const String & with_name, const Context & context) override;
void mutate(const MutationCommands & commands, const Context & context) override;
/** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper.
*/
void drop() override;