diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 405b8c60af8..1981dea5cb9 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -522,6 +522,7 @@ M(553, ROCKSDB_ERROR) \ M(553, LZMA_STREAM_ENCODER_FAILED) \ M(554, LZMA_STREAM_DECODER_FAILED) \ + M(554, DATABASE_REPLICATION_FAILED) \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ M(1001, STD_EXCEPTION) \ diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index c4bffd8fd5d..7b6d98f992a 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -28,9 +28,10 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int BAD_ARGUMENTS; extern const int REPLICA_IS_ALREADY_EXIST; + extern const int DATABASE_REPLICATION_FAILED; } -constexpr const char * first_entry_name = "query-0000000000"; +static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768; zkutil::ZooKeeperPtr DatabaseReplicated::getZooKeeper() const { @@ -42,6 +43,15 @@ static inline String getHostID(const Context & global_context) return Cluster::Address::toString(getFQDNOrHostName(), global_context.getTCPPort()); } +Strings DatabaseReplicated::getSnapshots(const ZooKeeperPtr & zookeeper) const +{ + Strings snapshots = zookeeper->getChildren(zookeeper_path + "/snapshots"); + std::sort(snapshots.begin(), snapshots.end()); + if (snapshots.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No snapshots found"); + return snapshots; +} + DatabaseReplicated::~DatabaseReplicated() = default; @@ -84,7 +94,7 @@ DatabaseReplicated::DatabaseReplicated( createDatabaseNodesInZooKeeper(current_zookeeper); } - replica_path = zookeeper_path + "/replicas/" + shard_name + "|" + replica_name; + replica_path = zookeeper_path + "/replicas/" + shard_name + "/" + replica_name; String replica_host_id; if (current_zookeeper->tryGet(replica_path, replica_host_id)) @@ -95,7 +105,7 @@ DatabaseReplicated::DatabaseReplicated( "Replica {} of shard {} of replicated database at {} already exists. Replica host ID: '{}', current host ID: '{}'", replica_name, shard_name, zookeeper_path, replica_host_id, host_id); - log_entry_to_execute = current_zookeeper->get(replica_path + "/log_ptr"); + log_entry_to_execute = parse(current_zookeeper->get(replica_path + "/log_ptr")); } else { @@ -103,10 +113,7 @@ DatabaseReplicated::DatabaseReplicated( createReplicaNodesInZooKeeper(current_zookeeper); } - assert(log_entry_to_execute.starts_with("query-")); - - - snapshot_period = context_.getConfigRef().getInt("database_replicated_snapshot_period", 10); + snapshot_period = 1; //context_.getConfigRef().getInt("database_replicated_snapshot_period", 10); LOG_DEBUG(log, "Snapshot period is set to {} log entries per one snapshot", snapshot_period); } @@ -117,10 +124,12 @@ bool DatabaseReplicated::createDatabaseNodesInZooKeeper(const zkutil::ZooKeeperP Coordination::Requests ops; ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log", "", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/snapshots", "", zkutil::CreateMode::Persistent)); /// Create empty snapshot (with no tables) - ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/snapshots/" + first_entry_name, "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/snapshots/0", "", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/metadata", "", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/metadata/0", "", zkutil::CreateMode::Persistent)); Coordination::Responses responses; auto res = current_zookeeper->tryMulti(ops, responses); @@ -137,20 +146,24 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt { current_zookeeper->createAncestors(replica_path); - Strings snapshots = current_zookeeper->getChildren(zookeeper_path + "/snapshots"); - std::sort(snapshots.begin(), snapshots.end()); - if (snapshots.empty()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "No snapshots found"); - /// When creating new replica, use latest snapshot version as initial value of log_pointer - log_entry_to_execute = snapshots.back(); + log_entry_to_execute = parse(getSnapshots(current_zookeeper).back()); /// Write host name to replica_path, it will protect from multiple replicas with the same name auto host_id = getHostID(global_context); + /// On replica creation add empty entry to log. Can be used to trigger some actions on other replicas (e.g. update cluster info). + DDLLogEntry entry; + entry.hosts = {}; + entry.query = {}; + entry.initiator = {}; + + recoverLostReplica(current_zookeeper, log_entry_to_execute, true); + Coordination::Requests ops; ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", log_entry_to_execute , zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", toString(log_entry_to_execute), zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/query-", entry.toString(), zkutil::CreateMode::PersistentSequential)); current_zookeeper->multi(ops); } @@ -160,10 +173,13 @@ void DatabaseReplicated::loadStoredObjects(Context & context, bool has_force_res DatabaseReplicatedExtensions ext; ext.database_uuid = getUUID(); + ext.zookeeper_path = zookeeper_path; ext.database_name = getDatabaseName(); ext.shard_name = shard_name; ext.replica_name = replica_name; ext.first_not_executed = log_entry_to_execute; + ext.lost_callback = [this] (const String & entry_name, const ZooKeeperPtr & zookeeper) { onUnexpectedLogEntry(entry_name, zookeeper); }; + ext.executed_callback = [this] (const String & entry_name, const ZooKeeperPtr & zookeeper) { onExecutedLogEntry(entry_name, zookeeper); }; /// Pool size must be 1 (to avoid reordering of log entries) constexpr size_t pool_size = 1; @@ -171,6 +187,41 @@ void DatabaseReplicated::loadStoredObjects(Context & context, bool has_force_res std::make_optional(std::move(ext))); } +void DatabaseReplicated::onUnexpectedLogEntry(const String & entry_name, const ZooKeeperPtr & zookeeper) +{ + /// We cannot execute next entry of replication log. Possible reasons: + /// 1. Replica is staled, some entries were removed by log cleanup process. + /// In this case we should recover replica from the last snapshot. + /// 2. Replication log is broken due to manual operations with ZooKeeper or logical error. + /// In this case we just stop replication without any attempts to recover it automatically, + /// because such attempts may lead to unexpected data removal. + + constexpr const char * name = "query-"; + if (!startsWith(entry_name, name)) + throw Exception(ErrorCodes::DATABASE_REPLICATION_FAILED, "Unexpected entry in replication log: {}", entry_name); + + UInt32 entry_number; + if (!tryParse(entry_number, entry_name.substr(strlen(name)))) + throw Exception(ErrorCodes::DATABASE_REPLICATION_FAILED, "Cannot parse number of replication log entry {}", entry_name); + + if (entry_number < log_entry_to_execute) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Entry {} already executed, current pointer is {}", entry_number, log_entry_to_execute); + + /// Entry name is valid. Let's get min snapshot version to check if replica is staled. + Strings snapshots = getSnapshots(zookeeper); + UInt32 min_snapshot = parse(snapshots.front()); + + if (log_entry_to_execute < min_snapshot) + { + recoverLostReplica(zookeeper, parse(snapshots.back())); + return; + } + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot recover replica, probably it's a bug. " + "Got log entry '{}' when expected entry number {}, " + "available snapshots: ", + entry_name, log_entry_to_execute, boost::algorithm::join(snapshots, ", ")); +} void DatabaseReplicated::removeOutdatedSnapshotsAndLog() { @@ -217,40 +268,51 @@ void DatabaseReplicated::removeOutdatedSnapshotsAndLog() } } -void DatabaseReplicated::runBackgroundLogExecutor() +void DatabaseReplicated::onExecutedLogEntry(const String & entry_name, const ZooKeeperPtr & zookeeper) { - if (last_executed_log_entry.empty()) + assert(entry_name == DatabaseReplicatedExtensions::getLogEntryName(log_entry_to_execute)); + ++log_entry_to_execute; + + if (snapshot_period > 0 && log_entry_to_execute % snapshot_period == 0) { - loadMetadataFromSnapshot(); + createSnapshot(zookeeper); } - - auto current_zookeeper = getZooKeeper(); - Strings log_entry_names = current_zookeeper->getChildren(zookeeper_path + "/log"); - - std::sort(log_entry_names.begin(), log_entry_names.end()); - auto newest_entry_it = std::upper_bound(log_entry_names.begin(), log_entry_names.end(), last_executed_log_entry); - - log_entry_names.erase(log_entry_names.begin(), newest_entry_it); - - for (const String & log_entry_name : log_entry_names) - { - //executeLogName(log_entry_name); - last_executed_log_entry = log_entry_name; - writeLastExecutedToDiskAndZK(); - - int log_n = parse(log_entry_name.substr(4)); - int last_log_n = parse(log_entry_names.back().substr(4)); - - /// The third condition gurantees at most one snapshot creation per batch - if (log_n > 0 && snapshot_period > 0 && (last_log_n - log_n) / snapshot_period == 0 && log_n % snapshot_period == 0) - { - createSnapshot(); - } - } - - //background_log_executor->scheduleAfter(500); } +//void DatabaseReplicated::runBackgroundLogExecutor() +//{ +// if (last_executed_log_entry.empty()) +// { +// loadMetadataFromSnapshot(); +// } +// +// auto current_zookeeper = getZooKeeper(); +// Strings log_entry_names = current_zookeeper->getChildren(zookeeper_path + "/log"); +// +// std::sort(log_entry_names.begin(), log_entry_names.end()); +// auto newest_entry_it = std::upper_bound(log_entry_names.begin(), log_entry_names.end(), last_executed_log_entry); +// +// log_entry_names.erase(log_entry_names.begin(), newest_entry_it); +// +// for (const String & log_entry_name : log_entry_names) +// { +// //executeLogName(log_entry_name); +// last_executed_log_entry = log_entry_name; +// writeLastExecutedToDiskAndZK(); +// +// int log_n = parse(log_entry_name.substr(4)); +// int last_log_n = parse(log_entry_names.back().substr(4)); +// +// /// The third condition gurantees at most one snapshot creation per batch +// if (log_n > 0 && snapshot_period > 0 && (last_log_n - log_n) / snapshot_period == 0 && log_n % snapshot_period == 0) +// { +// createSnapshot(); +// } +// } +// +// //background_log_executor->scheduleAfter(500); +//} + void DatabaseReplicated::writeLastExecutedToDiskAndZK() { auto current_zookeeper = getZooKeeper(); @@ -294,79 +356,88 @@ BlockIO DatabaseReplicated::propose(const ASTPtr & query) //FIXME need list of all replicas, we can obtain it from zk Strings hosts_to_wait; - hosts_to_wait.emplace_back(shard_name + '/' +replica_name); + hosts_to_wait.emplace_back(shard_name + '|' +replica_name); auto stream = std::make_shared(node_path, entry, global_context); io.in = std::move(stream); return io; } -void DatabaseReplicated::createSnapshot() +void DatabaseReplicated::createSnapshot(const ZooKeeperPtr & zookeeper) { - auto current_zookeeper = getZooKeeper(); - String snapshot_path = zookeeper_path + "/snapshots/" + last_executed_log_entry; + String snapshot_path = zookeeper_path + "/snapshot/" + toString(log_entry_to_execute); - if (Coordination::Error::ZNODEEXISTS == current_zookeeper->tryCreate(snapshot_path, String(), zkutil::CreateMode::Persistent)) - { + if (zookeeper->exists(snapshot_path)) return; - } - for (auto iterator = getTablesIterator(global_context, {}); iterator->isValid(); iterator->next()) + std::vector> create_queries; { - String table_name = iterator->name(); - auto query = getCreateQueryFromMetadata(getObjectMetadataPath(table_name), true); - String statement = queryToString(query); - current_zookeeper->create(snapshot_path + "/" + table_name, statement, zkutil::CreateMode::Persistent); + std::lock_guard lock{mutex}; + create_queries.reserve(tables.size()); + for (const auto & table : tables) + { + const String & name = table.first; + ReadBufferFromFile in(getObjectMetadataPath(name), METADATA_FILE_BUFFER_SIZE); + String attach_query; + readStringUntilEOF(attach_query, in); + create_queries.emplace_back(escapeForFileName(name), std::move(attach_query)); + } } - current_zookeeper->create(snapshot_path + "/.completed", String(), zkutil::CreateMode::Persistent); - removeOutdatedSnapshotsAndLog(); + if (zookeeper->exists(snapshot_path)) + return; + + String queries_path = zookeeper_path + "/metadata/" + toString(log_entry_to_execute); + zookeeper->tryCreate(queries_path, "", zkutil::CreateMode::Persistent); + queries_path += '/'; + + //FIXME use tryMulti with MULTI_BATCH_SIZE + + for (const auto & table : create_queries) + zookeeper->tryCreate(queries_path + table.first, table.second, zkutil::CreateMode::Persistent); + + if (create_queries.size() != zookeeper->getChildren(zookeeper_path + "/metadata/" + toString(log_entry_to_execute)).size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Created invalid snapshot"); + + zookeeper->tryCreate(snapshot_path, String(), zkutil::CreateMode::Persistent); } -void DatabaseReplicated::loadMetadataFromSnapshot() +void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 from_snapshot, bool create) { - /// Executes the latest snapshot. - /// Used by new replicas only. - auto current_zookeeper = getZooKeeper(); + LOG_WARNING(log, "Will recover replica from snapshot", from_snapshot); - Strings snapshots; - if (current_zookeeper->tryGetChildren(zookeeper_path + "/snapshots", snapshots) != Coordination::Error::ZOK) - return; + //FIXME drop old tables - auto latest_snapshot = std::max_element(snapshots.begin(), snapshots.end()); - while (snapshots.size() > 0 && !current_zookeeper->exists(zookeeper_path + "/snapshots/" + *latest_snapshot + "/.completed")) + String snapshot_metadata_path = zookeeper_path + "/metadata/" + toString(from_snapshot); + Strings tables_in_snapshot = current_zookeeper->getChildren(snapshot_metadata_path); + current_zookeeper->get(zookeeper_path + "/snapshots/" + toString(from_snapshot)); /// Assert node exists + snapshot_metadata_path += '/'; + + for (const auto & table_name : tables_in_snapshot) { - snapshots.erase(latest_snapshot); - latest_snapshot = std::max_element(snapshots.begin(), snapshots.end()); + String query_to_execute = current_zookeeper->get(snapshot_metadata_path + table_name); + + + if (!startsWith(query_to_execute, "ATTACH ")) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected query: {}", query_to_execute); + query_to_execute = "CREATE " + query_to_execute.substr(strlen("ATTACH ")); + + Context current_context = global_context; + current_context.getClientInfo().query_kind = ClientInfo::QueryKind::REPLICATED_LOG_QUERY; + current_context.setCurrentDatabase(database_name); + current_context.setCurrentQueryId(""); // generate random query_id + + executeQuery(query_to_execute, current_context); } - if (snapshots.size() < 1) - { - return; - } - - Strings metadatas; - if (current_zookeeper->tryGetChildren(zookeeper_path + "/snapshots/" + *latest_snapshot, metadatas) != Coordination::Error::ZOK) + if (create) return; - LOG_DEBUG(log, "Executing {} snapshot", *latest_snapshot); + current_zookeeper->set(replica_path + "/log-ptr", toString(from_snapshot)); + last_executed_log_entry = from_snapshot; + ddl_worker->setLogPointer(from_snapshot); //FIXME - for (auto t = metadatas.begin(); t != metadatas.end(); ++t) - { - String path = zookeeper_path + "/snapshots/" + *latest_snapshot + "/" + *t; - - String query_to_execute = current_zookeeper->get(path, {}, nullptr); - - auto current_context = std::make_unique(global_context); - current_context->getClientInfo().query_kind = ClientInfo::QueryKind::REPLICATED_LOG_QUERY; - current_context->setCurrentDatabase(database_name); - current_context->setCurrentQueryId(""); // generate random query_id - - executeQuery(query_to_execute, *current_context); - } - - last_executed_log_entry = *latest_snapshot; - writeLastExecutedToDiskAndZK(); + //writeLastExecutedToDiskAndZK(); } void DatabaseReplicated::drop(const Context & context_) diff --git a/src/Databases/DatabaseReplicated.h b/src/Databases/DatabaseReplicated.h index 219779d602d..3f5bd4608f1 100644 --- a/src/Databases/DatabaseReplicated.h +++ b/src/Databases/DatabaseReplicated.h @@ -13,6 +13,7 @@ namespace DB { class DDLWorker; +using ZooKeeperPtr = std::shared_ptr; /** DatabaseReplicated engine * supports replication of metadata @@ -56,22 +57,29 @@ public: void loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach = false) override; private: - bool createDatabaseNodesInZooKeeper(const zkutil::ZooKeeperPtr & current_zookeeper); - void createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPtr & current_zookeeper); + bool createDatabaseNodesInZooKeeper(const ZooKeeperPtr & current_zookeeper); + void createReplicaNodesInZooKeeper(const ZooKeeperPtr & current_zookeeper); - void runBackgroundLogExecutor(); + //void runBackgroundLogExecutor(); void writeLastExecutedToDiskAndZK(); - void loadMetadataFromSnapshot(); - void createSnapshot(); + //void loadMetadataFromSnapshot(); + void createSnapshot(const ZooKeeperPtr & zookeeper); void removeOutdatedSnapshotsAndLog(); + Strings getSnapshots(const ZooKeeperPtr & zookeeper) const; + + void onUnexpectedLogEntry(const String & entry_name, const ZooKeeperPtr & zookeeper); + void recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 from_snapshot, bool create = false); + + void onExecutedLogEntry(const String & entry_name, const ZooKeeperPtr & zookeeper); + String zookeeper_path; String shard_name; String replica_name; String replica_path; - String log_entry_to_execute; + UInt32 log_entry_to_execute; std::mutex log_name_mutex; String log_name_to_exec_with_result; @@ -84,6 +92,8 @@ private: std::unique_ptr ddl_worker; + + }; } diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 7d947a264a6..51f0e1b45a9 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -142,6 +142,22 @@ std::unique_ptr createSimpleZooKeeperLock( } +String DatabaseReplicatedExtensions::getLogEntryName(UInt32 log_entry_number) +{ + constexpr size_t seq_node_digits = 10; + String number = toString(log_entry_number); + String name = "query-" + String(seq_node_digits - number.size(), '0') + number; + return name; +} + +UInt32 DatabaseReplicatedExtensions::getLogEntryNumber(const String & log_entry_name) +{ + constexpr const char * name = "query-"; + assert(startsWith(log_entry_name, name)); + return parse(log_entry_name.substr(strlen(name))); +} + + DDLWorker::DDLWorker(int pool_size_, const std::string & zk_root_dir, const Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix, std::optional database_replicated_ext_) : context(context_) @@ -236,8 +252,21 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r String node_data; String entry_path = queue_dir + "/" + entry_name; + if (database_replicated_ext) + { + auto expected_log_entry = DatabaseReplicatedExtensions::getLogEntryName(database_replicated_ext->first_not_executed); + if (entry_name != expected_log_entry) + { + database_replicated_ext->lost_callback(entry_name, zookeeper); + out_reason = "DatabaseReplicated: expected " + expected_log_entry + " got " + entry_name; + return {}; + } + } + if (!zookeeper->tryGet(entry_path, node_data)) { + if (database_replicated_ext) + database_replicated_ext->lost_callback(entry_name, zookeeper); /// It is Ok that node could be deleted just now. It means that there are no current host in node's host list. out_reason = "The task was deleted"; return {}; @@ -339,7 +368,7 @@ void DDLWorker::scheduleTasks() ? queue_nodes.begin() : std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_tasks.back()); - for (auto it = begin_node; it != queue_nodes.end(); ++it) + for (auto it = begin_node; it != queue_nodes.end() && !stop_flag; ++it) { String entry_name = *it; @@ -362,11 +391,17 @@ void DDLWorker::scheduleTasks() if (!already_processed) { - worker_pool.scheduleOrThrowOnError([this, task_ptr = task.release()]() + if (database_replicated_ext) { - setThreadName("DDLWorkerExec"); - enqueueTask(DDLTaskPtr(task_ptr)); - }); + enqueueTask(DDLTaskPtr(task.release())); + } + else + { + worker_pool.scheduleOrThrowOnError([this, task_ptr = task.release()]() { + setThreadName("DDLWorkerExec"); + enqueueTask(DDLTaskPtr(task_ptr)); + }); + } } else { @@ -374,9 +409,6 @@ void DDLWorker::scheduleTasks() } saveTask(entry_name); - - if (stop_flag) - break; } } @@ -599,6 +631,7 @@ void DDLWorker::enqueueTask(DDLTaskPtr task_ptr) } } } + void DDLWorker::processTask(DDLTask & task) { auto zookeeper = tryGetZooKeeper(); @@ -626,7 +659,9 @@ void DDLWorker::processTask(DDLTask & task) else throw Coordination::Exception(code, active_node_path); - if (!task.was_executed) + //FIXME + bool is_dummy_query = database_replicated_ext && task.entry.query.empty(); + if (!task.was_executed && !is_dummy_query) { try { @@ -675,7 +710,19 @@ void DDLWorker::processTask(DDLTask & task) Coordination::Requests ops; ops.emplace_back(zkutil::makeRemoveRequest(active_node_path, -1)); ops.emplace_back(zkutil::makeCreateRequest(finished_node_path, task.execution_status.serializeText(), zkutil::CreateMode::Persistent)); + if (database_replicated_ext) + { + assert(DatabaseReplicatedExtensions::getLogEntryName(database_replicated_ext->first_not_executed) == task.entry_name); + ops.emplace_back(zkutil::makeSetRequest(database_replicated_ext->getReplicaPath() + "/log_ptr", toString(database_replicated_ext->first_not_executed), -1)); + } + zookeeper->multi(ops); + + if (database_replicated_ext) + { + database_replicated_ext->executed_callback(task.entry_name, zookeeper); + ++(database_replicated_ext->first_not_executed); + } } diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index f38d41df503..08bf641264e 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -37,16 +37,25 @@ using ZooKeeperPtr = std::shared_ptr; struct DatabaseReplicatedExtensions { UUID database_uuid; + String zookeeper_path; String database_name; String shard_name; String replica_name; - String first_not_executed; - using NewEntryCallback = std::function; + UInt32 first_not_executed; + using EntryLostCallback = std::function; using EntryExecutedCallback = std::function; using EntryErrorCallback = std::function; - NewEntryCallback before_execution_callback; + EntryLostCallback lost_callback; EntryExecutedCallback executed_callback; EntryErrorCallback error_callback; + + String getReplicaPath() const + { + return zookeeper_path + "/replicas/" + shard_name + "/" + replica_name; + } + + static String getLogEntryName(UInt32 log_entry_number); + static UInt32 getLogEntryNumber(const String & log_entry_name); }; @@ -69,6 +78,9 @@ public: void shutdown(); + //FIXME get rid of this method + void setLogPointer(UInt32 log_pointer) { database_replicated_ext->first_not_executed = log_pointer; } + private: /// Returns cached ZooKeeper session (possibly expired).