review suggestions

This commit is contained in:
Alexander Tokmakov 2021-02-20 02:41:58 +03:00
parent 8097532cb7
commit 2a36d6cb55
24 changed files with 232 additions and 124 deletions

View File

@ -315,7 +315,7 @@ public:
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, false, false, "");
}
void reset()
void setAlreadyRemoved()
{
need_remove = false;
}

View File

@ -115,11 +115,14 @@ void DatabaseAtomic::dropTable(const Context & context, const String & table_nam
std::unique_lock lock(mutex);
table = getTableUnlocked(table_name, lock);
table_metadata_path_drop = DatabaseCatalog::instance().getPathForDroppedMetadata(table->getStorageID());
auto txn = context.getMetadataTransaction();
auto txn = context.getZooKeeperMetadataTransaction();
if (txn && !context.isInternalSubquery())
txn->commit(); /// Commit point (a sort of) for Replicated database
/// NOTE: replica will be lost if server crashes before the following rename
/// We apply changes in ZooKeeper before applying changes in local metadata file
/// to reduce probability of failures between these operations
/// (it's more likely to lost connection, than to fail before applying local changes).
/// TODO better detection and recovery
Poco::File(table_metadata_path).renameTo(table_metadata_path_drop); /// Mark table as dropped
@ -241,7 +244,7 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n
}
/// Table renaming actually begins here
auto txn = context.getMetadataTransaction();
auto txn = context.getZooKeeperMetadataTransaction();
if (txn && !context.isInternalSubquery())
txn->commit(); /// Commit point (a sort of) for Replicated database
@ -302,7 +305,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
DatabaseCatalog::instance().addUUIDMapping(query.uuid);
locked_uuid = true;
auto txn = query_context.getMetadataTransaction();
auto txn = query_context.getZooKeeperMetadataTransaction();
if (txn && !query_context.isInternalSubquery())
txn->commit(); /// Commit point (a sort of) for Replicated database
@ -337,7 +340,7 @@ void DatabaseAtomic::commitAlterTable(const StorageID & table_id, const String &
if (table_id.uuid != actual_table_id.uuid)
throw Exception("Cannot alter table because it was renamed", ErrorCodes::CANNOT_ASSIGN_ALTER);
auto txn = query_context.getMetadataTransaction();
auto txn = query_context.getZooKeeperMetadataTransaction();
if (txn && !query_context.isInternalSubquery())
txn->commit(); /// Commit point (a sort of) for Replicated database

View File

@ -103,8 +103,11 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (engine_define->engine->arguments && !engine_may_have_arguments)
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS);
if (engine_define->engine->parameters || engine_define->partition_by || engine_define->primary_key || engine_define->order_by ||
engine_define->sample_by || (!endsWith(engine_name, "MySQL") && engine_define->settings))
bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by ||
engine_define->primary_key || engine_define->order_by ||
engine_define->sample_by;
bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated";
if (has_unexpected_element || (!may_have_settings && engine_define->settings))
throw Exception("Database engine " + engine_name + " cannot have parameters, primary_key, order_by, sample_by, settings",
ErrorCodes::UNKNOWN_ELEMENT_IN_AST);
@ -205,7 +208,13 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
shard_name = context.getMacros()->expand(shard_name);
replica_name = context.getMacros()->expand(replica_name);
return std::make_shared<DatabaseReplicated>(database_name, metadata_path, uuid, zookeeper_path, shard_name, replica_name, context);
DatabaseReplicatedSettings database_replicated_settings{};
if (engine_define->settings)
database_replicated_settings.loadFromQuery(*engine_define);
return std::make_shared<DatabaseReplicated>(database_name, metadata_path, uuid,
zookeeper_path, shard_name, replica_name,
std::move(database_replicated_settings), context);
}
#if USE_LIBPQXX

View File

