Merge branch 'ClickHouse:master' into master

This commit is contained in:
Roman Antonov 2024-10-28 17:32:08 +03:00 committed by GitHub
commit febfe64321
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 1310 additions and 592 deletions

2
contrib/numactl vendored

@ -1 +1 @@
Subproject commit 8d13d63a05f0c3cd88bf777cbb61541202b7da08
Subproject commit ff32c618d63ca7ac48cce366c5a04bb3563683a0

View File

@ -2394,14 +2394,23 @@ try
if (has_zookeeper && config().has("distributed_ddl"))
{
/// DDL worker should be started after all tables were loaded
String ddl_zookeeper_path = config().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
String ddl_queue_path = config().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
String ddl_replicas_path = config().getString("distributed_ddl.replicas_path", "/clickhouse/task_queue/replicas/");
int pool_size = config().getInt("distributed_ddl.pool_size", 1);
if (pool_size < 1)
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "distributed_ddl.pool_size should be greater then 0");
global_context->setDDLWorker(std::make_unique<DDLWorker>(pool_size, ddl_zookeeper_path, global_context, &config(),
"distributed_ddl", "DDLWorker",
&CurrentMetrics::MaxDDLEntryID, &CurrentMetrics::MaxPushedDDLEntryID),
load_metadata_tasks);
global_context->setDDLWorker(
std::make_unique<DDLWorker>(
pool_size,
ddl_queue_path,
ddl_replicas_path,
global_context,
&config(),
"distributed_ddl",
"DDLWorker",
&CurrentMetrics::MaxDDLEntryID,
&CurrentMetrics::MaxPushedDDLEntryID),
load_metadata_tasks);
}
/// Do not keep tasks in server, they should be kept inside databases. Used here to make dependent tasks only.
@ -3024,7 +3033,7 @@ void Server::updateServers(
for (auto * server : all_servers)
{
if (!server->isStopping())
if (server->supportsRuntimeReconfiguration() && !server->isStopping())
{
std::string port_name = server->getPortName();
bool has_host = false;

View File

@ -1450,6 +1450,8 @@
<distributed_ddl>
<!-- Path in ZooKeeper to queue with DDL queries -->
<path>/clickhouse/task_queue/ddl</path>
<!-- Path in ZooKeeper to store running DDL hosts -->
<replicas_path>/clickhouse/task_queue/replicas</replicas_path>
<!-- Settings from this profile will be used to execute DDL queries -->
<!-- <profile>default</profile> -->

View File

@ -59,13 +59,13 @@ constexpr size_t group_array_sorted_sort_strategy_max_elements_threshold = 10000
template <typename T, GroupArraySortedStrategy strategy>
struct GroupArraySortedData
{
static constexpr bool is_value_generic_field = std::is_same_v<T, Field>;
using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
using Array = PODArray<T, 32, Allocator>;
using Array = typename std::conditional_t<is_value_generic_field, std::vector<T>, PODArray<T, 32, Allocator>>;
static constexpr size_t partial_sort_max_elements_factor = 2;
static constexpr bool is_value_generic_field = std::is_same_v<T, Field>;
Array values;
static bool compare(const T & lhs, const T & rhs)
@ -144,7 +144,7 @@ struct GroupArraySortedData
}
if (values.size() > max_elements)
values.resize(max_elements, arena);
resize(max_elements, arena);
}
ALWAYS_INLINE void partialSortAndLimitIfNeeded(size_t max_elements, Arena * arena)
@ -153,7 +153,23 @@ struct GroupArraySortedData
return;
::nth_element(values.begin(), values.begin() + max_elements, values.end(), Comparator());
values.resize(max_elements, arena);
resize(max_elements, arena);
}
ALWAYS_INLINE void resize(size_t n, Arena * arena)
{
if constexpr (is_value_generic_field)
values.resize(n);
else
values.resize(n, arena);
}
ALWAYS_INLINE void push_back(T && element, Arena * arena)
{
if constexpr (is_value_generic_field)
values.push_back(element);
else
values.push_back(element, arena);
}
ALWAYS_INLINE void addElement(T && element, size_t max_elements, Arena * arena)
@ -171,12 +187,12 @@ struct GroupArraySortedData
return;
}
values.push_back(std::move(element), arena);
push_back(std::move(element), arena);
std::push_heap(values.begin(), values.end(), Comparator());
}
else
{
values.push_back(std::move(element), arena);
push_back(std::move(element), arena);
partialSortAndLimitIfNeeded(max_elements, arena);
}
}
@ -210,14 +226,6 @@ struct GroupArraySortedData
result_array_data[result_array_data_insert_begin + i] = values[i];
}
}
~GroupArraySortedData()
{
for (auto & value : values)
{
value.~T();
}
}
};
template <typename T>
@ -313,14 +321,12 @@ public:
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too large array size, it should not exceed {}", max_elements);
auto & values = this->data(place).values;
values.resize_exact(size, arena);
if constexpr (std::is_same_v<T, Field>)
if constexpr (Data::is_value_generic_field)
{
values.resize(size);
for (Field & element : values)
{
/// We must initialize the Field type since some internal functions (like operator=) use them
new (&element) Field;
bool has_value = false;
readBinary(has_value, buf);
if (has_value)
@ -329,6 +335,7 @@ public:
}
else
{
values.resize_exact(size, arena);
if constexpr (std::endian::native == std::endian::little)
{
buf.readStrict(reinterpret_cast<char *>(values.data()), size * sizeof(values[0]));

View File

@ -4,47 +4,49 @@
#include <Backups/IRestoreCoordination.h>
#include <Backups/RestorerFromBackup.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
#include <Databases/DDLDependencyVisitor.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseReplicated.h>
#include <Databases/DatabaseReplicatedWorker.h>
#include <Databases/TablesDependencyGraph.h>
#include <Databases/enableAllExperimentalSettings.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/SharedThreadPools.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/ReplicatedDatabaseQueryStatusSource.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/executeQuery.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTDeleteQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Processors/Sinks/EmptySink.h>
#include <Storages/AlterCommands.h>
#include <Storages/StorageKeeperMap.h>
#include <base/chrono_io.h>
#include <base/getFQDNOrHostName.h>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/PoolId.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/PoolId.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseReplicated.h>
#include <Databases/DatabaseReplicatedWorker.h>
#include <Databases/DDLDependencyVisitor.h>
#include <Databases/TablesDependencyGraph.h>
#include <Databases/enableAllExperimentalSettings.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/SharedThreadPools.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTDeleteQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/queryToString.h>
#include <Storages/StorageKeeperMap.h>
#include <Storages/AlterCommands.h>
namespace DB
{
@ -55,6 +57,8 @@ namespace Setting
extern const SettingsUInt64 max_parser_backtracks;
extern const SettingsUInt64 max_parser_depth;
extern const SettingsUInt64 max_query_size;
extern const SettingsDistributedDDLOutputMode distributed_ddl_output_mode;
extern const SettingsInt64 distributed_ddl_task_timeout;
extern const SettingsBool throw_on_unsupported_query_inside_transaction;
}
@ -443,7 +447,6 @@ void DatabaseReplicated::fillClusterAuthInfo(String collection_name, const Poco:
cluster_auth_info.cluster_secure_connection = config_ref.getBool(config_prefix + ".cluster_secure_connection", false);
}
void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessLevel mode)
{
try
@ -1096,7 +1099,8 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex
hosts_to_wait.push_back(unfiltered_hosts[i]);
}
return getDistributedDDLStatus(node_path, entry, query_context, &hosts_to_wait);
return getQueryStatus(node_path, fs::path(zookeeper_path) / "replicas", query_context, hosts_to_wait);
}
static UUID getTableUUIDIfReplicated(const String & metadata, ContextPtr context)
@ -2040,4 +2044,21 @@ void registerDatabaseReplicated(DatabaseFactory & factory)
};
factory.registerDatabase("Replicated", create_fn, {.supports_arguments = true, .supports_settings = true});
}
BlockIO DatabaseReplicated::getQueryStatus(
const String & node_path, const String & replicas_path, ContextPtr context_, const Strings & hosts_to_wait)
{
BlockIO io;
if (context_->getSettingsRef()[Setting::distributed_ddl_task_timeout] == 0)
return io;
auto source = std::make_shared<ReplicatedDatabaseQueryStatusSource>(node_path, replicas_path, context_, hosts_to_wait);
io.pipeline = QueryPipeline(std::move(source));
if (context_->getSettingsRef()[Setting::distributed_ddl_output_mode] == DistributedDDLOutputMode::NONE
|| context_->getSettingsRef()[Setting::distributed_ddl_output_mode] == DistributedDDLOutputMode::NONE_ONLY_ACTIVE)
io.pipeline.complete(std::make_shared<EmptySink>(io.pipeline.getHeader()));
return io;
}
}

