add timeouts

This commit is contained in:
Alexander Tokmakov 2021-02-04 22:41:44 +03:00
parent 066fb4c82b
commit 18f6b5bbad
15 changed files with 139 additions and 59 deletions

View File

@ -35,6 +35,7 @@ namespace ErrorCodes
extern const int DATABASE_REPLICATION_FAILED;
extern const int UNKNOWN_DATABASE;
extern const int NOT_IMPLEMENTED;
extern const int INCORRECT_QUERY;
}
zkutil::ZooKeeperPtr DatabaseReplicated::getZooKeeper() const
@ -121,8 +122,8 @@ bool DatabaseReplicated::createDatabaseNodesInZooKeeper(const zkutil::ZooKeeperP
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/counter/cnt-", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/counter/cnt-", -1));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/metadata", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/min_log_ptr", "1", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/max_log_ptr", "1", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/logs_to_keep", "1000", zkutil::CreateMode::Persistent));
Coordination::Responses responses;
auto res = current_zookeeper->tryMulti(ops, responses);
@ -194,7 +195,7 @@ void DatabaseReplicated::onUnexpectedLogEntry(const String & entry_name, const Z
throw Exception(ErrorCodes::LOGICAL_ERROR, "Entry {} already executed, current pointer is {}", entry_number, log_entry_to_execute);
/// Entry name is valid. Let's get min log pointer to check if replica is staled.
UInt32 min_snapshot = parse<UInt32>(zookeeper->get(zookeeper_path + "/min_log_ptr"));
UInt32 min_snapshot = parse<UInt32>(zookeeper->get(zookeeper_path + "/min_log_ptr")); // FIXME
if (log_entry_to_execute < min_snapshot)
{
@ -207,13 +208,15 @@ void DatabaseReplicated::onUnexpectedLogEntry(const String & entry_name, const Z
}
BlockIO DatabaseReplicated::propose(const ASTPtr & query)
BlockIO DatabaseReplicated::propose(const ASTPtr & query, const Context & query_context)
{
if (query_context.getClientInfo().query_kind != ClientInfo::QueryKind::INITIAL_QUERY)
throw Exception(ErrorCodes::INCORRECT_QUERY, "It's not initial query. ON CLUSTER is not allowed for Replicated database.");
if (const auto * query_alter = query->as<ASTAlterQuery>())
{
for (const auto & command : query_alter->command_list->children)
{
//FIXME allow all types of queries (maybe we should execute ATTACH an similar queries on leader)
if (!isSupportedAlterType(command->as<ASTAlterCommand&>().type))
throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED);
}
@ -225,17 +228,16 @@ BlockIO DatabaseReplicated::propose(const ASTPtr & query)
DDLLogEntry entry;
entry.query = queryToString(query);
entry.initiator = ddl_worker->getCommonHostID();
String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry);
String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry, query_context);
BlockIO io;
//FIXME use query context
if (global_context.getSettingsRef().distributed_ddl_task_timeout == 0)
if (query_context.getSettingsRef().distributed_ddl_task_timeout == 0)
return io;
//FIXME need list of all replicas, we can obtain it from zk
Strings hosts_to_wait;
hosts_to_wait.emplace_back(getFullReplicaName());
auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, global_context, hosts_to_wait);
auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, query_context, hosts_to_wait);
io.in = std::move(stream);
return io;
}
@ -295,17 +297,20 @@ void DatabaseReplicated::drop(const Context & context_)
{
auto current_zookeeper = getZooKeeper();
current_zookeeper->set(replica_path, "DROPPED");
current_zookeeper->tryRemoveRecursive(replica_path);
DatabaseAtomic::drop(context_);
current_zookeeper->tryRemoveRecursive(replica_path);
}
void DatabaseReplicated::stopReplication()
{
if (ddl_worker)
ddl_worker->shutdown();
}
void DatabaseReplicated::shutdown()
{
if (ddl_worker)
{
ddl_worker->shutdown();
ddl_worker = nullptr;
}
stopReplication();
ddl_worker = nullptr;
DatabaseAtomic::shutdown();
}
@ -330,10 +335,15 @@ void DatabaseReplicated::renameTable(const Context & context, const String & tab
if (txn->is_initial_query)
{
if (!isTableExist(table_name, context))
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", table_name);
if (exchange && !to_database.isTableExist(to_table_name, context))
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", to_table_name);
String statement;
String statement_to;
{
//FIXME It's not atomic (however we have only one thread)
/// NOTE It's not atomic (however, we have only one thread)
ReadBufferFromFile in(getObjectMetadataPath(table_name), 4096);
readStringUntilEOF(statement, in);
if (exchange)

View File

@ -60,8 +60,9 @@ public:
String getEngineName() const override { return "Replicated"; }
BlockIO propose(const ASTPtr & query);
BlockIO propose(const ASTPtr & query, const Context & query_context);
void stopReplication();
void shutdown() override;
void loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach) override;

View File

@ -9,6 +9,8 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int DATABASE_REPLICATION_FAILED;
extern const int NOT_A_LEADER;
extern const int UNFINISHED;
}
DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db, const Context & context_)
@ -22,7 +24,7 @@ DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db
void DatabaseReplicatedDDLWorker::initializeMainThread()
{
do
while (!initialized && !stop_flag)
{
try
{
@ -36,17 +38,17 @@ void DatabaseReplicatedDDLWorker::initializeMainThread()
sleepForSeconds(5);
}
}
while (!initialized && !stop_flag);
}
void DatabaseReplicatedDDLWorker::initializeReplication()
{
/// Check if we need to recover replica.
/// Invariant: replica is lost if it's log_ptr value is less then min_log_ptr value.
/// Invariant: replica is lost if it's log_ptr value is less then max_log_ptr - logs_to_keep.
UInt32 our_log_ptr = parse<UInt32>(current_zookeeper->get(database->replica_path + "/log_ptr"));
UInt32 min_log_ptr = parse<UInt32>(current_zookeeper->get(database->zookeeper_path + "/min_log_ptr"));
if (our_log_ptr < min_log_ptr)
UInt32 max_log_ptr = parse<UInt32>(current_zookeeper->get(database->zookeeper_path + "/max_log_ptr"));
UInt32 logs_to_keep = parse<UInt32>(current_zookeeper->get(database->zookeeper_path + "/logs_to_keep"));
if (our_log_ptr + logs_to_keep < max_log_ptr)
database->recoverLostReplica(current_zookeeper, 0);
}
@ -75,10 +77,19 @@ String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry)
return node_path;
}
String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entry)
String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entry, const Context & query_context)
{
/// NOTE Possibly it would be better to execute initial query on the most up-to-date node,
/// but it requires more complex logic around /try node.
auto zookeeper = getAndSetZooKeeper();
// TODO do not enqueue query if we have big replication lag
UInt32 our_log_ptr = parse<UInt32>(zookeeper->get(database->replica_path + "/log_ptr"));
UInt32 max_log_ptr = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/max_log_ptr"));
assert(our_log_ptr <= max_log_ptr);
constexpr UInt32 max_replication_lag = 16;
if (max_replication_lag < max_log_ptr - our_log_ptr)
throw Exception(ErrorCodes::NOT_A_LEADER, "Cannot enqueue query on this replica, "
"because it has replication lag of {} queries. Try other replica.", max_log_ptr - our_log_ptr);
String entry_path = enqueueQuery(entry);
auto try_node = zkutil::EphemeralNodeHolder::existing(entry_path + "/try", *zookeeper);
@ -91,9 +102,18 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr
task->is_initial_query = true;
LOG_DEBUG(log, "Waiting for worker thread to process all entries before {}", entry_name);
UInt64 timeout = query_context.getSettingsRef().distributed_ddl_task_timeout;
{
std::unique_lock lock{mutex};
wait_current_task_change.wait(lock, [&]() { assert(zookeeper->expired() || current_task <= entry_name); return zookeeper->expired() || current_task == entry_name; });
bool processed = wait_current_task_change.wait_for(lock, std::chrono::seconds(timeout), [&]()
{
assert(zookeeper->expired() || current_task <= entry_name);
return zookeeper->expired() || current_task == entry_name || stop_flag;
});
if (!processed)
throw Exception(ErrorCodes::UNFINISHED, "Timeout: Cannot enqueue query on this replica,"
"most likely because replica is busy with previous queue entries");
}
if (zookeeper->expired())
@ -116,8 +136,11 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na
{
{
std::lock_guard lock{mutex};
current_task = entry_name;
wait_current_task_change.notify_all();
if (current_task < entry_name)
{
current_task = entry_name;
wait_current_task_change.notify_all();
}
}
UInt32 our_log_ptr = parse<UInt32>(current_zookeeper->get(database->replica_path + "/log_ptr"));
@ -135,18 +158,50 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na
String initiator_name;
zkutil::EventPtr wait_committed_or_failed = std::make_shared<Poco::Event>();
if (zookeeper->tryGet(entry_path + "/try", initiator_name, nullptr, wait_committed_or_failed))
String try_node_path = entry_path + "/try";
if (zookeeper->tryGet(try_node_path, initiator_name, nullptr, wait_committed_or_failed))
{
task->is_initial_query = initiator_name == task->host_id_str;
/// Query is not committed yet. We cannot just skip it and execute next one, because reordering may break replication.
//FIXME add some timeouts
LOG_TRACE(log, "Waiting for initiator {} to commit or rollback entry {}", initiator_name, entry_path);
wait_committed_or_failed->wait();
constexpr size_t wait_time_ms = 1000;
constexpr size_t max_iterations = 3600;
size_t iteration = 0;
while (!wait_committed_or_failed->tryWait(wait_time_ms))
{
if (stop_flag)
{
/// We cannot return task to process and we cannot return nullptr too,
/// because nullptr means "task should not be executed".
/// We can only exit by exception.
throw Exception(ErrorCodes::UNFINISHED, "Replication was stopped");
}
if (max_iterations <= ++iteration)
{
/// What can we do if initiator hangs for some reason? Seems like we can remove /try node.
/// Initiator will fail to commit entry to ZK (including ops for replicated table) if /try does not exist.
/// But it's questionable.
/// We use tryRemove(...) because multiple hosts (including initiator) may try to do it concurrently.
auto code = zookeeper->tryRemove(try_node_path);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
throw Coordination::Exception(code, try_node_path);
if (!zookeeper->exists(entry_path + "/committed"))
{
out_reason = fmt::format("Entry {} was forcefully cancelled due to timeout", entry_name);
return {};
}
}
}
}
if (!zookeeper->exists(entry_path + "/committed"))
{
out_reason = "Entry " + entry_name + " hasn't been committed";
out_reason = fmt::format("Entry {} hasn't been committed", entry_name);
return {};
}
@ -154,7 +209,7 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na
{
assert(!zookeeper->exists(entry_path + "/try"));
assert(zookeeper->exists(entry_path + "/committed") == (zookeeper->get(task->getFinishedNodePath()) == "0"));
out_reason = "Entry " + entry_name + " has been executed as initial query";
out_reason = fmt::format("Entry {} has been executed as initial query", entry_name);
return {};
}
@ -169,8 +224,7 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na
if (task->entry.query.empty())
{
//TODO better way to determine special entries
out_reason = "It's dummy task";
out_reason = fmt::format("Entry {} is a dummy task", entry_name);
return {};
}
@ -178,7 +232,7 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na
if (zookeeper->exists(task->getFinishedNodePath()))
{
out_reason = "Task has been already processed";
out_reason = fmt::format("Task {} has been already processed", entry_name);
return {};
}

View File

@ -13,7 +13,7 @@ public:
String enqueueQuery(DDLLogEntry & entry) override;
String tryEnqueueAndExecuteEntry(DDLLogEntry & entry);
String tryEnqueueAndExecuteEntry(DDLLogEntry & entry, const Context & query_context);
private:
void initializeMainThread() override;

View File

@ -309,13 +309,9 @@ std::unique_ptr<Context> DatabaseReplicatedTask::makeQueryContext(Context & from
{
txn->ops.emplace_back(zkutil::makeRemoveRequest(entry_path + "/try", -1));
txn->ops.emplace_back(zkutil::makeCreateRequest(entry_path + "/committed", host_id_str, zkutil::CreateMode::Persistent));
//txn->ops.emplace_back(zkutil::makeRemoveRequest(getActiveNodePath(), -1));
txn->ops.emplace_back(zkutil::makeSetRequest(database->zookeeper_path + "/max_log_ptr", toString(getLogEntryNumber(entry_name)), -1));
}
//if (execute_on_leader)
// txn->ops.emplace_back(zkutil::makeCreateRequest(getShardNodePath() + "/executed", host_id_str, zkutil::CreateMode::Persistent));
//txn->ops.emplace_back(zkutil::makeCreateRequest(getFinishedNodePath(), execution_status.serializeText(), zkutil::CreateMode::Persistent));
txn->ops.emplace_back(zkutil::makeSetRequest(database->replica_path + "/log_ptr", toString(getLogEntryNumber(entry_name)), -1));
std::move(ops.begin(), ops.end(), std::back_inserter(txn->ops));

View File

@ -81,7 +81,6 @@ struct DDLTaskBase
bool is_circular_replicated = false;
bool execute_on_leader = false;
//MetadataTransactionPtr txn;
Coordination::Requests ops;
ExecutionStatus execution_status;
bool was_executed = false;
@ -163,6 +162,7 @@ struct MetadataTransaction
void commit();
~MetadataTransaction() { assert(state != CREATED || std::uncaught_exception()); }
};
}

View File

@ -341,7 +341,8 @@ void DDLWorker::scheduleTasks()
{
/// We will recheck status of last executed tasks. It's useful if main thread was just restarted.
auto & min_task = *std::min_element(current_tasks.begin(), current_tasks.end());
begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), min_task->entry_name);
String min_entry_name = last_skipped_entry_name ? std::min(min_task->entry_name, *last_skipped_entry_name) : min_task->entry_name;
begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), min_entry_name);
current_tasks.clear();
}
@ -358,6 +359,7 @@ void DDLWorker::scheduleTasks()
{
LOG_DEBUG(log, "Will not execute task {}: {}", entry_name, reason);
updateMaxDDLEntryID(entry_name);
last_skipped_entry_name.emplace(entry_name);
continue;
}
@ -500,10 +502,7 @@ void DDLWorker::processTask(DDLTaskBase & task)
{
/// It's not CREATE DATABASE
auto table_id = context.tryResolveStorageID(*query_with_table, Context::ResolveOrdinary);
DatabasePtr database;
std::tie(database, storage) = DatabaseCatalog::instance().tryGetDatabaseAndTable(table_id, context);
if (database && database->getEngineName() == "Replicated" && !typeid_cast<const DatabaseReplicatedTask *>(&task))
throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER queries are not allowed for Replicated databases");
storage = DatabaseCatalog::instance().tryGetTable(table_id, context);
}
task.execute_on_leader = storage && taskShouldBeExecutedOnLeader(task.query, storage) && !task.is_circular_replicated;
@ -553,7 +552,8 @@ void DDLWorker::processTask(DDLTaskBase & task)
updateMaxDDLEntryID(task.entry_name);
/// FIXME: if server fails right here, the task will be executed twice. We need WAL here.
/// If ZooKeeper connection is lost here, we will try again to write query status.
/// NOTE: If ZooKeeper connection is lost here, we will try again to write query status.
/// NOTE: If both table and database are replicated, task is executed in single ZK transaction.
bool status_written = task.ops.empty();
if (!status_written)
@ -959,12 +959,6 @@ void DDLWorker::runMainThread()
initialized = false;
LOG_INFO(log, "Lost ZooKeeper connection, will try to connect again: {}", getCurrentExceptionMessage(true));
}
else if (e.code == Coordination::Error::ZNONODE)
{
// TODO add comment: when it happens and why it's expected?
// maybe because cleanup thread may remove nodes inside queue entry which are currently processed
LOG_ERROR(log, "ZooKeeper error: {}", getCurrentExceptionMessage(true));
}
else
{
LOG_ERROR(log, "Unexpected ZooKeeper error, will try to restart main thread: {}", getCurrentExceptionMessage(true));

View File

@ -115,7 +115,7 @@ protected:
ZooKeeperPtr current_zookeeper;
/// Save state of executed task to avoid duplicate execution on ZK error
//std::optional<String> last_entry_name;
std::optional<String> last_skipped_entry_name;
std::list<DDLTaskPtr> current_tasks;
std::shared_ptr<Poco::Event> queue_updated_event = std::make_shared<Poco::Event>();

View File

@ -609,7 +609,7 @@ DatabaseCatalog::updateDependency(const StorageID & old_from, const StorageID &
view_dependencies[{new_from.getDatabaseName(), new_from.getTableName()}].insert(new_where);
}
std::unique_ptr<DDLGuard> DatabaseCatalog::getDDLGuard(const String & database, const String & table)
DDLGuardPtr DatabaseCatalog::getDDLGuard(const String & database, const String & table)
{
std::unique_lock lock(ddl_guards_mutex);
auto db_guard_iter = ddl_guards.try_emplace(database).first;

View File

@ -67,6 +67,8 @@ private:
bool is_database_guard = false;
};
using DDLGuardPtr = std::unique_ptr<DDLGuard>;
/// Creates temporary table in `_temporary_and_external_tables` with randomly generated unique StorageID.
/// Such table can be accessed from everywhere by its ID.
@ -120,7 +122,7 @@ public:
void loadDatabases();
/// Get an object that protects the table from concurrently executing multiple DDL operations.
std::unique_ptr<DDLGuard> getDDLGuard(const String & database, const String & table);
DDLGuardPtr getDDLGuard(const String & database, const String & table);
/// Get an object that protects the database from concurrent DDL queries all tables in the database
std::unique_lock<std::shared_mutex> getExclusiveDDLGuardForDatabase(const String & database);

View File

@ -53,7 +53,7 @@ BlockIO InterpreterAlterQuery::execute()
{
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr);
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr, context);
}
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context);