@ -63,11 +63,13 @@ DatabaseReplicated::DatabaseReplicated(
const String & zookeeper_path_,
const String & shard_name_,
const String & replica_name_,
DatabaseReplicatedSettings db_settings_,
const Context & context_)
: DatabaseAtomic(name_, metadata_path_, uuid, "DatabaseReplicated (" + name_ + ")", context_)
, zookeeper_path(zookeeper_path_)
, shard_name(shard_name_)
, replica_name(replica_name_)
, db_settings(std::move(db_settings_))
{
if (zookeeper_path.empty() || shard_name.empty() || replica_name.empty())
throw Exception("ZooKeeper path, shard and replica names must be non-empty", ErrorCodes::BAD_ARGUMENTS);
@ -141,7 +143,8 @@ ClusterPtr DatabaseReplicated::getCluster() const
break;
}
if (!success)
throw Exception(ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Cannot get consistent cluster snapshot");
throw Exception(ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Cannot get consistent cluster snapshot,"
"because replicas are created or removed concurrently");
assert(!hosts.empty());
assert(hosts.size() == host_ids.size());
@ -172,7 +175,7 @@ ClusterPtr DatabaseReplicated::getCluster() const
return std::make_shared<Cluster>(global_context.getSettingsRef(), shards, username, password, global_context.getTCPPort(), false);
}
void DatabaseReplicated::tryConnectToZooKeeper(bool force_attach)
void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(bool force_attach)
{
try
{
@ -228,6 +231,9 @@ bool DatabaseReplicated::createDatabaseNodesInZooKeeper(const zkutil::ZooKeeperP
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/counter", "", zkutil::CreateMode::Persistent));
/// We create and remove counter/cnt- node to increment sequential number of counter/ node and make log entry numbers start from 1.
/// New replicas are created with log pointer equal to 0 and log pointer is a number of the last executed entry.
/// It means that we cannot have log entry with number 0.
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/counter/cnt-", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/counter/cnt-", -1));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/metadata", "", zkutil::CreateMode::Persistent));
@ -253,10 +259,7 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
auto host_id = getHostID(global_context, db_uuid);
/// On replica creation add empty entry to log. Can be used to trigger some actions on other replicas (e.g. update cluster info).
DDLLogEntry entry;
entry.hosts = {};
entry.query = {};
entry.initiator = {};
DDLLogEntry entry{};
String query_path_prefix = zookeeper_path + "/log/query-";
String counter_prefix = zookeeper_path + "/counter/cnt-";
@ -273,7 +276,7 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
void DatabaseReplicated::loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach)
{
tryConnectToZooKeeper(force_attach);
tryConnectToZooKeeperAndInitDatabase(force_attach);
DatabaseAtomic::loadStoredObjects(context, has_force_restore_data_flag, force_attach);
@ -281,7 +284,7 @@ void DatabaseReplicated::loadStoredObjects(Context & context, bool has_force_res
ddl_worker->startup();
}
BlockIO DatabaseReplicated::propose(const ASTPtr & query, const Context & query_context)
BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, const Context & query_context)
{
if (is_readonly)
throw Exception(ErrorCodes::NO_ZOOKEEPER, "Database is in readonly mode, because it cannot connect to ZooKeeper");
@ -405,7 +408,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
String db_name = getDatabaseName();
String to_db_name = getDatabaseName() + BROKEN_TABLES_SUFFIX;
if (total_tables < tables_to_detach.size() * 2)
if (total_tables * db_settings.max_broken_tables_ratio < tables_to_detach.size())
throw Exception(ErrorCodes::DATABASE_REPLICATION_FAILED, "Too many tables to recreate: {} of {}", tables_to_detach.size(), total_tables);
else if (!tables_to_detach.empty())
{
@ -594,12 +597,12 @@ void DatabaseReplicated::shutdown()
void DatabaseReplicated::dropTable(const Context & context, const String & table_name, bool no_delay)
{
auto txn = context.getMetadataTransaction();
auto txn = context.getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn);
if (txn && txn->is_initial_query)
if (txn && txn->isInitialQuery())
{
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name);
txn->ops.emplace_back(zkutil::makeRemoveRequest(metadata_zk_path, -1));
txn->addOp(zkutil::makeRemoveRequest(metadata_zk_path, -1));
}
DatabaseAtomic::dropTable(context, table_name, no_delay);
}
@ -607,10 +610,10 @@ void DatabaseReplicated::dropTable(const Context & context, const String & table
void DatabaseReplicated::renameTable(const Context & context, const String & table_name, IDatabase & to_database,
const String & to_table_name, bool exchange, bool dictionary)
{
auto txn = context.getMetadataTransaction();
auto txn = context.getZooKeeperMetadataTransaction();
assert(txn);
if (txn->is_initial_query)
if (txn->isInitialQuery())
{
if (this != &to_database)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Moving tables between databases is not supported for Replicated engine");
@ -622,16 +625,16 @@ void DatabaseReplicated::renameTable(const Context & context, const String & tab
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", to_table_name);
String statement = readMetadataFile(table_name);
String metadata_zk_path = txn->zookeeper_path + "/metadata/" + escapeForFileName(table_name);
String metadata_zk_path_to = txn->zookeeper_path + "/metadata/" + escapeForFileName(to_table_name);
txn->ops.emplace_back(zkutil::makeRemoveRequest(metadata_zk_path, -1));
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name);
String metadata_zk_path_to = zookeeper_path + "/metadata/" + escapeForFileName(to_table_name);
txn->addOp(zkutil::makeRemoveRequest(metadata_zk_path, -1));
if (exchange)
{
String statement_to = readMetadataFile(to_table_name);
txn->ops.emplace_back(zkutil::makeRemoveRequest(metadata_zk_path_to, -1));
txn->ops.emplace_back(zkutil::makeCreateRequest(metadata_zk_path, statement_to, zkutil::CreateMode::Persistent));
txn->addOp(zkutil::makeRemoveRequest(metadata_zk_path_to, -1));
txn->addOp(zkutil::makeCreateRequest(metadata_zk_path, statement_to, zkutil::CreateMode::Persistent));
}
txn->ops.emplace_back(zkutil::makeCreateRequest(metadata_zk_path_to, statement, zkutil::CreateMode::Persistent));
txn->addOp(zkutil::makeCreateRequest(metadata_zk_path_to, statement, zkutil::CreateMode::Persistent));
}
DatabaseAtomic::renameTable(context, table_name, to_database, to_table_name, exchange, dictionary);
@ -641,14 +644,14 @@ void DatabaseReplicated::commitCreateTable(const ASTCreateQuery & query, const S
const String & table_metadata_tmp_path, const String & table_metadata_path,
const Context & query_context)
{
auto txn = query_context.getMetadataTransaction();
auto txn = query_context.getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn);
if (txn && txn->is_initial_query)
if (txn && txn->isInitialQuery())
{
String metadata_zk_path = txn->zookeeper_path + "/metadata/" + escapeForFileName(query.table);
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(query.table);
String statement = getObjectDefinitionFromCreateQuery(query.clone());
/// zk::multi(...) will throw if `metadata_zk_path` exists
txn->ops.emplace_back(zkutil::makeCreateRequest(metadata_zk_path, statement, zkutil::CreateMode::Persistent));
txn->addOp(zkutil::makeCreateRequest(metadata_zk_path, statement, zkutil::CreateMode::Persistent));
}
DatabaseAtomic::commitCreateTable(query, table, table_metadata_tmp_path, table_metadata_path, query_context);
}
@ -657,11 +660,11 @@ void DatabaseReplicated::commitAlterTable(const StorageID & table_id,
const String & table_metadata_tmp_path, const String & table_metadata_path,
const String & statement, const Context & query_context)
{
auto txn = query_context.getMetadataTransaction();
if (txn && txn->is_initial_query)
auto txn = query_context.getZooKeeperMetadataTransaction();
if (txn && txn->isInitialQuery())
{
String metadata_zk_path = txn->zookeeper_path + "/metadata/" + escapeForFileName(table_id.table_name);
txn->ops.emplace_back(zkutil::makeSetRequest(metadata_zk_path, statement, -1));
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_id.table_name);
txn->addOp(zkutil::makeSetRequest(metadata_zk_path, statement, -1));
}
DatabaseAtomic::commitAlterTable(table_id, table_metadata_tmp_path, table_metadata_path, statement, query_context);
}
@ -670,37 +673,37 @@ void DatabaseReplicated::createDictionary(const Context & context,
const String & dictionary_name,
const ASTPtr & query)
{
auto txn = context.getMetadataTransaction();
auto txn = context.getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn);
if (txn && txn->is_initial_query)
if (txn && txn->isInitialQuery())
{
String metadata_zk_path = txn->zookeeper_path + "/metadata/" + escapeForFileName(dictionary_name);
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(dictionary_name);
String statement = getObjectDefinitionFromCreateQuery(query->clone());
txn->ops.emplace_back(zkutil::makeCreateRequest(metadata_zk_path, statement, zkutil::CreateMode::Persistent));
txn->addOp(zkutil::makeCreateRequest(metadata_zk_path, statement, zkutil::CreateMode::Persistent));
}
DatabaseAtomic::createDictionary(context, dictionary_name, query);
}
void DatabaseReplicated::removeDictionary(const Context & context, const String & dictionary_name)
{
auto txn = context.getMetadataTransaction();
auto txn = context.getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn);
if (txn && txn->is_initial_query)
if (txn && txn->isInitialQuery())
{
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(dictionary_name);
txn->ops.emplace_back(zkutil::makeRemoveRequest(metadata_zk_path, -1));
txn->addOp(zkutil::makeRemoveRequest(metadata_zk_path, -1));
}
DatabaseAtomic::removeDictionary(context, dictionary_name);
}
void DatabaseReplicated::detachTablePermanently(const Context & context, const String & table_name)
{
auto txn = context.getMetadataTransaction();
auto txn = context.getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn);
if (txn && txn->is_initial_query)
if (txn && txn->isInitialQuery())
{
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name);
txn->ops.emplace_back(zkutil::makeRemoveRequest(metadata_zk_path, -1));
txn->addOp(zkutil::makeRemoveRequest(metadata_zk_path, -1));
}
DatabaseAtomic::detachTablePermanently(context, table_name);
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseReplicatedSettings.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Core/BackgroundSchedulePool.h>
#include <DataStreams/BlockIO.h>
@ -22,13 +23,14 @@ class DatabaseReplicated : public DatabaseAtomic
public:
DatabaseReplicated(const String & name_, const String & metadata_path_, UUID uuid,
const String & zookeeper_path_, const String & shard_name_, const String & replica_name_,
DatabaseReplicatedSettings db_settings_,
const Context & context);
~DatabaseReplicated() override;
String getEngineName() const override { return "Replicated"; }
/// If current query is initial, then the following methods add metadata updating ZooKeeper operations to current MetadataTransaction.
/// If current query is initial, then the following methods add metadata updating ZooKeeper operations to current ZooKeeperMetadataTransaction.
void dropTable(const Context &, const String & table_name, bool no_delay) override;
void renameTable(const Context & context, const String & table_name, IDatabase & to_database,
const String & to_table_name, bool exchange, bool dictionary) override;
@ -46,7 +48,7 @@ public:
/// Try to execute DLL query on current host as initial query. If query is succeed,
/// then it will be executed on all replicas.
BlockIO propose(const ASTPtr & query, const Context & query_context);
BlockIO tryEnqueueReplicatedDDL(const ASTPtr & query, const Context & query_context);
void stopReplication();
@ -64,7 +66,7 @@ public:
friend struct DatabaseReplicatedTask;
friend class DatabaseReplicatedDDLWorker;
private:
void tryConnectToZooKeeper(bool force_attach);
void tryConnectToZooKeeperAndInitDatabase(bool force_attach);
bool createDatabaseNodesInZooKeeper(const ZooKeeperPtr & current_zookeeper);
void createReplicaNodesInZooKeeper(const ZooKeeperPtr & current_zookeeper);
@ -78,6 +80,7 @@ private:
String shard_name;
String replica_name;
String replica_path;
DatabaseReplicatedSettings db_settings;
zkutil::ZooKeeperPtr getZooKeeper() const;