View File

@ -151,6 +151,9 @@ private:
void waitDatabaseStarted() const override;
void stopLoading() override;
static BlockIO
getQueryStatus(const String & node_path, const String & replicas_path, ContextPtr context, const Strings & hosts_to_wait);
String zookeeper_path;
String shard_name;
String replica_name;

View File

@ -39,7 +39,14 @@ namespace ErrorCodes
static constexpr const char * FORCE_AUTO_RECOVERY_DIGEST = "42";
DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db, ContextPtr context_)
: DDLWorker(/* pool_size */ 1, db->zookeeper_path + "/log", context_, nullptr, {}, fmt::format("DDLWorker({})", db->getDatabaseName()))
: DDLWorker(
/* pool_size */ 1,
db->zookeeper_path + "/log",
db->zookeeper_path + "/replicas",
context_,
nullptr,
{},
fmt::format("DDLWorker({})", db->getDatabaseName()))
, database(db)
{
/// Pool size must be 1 to avoid reordering of log entries.

View File

@ -38,9 +38,14 @@ public:
UInt32 getLogPointer() const;
UInt64 getCurrentInitializationDurationMs() const;
private:
bool initializeMainThread() override;
void initializeReplication();
void initializeReplication() override;
void createReplicaDirs(const ZooKeeperPtr &, const NameSet &) override { }
void markReplicasActive(bool) override { }
void initializeLogPointer(const String & processed_entry_name);
DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper, bool dry_run) override;

View File

