allow to use Cluster from Replicated db

This commit is contained in:
Alexander Tokmakov 2021-03-09 20:05:24 +03:00
parent 00b939e5a3
commit 83b3e4e0f5
14 changed files with 87 additions and 60 deletions

View File

@ -215,7 +215,7 @@ class IColumn;
\
M(Bool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.", 0) \
M(UInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) \
M(Int64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite.", 0) \
M(Int64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite. Zero means async mode.", 0) \
M(Milliseconds, stream_flush_interval_ms, 7500, "Timeout for flushing data from streaming storages.", 0) \
M(Milliseconds, stream_poll_timeout_ms, 500, "Timeout for polling data from/to streaming storages.", 0) \
\

View File

@ -106,7 +106,27 @@ std::pair<String, String> DatabaseReplicated::parseFullReplicaName(const String
ClusterPtr DatabaseReplicated::getCluster() const
{
/// TODO Maintain up-to-date Cluster and allow to use it in Distributed tables
{
std::lock_guard lock{mutex};
if (cluster)
return cluster;
}
ClusterPtr new_cluster = getClusterImpl();
std::lock_guard lock{mutex};
if (!cluster)
cluster = std::move(new_cluster);
return cluster;
}
void DatabaseReplicated::setCluster(ClusterPtr && new_cluster)
{
std::lock_guard lock{mutex};
cluster = std::move(new_cluster);
}
ClusterPtr DatabaseReplicated::getClusterImpl() const
{
Strings hosts;
Strings host_ids;
@ -254,11 +274,8 @@ bool DatabaseReplicated::createDatabaseNodesInZooKeeper(const zkutil::ZooKeeperP
__builtin_unreachable();
}
void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPtr & current_zookeeper)
void DatabaseReplicated::createEmptyLogEntry(Coordination::Requests & ops, const ZooKeeperPtr & current_zookeeper)
{
/// Write host name to replica_path, it will protect from multiple replicas with the same name
auto host_id = getHostID(global_context, db_uuid);
/// On replica creation add empty entry to log. Can be used to trigger some actions on other replicas (e.g. update cluster info).
DDLLogEntry entry{};
@ -267,11 +284,20 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
String counter_path = current_zookeeper->create(counter_prefix, "", zkutil::CreateMode::EphemeralSequential);
String query_path = query_path_prefix + counter_path.substr(counter_prefix.size());
ops.emplace_back(zkutil::makeCreateRequest(query_path, entry.toString(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(query_path + "/committed", getFullReplicaName(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(counter_path, -1));
}
void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPtr & current_zookeeper)
{
/// Write host name to replica_path, it will protect from multiple replicas with the same name
auto host_id = getHostID(global_context, db_uuid);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(query_path, entry.toString(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(counter_path, -1));
createEmptyLogEntry(ops, current_zookeeper);
current_zookeeper->multi(ops);
}
@ -580,8 +606,13 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node
void DatabaseReplicated::drop(const Context & context_)
{
auto current_zookeeper = getZooKeeper();
current_zookeeper->set(replica_path, DROPPED_MARK);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(replica_path, DROPPED_MARK, -1));
createEmptyLogEntry(ops, current_zookeeper);
current_zookeeper->multi(ops);
DatabaseAtomic::drop(context_);
current_zookeeper->tryRemoveRecursive(replica_path);
/// TODO it may leave garbage in ZooKeeper if the last node lost connection here
if (current_zookeeper->tryRemove(zookeeper_path + "/replicas") == Coordination::Error::ZOK)

View File

@ -77,6 +77,11 @@ private:
ASTPtr parseQueryFromMetadataInZooKeeper(const String & node_name, const String & query);
String readMetadataFile(const String & table_name) const;
ClusterPtr getClusterImpl() const;
void setCluster(ClusterPtr && new_cluster);
void createEmptyLogEntry(Coordination::Requests & ops, const ZooKeeperPtr & current_zookeeper);
String zookeeper_path;
String shard_name;
String replica_name;
@ -87,6 +92,8 @@ private:
std::atomic_bool is_readonly = true;
std::unique_ptr<DatabaseReplicatedDDLWorker> ddl_worker;
mutable ClusterPtr cluster;
};
}

View File

@ -237,6 +237,8 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na
if (task->entry.query.empty())
{
/// Some replica is added or removed, let's update cached cluster
database->setCluster(database->getClusterImpl());
out_reason = fmt::format("Entry {} is a dummy task", entry_name);
return {};
}

View File

@ -115,7 +115,9 @@ Cluster::Address::Address(
const String & password_,
UInt16 clickhouse_port,
bool secure_,
Int64 priority_)
Int64 priority_,
UInt32 shard_index_,
UInt32 replica_index_)
: user(user_)
, password(password_)
{
@ -125,6 +127,8 @@ Cluster::Address::Address(
secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable;
priority = priority_;
is_local = isLocal(clickhouse_port);
shard_index = shard_index_;
replica_index = replica_index_;
}
@ -465,7 +469,7 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
{
Addresses current;
for (const auto & replica : shard)
current.emplace_back(replica, username, password, clickhouse_port, secure, priority);
current.emplace_back(replica, username, password, clickhouse_port, secure, priority, current_shard_num, current.size() + 1);
addresses_with_failover.emplace_back(current);

View File

@ -110,7 +110,9 @@ public:
const String & password_,
UInt16 clickhouse_port,
bool secure_ = false,
Int64 priority_ = 1);
Int64 priority_ = 1,
UInt32 shard_index_ = 0,
UInt32 replica_index_ = 0);
/// Returns 'escaped_host_name:port'
String toString() const;

View File

@ -50,6 +50,7 @@
#include <Interpreters/SystemLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/DDLTask.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/UncompressedCache.h>
#include <Parsers/ASTCreateQuery.h>
@ -1772,11 +1773,14 @@ std::optional<UInt16> Context::getTCPPortSecure() const
std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) const
{
auto res = getClusters().getCluster(cluster_name);
if (res)
return res;
if (!res)
throw Exception("Requested cluster '" + cluster_name + "' not found", ErrorCodes::BAD_GET);
res = tryGetReplicatedDatabaseCluster(cluster_name);
if (res)
return res;
return res;
throw Exception("Requested cluster '" + cluster_name + "' not found", ErrorCodes::BAD_GET);
}

View File

@ -396,4 +396,11 @@ void ZooKeeperMetadataTransaction::commit()
state = COMMITTED;
}
ClusterPtr tryGetReplicatedDatabaseCluster(const String & cluster_name)
{
if (const auto * replicated_db = dynamic_cast<const DatabaseReplicated *>(DatabaseCatalog::instance().tryGetDatabase(cluster_name).get()))
return replicated_db->getCluster();
return {};
}
}

