try support replica recovery

This commit is contained in:
Alexander Tokmakov 2020-11-19 13:34:45 +03:00
parent b0262b3d06
commit 2283906a11
5 changed files with 253 additions and 112 deletions

View File

@ -522,6 +522,7 @@
M(553, ROCKSDB_ERROR) \ M(553, ROCKSDB_ERROR) \
M(553, LZMA_STREAM_ENCODER_FAILED) \ M(553, LZMA_STREAM_ENCODER_FAILED) \
M(554, LZMA_STREAM_DECODER_FAILED) \ M(554, LZMA_STREAM_DECODER_FAILED) \
M(554, DATABASE_REPLICATION_FAILED) \
M(999, KEEPER_EXCEPTION) \ M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \ M(1000, POCO_EXCEPTION) \
M(1001, STD_EXCEPTION) \ M(1001, STD_EXCEPTION) \

View File

@ -28,9 +28,10 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int REPLICA_IS_ALREADY_EXIST; extern const int REPLICA_IS_ALREADY_EXIST;
extern const int DATABASE_REPLICATION_FAILED;
} }
constexpr const char * first_entry_name = "query-0000000000"; static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
zkutil::ZooKeeperPtr DatabaseReplicated::getZooKeeper() const zkutil::ZooKeeperPtr DatabaseReplicated::getZooKeeper() const
{ {
@ -42,6 +43,15 @@ static inline String getHostID(const Context & global_context)
return Cluster::Address::toString(getFQDNOrHostName(), global_context.getTCPPort()); return Cluster::Address::toString(getFQDNOrHostName(), global_context.getTCPPort());
} }
Strings DatabaseReplicated::getSnapshots(const ZooKeeperPtr & zookeeper) const
{
Strings snapshots = zookeeper->getChildren(zookeeper_path + "/snapshots");
std::sort(snapshots.begin(), snapshots.end());
if (snapshots.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No snapshots found");
return snapshots;
}
DatabaseReplicated::~DatabaseReplicated() = default; DatabaseReplicated::~DatabaseReplicated() = default;
@ -84,7 +94,7 @@ DatabaseReplicated::DatabaseReplicated(
createDatabaseNodesInZooKeeper(current_zookeeper); createDatabaseNodesInZooKeeper(current_zookeeper);
} }
replica_path = zookeeper_path + "/replicas/" + shard_name + "|" + replica_name; replica_path = zookeeper_path + "/replicas/" + shard_name + "/" + replica_name;
String replica_host_id; String replica_host_id;
if (current_zookeeper->tryGet(replica_path, replica_host_id)) if (current_zookeeper->tryGet(replica_path, replica_host_id))
@ -95,7 +105,7 @@ DatabaseReplicated::DatabaseReplicated(
"Replica {} of shard {} of replicated database at {} already exists. Replica host ID: '{}', current host ID: '{}'", "Replica {} of shard {} of replicated database at {} already exists. Replica host ID: '{}', current host ID: '{}'",
replica_name, shard_name, zookeeper_path, replica_host_id, host_id); replica_name, shard_name, zookeeper_path, replica_host_id, host_id);
log_entry_to_execute = current_zookeeper->get(replica_path + "/log_ptr"); log_entry_to_execute = parse<UInt32>(current_zookeeper->get(replica_path + "/log_ptr"));
} }
else else
{ {
@ -103,10 +113,7 @@ DatabaseReplicated::DatabaseReplicated(
createReplicaNodesInZooKeeper(current_zookeeper); createReplicaNodesInZooKeeper(current_zookeeper);
} }
assert(log_entry_to_execute.starts_with("query-")); snapshot_period = 1; //context_.getConfigRef().getInt("database_replicated_snapshot_period", 10);
snapshot_period = context_.getConfigRef().getInt("database_replicated_snapshot_period", 10);
LOG_DEBUG(log, "Snapshot period is set to {} log entries per one snapshot", snapshot_period); LOG_DEBUG(log, "Snapshot period is set to {} log entries per one snapshot", snapshot_period);
} }
@ -117,10 +124,12 @@ bool DatabaseReplicated::createDatabaseNodesInZooKeeper(const zkutil::ZooKeeperP
Coordination::Requests ops; Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log", "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/snapshots", "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/snapshots", "", zkutil::CreateMode::Persistent));
/// Create empty snapshot (with no tables) /// Create empty snapshot (with no tables)
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/snapshots/" + first_entry_name, "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/snapshots/0", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/metadata", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/metadata/0", "", zkutil::CreateMode::Persistent));
Coordination::Responses responses; Coordination::Responses responses;
auto res = current_zookeeper->tryMulti(ops, responses); auto res = current_zookeeper->tryMulti(ops, responses);
@ -137,20 +146,24 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
{ {
current_zookeeper->createAncestors(replica_path); current_zookeeper->createAncestors(replica_path);
Strings snapshots = current_zookeeper->getChildren(zookeeper_path + "/snapshots");
std::sort(snapshots.begin(), snapshots.end());
if (snapshots.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No snapshots found");
/// When creating new replica, use latest snapshot version as initial value of log_pointer /// When creating new replica, use latest snapshot version as initial value of log_pointer
log_entry_to_execute = snapshots.back(); log_entry_to_execute = parse<UInt32>(getSnapshots(current_zookeeper).back());
/// Write host name to replica_path, it will protect from multiple replicas with the same name /// Write host name to replica_path, it will protect from multiple replicas with the same name
auto host_id = getHostID(global_context); auto host_id = getHostID(global_context);
/// On replica creation add empty entry to log. Can be used to trigger some actions on other replicas (e.g. update cluster info).
DDLLogEntry entry;
entry.hosts = {};
entry.query = {};
entry.initiator = {};
recoverLostReplica(current_zookeeper, log_entry_to_execute, true);
Coordination::Requests ops; Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", log_entry_to_execute , zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", toString(log_entry_to_execute), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/query-", entry.toString(), zkutil::CreateMode::PersistentSequential));
current_zookeeper->multi(ops); current_zookeeper->multi(ops);
} }
@ -160,10 +173,13 @@ void DatabaseReplicated::loadStoredObjects(Context & context, bool has_force_res
DatabaseReplicatedExtensions ext; DatabaseReplicatedExtensions ext;
ext.database_uuid = getUUID(); ext.database_uuid = getUUID();
ext.zookeeper_path = zookeeper_path;
ext.database_name = getDatabaseName(); ext.database_name = getDatabaseName();
ext.shard_name = shard_name; ext.shard_name = shard_name;
ext.replica_name = replica_name; ext.replica_name = replica_name;
ext.first_not_executed = log_entry_to_execute; ext.first_not_executed = log_entry_to_execute;
ext.lost_callback = [this] (const String & entry_name, const ZooKeeperPtr & zookeeper) { onUnexpectedLogEntry(entry_name, zookeeper); };
ext.executed_callback = [this] (const String & entry_name, const ZooKeeperPtr & zookeeper) { onExecutedLogEntry(entry_name, zookeeper); };
/// Pool size must be 1 (to avoid reordering of log entries) /// Pool size must be 1 (to avoid reordering of log entries)
constexpr size_t pool_size = 1; constexpr size_t pool_size = 1;
@ -171,6 +187,41 @@ void DatabaseReplicated::loadStoredObjects(Context & context, bool has_force_res
std::make_optional<DatabaseReplicatedExtensions>(std::move(ext))); std::make_optional<DatabaseReplicatedExtensions>(std::move(ext)));
} }
void DatabaseReplicated::onUnexpectedLogEntry(const String & entry_name, const ZooKeeperPtr & zookeeper)
{
/// We cannot execute next entry of replication log. Possible reasons:
/// 1. Replica is staled, some entries were removed by log cleanup process.
/// In this case we should recover replica from the last snapshot.
/// 2. Replication log is broken due to manual operations with ZooKeeper or logical error.
/// In this case we just stop replication without any attempts to recover it automatically,
/// because such attempts may lead to unexpected data removal.
constexpr const char * name = "query-";
if (!startsWith(entry_name, name))
throw Exception(ErrorCodes::DATABASE_REPLICATION_FAILED, "Unexpected entry in replication log: {}", entry_name);
UInt32 entry_number;
if (!tryParse(entry_number, entry_name.substr(strlen(name))))
throw Exception(ErrorCodes::DATABASE_REPLICATION_FAILED, "Cannot parse number of replication log entry {}", entry_name);
if (entry_number < log_entry_to_execute)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Entry {} already executed, current pointer is {}", entry_number, log_entry_to_execute);
/// Entry name is valid. Let's get min snapshot version to check if replica is staled.
Strings snapshots = getSnapshots(zookeeper);
UInt32 min_snapshot = parse<UInt32>(snapshots.front());
if (log_entry_to_execute < min_snapshot)
{
recoverLostReplica(zookeeper, parse<UInt32>(snapshots.back()));
return;
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot recover replica, probably it's a bug. "
"Got log entry '{}' when expected entry number {}, "
"available snapshots: ",
entry_name, log_entry_to_execute, boost::algorithm::join(snapshots, ", "));
}
void DatabaseReplicated::removeOutdatedSnapshotsAndLog() void DatabaseReplicated::removeOutdatedSnapshotsAndLog()
{ {
@ -217,40 +268,51 @@ void DatabaseReplicated::removeOutdatedSnapshotsAndLog()
} }
} }
void DatabaseReplicated::runBackgroundLogExecutor() void DatabaseReplicated::onExecutedLogEntry(const String & entry_name, const ZooKeeperPtr & zookeeper)
{ {
if (last_executed_log_entry.empty()) assert(entry_name == DatabaseReplicatedExtensions::getLogEntryName(log_entry_to_execute));
++log_entry_to_execute;
if (snapshot_period > 0 && log_entry_to_execute % snapshot_period == 0)
{ {
loadMetadataFromSnapshot(); createSnapshot(zookeeper);
} }
auto current_zookeeper = getZooKeeper();
Strings log_entry_names = current_zookeeper->getChildren(zookeeper_path + "/log");
std::sort(log_entry_names.begin(), log_entry_names.end());
auto newest_entry_it = std::upper_bound(log_entry_names.begin(), log_entry_names.end(), last_executed_log_entry);
log_entry_names.erase(log_entry_names.begin(), newest_entry_it);
for (const String & log_entry_name : log_entry_names)
{
//executeLogName(log_entry_name);
last_executed_log_entry = log_entry_name;
writeLastExecutedToDiskAndZK();
int log_n = parse<int>(log_entry_name.substr(4));
int last_log_n = parse<int>(log_entry_names.back().substr(4));
/// The third condition gurantees at most one snapshot creation per batch
if (log_n > 0 && snapshot_period > 0 && (last_log_n - log_n) / snapshot_period == 0 && log_n % snapshot_period == 0)
{
createSnapshot();
}
}
//background_log_executor->scheduleAfter(500);
} }
//void DatabaseReplicated::runBackgroundLogExecutor()
//{
// if (last_executed_log_entry.empty())
// {
// loadMetadataFromSnapshot();
// }
//
// auto current_zookeeper = getZooKeeper();
// Strings log_entry_names = current_zookeeper->getChildren(zookeeper_path + "/log");
//
// std::sort(log_entry_names.begin(), log_entry_names.end());
// auto newest_entry_it = std::upper_bound(log_entry_names.begin(), log_entry_names.end(), last_executed_log_entry);
//
// log_entry_names.erase(log_entry_names.begin(), newest_entry_it);
//
// for (const String & log_entry_name : log_entry_names)
// {
// //executeLogName(log_entry_name);
// last_executed_log_entry = log_entry_name;
// writeLastExecutedToDiskAndZK();
//
// int log_n = parse<int>(log_entry_name.substr(4));
// int last_log_n = parse<int>(log_entry_names.back().substr(4));
//
// /// The third condition gurantees at most one snapshot creation per batch
// if (log_n > 0 && snapshot_period > 0 && (last_log_n - log_n) / snapshot_period == 0 && log_n % snapshot_period == 0)
// {
// createSnapshot();
// }
// }
//
// //background_log_executor->scheduleAfter(500);
//}
void DatabaseReplicated::writeLastExecutedToDiskAndZK() void DatabaseReplicated::writeLastExecutedToDiskAndZK()
{ {
auto current_zookeeper = getZooKeeper(); auto current_zookeeper = getZooKeeper();
@ -294,79 +356,88 @@ BlockIO DatabaseReplicated::propose(const ASTPtr & query)
//FIXME need list of all replicas, we can obtain it from zk //FIXME need list of all replicas, we can obtain it from zk
Strings hosts_to_wait; Strings hosts_to_wait;
hosts_to_wait.emplace_back(shard_name + '/' +replica_name); hosts_to_wait.emplace_back(shard_name + '|' +replica_name);
auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, global_context); auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, global_context);
io.in = std::move(stream); io.in = std::move(stream);
return io; return io;
} }
void DatabaseReplicated::createSnapshot() void DatabaseReplicated::createSnapshot(const ZooKeeperPtr & zookeeper)
{ {
auto current_zookeeper = getZooKeeper(); String snapshot_path = zookeeper_path + "/snapshot/" + toString(log_entry_to_execute);
String snapshot_path = zookeeper_path + "/snapshots/" + last_executed_log_entry;
if (Coordination::Error::ZNODEEXISTS == current_zookeeper->tryCreate(snapshot_path, String(), zkutil::CreateMode::Persistent)) if (zookeeper->exists(snapshot_path))
{
return; return;
}
for (auto iterator = getTablesIterator(global_context, {}); iterator->isValid(); iterator->next()) std::vector<std::pair<String, String>> create_queries;
{ {
String table_name = iterator->name(); std::lock_guard lock{mutex};
auto query = getCreateQueryFromMetadata(getObjectMetadataPath(table_name), true); create_queries.reserve(tables.size());
String statement = queryToString(query); for (const auto & table : tables)
current_zookeeper->create(snapshot_path + "/" + table_name, statement, zkutil::CreateMode::Persistent); {
const String & name = table.first;
ReadBufferFromFile in(getObjectMetadataPath(name), METADATA_FILE_BUFFER_SIZE);
String attach_query;
readStringUntilEOF(attach_query, in);
create_queries.emplace_back(escapeForFileName(name), std::move(attach_query));
}
} }
current_zookeeper->create(snapshot_path + "/.completed", String(), zkutil::CreateMode::Persistent);
removeOutdatedSnapshotsAndLog(); if (zookeeper->exists(snapshot_path))
return;
String queries_path = zookeeper_path + "/metadata/" + toString(log_entry_to_execute);
zookeeper->tryCreate(queries_path, "", zkutil::CreateMode::Persistent);
queries_path += '/';
//FIXME use tryMulti with MULTI_BATCH_SIZE
for (const auto & table : create_queries)
zookeeper->tryCreate(queries_path + table.first, table.second, zkutil::CreateMode::Persistent);
if (create_queries.size() != zookeeper->getChildren(zookeeper_path + "/metadata/" + toString(log_entry_to_execute)).size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Created invalid snapshot");
zookeeper->tryCreate(snapshot_path, String(), zkutil::CreateMode::Persistent);
} }
void DatabaseReplicated::loadMetadataFromSnapshot() void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 from_snapshot, bool create)
{ {
/// Executes the latest snapshot. LOG_WARNING(log, "Will recover replica from snapshot", from_snapshot);
/// Used by new replicas only.
auto current_zookeeper = getZooKeeper();
Strings snapshots; //FIXME drop old tables
if (current_zookeeper->tryGetChildren(zookeeper_path + "/snapshots", snapshots) != Coordination::Error::ZOK)
return;
auto latest_snapshot = std::max_element(snapshots.begin(), snapshots.end()); String snapshot_metadata_path = zookeeper_path + "/metadata/" + toString(from_snapshot);
while (snapshots.size() > 0 && !current_zookeeper->exists(zookeeper_path + "/snapshots/" + *latest_snapshot + "/.completed")) Strings tables_in_snapshot = current_zookeeper->getChildren(snapshot_metadata_path);
current_zookeeper->get(zookeeper_path + "/snapshots/" + toString(from_snapshot)); /// Assert node exists
snapshot_metadata_path += '/';
for (const auto & table_name : tables_in_snapshot)
{ {
snapshots.erase(latest_snapshot); String query_to_execute = current_zookeeper->get(snapshot_metadata_path + table_name);
latest_snapshot = std::max_element(snapshots.begin(), snapshots.end());
if (!startsWith(query_to_execute, "ATTACH "))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected query: {}", query_to_execute);
query_to_execute = "CREATE " + query_to_execute.substr(strlen("ATTACH "));
Context current_context = global_context;
current_context.getClientInfo().query_kind = ClientInfo::QueryKind::REPLICATED_LOG_QUERY;
current_context.setCurrentDatabase(database_name);
current_context.setCurrentQueryId(""); // generate random query_id
executeQuery(query_to_execute, current_context);
} }
if (snapshots.size() < 1) if (create)
{
return;
}
Strings metadatas;
if (current_zookeeper->tryGetChildren(zookeeper_path + "/snapshots/" + *latest_snapshot, metadatas) != Coordination::Error::ZOK)
return; return;
LOG_DEBUG(log, "Executing {} snapshot", *latest_snapshot); current_zookeeper->set(replica_path + "/log-ptr", toString(from_snapshot));
last_executed_log_entry = from_snapshot;
ddl_worker->setLogPointer(from_snapshot); //FIXME
for (auto t = metadatas.begin(); t != metadatas.end(); ++t) //writeLastExecutedToDiskAndZK();
{
String path = zookeeper_path + "/snapshots/" + *latest_snapshot + "/" + *t;
String query_to_execute = current_zookeeper->get(path, {}, nullptr);
auto current_context = std::make_unique<Context>(global_context);
current_context->getClientInfo().query_kind = ClientInfo::QueryKind::REPLICATED_LOG_QUERY;
current_context->setCurrentDatabase(database_name);
current_context->setCurrentQueryId(""); // generate random query_id
executeQuery(query_to_execute, *current_context);
}
last_executed_log_entry = *latest_snapshot;
writeLastExecutedToDiskAndZK();
} }
void DatabaseReplicated::drop(const Context & context_) void DatabaseReplicated::drop(const Context & context_)