@ -0,0 +1,157 @@
#include <unordered_set>
#include <Core/Settings.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/DDLOnClusterQueryStatusSource.h>
#include <Common/DNSResolver.h>
#include <Common/isLocalAddress.h>
namespace DB
{
namespace Setting
{
extern const SettingsDistributedDDLOutputMode distributed_ddl_output_mode;
}
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
}
DDLOnClusterQueryStatusSource::DDLOnClusterQueryStatusSource(
const String & zk_node_path, const String & zk_replicas_path, ContextPtr context_, const Strings & hosts_to_wait)
: DistributedQueryStatusSource(
zk_node_path, zk_replicas_path, getSampleBlock(context_), context_, hosts_to_wait, "DDLOnClusterQueryStatusSource")
{
}
ExecutionStatus DDLOnClusterQueryStatusSource::checkStatus(const String & host_id)
{
fs::path status_path = fs::path(node_path) / "finished" / host_id;
return getExecutionStatus(status_path);
}
Chunk DDLOnClusterQueryStatusSource::generateChunkWithUnfinishedHosts() const
{
NameSet unfinished_hosts = waiting_hosts;
for (const auto & host_id : finished_hosts)
unfinished_hosts.erase(host_id);
NameSet active_hosts_set = NameSet{current_active_hosts.begin(), current_active_hosts.end()};
/// Query is not finished on the rest hosts, so fill the corresponding rows with NULLs.
MutableColumns columns = output.getHeader().cloneEmptyColumns();
for (const String & host_id : unfinished_hosts)
{
size_t num = 0;
auto [host, port] = parseHostAndPort(host_id);
columns[num++]->insert(host);
columns[num++]->insert(port);
columns[num++]->insert(Field{});
columns[num++]->insert(Field{});
columns[num++]->insert(unfinished_hosts.size());
columns[num++]->insert(current_active_hosts.size());
}
return Chunk(std::move(columns), unfinished_hosts.size());
}
Strings DDLOnClusterQueryStatusSource::getNodesToWait()
{
return {String(fs::path(node_path) / "finished"), String(fs::path(node_path) / "active")};
}
Chunk DDLOnClusterQueryStatusSource::handleTimeoutExceeded()
{
timeout_exceeded = true;
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
size_t num_active_hosts = current_active_hosts.size();
constexpr auto msg_format = "Distributed DDL task {} is not finished on {} of {} hosts "
"({} of them are currently executing the task, {} are inactive). "
"They are going to execute the query in background. Was waiting for {} seconds{}";
if (throw_on_timeout || (throw_on_timeout_only_active && !stop_waiting_offline_hosts))
{
if (!first_exception)
first_exception = std::make_unique<Exception>(Exception(
ErrorCodes::TIMEOUT_EXCEEDED,
msg_format,
node_path,
num_unfinished_hosts,
waiting_hosts.size(),
num_active_hosts,
offline_hosts.size(),
watch.elapsedSeconds(),
stop_waiting_offline_hosts ? "" : ", which is longer than distributed_ddl_task_timeout"));
return {};
}
LOG_INFO(
log,
msg_format,
node_path,
num_unfinished_hosts,
waiting_hosts.size(),
num_active_hosts,
offline_hosts.size(),
watch.elapsedSeconds(),
stop_waiting_offline_hosts ? "" : "which is longer than distributed_ddl_task_timeout");
return generateChunkWithUnfinishedHosts();
}
Chunk DDLOnClusterQueryStatusSource::stopWaitingOfflineHosts()
{
// Same logic as timeout exceeded
return handleTimeoutExceeded();
}
void DDLOnClusterQueryStatusSource::handleNonZeroStatusCode(const ExecutionStatus & status, const String & host_id)
{
assert(status.code != 0);
if (!first_exception && context->getSettingsRef()[Setting::distributed_ddl_output_mode] != DistributedDDLOutputMode::NEVER_THROW)
{
auto [host, port] = parseHostAndPort(host_id);
first_exception
= std::make_unique<Exception>(Exception(status.code, "There was an error on [{}:{}]: {}", host, port, status.message));
}
}
void DDLOnClusterQueryStatusSource::fillHostStatus(const String & host_id, const ExecutionStatus & status, MutableColumns & columns)
{
size_t num = 0;
auto [host, port] = parseHostAndPort(host_id);
columns[num++]->insert(host);
columns[num++]->insert(port);
columns[num++]->insert(status.code);
columns[num++]->insert(status.message);
columns[num++]->insert(waiting_hosts.size() - num_hosts_finished);
columns[num++]->insert(current_active_hosts.size());
}
Block DDLOnClusterQueryStatusSource::getSampleBlock(ContextPtr context_)
{
auto output_mode = context_->getSettingsRef()[Setting::distributed_ddl_output_mode];
auto maybe_make_nullable = [&](const DataTypePtr & type) -> DataTypePtr
{
if (output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE
|| output_mode == DistributedDDLOutputMode::NONE_ONLY_ACTIVE)
return type;
return std::make_shared<DataTypeNullable>(type);
};
return Block{
{std::make_shared<DataTypeString>(), "host"},
{std::make_shared<DataTypeUInt16>(), "port"},
{maybe_make_nullable(std::make_shared<DataTypeInt64>()), "status"},
{maybe_make_nullable(std::make_shared<DataTypeString>()), "error"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
};
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/DistributedQueryStatusSource.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
namespace DB
{
class DDLOnClusterQueryStatusSource final : public DistributedQueryStatusSource
{
public:
DDLOnClusterQueryStatusSource(
const String & zk_node_path, const String & zk_replicas_path, ContextPtr context_, const Strings & hosts_to_wait);
String getName() const override { return "DDLOnClusterQueryStatus"; }
protected:
ExecutionStatus checkStatus(const String & host_id) override;
Chunk generateChunkWithUnfinishedHosts() const override;
Strings getNodesToWait() override;
Chunk handleTimeoutExceeded() override;
Chunk stopWaitingOfflineHosts() override;
void handleNonZeroStatusCode(const ExecutionStatus & status, const String & host_id) override;
void fillHostStatus(const String & host_id, const ExecutionStatus & status, MutableColumns & columns) override;
private:
static Block getSampleBlock(ContextPtr context_);
};
}

View File

@ -1,48 +1,47 @@
#include <filesystem>
#include <Interpreters/DDLWorker.h>
#include <Core/ServerUUID.h>
#include <Core/Settings.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/executeQuery.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ParserQuery.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <Storages/IStorage.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/setThreadName.h>
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeperLock.h>
#include <Common/isLocalAddress.h>
#include <Core/ServerUUID.h>
#include <Core/Settings.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Poco/Timestamp.h>
#include <base/sleep.h>
#include <base/getFQDNOrHostName.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/ThreadPool.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperLock.h>
#include <Common/isLocalAddress.h>
#include <Common/logger_useful.h>
#include <Common/randomSeed.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <base/getFQDNOrHostName.h>
#include <base/sleep.h>
#include <base/sort.h>
#include <memory>
#include <random>
#include <pcg_random.hpp>
#include <Common/scope_guard_safe.h>
#include <Common/ThreadPool.h>
#include <Interpreters/ZooKeeperLog.h>
namespace fs = std::filesystem;
namespace CurrentMetrics
{
extern const Metric DDLWorkerThreads;
@ -78,7 +77,8 @@ constexpr const char * TASK_PROCESSED_OUT_REASON = "Task has been already proces
DDLWorker::DDLWorker(
int pool_size_,
const std::string & zk_root_dir,
const std::string & zk_queue_dir,
const std::string & zk_replicas_dir,
ContextPtr context_,
const Poco::Util::AbstractConfiguration * config,
const String & prefix,
@ -104,10 +104,15 @@ DDLWorker::DDLWorker(
worker_pool = std::make_unique<ThreadPool>(CurrentMetrics::DDLWorkerThreads, CurrentMetrics::DDLWorkerThreadsActive, CurrentMetrics::DDLWorkerThreadsScheduled, pool_size);
}
queue_dir = zk_root_dir;
queue_dir = zk_queue_dir;
if (queue_dir.back() == '/')
queue_dir.resize(queue_dir.size() - 1);
replicas_dir = zk_replicas_dir;
if (replicas_dir.back() == '/')
replicas_dir.resize(replicas_dir.size() - 1);
if (config)
{
task_max_lifetime = config->getUInt64(prefix + ".task_max_lifetime", static_cast<UInt64>(task_max_lifetime));
@ -1058,6 +1063,11 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
String query_path_prefix = fs::path(queue_dir) / "query-";
zookeeper->createAncestors(query_path_prefix);
NameSet host_ids;
for (const HostID & host : entry.hosts)
host_ids.emplace(host.toString());
createReplicaDirs(zookeeper, host_ids);
String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
if (max_pushed_entry_metric)
{
@ -1097,6 +1107,7 @@ bool DDLWorker::initializeMainThread()
{
auto zookeeper = getAndSetZooKeeper();
zookeeper->createAncestors(fs::path(queue_dir) / "");
initializeReplication();
initialized = true;
return true;
}
@ -1158,6 +1169,14 @@ void DDLWorker::runMainThread()
}
cleanup_event->set();
try
{
markReplicasActive(reinitialized);
}
catch (...)
{
tryLogCurrentException(log, "An error occurred when markReplicasActive: ");
}
scheduleTasks(reinitialized);
subsequent_errors_count = 0;
@ -1215,6 +1234,97 @@ void DDLWorker::runMainThread()
}
void DDLWorker::initializeReplication()
{
auto zookeeper = getAndSetZooKeeper();
zookeeper->createAncestors(fs::path(replicas_dir) / "");
NameSet host_id_set;
for (const auto & it : context->getClusters())
{
auto cluster = it.second;
for (const auto & host_ids : cluster->getHostIDs())
for (const auto & host_id : host_ids)
host_id_set.emplace(host_id);
}
createReplicaDirs(zookeeper, host_id_set);
}
void DDLWorker::createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids)
{
for (const auto & host_id : host_ids)
zookeeper->createAncestors(fs::path(replicas_dir) / host_id / "");
}
void DDLWorker::markReplicasActive(bool reinitialized)
{
auto zookeeper = getAndSetZooKeeper();
if (reinitialized)
{
// Reset all active_node_holders
for (auto & it : active_node_holders)
{
auto & active_node_holder = it.second.second;
if (active_node_holder)
active_node_holder->setAlreadyRemoved();
active_node_holder.reset();
}
active_node_holders.clear();
}
const auto maybe_secure_port = context->getTCPPortSecure();
const auto port = context->getTCPPort();
Coordination::Stat replicas_stat;
Strings host_ids = zookeeper->getChildren(replicas_dir, &replicas_stat);
NameSet local_host_ids;
for (const auto & host_id : host_ids)
{
if (active_node_holders.contains(host_id))
continue;
try
{
HostID host = HostID::fromString(host_id);
/// The port is considered local if it matches TCP or TCP secure port that the server is listening.
bool is_local_host = (maybe_secure_port && host.isLocalAddress(*maybe_secure_port)) || host.isLocalAddress(port);
if (is_local_host)
local_host_ids.emplace(host_id);
}
catch (const Exception & e)
{
LOG_WARNING(log, "Unable to check if host {} is a local address, exception: {}", host_id, e.displayText());
continue;
}
}
for (const auto & host_id : local_host_ids)
{
auto it = active_node_holders.find(host_id);
if (it != active_node_holders.end())
{
continue;
}
String active_path = fs::path(replicas_dir) / host_id / "active";
if (zookeeper->exists(active_path))
continue;
String active_id = toString(ServerUUID::get());
LOG_TRACE(log, "Trying to mark a replica active: active_path={}, active_id={}", active_path, active_id);
zookeeper->create(active_path, active_id, zkutil::CreateMode::Ephemeral);
auto active_node_holder_zookeeper = zookeeper;
auto active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper);
active_node_holders[host_id] = {active_node_holder_zookeeper, active_node_holder};
}
}
void DDLWorker::runCleanupThread()
{
setThreadName("DDLWorkerClnr");

View File

@ -1,24 +1,24 @@
#pragma once
#include <Common/CurrentThread.h>
#include <Interpreters/Context.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Storages/IStorage_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Interpreters/Context_fwd.h>
#include <Poco/Event.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <list>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <unordered_set>
namespace zkutil
{
class ZooKeeper;
@ -52,8 +52,16 @@ class AccessRightsElements;
class DDLWorker
{
public:
DDLWorker(int pool_size_, const std::string & zk_root_dir, ContextPtr context_, const Poco::Util::AbstractConfiguration * config, const String & prefix,
const String & logger_name = "DDLWorker", const CurrentMetrics::Metric * max_entry_metric_ = nullptr, const CurrentMetrics::Metric * max_pushed_entry_metric_ = nullptr);
DDLWorker(
int pool_size_,
const std::string & zk_queue_dir,
const std::string & zk_replicas_dir,
ContextPtr context_,
const Poco::Util::AbstractConfiguration * config,
const String & prefix,
const String & logger_name = "DDLWorker",
const CurrentMetrics::Metric * max_entry_metric_ = nullptr,
const CurrentMetrics::Metric * max_pushed_entry_metric_ = nullptr);
virtual ~DDLWorker();
/// Pushes query into DDL queue, returns path to created node
@ -71,6 +79,8 @@ public:
return queue_dir;
}
std::string getReplicasDir() const { return replicas_dir; }
void startup();
virtual void shutdown();
@ -149,6 +159,10 @@ protected:
/// Return false if the worker was stopped (stop_flag = true)
virtual bool initializeMainThread();
virtual void initializeReplication();
virtual void createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids);
virtual void markReplicasActive(bool reinitialized);
void runMainThread();
void runCleanupThread();
@ -160,7 +174,8 @@ protected:
std::string host_fqdn; /// current host domain name
std::string host_fqdn_id; /// host_name:port
std::string queue_dir; /// dir with queue of queries
std::string queue_dir; /// dir with queue of queries
std::string replicas_dir;
mutable std::mutex zookeeper_mutex;
ZooKeeperPtr current_zookeeper TSA_GUARDED_BY(zookeeper_mutex);
@ -202,6 +217,8 @@ protected:
const CurrentMetrics::Metric * max_entry_metric;
const CurrentMetrics::Metric * max_pushed_entry_metric;
std::unordered_map<String, std::pair<ZooKeeperPtr, zkutil::EphemeralNodeHolderPtr>> active_node_holders;
};

View File

@ -0,0 +1,270 @@
#include <Core/Block.h>
#include <Core/Settings.h>
#include <Core/SettingsEnums.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Interpreters/DistributedQueryStatusSource.h>
#include <Common/Exception.h>
#include <Common/ZooKeeper/ZooKeeper.h>
namespace DB
{
namespace Setting
{
extern const SettingsDistributedDDLOutputMode distributed_ddl_output_mode;
extern const SettingsInt64 distributed_ddl_task_timeout;
}
namespace ErrorCodes
{
extern const int UNFINISHED;
}
DistributedQueryStatusSource::DistributedQueryStatusSource(
const String & zk_node_path,
const String & zk_replicas_path,
Block block,
ContextPtr context_,
const Strings & hosts_to_wait,
const char * logger_name)
: ISource(block)
, node_path(zk_node_path)
, replicas_path(zk_replicas_path)
, context(context_)
, watch(CLOCK_MONOTONIC_COARSE)
, log(getLogger(logger_name))
{
auto output_mode = context->getSettingsRef()[Setting::distributed_ddl_output_mode];
throw_on_timeout = output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE;
throw_on_timeout_only_active
= output_mode == DistributedDDLOutputMode::THROW_ONLY_ACTIVE || output_mode == DistributedDDLOutputMode::NONE_ONLY_ACTIVE;
waiting_hosts = NameSet(hosts_to_wait.begin(), hosts_to_wait.end());
only_running_hosts = output_mode == DistributedDDLOutputMode::THROW_ONLY_ACTIVE
|| output_mode == DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE
|| output_mode == DistributedDDLOutputMode::NONE_ONLY_ACTIVE;
addTotalRowsApprox(waiting_hosts.size());
timeout_seconds = context->getSettingsRef()[Setting::distributed_ddl_task_timeout];
}
IProcessor::Status DistributedQueryStatusSource::prepare()
{
/// This method is overloaded to throw exception after all data is read.
/// Exception is pushed into pipe (instead of simply being thrown) to ensure the order of data processing and exception.
if (finished)
{
if (first_exception)
{
if (!output.canPush())
return Status::PortFull;
output.pushException(std::make_exception_ptr(*first_exception));
}
output.finish();
return Status::Finished;
}
else
return ISource::prepare();
}
NameSet DistributedQueryStatusSource::getOfflineHosts(const NameSet & hosts_to_wait, const ZooKeeperPtr & zookeeper)
{
Strings paths;
Strings hosts_array;
for (const auto & host : hosts_to_wait)
{
hosts_array.push_back(host);
paths.push_back(fs::path(replicas_path) / host / "active");
}
NameSet offline;
auto res = zookeeper->tryGet(paths);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error == Coordination::Error::ZNONODE)
offline.insert(hosts_array[i]);
if (offline.size() == hosts_to_wait.size())
{
/// Avoid reporting that all hosts are offline
LOG_WARNING(log, "Did not find active hosts, will wait for all {} hosts. This should not happen often", offline.size());
return {};
}
return offline;
}
Strings DistributedQueryStatusSource::getNewAndUpdate(const Strings & current_finished_hosts)
{
Strings diff;
for (const String & host : current_finished_hosts)
{
if (!waiting_hosts.contains(host))
{
if (!ignoring_hosts.contains(host))
{
ignoring_hosts.emplace(host);
LOG_INFO(log, "Unexpected host {} appeared in task {}", host, node_path);
}
continue;
}
if (!finished_hosts.contains(host))
{
diff.emplace_back(host);
finished_hosts.emplace(host);
}
}
return diff;
}
ExecutionStatus DistributedQueryStatusSource::getExecutionStatus(const fs::path & status_path)
{
ExecutionStatus status(-1, "Cannot obtain error message");
String status_data;
bool finished_exists = false;
auto retries_ctl = ZooKeeperRetriesControl(
"executeDDLQueryOnCluster", getLogger("DDLQueryStatusSource"), getRetriesInfo(), context->getProcessListElement());
retries_ctl.retryLoop([&]() { finished_exists = context->getZooKeeper()->tryGet(status_path, status_data); });
if (finished_exists)
status.tryDeserializeText(status_data);
return status;
}
ZooKeeperRetriesInfo DistributedQueryStatusSource::getRetriesInfo()
{
const auto & config_ref = Context::getGlobalContextInstance()->getConfigRef();
return ZooKeeperRetriesInfo(
config_ref.getInt("distributed_ddl_keeper_max_retries", 5),
config_ref.getInt("distributed_ddl_keeper_initial_backoff_ms", 100),
config_ref.getInt("distributed_ddl_keeper_max_backoff_ms", 5000));
}
std::pair<String, UInt16> DistributedQueryStatusSource::parseHostAndPort(const String & host_id)
{
String host = host_id;
UInt16 port = 0;
auto host_and_port = Cluster::Address::fromString(host_id);
host = host_and_port.first;
port = host_and_port.second;
return {host, port};
}
Chunk DistributedQueryStatusSource::generate()
{
bool all_hosts_finished = num_hosts_finished >= waiting_hosts.size();
/// Seems like num_hosts_finished cannot be strictly greater than waiting_hosts.size()
assert(num_hosts_finished <= waiting_hosts.size());
if (all_hosts_finished || timeout_exceeded)
return {};
size_t try_number = 0;
while (true)
{
if (isCancelled())
return {};
if (stop_waiting_offline_hosts)
{
return stopWaitingOfflineHosts();
}
if ((timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds))
{
return handleTimeoutExceeded();
}
sleepForMilliseconds(std::min<size_t>(1000, 50 * try_number));
bool node_exists = false;
Strings tmp_hosts;
Strings tmp_active_hosts;
{
auto retries_ctl = ZooKeeperRetriesControl(
"executeDistributedQueryOnCluster", getLogger(getName()), getRetriesInfo(), context->getProcessListElement());
retries_ctl.retryLoop(
[&]()
{
auto zookeeper = context->getZooKeeper();
Strings paths = getNodesToWait();
auto res = zookeeper->tryGetChildren(paths);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error != Coordination::Error::ZOK && res[i].error != Coordination::Error::ZNONODE)
throw Coordination::Exception::fromPath(res[i].error, paths[i]);
if (res[0].error == Coordination::Error::ZNONODE)
node_exists = zookeeper->exists(node_path);
else
node_exists = true;
tmp_hosts = res[0].names;
tmp_active_hosts = res[1].names;
if (only_running_hosts)
offline_hosts = getOfflineHosts(waiting_hosts, zookeeper);
});
}
if (!node_exists)
{
/// Paradoxically, this exception will be throw even in case of "never_throw" mode.
if (!first_exception)
first_exception = std::make_unique<Exception>(Exception(
ErrorCodes::UNFINISHED,
"Cannot provide query execution status. The query's node {} has been deleted by the cleaner"
" since it was finished (or its lifetime is expired)",
node_path));
return {};
}
Strings new_hosts = getNewAndUpdate(tmp_hosts);
++try_number;
if (only_running_hosts)
{
size_t num_finished_or_offline = 0;
for (const auto & host : waiting_hosts)
num_finished_or_offline += finished_hosts.contains(host) || offline_hosts.contains(host);
if (num_finished_or_offline == waiting_hosts.size())
stop_waiting_offline_hosts = true;
}
if (new_hosts.empty())
continue;
current_active_hosts = std::move(tmp_active_hosts);
MutableColumns columns = output.getHeader().cloneEmptyColumns();
for (const String & host_id : new_hosts)
{
ExecutionStatus status = checkStatus(host_id);
if (status.code != 0)
{
handleNonZeroStatusCode(status, host_id);
}
++num_hosts_finished;
fillHostStatus(host_id, status, columns);
}
return Chunk(std::move(columns), new_hosts.size());
}
}
}