View File

@ -0,0 +1,23 @@
#include <Databases/DatabaseReplicatedSettings.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTCreateQuery.h>
namespace DB
{
IMPLEMENT_SETTINGS_TRAITS(DatabaseReplicatedSettingsTraits, LIST_OF_DATABASE_REPLICATED_SETTINGS)
void DatabaseReplicatedSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
applyChanges(storage_def.settings->changes);
return;
}
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <Core/Defines.h>
#include <Core/BaseSettings.h>
namespace DB
{
class ASTStorage;
#define LIST_OF_DATABASE_REPLICATED_SETTINGS(M) \
M(Float, max_broken_tables_ratio, 0.5, "Do not recover replica automatically if the ratio of staled tables to all tables is greater", 0) \
M(UInt64, max_replication_lag_to_enqueue, 10, "Replica will throw exception on attempt to execute query if its replication lag greater", 0) \
M(UInt64, wait_entry_commited_timeout_sec, 3600, "Replicas will try to cancel query if timeout exceed, but initiator host has not executed it yet", 0) \
DECLARE_SETTINGS_TRAITS(DatabaseReplicatedSettingsTraits, LIST_OF_DATABASE_REPLICATED_SETTINGS)
/** Settings for the MaterializeMySQL database engine.
* Could be loaded from a CREATE DATABASE query (SETTINGS clause).
*/
struct DatabaseReplicatedSettings : public BaseSettings<DatabaseReplicatedSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -30,7 +30,7 @@ void DatabaseReplicatedDDLWorker::initializeMainThread()
{
auto zookeeper = getAndSetZooKeeper();
if (database->is_readonly)
database->tryConnectToZooKeeper(false);
database->tryConnectToZooKeeperAndInitDatabase(false);
initializeReplication();
initialized = true;
return;
@ -98,8 +98,7 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr
UInt32 our_log_ptr = parse<UInt32>(zookeeper->get(database->replica_path + "/log_ptr"));
UInt32 max_log_ptr = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/max_log_ptr"));
assert(our_log_ptr <= max_log_ptr);
constexpr UInt32 max_replication_lag = 16;
if (max_replication_lag < max_log_ptr - our_log_ptr)
if (database->db_settings.max_replication_lag_to_enqueue < max_log_ptr - our_log_ptr)
throw Exception(ErrorCodes::NOT_A_LEADER, "Cannot enqueue query on this replica, "
"because it has replication lag of {} queries. Try other replica.", max_log_ptr - our_log_ptr);
@ -131,7 +130,7 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr
if (zookeeper->expired() || stop_flag)
throw Exception(ErrorCodes::DATABASE_REPLICATION_FAILED, "ZooKeeper session expired or replication stopped, try again");
processTask(*task);
processTask(*task, zookeeper);
if (!task->was_executed)
{
@ -139,7 +138,7 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr
task->execution_status.code, task->execution_status.message);
}
try_node->reset();
try_node->setAlreadyRemoved();
return entry_path;
}
@ -178,7 +177,7 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na
/// Query is not committed yet. We cannot just skip it and execute next one, because reordering may break replication.
LOG_TRACE(log, "Waiting for initiator {} to commit or rollback entry {}", initiator_name, entry_path);
constexpr size_t wait_time_ms = 1000;
constexpr size_t max_iterations = 3600;
size_t max_iterations = database->db_settings.wait_entry_commited_timeout_sec;
size_t iteration = 0;
while (!wait_committed_or_failed->tryWait(wait_time_ms))
@ -194,7 +193,7 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na
if (max_iterations <= ++iteration)
{
/// What can we do if initiator hangs for some reason? Seems like we can remove /try node.
/// Initiator will fail to commit entry to ZK (including ops for replicated table) if /try does not exist.
/// Initiator will fail to commit ZooKeeperMetadataTransaction (including ops for replicated table) if /try does not exist.
/// But it's questionable.
/// We use tryRemove(...) because multiple hosts (including initiator) may try to do it concurrently.

View File

@ -6,6 +6,18 @@ namespace DB
class DatabaseReplicated;
/// It's similar to DDLWorker, but has the following differences:
/// 1. DDL queue in ZooKeeper is not shared between multiple clusters and databases,
/// each DatabaseReplicated has its own queue in ZooKeeper and DatabaseReplicatedDDLWorker object.
/// 2. Shards and replicas are identified by shard_name and replica_name arguments of database engine,
/// not by address:port pairs. Cluster (of multiple database replicas) is identified by its zookeeper_path.
/// 3. After creation of an entry in DDL queue initiator tries to execute the entry locally
/// and other hosts wait for query to finish on initiator host.
/// If query succeed on initiator, then all hosts must execute it, so they will retry until query succeed.
/// We assume that cluster is homogenous, so if replicas are in consistent state and query succeed on one host,
/// then all hosts can execute it (maybe after several retries).
/// 4. Each database replica stores its log pointer in ZooKeeper. Cleanup thread removes old entry
/// if its number < max_log_ptr - logs_to_keep.
class DatabaseReplicatedDDLWorker : public DDLWorker
{
public:

View File

@ -194,7 +194,7 @@ void DatabaseWithDictionaries::createDictionary(const Context & context, const S
detachDictionary(dictionary_name);
});
auto txn = context.getMetadataTransaction();
auto txn = context.getZooKeeperMetadataTransaction();
if (txn && !context.isInternalSubquery())
txn->commit(); /// Commit point (a sort of) for Replicated database
@ -219,7 +219,7 @@ void DatabaseWithDictionaries::removeDictionary(const Context & context, const S
{
String dictionary_metadata_path = getObjectMetadataPath(dictionary_name);
auto txn = context.getMetadataTransaction();
auto txn = context.getZooKeeperMetadataTransaction();
if (txn && !context.isInternalSubquery())
txn->commit(); /// Commit point (a sort of) for Replicated database

View File

@ -17,6 +17,7 @@ SRCS(
DatabaseOnDisk.cpp
DatabaseOrdinary.cpp
DatabaseReplicated.cpp
DatabaseReplicatedSettings.cpp
DatabaseReplicatedWorker.cpp
DatabaseWithDictionaries.cpp
DatabasesCommon.cpp

View File

@ -2553,14 +2553,14 @@ StorageID Context::resolveStorageIDImpl(StorageID storage_id, StorageNamespace w
return StorageID::createEmpty();
}
void Context::initMetadataTransaction(MetadataTransactionPtr txn, [[maybe_unused]] bool attach_existing)
void Context::initZooKeeperMetadataTransaction(ZooKeeperMetadataTransactionPtr txn, [[maybe_unused]] bool attach_existing)
{
assert(!metadata_transaction);
assert(attach_existing || query_context == this);
metadata_transaction = std::move(txn);
}
MetadataTransactionPtr Context::getMetadataTransaction() const
ZooKeeperMetadataTransactionPtr Context::getZooKeeperMetadataTransaction() const
{
assert(!metadata_transaction || hasQueryContext());
return metadata_transaction;

View File

@ -117,8 +117,8 @@ using VolumePtr = std::shared_ptr<IVolume>;
struct NamedSession;
struct BackgroundTaskSchedulingSettings;
struct MetadataTransaction;
using MetadataTransactionPtr = std::shared_ptr<MetadataTransaction>;
class ZooKeeperMetadataTransaction;
using ZooKeeperMetadataTransactionPtr = std::shared_ptr<ZooKeeperMetadataTransaction>;
#if USE_EMBEDDED_COMPILER
class CompiledExpressionCache;
@ -281,7 +281,7 @@ private:
/// to be customized in HTTP and TCP servers by overloading the customizeContext(DB::Context&)
/// methods.
MetadataTransactionPtr metadata_transaction; /// Distributed DDL context. I'm not sure if it's a suitable place for this,
ZooKeeperMetadataTransactionPtr metadata_transaction; /// Distributed DDL context. I'm not sure if it's a suitable place for this,
/// but it's the easiest way to pass this through the whole stack from executeQuery(...)
/// to DatabaseOnDisk::commitCreateTable(...) or IStorage::alter(...) without changing
/// thousands of signatures.
@ -746,8 +746,10 @@ public:
IHostContextPtr & getHostContext();
const IHostContextPtr & getHostContext() const;
void initMetadataTransaction(MetadataTransactionPtr txn, bool attach_existing = false);
MetadataTransactionPtr getMetadataTransaction() const;
/// Initialize context of distributed DDL query with Replicated database.
void initZooKeeperMetadataTransaction(ZooKeeperMetadataTransactionPtr txn, bool attach_existing = false);
/// Returns context of current distributed DDL query or nullptr.
ZooKeeperMetadataTransactionPtr getZooKeeperMetadataTransaction() const;
struct MySQLWireContext
{

View File

@ -96,7 +96,7 @@ void DDLTaskBase::parseQueryFromEntry(const Context & context)
query = parseQuery(parser_query, begin, end, description, 0, context.getSettingsRef().max_parser_depth);
}
std::unique_ptr<Context> DDLTaskBase::makeQueryContext(Context & from_context)
std::unique_ptr<Context> DDLTaskBase::makeQueryContext(Context & from_context, const ZooKeeperPtr & /*zookeeper*/)
{
auto query_context = std::make_unique<Context>(from_context);
query_context->makeQueryContext();
@ -293,28 +293,26 @@ String DatabaseReplicatedTask::getShardID() const
return database->shard_name;
}
std::unique_ptr<Context> DatabaseReplicatedTask::makeQueryContext(Context & from_context)
std::unique_ptr<Context> DatabaseReplicatedTask::makeQueryContext(Context & from_context, const ZooKeeperPtr & zookeeper)
{
auto query_context = DDLTaskBase::makeQueryContext(from_context);
auto query_context = DDLTaskBase::makeQueryContext(from_context, zookeeper);
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->setCurrentDatabase(database->getDatabaseName());
auto txn = std::make_shared<MetadataTransaction>();
query_context->initMetadataTransaction(txn);
txn->current_zookeeper = from_context.getZooKeeper();
txn->zookeeper_path = database->zookeeper_path;
txn->is_initial_query = is_initial_query;
auto txn = std::make_shared<ZooKeeperMetadataTransaction>(zookeeper, database->zookeeper_path, is_initial_query);
query_context->initZooKeeperMetadataTransaction(txn);
if (is_initial_query)
{
txn->ops.emplace_back(zkutil::makeRemoveRequest(entry_path + "/try", -1));
txn->ops.emplace_back(zkutil::makeCreateRequest(entry_path + "/committed", host_id_str, zkutil::CreateMode::Persistent));
txn->ops.emplace_back(zkutil::makeSetRequest(database->zookeeper_path + "/max_log_ptr", toString(getLogEntryNumber(entry_name)), -1));
txn->addOp(zkutil::makeRemoveRequest(entry_path + "/try", -1));
txn->addOp(zkutil::makeCreateRequest(entry_path + "/committed", host_id_str, zkutil::CreateMode::Persistent));
txn->addOp(zkutil::makeSetRequest(database->zookeeper_path + "/max_log_ptr", toString(getLogEntryNumber(entry_name)), -1));
}
txn->ops.emplace_back(zkutil::makeSetRequest(database->replica_path + "/log_ptr", toString(getLogEntryNumber(entry_name)), -1));
txn->addOp(zkutil::makeSetRequest(database->replica_path + "/log_ptr", toString(getLogEntryNumber(entry_name)), -1));
std::move(ops.begin(), ops.end(), std::back_inserter(txn->ops));
for (auto & op : ops)
txn->addOp(std::move(op));
ops.clear();
return query_context;
@ -335,7 +333,7 @@ UInt32 DDLTaskBase::getLogEntryNumber(const String & log_entry_name)
return parse<UInt32>(log_entry_name.substr(strlen(name)));
}
void MetadataTransaction::commit()
void ZooKeeperMetadataTransaction::commit()
{
assert(state == CREATED);
state = FAILED;

View File

@ -20,8 +20,8 @@ class ASTQueryWithOnCluster;
using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
class DatabaseReplicated;
struct MetadataTransaction;
using MetadataTransactionPtr = std::shared_ptr<MetadataTransaction>;
class ZooKeeperMetadataTransaction;
using ZooKeeperMetadataTransactionPtr = std::shared_ptr<ZooKeeperMetadataTransaction>;
struct HostID
{
@ -95,7 +95,7 @@ struct DDLTaskBase
virtual String getShardID() const = 0;
virtual std::unique_ptr<Context> makeQueryContext(Context & from_context);
virtual std::unique_ptr<Context> makeQueryContext(Context & from_context, const ZooKeeperPtr & zookeeper);
inline String getActiveNodePath() const { return entry_path + "/active/" + host_id_str; }
inline String getFinishedNodePath() const { return entry_path + "/finished/" + host_id_str; }
@ -132,13 +132,19 @@ struct DatabaseReplicatedTask : public DDLTaskBase
DatabaseReplicatedTask(const String & name, const String & path, DatabaseReplicated * database_);
String getShardID() const override;
std::unique_ptr<Context> makeQueryContext(Context & from_context) override;
std::unique_ptr<Context> makeQueryContext(Context & from_context, const ZooKeeperPtr & zookeeper) override;
DatabaseReplicated * database;
};
struct MetadataTransaction
/// The main purpose of ZooKeeperMetadataTransaction is to execute all zookeeper operation related to query
/// in a single transaction when we performed all required checks and ready to "commit" changes.
/// For example, create ALTER_METADATA entry in ReplicatedMergeTree log,
/// create path/to/entry/finished/host_id node in distributed DDL queue to mark query as executed and
/// update metadata in path/to/replicated_database/metadata/table_name
/// It's used for DatabaseReplicated.
/// TODO we can also use it for ordinary ON CLUSTER queries
class ZooKeeperMetadataTransaction
{
enum State
{
@ -153,8 +159,29 @@ struct MetadataTransaction
bool is_initial_query;
Coordination::Requests ops;
public:
ZooKeeperMetadataTransaction(const ZooKeeperPtr & current_zookeeper_, const String & zookeeper_path_, bool is_initial_query_)
: current_zookeeper(current_zookeeper_)
, zookeeper_path(zookeeper_path_)
, is_initial_query(is_initial_query_)
{
}
bool isInitialQuery() const { return is_initial_query; }
bool isExecuted() const { return state != CREATED; }
String getDatabaseZooKeeperPath() const { return zookeeper_path; }
void addOp(Coordination::RequestPtr && op)
{
assert(!isExecuted());
ops.emplace_back(op);
}
void moveOpsTo(Coordination::Requests & other_ops)
{
assert(!isExecuted());
std::move(ops.begin(), ops.end(), std::back_inserter(other_ops));
ops.clear();
state = COMMITTED;
@ -162,7 +189,7 @@ struct MetadataTransaction
void commit();
~MetadataTransaction() { assert(state != CREATED || std::uncaught_exception()); }
~ZooKeeperMetadataTransaction() { assert(isExecuted() || std::uncaught_exception()); }
};
}

View File

@ -195,16 +195,15 @@ void DDLWorker::startup()
void DDLWorker::shutdown()
{
stop_flag = true;
queue_updated_event->set();
cleanup_event->set();
if (main_thread.joinable())
bool prev_stop_flag = stop_flag.exchange(true);
if (!prev_stop_flag)
{
queue_updated_event->set();
cleanup_event->set();
main_thread.join();
if (cleanup_thread.joinable())
cleanup_thread.join();
worker_pool.reset();
worker_pool.reset();
}
}
DDLWorker::~DDLWorker()
@ -267,6 +266,8 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r
}
/// Stage 2: resolve host_id and check if we should execute query or not
/// Multiple clusters can use single DDL queue path in ZooKeeper,
/// So we should skip task if we cannot find current host in cluster hosts list.
if (!task->findCurrentHostID(context, log))
{
out_reason = "There is no a local address in host list";
@ -317,7 +318,7 @@ void DDLWorker::scheduleTasks()
bool status_written = zookeeper->exists(task->getFinishedNodePath());
if (task->was_executed && !status_written && task_still_exists)
{
processTask(*task);
processTask(*task, zookeeper);
}
}
@ -364,15 +365,15 @@ void DDLWorker::scheduleTasks()
if (worker_pool)
{
worker_pool->scheduleOrThrowOnError([this, &saved_task]()
worker_pool->scheduleOrThrowOnError([this, &saved_task, &zookeeper]()
{
setThreadName("DDLWorkerExec");
processTask(saved_task);
processTask(saved_task, zookeeper);
});
}
else
{
processTask(saved_task);
processTask(saved_task, zookeeper);
}
}
}
@ -385,7 +386,7 @@ DDLTaskBase & DDLWorker::saveTask(DDLTaskPtr && task)
return *current_tasks.back();
}
bool DDLWorker::tryExecuteQuery(const String & query, DDLTaskBase & task)
bool DDLWorker::tryExecuteQuery(const String & query, DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
{
/// Add special comment at the start of query to easily identify DDL-produced queries in query_log
String query_prefix = "/* ddl_entry=" + task.entry_name + " */ ";
@ -398,14 +399,16 @@ bool DDLWorker::tryExecuteQuery(const String & query, DDLTaskBase & task)
try
{
auto query_context = task.makeQueryContext(context);
auto query_context = task.makeQueryContext(context, zookeeper);
if (!task.is_initial_query)
query_scope.emplace(*query_context);
executeQuery(istr, ostr, !task.is_initial_query, *query_context, {});
if (auto txn = query_context->getMetadataTransaction())
if (auto txn = query_context->getZooKeeperMetadataTransaction())
{
if (txn->state == MetadataTransaction::CREATED)
/// Most queries commit changes to ZooKeeper right before applying local changes,
/// but some queries does not support it, so we have to do it here.
if (!txn->isExecuted())
txn->commit();
}
}
@ -463,10 +466,8 @@ void DDLWorker::updateMaxDDLEntryID(const String & entry_name)
}
}
void DDLWorker::processTask(DDLTaskBase & task)
void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
{
auto zookeeper = tryGetZooKeeper();
LOG_DEBUG(log, "Processing task {} ({})", task.entry_name, task.entry.query);
String active_node_path = task.getActiveNodePath();
@ -541,7 +542,7 @@ void DDLWorker::processTask(DDLTaskBase & task)
else
{
storage.reset();
tryExecuteQuery(rewritten_query, task);
tryExecuteQuery(rewritten_query, task, zookeeper);
}
}
catch (const Coordination::Exception &)
@ -565,7 +566,7 @@ void DDLWorker::processTask(DDLTaskBase & task)
}
else
{
/// task.ops where not executed by table or database engine, se DDLWorker is responsible for
/// task.ops where not executed by table or database engine, so DDLWorker is responsible for
/// writing query execution status into ZooKeeper.
task.ops.emplace_back(zkutil::makeSetRequest(finished_node_path, task.execution_status.serializeText(), -1));
}
@ -589,7 +590,7 @@ void DDLWorker::processTask(DDLTaskBase & task)
}
/// Active node was removed in multi ops
active_node->reset();
active_node->setAlreadyRemoved();
task.completely_processed = true;
}
@ -712,7 +713,7 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
/// If the leader will unexpectedly changed this method will return false
/// and on the next iteration new leader will take lock
if (tryExecuteQuery(rewritten_query, task))
if (tryExecuteQuery(rewritten_query, task, zookeeper))
{
executed_by_us = true;
break;

View File

@ -77,7 +77,7 @@ protected:
/// Returns non-empty DDLTaskPtr if entry parsed and the check is passed
virtual DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper);
void processTask(DDLTaskBase & task);
void processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper);
void updateMaxDDLEntryID(const String & entry_name);
/// Check that query should be executed on leader replica only
@ -95,7 +95,7 @@ protected:
const String & node_path,
const ZooKeeperPtr & zookeeper);
bool tryExecuteQuery(const String & query, DDLTaskBase & task);
bool tryExecuteQuery(const String & query, DDLTaskBase & task, const ZooKeeperPtr & zookeeper);
/// Checks and cleanups queue's nodes
void cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zookeeper);