View File

@ -13,6 +13,7 @@ namespace DB
{ {
class DDLWorker; class DDLWorker;
using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
/** DatabaseReplicated engine /** DatabaseReplicated engine
* supports replication of metadata * supports replication of metadata
@ -56,22 +57,29 @@ public:
void loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach = false) override; void loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach = false) override;
private: private:
bool createDatabaseNodesInZooKeeper(const zkutil::ZooKeeperPtr & current_zookeeper); bool createDatabaseNodesInZooKeeper(const ZooKeeperPtr & current_zookeeper);
void createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPtr & current_zookeeper); void createReplicaNodesInZooKeeper(const ZooKeeperPtr & current_zookeeper);
void runBackgroundLogExecutor(); //void runBackgroundLogExecutor();
void writeLastExecutedToDiskAndZK(); void writeLastExecutedToDiskAndZK();
void loadMetadataFromSnapshot(); //void loadMetadataFromSnapshot();
void createSnapshot(); void createSnapshot(const ZooKeeperPtr & zookeeper);
void removeOutdatedSnapshotsAndLog(); void removeOutdatedSnapshotsAndLog();
Strings getSnapshots(const ZooKeeperPtr & zookeeper) const;
void onUnexpectedLogEntry(const String & entry_name, const ZooKeeperPtr & zookeeper);
void recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 from_snapshot, bool create = false);
void onExecutedLogEntry(const String & entry_name, const ZooKeeperPtr & zookeeper);
String zookeeper_path; String zookeeper_path;
String shard_name; String shard_name;
String replica_name; String replica_name;
String replica_path; String replica_path;
String log_entry_to_execute; UInt32 log_entry_to_execute;
std::mutex log_name_mutex; std::mutex log_name_mutex;
String log_name_to_exec_with_result; String log_name_to_exec_with_result;
@ -84,6 +92,8 @@ private:
std::unique_ptr<DDLWorker> ddl_worker; std::unique_ptr<DDLWorker> ddl_worker;
}; };
} }

View File

@ -142,6 +142,22 @@ std::unique_ptr<ZooKeeperLock> createSimpleZooKeeperLock(
} }
String DatabaseReplicatedExtensions::getLogEntryName(UInt32 log_entry_number)
{
constexpr size_t seq_node_digits = 10;
String number = toString(log_entry_number);
String name = "query-" + String(seq_node_digits - number.size(), '0') + number;
return name;
}
UInt32 DatabaseReplicatedExtensions::getLogEntryNumber(const String & log_entry_name)
{
constexpr const char * name = "query-";
assert(startsWith(log_entry_name, name));
return parse<UInt32>(log_entry_name.substr(strlen(name)));
}
DDLWorker::DDLWorker(int pool_size_, const std::string & zk_root_dir, const Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix, DDLWorker::DDLWorker(int pool_size_, const std::string & zk_root_dir, const Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix,
std::optional<DatabaseReplicatedExtensions> database_replicated_ext_) std::optional<DatabaseReplicatedExtensions> database_replicated_ext_)
: context(context_) : context(context_)
@ -236,8 +252,21 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r
String node_data; String node_data;
String entry_path = queue_dir + "/" + entry_name; String entry_path = queue_dir + "/" + entry_name;
if (database_replicated_ext)
{
auto expected_log_entry = DatabaseReplicatedExtensions::getLogEntryName(database_replicated_ext->first_not_executed);
if (entry_name != expected_log_entry)
{
database_replicated_ext->lost_callback(entry_name, zookeeper);
out_reason = "DatabaseReplicated: expected " + expected_log_entry + " got " + entry_name;
return {};
}
}
if (!zookeeper->tryGet(entry_path, node_data)) if (!zookeeper->tryGet(entry_path, node_data))
{ {
if (database_replicated_ext)
database_replicated_ext->lost_callback(entry_name, zookeeper);
/// It is Ok that node could be deleted just now. It means that there are no current host in node's host list. /// It is Ok that node could be deleted just now. It means that there are no current host in node's host list.
out_reason = "The task was deleted"; out_reason = "The task was deleted";
return {}; return {};
@ -339,7 +368,7 @@ void DDLWorker::scheduleTasks()
? queue_nodes.begin() ? queue_nodes.begin()
: std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_tasks.back()); : std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_tasks.back());
for (auto it = begin_node; it != queue_nodes.end(); ++it) for (auto it = begin_node; it != queue_nodes.end() && !stop_flag; ++it)
{ {
String entry_name = *it; String entry_name = *it;
@ -362,11 +391,17 @@ void DDLWorker::scheduleTasks()
if (!already_processed) if (!already_processed)
{ {
worker_pool.scheduleOrThrowOnError([this, task_ptr = task.release()]() if (database_replicated_ext)
{ {
setThreadName("DDLWorkerExec"); enqueueTask(DDLTaskPtr(task.release()));
enqueueTask(DDLTaskPtr(task_ptr)); }
}); else
{
worker_pool.scheduleOrThrowOnError([this, task_ptr = task.release()]() {
setThreadName("DDLWorkerExec");
enqueueTask(DDLTaskPtr(task_ptr));
});
}
} }
else else
{ {
@ -374,9 +409,6 @@ void DDLWorker::scheduleTasks()
} }
saveTask(entry_name); saveTask(entry_name);
if (stop_flag)
break;
} }
} }
@ -599,6 +631,7 @@ void DDLWorker::enqueueTask(DDLTaskPtr task_ptr)
} }
} }
} }
void DDLWorker::processTask(DDLTask & task) void DDLWorker::processTask(DDLTask & task)
{ {
auto zookeeper = tryGetZooKeeper(); auto zookeeper = tryGetZooKeeper();
@ -626,7 +659,9 @@ void DDLWorker::processTask(DDLTask & task)
else else
throw Coordination::Exception(code, active_node_path); throw Coordination::Exception(code, active_node_path);
if (!task.was_executed) //FIXME
bool is_dummy_query = database_replicated_ext && task.entry.query.empty();
if (!task.was_executed && !is_dummy_query)
{ {
try try
{ {
@ -675,7 +710,19 @@ void DDLWorker::processTask(DDLTask & task)
Coordination::Requests ops; Coordination::Requests ops;
ops.emplace_back(zkutil::makeRemoveRequest(active_node_path, -1)); ops.emplace_back(zkutil::makeRemoveRequest(active_node_path, -1));
ops.emplace_back(zkutil::makeCreateRequest(finished_node_path, task.execution_status.serializeText(), zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(finished_node_path, task.execution_status.serializeText(), zkutil::CreateMode::Persistent));
if (database_replicated_ext)
{
assert(DatabaseReplicatedExtensions::getLogEntryName(database_replicated_ext->first_not_executed) == task.entry_name);
ops.emplace_back(zkutil::makeSetRequest(database_replicated_ext->getReplicaPath() + "/log_ptr", toString(database_replicated_ext->first_not_executed), -1));
}
zookeeper->multi(ops); zookeeper->multi(ops);
if (database_replicated_ext)
{
database_replicated_ext->executed_callback(task.entry_name, zookeeper);
++(database_replicated_ext->first_not_executed);
}
} }

View File

@ -37,16 +37,25 @@ using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
struct DatabaseReplicatedExtensions struct DatabaseReplicatedExtensions
{ {
UUID database_uuid; UUID database_uuid;
String zookeeper_path;
String database_name; String database_name;
String shard_name; String shard_name;
String replica_name; String replica_name;
String first_not_executed; UInt32 first_not_executed;
using NewEntryCallback = std::function<void(const String & entry_name, const ZooKeeperPtr)>; using EntryLostCallback = std::function<void(const String & entry_name, const ZooKeeperPtr)>;
using EntryExecutedCallback = std::function<void(const String & entry_name, const ZooKeeperPtr)>; using EntryExecutedCallback = std::function<void(const String & entry_name, const ZooKeeperPtr)>;
using EntryErrorCallback = std::function<void(const String & entry_name, const ZooKeeperPtr, const std::exception_ptr &)>; using EntryErrorCallback = std::function<void(const String & entry_name, const ZooKeeperPtr, const std::exception_ptr &)>;
NewEntryCallback before_execution_callback; EntryLostCallback lost_callback;
EntryExecutedCallback executed_callback; EntryExecutedCallback executed_callback;
EntryErrorCallback error_callback; EntryErrorCallback error_callback;
String getReplicaPath() const
{
return zookeeper_path + "/replicas/" + shard_name + "/" + replica_name;
}
static String getLogEntryName(UInt32 log_entry_number);
static UInt32 getLogEntryNumber(const String & log_entry_name);
}; };
@ -69,6 +78,9 @@ public:
void shutdown(); void shutdown();
//FIXME get rid of this method
void setLogPointer(UInt32 log_pointer) { database_replicated_ext->first_not_executed = log_pointer; }
private: private:
/// Returns cached ZooKeeper session (possibly expired). /// Returns cached ZooKeeper session (possibly expired).