View File

@ -0,0 +1,68 @@
#pragma once
#include <filesystem>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/DDLTask.h>
#include <Processors/ISource.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
namespace fs = std::filesystem;
namespace DB
{
class DistributedQueryStatusSource : public ISource
{
public:
DistributedQueryStatusSource(
const String & zk_node_path,
const String & zk_replicas_path,
Block block,
ContextPtr context_,
const Strings & hosts_to_wait,
const char * logger_name);
Chunk generate() override;
Status prepare() override;
protected:
virtual ExecutionStatus checkStatus(const String & host_id) = 0;
virtual Chunk generateChunkWithUnfinishedHosts() const = 0;
virtual Strings getNodesToWait() = 0;
virtual Chunk handleTimeoutExceeded() = 0;
virtual Chunk stopWaitingOfflineHosts() = 0;
virtual void handleNonZeroStatusCode(const ExecutionStatus & status, const String & host_id) = 0;
virtual void fillHostStatus(const String & host_id, const ExecutionStatus & status, MutableColumns & columns) = 0;
virtual NameSet getOfflineHosts(const NameSet & hosts_to_wait, const ZooKeeperPtr & zookeeper);
Strings getNewAndUpdate(const Strings & current_finished_hosts);
ExecutionStatus getExecutionStatus(const fs::path & status_path);
static ZooKeeperRetriesInfo getRetriesInfo();
static std::pair<String, UInt16> parseHostAndPort(const String & host_id);
String node_path;
String replicas_path;
ContextPtr context;
Stopwatch watch;
LoggerPtr log;
NameSet waiting_hosts; /// hosts from task host list
NameSet finished_hosts; /// finished hosts from host list
NameSet ignoring_hosts; /// appeared hosts that are not in hosts list
Strings current_active_hosts; /// Hosts that are currently executing the task
NameSet offline_hosts; /// Hosts that are not currently running
size_t num_hosts_finished = 0;
/// Save the first detected error and throw it at the end of execution
std::unique_ptr<Exception> first_exception;
Int64 timeout_seconds = 120;
bool throw_on_timeout = true;
bool throw_on_timeout_only_active = false;
bool only_running_hosts = false;
bool timeout_exceeded = false;
bool stop_waiting_offline_hosts = false;
};
}