View File

@ -18,6 +18,7 @@ namespace DB
class ASTQueryWithOnCluster;
using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
using ClusterPtr = std::shared_ptr<Cluster>;
class DatabaseReplicated;
class ZooKeeperMetadataTransaction;
@ -193,4 +194,6 @@ public:
~ZooKeeperMetadataTransaction() { assert(isExecuted() || std::uncaught_exceptions()); }
};
ClusterPtr tryGetReplicatedDatabaseCluster(const String & cluster_name);
}

View File

@ -238,8 +238,6 @@ DDLQueryStatusInputStream::DDLQueryStatusInputStream(const String & zk_node_path
addTotalRowsApprox(waiting_hosts.size());
timeout_seconds = context.getSettingsRef().distributed_ddl_task_timeout;
/// There is not sense to check query status with zero timeout.
assert(timeout_seconds >= 0);
}
std::pair<String, UInt16> DDLQueryStatusInputStream::parseHostAndPort(const String & host_id) const
@ -284,7 +282,7 @@ Block DDLQueryStatusInputStream::readImpl()
return res;
}
if (watch.elapsedSeconds() > timeout_seconds)
if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds)
{
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
size_t num_active_hosts = current_active_hosts.size();

View File

@ -1,34 +1,3 @@
<yandex>
<database_atomic_delay_before_drop_table_sec>10</database_atomic_delay_before_drop_table_sec>
<remote_servers>
<cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>main_node</host>
<port>9000</port>
</replica>
<replica>
<host>dummy_node</host>
<port>9000</port>
</replica>
<replica>
<host>competing_node</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>snapshotting_node</host>
<port>9000</port>
</replica>
<replica>
<host>snapshot_recovering_node</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
</yandex>

View File

@ -99,7 +99,7 @@ def test_alters_from_different_replicas(started_cluster):
"(CounterID UInt32, StartDate Date, UserID UInt32, VisitID UInt32, NestedColumn Nested(A UInt8, S String), ToDrop UInt32) "
"ENGINE = MergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192);")
main_node.query("CREATE TABLE testdb.dist AS testdb.concurrent_test ENGINE = Distributed(cluster, testdb, concurrent_test, CounterID)")
main_node.query("CREATE TABLE testdb.dist AS testdb.concurrent_test ENGINE = Distributed(testdb, testdb, concurrent_test, CounterID)")
dummy_node.stop_clickhouse(kill=True)