View File

@ -54,7 +54,7 @@ BlockIO InterpreterAlterQuery::execute()
{
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr, context);
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, context);
}
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context);

View File

@ -880,7 +880,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
{
assertOrSetUUID(create, database);
guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr, context);
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, context);
}
}
@ -1092,7 +1092,7 @@ BlockIO InterpreterCreateQuery::createDictionary(ASTCreateQuery & create)
if (!create.attach)
assertOrSetUUID(create, database);
guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr, context);
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, context);
}
if (database->isDictionaryExist(dictionary_name))

View File

@ -146,7 +146,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, Dat
ddl_guard->releaseTableLock();
table.reset();
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query.clone(), context);
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query.clone(), context);
}
if (query.kind == ASTDropQuery::Kind::Detach)
@ -231,7 +231,7 @@ BlockIO InterpreterDropQuery::executeToDictionary(
context.checkAccess(AccessType::DROP_DICTIONARY, database_name, dictionary_name);
ddl_guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr, context);
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, context);
}
if (!database || !database->isDictionaryExist(dictionary_name))

View File

@ -90,7 +90,7 @@ BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, c
UniqueTableName to(elem.to_database_name, elem.to_table_name);
ddl_guards[from]->releaseTableLock();
ddl_guards[to]->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr, context);
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, context);
}
else
{

View File

@ -212,11 +212,11 @@ static void executeDropQuery(ASTDropQuery::Kind kind, const Context & global_con
/// looks like expected behaviour and we have tests for it.
auto drop_context = Context(global_context);
drop_context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (auto txn = current_context.getMetadataTransaction())
if (auto txn = current_context.getZooKeeperMetadataTransaction())
{
/// For Replicated database
drop_context.setQueryContext(const_cast<Context &>(current_context));
drop_context.initMetadataTransaction(txn, true);
drop_context.initZooKeeperMetadataTransaction(txn, true);
}
InterpreterDropQuery drop_interpreter(ast_drop_query, drop_context);
drop_interpreter.execute();

View File

@ -4282,12 +4282,12 @@ void StorageReplicatedMergeTree::alter(
zkutil::makeCreateRequest(mutations_path + "/", mutation_entry.toString(), zkutil::CreateMode::PersistentSequential));
}
if (auto txn = query_context.getMetadataTransaction())
if (auto txn = query_context.getZooKeeperMetadataTransaction())
{
txn->moveOpsTo(ops);
/// NOTE: IDatabase::alterTable(...) is called when executing ALTER_METADATA queue entry without query context,
/// so we have to update metadata of DatabaseReplicated here.
String metadata_zk_path = txn->zookeeper_path + "/metadata/" + escapeForFileName(table_id.table_name);
String metadata_zk_path = txn->getDatabaseZooKeeperPath() + "/metadata/" + escapeForFileName(table_id.table_name);
auto ast = DatabaseCatalog::instance().getDatabase(table_id.database_name)->getCreateTableQuery(table_id.table_name, query_context);
applyMetadataChangesToCreateQuery(ast, future_metadata);
ops.emplace_back(zkutil::makeSetRequest(metadata_zk_path, getObjectDefinitionFromCreateQuery(ast), -1));
@ -5262,7 +5262,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
requests.emplace_back(zkutil::makeCreateRequest(
mutations_path + "/", mutation_entry.toString(), zkutil::CreateMode::PersistentSequential));
if (auto txn = query_context.getMetadataTransaction())
if (auto txn = query_context.getZooKeeperMetadataTransaction())
txn->moveOpsTo(requests);
Coordination::Responses responses;
@ -5766,7 +5766,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
}
}
if (auto txn = context.getMetadataTransaction())
if (auto txn = context.getZooKeeperMetadataTransaction())
txn->moveOpsTo(ops);
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); /// Just update version
@ -6269,7 +6269,7 @@ bool StorageReplicatedMergeTree::dropAllPartsInPartition(
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); /// Just update version.
if (auto txn = query_context.getMetadataTransaction())
if (auto txn = query_context.getZooKeeperMetadataTransaction())
txn->moveOpsTo(ops);
Coordination::Responses responses = zookeeper.multi(ops);

View File

@ -108,6 +108,7 @@
"memory_tracking",
"memory_usage",
"live_view",
"00825_protobuf_format_map",
"00152_insert_different_granularity",
"01715_background_checker_blather_zookeeper",
"01714_alter_drop_version",