View File

@ -1987,6 +1987,12 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
UInt16 hashed_zk_path = sipHash64(txn->getTaskZooKeeperPath());
random_suffix = getHexUIntLowercase(hashed_zk_path);
}
else if (!current_context->getCurrentQueryId().empty())
{
random_suffix = getRandomASCIIString(/*length=*/2);
UInt8 hashed_query_id = sipHash64(current_context->getCurrentQueryId());
random_suffix += getHexUIntLowercase(hashed_query_id);
}
else
{
random_suffix = getRandomASCIIString(/*length=*/4);

View File

@ -0,0 +1,170 @@
#include <Core/Settings.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/ReplicatedDatabaseQueryStatusSource.h>
namespace DB
{
namespace Setting
{
extern const SettingsBool database_replicated_enforce_synchronous_settings;
extern const SettingsDistributedDDLOutputMode distributed_ddl_output_mode;
}
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
extern const int LOGICAL_ERROR;
}
ReplicatedDatabaseQueryStatusSource::ReplicatedDatabaseQueryStatusSource(
const String & zk_node_path, const String & zk_replicas_path, ContextPtr context_, const Strings & hosts_to_wait)
: DistributedQueryStatusSource(
zk_node_path, zk_replicas_path, getSampleBlock(), context_, hosts_to_wait, "ReplicatedDatabaseQueryStatusSource")
{
}
ExecutionStatus ReplicatedDatabaseQueryStatusSource::checkStatus([[maybe_unused]] const String & host_id)
{
/// Replicated database retries in case of error, it should not write error status.
#ifdef DEBUG_OR_SANITIZER_BUILD
fs::path status_path = fs::path(node_path) / "finished" / host_id;
return getExecutionStatus(status_path);
#else
return ExecutionStatus{0};
#endif
}
Chunk ReplicatedDatabaseQueryStatusSource::generateChunkWithUnfinishedHosts() const
{
NameSet unfinished_hosts = waiting_hosts;
for (const auto & host_id : finished_hosts)
unfinished_hosts.erase(host_id);
NameSet active_hosts_set = NameSet{current_active_hosts.begin(), current_active_hosts.end()};
/// Query is not finished on the rest hosts, so fill the corresponding rows with NULLs.
MutableColumns columns = output.getHeader().cloneEmptyColumns();
for (const String & host_id : unfinished_hosts)
{
size_t num = 0;
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard);
columns[num++]->insert(replica);
if (active_hosts_set.contains(host_id))
columns[num++]->insert(IN_PROGRESS);
else
columns[num++]->insert(QUEUED);
columns[num++]->insert(unfinished_hosts.size());
columns[num++]->insert(current_active_hosts.size());
}
return Chunk(std::move(columns), unfinished_hosts.size());
}
Strings ReplicatedDatabaseQueryStatusSource::getNodesToWait()
{
String node_to_wait = "finished";
if (context->getSettingsRef()[Setting::database_replicated_enforce_synchronous_settings])
{
node_to_wait = "synced";
}
return {String(fs::path(node_path) / node_to_wait), String(fs::path(node_path) / "active")};
}
Chunk ReplicatedDatabaseQueryStatusSource::handleTimeoutExceeded()
{
timeout_exceeded = true;
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
size_t num_active_hosts = current_active_hosts.size();
constexpr auto msg_format = "ReplicatedDatabase DDL task {} is not finished on {} of {} hosts "
"({} of them are currently executing the task, {} are inactive). "
"They are going to execute the query in background. Was waiting for {} seconds{}";
if (throw_on_timeout || (throw_on_timeout_only_active && !stop_waiting_offline_hosts))
{
if (!first_exception)
first_exception = std::make_unique<Exception>(Exception(
ErrorCodes::TIMEOUT_EXCEEDED,
msg_format,
node_path,
num_unfinished_hosts,
waiting_hosts.size(),
num_active_hosts,
offline_hosts.size(),
watch.elapsedSeconds(),
stop_waiting_offline_hosts ? "" : ", which is longer than distributed_ddl_task_timeout"));
/// For Replicated database print a list of unfinished hosts as well. Will return empty block on next iteration.
return generateChunkWithUnfinishedHosts();
}
LOG_INFO(
log,
msg_format,
node_path,
num_unfinished_hosts,
waiting_hosts.size(),
num_active_hosts,
offline_hosts.size(),
watch.elapsedSeconds(),
stop_waiting_offline_hosts ? "" : "which is longer than distributed_ddl_task_timeout");
return generateChunkWithUnfinishedHosts();
}
Chunk ReplicatedDatabaseQueryStatusSource::stopWaitingOfflineHosts()
{
// Same logic as timeout exceeded
return handleTimeoutExceeded();
}
void ReplicatedDatabaseQueryStatusSource::handleNonZeroStatusCode(const ExecutionStatus & status, const String & host_id)
{
assert(status.code != 0);
if (!first_exception && context->getSettingsRef()[Setting::distributed_ddl_output_mode] != DistributedDDLOutputMode::NEVER_THROW)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
}
}
void ReplicatedDatabaseQueryStatusSource::fillHostStatus(const String & host_id, const ExecutionStatus & status, MutableColumns & columns)
{
size_t num = 0;
if (status.code != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard);
columns[num++]->insert(replica);
columns[num++]->insert(OK);
columns[num++]->insert(waiting_hosts.size() - num_hosts_finished);
columns[num++]->insert(current_active_hosts.size());
}
Block ReplicatedDatabaseQueryStatusSource::getSampleBlock()
{
auto get_status_enum = []()
{
return std::make_shared<DataTypeEnum8>(DataTypeEnum8::Values{
{"OK", static_cast<Int8>(OK)},
{"IN_PROGRESS", static_cast<Int8>(IN_PROGRESS)},
{"QUEUED", static_cast<Int8>(QUEUED)},
});
};
return Block{
{std::make_shared<DataTypeString>(), "shard"},
{std::make_shared<DataTypeString>(), "replica"},
{get_status_enum(), "status"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
};
}
}

View File

@ -0,0 +1,40 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/DistributedQueryStatusSource.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
namespace DB
{
class ReplicatedDatabaseQueryStatusSource final : public DistributedQueryStatusSource
{
public:
ReplicatedDatabaseQueryStatusSource(
const String & zk_node_path, const String & zk_replicas_path, ContextPtr context_, const Strings & hosts_to_wait);
String getName() const override { return "ReplicatedDatabaseQueryStatus"; }
protected:
ExecutionStatus checkStatus(const String & host_id) override;
Chunk generateChunkWithUnfinishedHosts() const override;
Strings getNodesToWait() override;
Chunk handleTimeoutExceeded() override;
Chunk stopWaitingOfflineHosts() override;
void handleNonZeroStatusCode(const ExecutionStatus & status, const String & host_id) override;
void fillHostStatus(const String & host_id, const ExecutionStatus & status, MutableColumns & columns) override;
private:
static Block getSampleBlock();
enum ReplicatedDatabaseQueryStatus
{
/// Query is (successfully) finished
OK = 0,
/// Query is not finished yet, but replica is currently executing it
IN_PROGRESS = 1,
/// Replica is not available or busy with previous queries. It will process query asynchronously
QUEUED = 2,
};
};
}

View File