View File

@ -11,7 +11,7 @@ $CLICKHOUSE_CLIENT -q "drop table if exists never_throw;"
CLICKHOUSE_CLIENT_OPT=$(echo ${CLICKHOUSE_CLIENT_OPT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=fatal/g')
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=5 --distributed_ddl_output_mode=none"
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=8 --distributed_ddl_output_mode=none"
$CLIENT -q "select value from system.settings where name='distributed_ddl_output_mode';"
# Ok
$CLIENT -q "create table throw on cluster test_shard_localhost (n int) engine=Memory;"
@ -20,19 +20,19 @@ $CLIENT -q "create table throw on cluster test_shard_localhost (n int) engine=Me
# Timeout
$CLIENT -q "drop table throw on cluster test_unavailable_shard;" 2>&1| grep -Fv "@ 0x" | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/Watching task .* is executing longer/Watching task <task> is executing longer/" | sed "s/background. /background./"
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=5 --distributed_ddl_output_mode=throw"
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=8 --distributed_ddl_output_mode=throw"
$CLIENT -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLIENT -q "create table throw on cluster test_shard_localhost (n int) engine=Memory;"
$CLIENT -q "create table throw on cluster test_shard_localhost (n int) engine=Memory;" 2>&1| grep -Fv "@ 0x" | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/exists.. /exists/"
$CLIENT -q "drop table throw on cluster test_unavailable_shard;" 2>&1| grep -Fv "@ 0x" | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/Watching task .* is executing longer/Watching task <task> is executing longer/" | sed "s/background. /background./"
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=5 --distributed_ddl_output_mode=null_status_on_timeout"
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=8 --distributed_ddl_output_mode=null_status_on_timeout"
$CLIENT -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLIENT -q "create table null_status on cluster test_shard_localhost (n int) engine=Memory;"
$CLIENT -q "create table null_status on cluster test_shard_localhost (n int) engine=Memory;" 2>&1| grep -Fv "@ 0x" | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/exists.. /exists/"
$CLIENT -q "drop table null_status on cluster test_unavailable_shard;"
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=5 --distributed_ddl_output_mode=never_throw"
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=8 --distributed_ddl_output_mode=never_throw"
$CLIENT -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLIENT -q "create table never_throw on cluster test_shard_localhost (n int) engine=Memory;"
$CLIENT -q "create table never_throw on cluster test_shard_localhost (n int) engine=Memory;" 2>&1| sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//"

View File

@ -114,13 +114,11 @@
"01415_sticking_mutations",
"00980_zookeeper_merge_tree_alter_settings",
"01148_zookeeper_path_macros_unfolding",
"01019_alter_materialized_view_atomic",
"01019_alter_materialized_view_consistent",
"01294_system_distributed_on_cluster",
"01269_create_with_null",
/// grep -c
"01018_ddl_dictionaries_bad_queries",
"00955_test_final_mark",
"00180_attach_materialized_view",
"01294_system_distributed_on_cluster",
"01294_system_distributed_on_cluster",
"00908_bloom_filter_index",
/// Unsupported type of ALTER query
"01650_fetch_patition_with_macro_in_zk_path",
"01451_detach_drop_part",
@ -131,6 +129,8 @@
"01060_shutdown_table_after_detach",
"01021_only_tuple_columns",
"01015_attach_part",
"00955_test_final_mark",
"00753_alter_attach",
"00626_replace_partition_from_table_zookeeper",
"00626_replace_partition_from_table",
"00152_insert_different_granularity",