View File

@ -886,7 +886,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
{
assertOrSetUUID(create, database);
guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr);
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr, context);
}
}

View File

@ -139,7 +139,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, Dat
ddl_guard->releaseTableLock();
table.reset();
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query.clone());
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query.clone(), context);
}
if (query.kind == ASTDropQuery::Kind::Detach)
@ -325,6 +325,8 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
if (database->getEngineName() == "MaterializeMySQL")
stopDatabaseSynchronization(database);
#endif
if (auto * replicated = typeid_cast<DatabaseReplicated *>(database.get()))
replicated->stopReplication();
if (database->shouldBeEmptyOnDetach())
{

View File

@ -90,7 +90,7 @@ BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, c
UniqueTableName to(elem.to_database_name, elem.to_table_name);
ddl_guards[from]->releaseTableLock();
ddl_guards[to]->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr);
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr, context);
}
else
{

View File

@ -103,6 +103,27 @@
"memory_tracking", /// FIXME remove it before merge
"memory_tracking",
"memory_usage",
"01533_multiple_nested",
"01575_disable_detach_table_of_dictionary",
"01457_create_as_table_function_structure",
"01415_inconsistent_merge_tree_settings",
"01413_allow_non_metadata_alters",
"01378_alter_rename_with_ttl_zookeeper",
"01349_mutation_datetime_key",
"01325_freeze_mutation_stuck",
"01272_suspicious_codecs",
"01181_db_atomic_drop_on_cluster",
"00957_delta_diff_bug",
"00910_zookeeper_custom_compression_codecs_replicated",
"00899_long_attach_memory_limit",
"00804_test_custom_compression_codes_log_storages",
"00804_test_alter_compression_codecs",
"00804_test_delta_codec_no_type_alter",
"00804_test_custom_compression_codecs",
"00753_alter_attach",
"00715_fetch_merged_or_mutated_part_zookeeper",
"00688_low_cardinality_serialization",
"01575_disable_detach_table_of_dictionary",
"00738_lock_for_inner_table",
"01666_blns",
"01652_ignore_and_low_cardinality",