@ -1,33 +1,32 @@
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/queryToString.h>
#include <filesystem>
#include <Access/Common/AccessRightsElement.h>
#include <Access/ContextAccess.h>
#include <Core/Settings.h>
#include <Common/Macros.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include "Parsers/ASTSystemQuery.h"
#include <Databases/DatabaseReplicated.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLOnClusterQueryStatusSource.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTSystemQuery.h>
#include <Parsers/queryToString.h>
#include <Processors/Sinks/EmptySink.h>
#include <QueryPipeline/Pipe.h>
#include <filesystem>
#include <base/sort.h>
#include <Common/Macros.h>
#include <Common/ZooKeeper/ZooKeeper.h>
namespace fs = std::filesystem;
namespace DB
{
namespace Setting
@ -41,21 +40,11 @@ namespace Setting
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int TIMEOUT_EXCEEDED;
extern const int UNFINISHED;
extern const int QUERY_IS_PROHIBITED;
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int QUERY_IS_PROHIBITED;
extern const int LOGICAL_ERROR;
}
static ZooKeeperRetriesInfo getRetriesInfo()
{
const auto & config_ref = Context::getGlobalContextInstance()->getConfigRef();
return ZooKeeperRetriesInfo(
config_ref.getInt("distributed_ddl_keeper_max_retries", 5),
config_ref.getInt("distributed_ddl_keeper_initial_backoff_ms", 100),
config_ref.getInt("distributed_ddl_keeper_max_backoff_ms", 5000));
}
bool isSupportedAlterTypeForOnClusterDDLQuery(int type)
{
@ -202,72 +191,19 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
entry.initial_query_id = context->getClientInfo().initial_query_id;
String node_path = ddl_worker.enqueueQuery(entry);
return getDistributedDDLStatus(node_path, entry, context, /* hosts_to_wait */ nullptr);
return getDDLOnClusterStatus(node_path, ddl_worker.getReplicasDir(), entry, context);
}
class DDLQueryStatusSource final : public ISource
{
public:
DDLQueryStatusSource(
const String & zk_node_path, const DDLLogEntry & entry, ContextPtr context_, const Strings * hosts_to_wait);
String getName() const override { return "DDLQueryStatus"; }
Chunk generate() override;
Status prepare() override;
private:
static Block getSampleBlock(ContextPtr context_, bool hosts_to_wait);
Strings getNewAndUpdate(const Strings & current_list_of_finished_hosts);
std::pair<String, UInt16> parseHostAndPort(const String & host_id) const;
Chunk generateChunkWithUnfinishedHosts() const;
enum ReplicatedDatabaseQueryStatus
{
/// Query is (successfully) finished
OK = 0,
/// Query is not finished yet, but replica is currently executing it
IN_PROGRESS = 1,
/// Replica is not available or busy with previous queries. It will process query asynchronously
QUEUED = 2,
};
String node_path;
ContextPtr context;
Stopwatch watch;
LoggerPtr log;
NameSet waiting_hosts; /// hosts from task host list
NameSet finished_hosts; /// finished hosts from host list
NameSet ignoring_hosts; /// appeared hosts that are not in hosts list
Strings current_active_hosts; /// Hosts that are currently executing the task
NameSet offline_hosts; /// Hosts that are not currently running
size_t num_hosts_finished = 0;
/// Save the first detected error and throw it at the end of execution
std::unique_ptr<Exception> first_exception;
Int64 timeout_seconds = 120;
bool is_replicated_database = false;
bool throw_on_timeout = true;
bool throw_on_timeout_only_active = false;
bool only_running_hosts = false;
bool timeout_exceeded = false;
bool stop_waiting_offline_hosts = false;
};
BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & entry, ContextPtr context, const Strings * hosts_to_wait)
BlockIO getDDLOnClusterStatus(const String & node_path, const String & replicas_path, const DDLLogEntry & entry, ContextPtr context)
{
BlockIO io;
if (context->getSettingsRef()[Setting::distributed_ddl_task_timeout] == 0)
return io;
Strings hosts_to_wait;
for (const HostID & host : entry.hosts)
hosts_to_wait.push_back(host.toString());
auto source = std::make_shared<DDLQueryStatusSource>(node_path, entry, context, hosts_to_wait);
auto source = std::make_shared<DDLOnClusterQueryStatusSource>(node_path, replicas_path, context, hosts_to_wait);
io.pipeline = QueryPipeline(std::move(source));
if (context->getSettingsRef()[Setting::distributed_ddl_output_mode] == DistributedDDLOutputMode::NONE
@ -277,394 +213,6 @@ BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & en
return io;
}
Block DDLQueryStatusSource::getSampleBlock(ContextPtr context_, bool hosts_to_wait)
{
auto output_mode = context_->getSettingsRef()[Setting::distributed_ddl_output_mode];
auto maybe_make_nullable = [&](const DataTypePtr & type) -> DataTypePtr
{
if (output_mode == DistributedDDLOutputMode::THROW ||
output_mode == DistributedDDLOutputMode::NONE ||
output_mode == DistributedDDLOutputMode::NONE_ONLY_ACTIVE)
return type;
return std::make_shared<DataTypeNullable>(type);
};
auto get_status_enum = []()
{
return std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"OK", static_cast<Int8>(OK)},
{"IN_PROGRESS", static_cast<Int8>(IN_PROGRESS)},
{"QUEUED", static_cast<Int8>(QUEUED)},
});
};
if (hosts_to_wait)
{
return Block{
{std::make_shared<DataTypeString>(), "shard"},
{std::make_shared<DataTypeString>(), "replica"},
{get_status_enum(), "status"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
};
}
return Block{
{std::make_shared<DataTypeString>(), "host"},
{std::make_shared<DataTypeUInt16>(), "port"},
{maybe_make_nullable(std::make_shared<DataTypeInt64>()), "status"},
{maybe_make_nullable(std::make_shared<DataTypeString>()), "error"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
};
}
DDLQueryStatusSource::DDLQueryStatusSource(
const String & zk_node_path, const DDLLogEntry & entry, ContextPtr context_, const Strings * hosts_to_wait)
: ISource(getSampleBlock(context_, static_cast<bool>(hosts_to_wait)))
, node_path(zk_node_path)
, context(context_)
, watch(CLOCK_MONOTONIC_COARSE)
, log(getLogger("DDLQueryStatusSource"))
{
auto output_mode = context->getSettingsRef()[Setting::distributed_ddl_output_mode];
throw_on_timeout = output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE;
throw_on_timeout_only_active = output_mode == DistributedDDLOutputMode::THROW_ONLY_ACTIVE || output_mode == DistributedDDLOutputMode::NONE_ONLY_ACTIVE;
if (hosts_to_wait)
{
waiting_hosts = NameSet(hosts_to_wait->begin(), hosts_to_wait->end());
is_replicated_database = true;
only_running_hosts = output_mode == DistributedDDLOutputMode::THROW_ONLY_ACTIVE ||
output_mode == DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE ||
output_mode == DistributedDDLOutputMode::NONE_ONLY_ACTIVE;
}
else
{
for (const HostID & host : entry.hosts)
waiting_hosts.emplace(host.toString());
}
addTotalRowsApprox(waiting_hosts.size());
timeout_seconds = context->getSettingsRef()[Setting::distributed_ddl_task_timeout];
}
std::pair<String, UInt16> DDLQueryStatusSource::parseHostAndPort(const String & host_id) const
{
String host = host_id;
UInt16 port = 0;
if (!is_replicated_database)
{
auto host_and_port = Cluster::Address::fromString(host_id);
host = host_and_port.first;
port = host_and_port.second;
}
return {host, port};
}
Chunk DDLQueryStatusSource::generateChunkWithUnfinishedHosts() const
{
NameSet unfinished_hosts = waiting_hosts;
for (const auto & host_id : finished_hosts)
unfinished_hosts.erase(host_id);
NameSet active_hosts_set = NameSet{current_active_hosts.begin(), current_active_hosts.end()};
/// Query is not finished on the rest hosts, so fill the corresponding rows with NULLs.
MutableColumns columns = output.getHeader().cloneEmptyColumns();
for (const String & host_id : unfinished_hosts)
{
size_t num = 0;
if (is_replicated_database)
{
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard);
columns[num++]->insert(replica);
if (active_hosts_set.contains(host_id))
columns[num++]->insert(IN_PROGRESS);
else
columns[num++]->insert(QUEUED);
}
else
{
auto [host, port] = parseHostAndPort(host_id);
columns[num++]->insert(host);
columns[num++]->insert(port);
columns[num++]->insert(Field{});
columns[num++]->insert(Field{});
}
columns[num++]->insert(unfinished_hosts.size());
columns[num++]->insert(current_active_hosts.size());
}
return Chunk(std::move(columns), unfinished_hosts.size());
}
static NameSet getOfflineHosts(const String & node_path, const NameSet & hosts_to_wait, const ZooKeeperPtr & zookeeper, LoggerPtr log)
{
fs::path replicas_path;
if (node_path.ends_with('/'))
replicas_path = fs::path(node_path).parent_path().parent_path().parent_path() / "replicas";
else
replicas_path = fs::path(node_path).parent_path().parent_path() / "replicas";
Strings paths;
Strings hosts_array;
for (const auto & host : hosts_to_wait)
{
hosts_array.push_back(host);
paths.push_back(replicas_path / host / "active");
}
NameSet offline;
auto res = zookeeper->tryGet(paths);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error == Coordination::Error::ZNONODE)
offline.insert(hosts_array[i]);
if (offline.size() == hosts_to_wait.size())
{
/// Avoid reporting that all hosts are offline
LOG_WARNING(log, "Did not find active hosts, will wait for all {} hosts. This should not happen often", offline.size());
return {};
}
return offline;
}
Chunk DDLQueryStatusSource::generate()
{
bool all_hosts_finished = num_hosts_finished >= waiting_hosts.size();
/// Seems like num_hosts_finished cannot be strictly greater than waiting_hosts.size()
assert(num_hosts_finished <= waiting_hosts.size());
if (all_hosts_finished || timeout_exceeded)
return {};
String node_to_wait = "finished";
if (is_replicated_database && context->getSettingsRef()[Setting::database_replicated_enforce_synchronous_settings])
node_to_wait = "synced";
size_t try_number = 0;
while (true)
{
if (isCancelled())
return {};
if (stop_waiting_offline_hosts || (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds))
{
timeout_exceeded = true;
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
size_t num_active_hosts = current_active_hosts.size();
constexpr auto msg_format = "Distributed DDL task {} is not finished on {} of {} hosts "
"({} of them are currently executing the task, {} are inactive). "
"They are going to execute the query in background. Was waiting for {} seconds{}";
if (throw_on_timeout || (throw_on_timeout_only_active && !stop_waiting_offline_hosts))
{
if (!first_exception)
first_exception = std::make_unique<Exception>(Exception(ErrorCodes::TIMEOUT_EXCEEDED,
msg_format, node_path, num_unfinished_hosts, waiting_hosts.size(), num_active_hosts, offline_hosts.size(),
watch.elapsedSeconds(), stop_waiting_offline_hosts ? "" : ", which is longer than distributed_ddl_task_timeout"));
/// For Replicated database print a list of unfinished hosts as well. Will return empty block on next iteration.
if (is_replicated_database)
return generateChunkWithUnfinishedHosts();
return {};
}
LOG_INFO(log, msg_format, node_path, num_unfinished_hosts, waiting_hosts.size(), num_active_hosts, offline_hosts.size(),
watch.elapsedSeconds(), stop_waiting_offline_hosts ? "" : "which is longer than distributed_ddl_task_timeout");
return generateChunkWithUnfinishedHosts();
}
sleepForMilliseconds(std::min<size_t>(1000, 50 * try_number));
bool node_exists = false;
Strings tmp_hosts;
Strings tmp_active_hosts;
{
auto retries_ctl = ZooKeeperRetriesControl(
"executeDDLQueryOnCluster", getLogger("DDLQueryStatusSource"), getRetriesInfo(), context->getProcessListElement());
retries_ctl.retryLoop([&]()
{
auto zookeeper = context->getZooKeeper();
Strings paths = {String(fs::path(node_path) / node_to_wait), String(fs::path(node_path) / "active")};
auto res = zookeeper->tryGetChildren(paths);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error != Coordination::Error::ZOK && res[i].error != Coordination::Error::ZNONODE)
throw Coordination::Exception::fromPath(res[i].error, paths[i]);
if (res[0].error == Coordination::Error::ZNONODE)
node_exists = zookeeper->exists(node_path);
else
node_exists = true;
tmp_hosts = res[0].names;
tmp_active_hosts = res[1].names;
if (only_running_hosts)
offline_hosts = getOfflineHosts(node_path, waiting_hosts, zookeeper, log);
});
}
if (!node_exists)
{
/// Paradoxically, this exception will be throw even in case of "never_throw" mode.
if (!first_exception)
first_exception = std::make_unique<Exception>(Exception(ErrorCodes::UNFINISHED,
"Cannot provide query execution status. The query's node {} has been deleted by the cleaner"
" since it was finished (or its lifetime is expired)",
node_path));
return {};
}
Strings new_hosts = getNewAndUpdate(tmp_hosts);
++try_number;
if (only_running_hosts)
{
size_t num_finished_or_offline = 0;
for (const auto & host : waiting_hosts)
num_finished_or_offline += finished_hosts.contains(host) || offline_hosts.contains(host);
if (num_finished_or_offline == waiting_hosts.size())
stop_waiting_offline_hosts = true;
}
if (new_hosts.empty())
continue;
current_active_hosts = std::move(tmp_active_hosts);
MutableColumns columns = output.getHeader().cloneEmptyColumns();
for (const String & host_id : new_hosts)
{
ExecutionStatus status(-1, "Cannot obtain error message");
/// Replicated database retries in case of error, it should not write error status.
#ifdef DEBUG_OR_SANITIZER_BUILD
bool need_check_status = true;
#else
bool need_check_status = !is_replicated_database;
#endif
if (need_check_status)
{
String status_data;
bool finished_exists = false;
auto retries_ctl = ZooKeeperRetriesControl(
"executeDDLQueryOnCluster",
getLogger("DDLQueryStatusSource"),
getRetriesInfo(),
context->getProcessListElement());
retries_ctl.retryLoop([&]()
{
finished_exists = context->getZooKeeper()->tryGet(fs::path(node_path) / "finished" / host_id, status_data);
});
if (finished_exists)
status.tryDeserializeText(status_data);
}
else
{
status = ExecutionStatus{0};
}
if (status.code != 0 && !first_exception
&& context->getSettingsRef()[Setting::distributed_ddl_output_mode] != DistributedDDLOutputMode::NEVER_THROW)
{
if (is_replicated_database)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
auto [host, port] = parseHostAndPort(host_id);
first_exception = std::make_unique<Exception>(Exception(status.code,
"There was an error on [{}:{}]: {}", host, port, status.message));
}
++num_hosts_finished;
size_t num = 0;
if (is_replicated_database)
{
if (status.code != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard);
columns[num++]->insert(replica);
columns[num++]->insert(OK);
}
else
{
auto [host, port] = parseHostAndPort(host_id);
columns[num++]->insert(host);
columns[num++]->insert(port);
columns[num++]->insert(status.code);
columns[num++]->insert(status.message);
}
columns[num++]->insert(waiting_hosts.size() - num_hosts_finished);
columns[num++]->insert(current_active_hosts.size());
}
return Chunk(std::move(columns), new_hosts.size());
}
}
IProcessor::Status DDLQueryStatusSource::prepare()
{
/// This method is overloaded to throw exception after all data is read.
/// Exception is pushed into pipe (instead of simply being thrown) to ensure the order of data processing and exception.
if (finished)
{
if (first_exception)
{
if (!output.canPush())
return Status::PortFull;
output.pushException(std::make_exception_ptr(*first_exception));
}
output.finish();
return Status::Finished;
}
return ISource::prepare();
}
Strings DDLQueryStatusSource::getNewAndUpdate(const Strings & current_list_of_finished_hosts)
{
Strings diff;
for (const String & host : current_list_of_finished_hosts)
{
if (!waiting_hosts.contains(host))
{
if (!ignoring_hosts.contains(host))
{
ignoring_hosts.emplace(host);
LOG_INFO(log, "Unexpected host {} appeared in task {}", host, node_path);
}
continue;
}
if (!finished_hosts.contains(host))
{
diff.emplace_back(host);
finished_hosts.emplace(host);
}
}
return diff;
}
bool maybeRemoveOnCluster(const ASTPtr & query_ptr, ContextPtr context)
{
const auto * query = dynamic_cast<const ASTQueryWithTableAndOutput *>(query_ptr.get());

View File

@ -43,7 +43,7 @@ struct DDLQueryOnClusterParams
/// Returns DDLQueryStatusSource, which reads results of query execution on each host in the cluster.
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, ContextPtr context, const DDLQueryOnClusterParams & params = {});
BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & entry, ContextPtr context, const Strings * hosts_to_wait);
BlockIO getDDLOnClusterStatus(const String & node_path, const String & replicas_path, const DDLLogEntry & entry, ContextPtr context);
bool maybeRemoveOnCluster(const ASTPtr & query_ptr, ContextPtr context);

View File

@ -30,11 +30,13 @@ ProtocolServerAdapter::ProtocolServerAdapter(
const std::string & listen_host_,
const char * port_name_,
const std::string & description_,
std::unique_ptr<TCPServer> tcp_server_)
std::unique_ptr<TCPServer> tcp_server_,
bool supports_runtime_reconfiguration_)
: listen_host(listen_host_)
, port_name(port_name_)
, description(description_)
, impl(std::make_unique<TCPServerAdapterImpl>(std::move(tcp_server_)))
, supports_runtime_reconfiguration(supports_runtime_reconfiguration_)
{
}
@ -66,11 +68,13 @@ ProtocolServerAdapter::ProtocolServerAdapter(
const std::string & listen_host_,
const char * port_name_,
const std::string & description_,
std::unique_ptr<GRPCServer> grpc_server_)
std::unique_ptr<GRPCServer> grpc_server_,
bool supports_runtime_reconfiguration_)
: listen_host(listen_host_)
, port_name(port_name_)
, description(description_)
, impl(std::make_unique<GRPCServerAdapterImpl>(std::move(grpc_server_)))
, supports_runtime_reconfiguration(supports_runtime_reconfiguration_)
{
}
#endif

View File

@ -21,10 +21,20 @@ class ProtocolServerAdapter
public:
ProtocolServerAdapter(ProtocolServerAdapter && src) = default;
ProtocolServerAdapter & operator =(ProtocolServerAdapter && src) = default;
ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr<TCPServer> tcp_server_);
ProtocolServerAdapter(
const std::string & listen_host_,
const char * port_name_,
const std::string & description_,
std::unique_ptr<TCPServer> tcp_server_,
bool supports_runtime_reconfiguration_ = true);
#if USE_GRPC
ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr<GRPCServer> grpc_server_);
ProtocolServerAdapter(
const std::string & listen_host_,
const char * port_name_,
const std::string & description_,
std::unique_ptr<GRPCServer> grpc_server_,
bool supports_runtime_reconfiguration_ = true);
#endif
/// Starts the server. A new thread will be created that waits for and accepts incoming connections.
@ -46,6 +56,8 @@ public:
/// Returns the port this server is listening to.
UInt16 portNumber() const { return impl->portNumber(); }
bool supportsRuntimeReconfiguration() const { return supports_runtime_reconfiguration; }
const std::string & getListenHost() const { return listen_host; }
const std::string & getPortName() const { return port_name; }
@ -72,6 +84,7 @@ private:
std::string port_name;
std::string description;
std::unique_ptr<Impl> impl;
bool supports_runtime_reconfiguration = true;
};
}

View File

@ -3944,11 +3944,11 @@ class ClickHouseInstance:
)
logging.info(f"PS RESULT:\n{ps_clickhouse}")
pid = self.get_process_pid("clickhouse")
if pid is not None:
self.exec_in_container(
["bash", "-c", f"gdb -batch -ex 'thread apply all bt full' -p {pid}"],
user="root",
)
# if pid is not None:
# self.exec_in_container(
# ["bash", "-c", f"gdb -batch -ex 'thread apply all bt full' -p {pid}"],
# user="root",
# )
if last_err is not None:
raise last_err

View File

@ -291,6 +291,8 @@
<distributed_ddl>
<!-- Path in ZooKeeper to queue with DDL queries -->
<path>/clickhouse/task_queue/ddl</path>
<!-- Path in ZooKeeper to store running DDL hosts -->
<replicas_path>/clickhouse/task_queue/replicas</replicas_path>
<!-- Settings from this profile will be used to execute DDL queries -->
<!-- <profile>default</profile> -->

View File

@ -849,6 +849,8 @@
<distributed_ddl>
<!-- Path in ZooKeeper to queue with DDL queries -->
<path>/clickhouse/task_queue/ddl</path>
<!-- Path in ZooKeeper to store running DDL hosts -->
<replicas_path>/clickhouse/task_queue/replicas</replicas_path>
<!-- Settings from this profile will be used to execute DDL queries -->
<!-- <profile>default</profile> -->

View File

@ -0,0 +1,30 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
<replica>
<host>node4</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
<allow_zookeeper_write>1</allow_zookeeper_write>
</clickhouse>

View File

@ -0,0 +1,88 @@
import time
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/remote_servers.xml"],
with_zookeeper=True,
stay_alive=True,
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node3 = cluster.add_instance(
"node3", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node4 = cluster.add_instance(
"node4",
main_configs=["configs/remote_servers.xml"],
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_stop_waiting_for_offline_hosts(started_cluster):
timeout = 10
settings = {"distributed_ddl_task_timeout": timeout}
node1.query(
"DROP TABLE IF EXISTS test_table ON CLUSTER test_cluster SYNC",
settings=settings,
)
node1.query(
"CREATE TABLE test_table ON CLUSTER test_cluster (x Int) Engine=Memory",
settings=settings,
)
try:
node4.stop_clickhouse()
start = time.time()
assert "Code: 159. DB::Exception" in node1.query_and_get_error(
"DROP TABLE IF EXISTS test_table ON CLUSTER test_cluster SYNC",
settings=settings,
)
assert time.time() - start >= timeout
start = time.time()
assert "Code: 159. DB::Exception" in node1.query_and_get_error(
"CREATE TABLE test_table ON CLUSTER test_cluster (x Int) Engine=Memory",
settings=settings,
)
assert time.time() - start >= timeout
# set `distributed_ddl_output_mode` = `throw_only_active``
settings = {
"distributed_ddl_task_timeout": timeout,
"distributed_ddl_output_mode": "throw_only_active",
}
start = time.time()
node1.query(
"DROP TABLE IF EXISTS test_table ON CLUSTER test_cluster SYNC",
settings=settings,
)
start = time.time()
node1.query(
"CREATE TABLE test_table ON CLUSTER test_cluster (x Int) Engine=Memory",
settings=settings,
)
finally:
node4.start_clickhouse()

View File

@ -0,0 +1,30 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
<replica>
<host>node4</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
<allow_zookeeper_write>1</allow_zookeeper_write>
</clickhouse>

View File

@ -0,0 +1,77 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/remote_servers.xml"],
with_zookeeper=True,
stay_alive=True,
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node3 = cluster.add_instance(
"node3", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node4 = cluster.add_instance(
"node4",
main_configs=["configs/remote_servers.xml"],
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_ddl_worker_replicas(started_cluster):
for replica in ["node1:9000", "node2:9000", "node3:9000", "node4:9000"]:
# wait until the replicas path is created
node1.query_with_retry(
sql=f"SELECT count() FROM system.zookeeper WHERE path='/clickhouse/task_queue/replicas/{replica}'",
check_callback=lambda result: result == 1,
)
result = node1.query(
f"SELECT name, value, ephemeralOwner FROM system.zookeeper WHERE path='/clickhouse/task_queue/replicas/{replica}'"
).strip()
print(f"result: {replica} {result}")
lines = list(result.split("\n"))
assert len(lines) == 1
parts = list(lines[0].split("\t"))
assert len(parts) == 3
assert parts[0] == "active"
assert len(parts[1]) != 0
assert len(parts[2]) != 0
try:
node4.stop_clickhouse()
# wait for node4 active path is removed
node1.query_with_retry(
sql=f"SELECT count() FROM system.zookeeper WHERE path='/clickhouse/task_queue/replicas/node4:9000'",
check_callback=lambda result: result == 0,
)
result = node1.query_with_retry(
f"SELECT name, value, ephemeralOwner FROM system.zookeeper WHERE path='/clickhouse/task_queue/replicas/node4:9000'"
).strip()
print(f"result: {replica} {result}")
lines = list(result.split("\n"))
assert len(lines) == 1
assert len(lines[0]) == 0
finally:
node4.start_clickhouse()

View File

@ -256,6 +256,8 @@
<distributed_ddl>
<!-- Path in ZooKeeper to queue with DDL queries -->
<path>/clickhouse/task_queue/ddl</path>
<!-- Path in ZooKeeper to store running DDL hosts -->
<replicas_path>/clickhouse/task_queue/replicas</replicas_path>
<!-- Settings from this profile will be used to execute DDL queries -->
<!-- <profile>default</profile> -->