Merge branch 'master' into retry-more-error-codes-from-s3

This commit is contained in:
Alexey Milovidov 2024-10-29 02:55:06 +01:00
commit 52482a2880
48 changed files with 1514 additions and 651 deletions

View File

@ -25,9 +25,10 @@
// We don't have libc struct available here.
// Compute aux vector manually (from /proc/self/auxv).
//
// Right now there is only 51 AT_* constants,
// so 64 should be enough until this implementation will be replaced with musl.
static unsigned long __auxv_procfs[64];
// Right now there are 51 AT_* constants. Custom kernels have been encountered
// making use of up to 71. 128 should be enough until this implementation is
// replaced with musl.
static unsigned long __auxv_procfs[128];
static unsigned long __auxv_secure = 0;
// Common
static unsigned long * __auxv_environ = NULL;

2
contrib/numactl vendored

@ -1 +1 @@
Subproject commit 8d13d63a05f0c3cd88bf777cbb61541202b7da08
Subproject commit ff32c618d63ca7ac48cce366c5a04bb3563683a0

View File

@ -1975,6 +1975,22 @@ The default is `false`.
<async_load_databases>true</async_load_databases>
```
## async_load_system_database {#async_load_system_database}
Asynchronous loading of system tables. Helpful if there is a high amount of log tables and parts in the `system` database. Independent of the `async_load_databases` setting.
If set to `true`, all system databases with `Ordinary`, `Atomic`, and `Replicated` engines will be loaded asynchronously after the ClickHouse server starts. See `system.asynchronous_loader` table, `tables_loader_background_pool_size` and `tables_loader_foreground_pool_size` server settings. Any query that tries to access a system table, that is not yet loaded, will wait for exactly this table to be started up. The table that is waited for by at least one query will be loaded with higher priority. Also consider setting the `max_waiting_queries` setting to limit the total number of waiting queries.
If `false`, system database loads before server start.
The default is `false`.
**Example**
``` xml
<async_load_system_database>true</async_load_system_database>
```
## tables_loader_foreground_pool_size {#tables_loader_foreground_pool_size}
Sets the number of threads performing load jobs in foreground pool. The foreground pool is used for loading table synchronously before server start listening on a port and for loading tables that are waited for. Foreground pool has higher priority than background pool. It means that no job starts in background pool while there are jobs running in foreground pool.

View File

@ -185,6 +185,7 @@ Examples:
- `CREATE USER name1 VALID UNTIL '2025-01-01'`
- `CREATE USER name1 VALID UNTIL '2025-01-01 12:00:00 UTC'`
- `CREATE USER name1 VALID UNTIL 'infinity'`
- ```CREATE USER name1 VALID UNTIL '2025-01-01 12:00:00 `Asia/Tokyo`'```
## GRANTEES Clause

View File

@ -93,7 +93,6 @@ LIMIT 5;
ClickHouse also can determine the compression method of the file. For example, if the file was zipped up with a `.csv.gz` extension, ClickHouse would decompress the file automatically.
:::
## Usage
Suppose that we have several files with following URIs on S3:
@ -248,6 +247,25 @@ FROM s3(
LIMIT 5;
```
## Using S3 credentials (ClickHouse Cloud)
For non-public buckets, users can pass an `aws_access_key_id` and `aws_secret_access_key` to the function. For example:
```sql
SELECT count() FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/mta/*.tsv', '<KEY>', '<SECRET>','TSVWithNames')
```
This is appropriate for one-off accesses or in cases where credentials can easily be rotated. However, this is not recommended as a long-term solution for repeated access or where credentials are sensitive. In this case, we recommend users rely on role-based access.
Role-based access for S3 in ClickHouse Cloud is documented [here](/docs/en/cloud/security/secure-s3#access-your-s3-bucket-with-the-clickhouseaccess-role).
Once configured, a `roleARN` can be passed to the s3 function via an `extra_credentials` parameter. For example:
```sql
SELECT count() FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/mta/*.tsv','CSVWithNames',extra_credentials(role_arn = 'arn:aws:iam::111111111111:role/ClickHouseAccessRole-001'))
```
Further examples can be found [here](/docs/en/cloud/security/secure-s3#access-your-s3-bucket-with-the-clickhouseaccess-role)
## Working with archives

View File

@ -70,10 +70,15 @@ SELECT count(*) FROM s3Cluster(
)
```
## Accessing private and public buckets
Users can use the same approaches as document for the s3 function [here](/docs/en/sql-reference/table-functions/s3#accessing-public-buckets).
## Optimizing performance
For details on optimizing the performance of the s3 function see [our detailed guide](/docs/en/integrations/s3/performance).
**See Also**
- [S3 engine](../../engines/table-engines/integrations/s3.md)

View File

@ -821,11 +821,11 @@ void LocalServer::processConfig()
status.emplace(fs::path(path) / "status", StatusFile::write_full_info);
LOG_DEBUG(log, "Loading metadata from {}", path);
auto startup_system_tasks = loadMetadataSystem(global_context);
auto load_system_metadata_tasks = loadMetadataSystem(global_context);
attachSystemTablesServer(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE), false);
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
waitLoad(TablesLoaderForegroundPoolId, startup_system_tasks);
waitLoad(TablesLoaderForegroundPoolId, load_system_metadata_tasks);
if (!getClientConfiguration().has("only-system-tables"))
{

View File

@ -171,6 +171,7 @@ namespace ServerSetting
extern const ServerSettingsBool async_insert_queue_flush_on_shutdown;
extern const ServerSettingsUInt64 async_insert_threads;
extern const ServerSettingsBool async_load_databases;
extern const ServerSettingsBool async_load_system_database;
extern const ServerSettingsUInt64 background_buffer_flush_schedule_pool_size;
extern const ServerSettingsUInt64 background_common_pool_size;
extern const ServerSettingsUInt64 background_distributed_schedule_pool_size;
@ -2199,6 +2200,7 @@ try
LOG_INFO(log, "Loading metadata from {}", path_str);
LoadTaskPtrs load_system_metadata_tasks;
LoadTaskPtrs load_metadata_tasks;
// Make sure that if exception is thrown during startup async, new async loading jobs are not going to be called.
@ -2222,12 +2224,8 @@ try
auto & database_catalog = DatabaseCatalog::instance();
/// We load temporary database first, because projections need it.
database_catalog.initializeAndLoadTemporaryDatabase();
auto system_startup_tasks = loadMetadataSystem(global_context);
maybeConvertSystemDatabase(global_context, system_startup_tasks);
/// This has to be done before the initialization of system logs,
/// otherwise there is a race condition between the system database initialization
/// and creation of new tables in the database.
waitLoad(TablesLoaderForegroundPoolId, system_startup_tasks);
load_system_metadata_tasks = loadMetadataSystem(global_context, server_settings[ServerSetting::async_load_system_database]);
maybeConvertSystemDatabase(global_context, load_system_metadata_tasks);
/// Startup scripts can depend on the system log tables.
if (config().has("startup_scripts") && !server_settings[ServerSetting::prepare_system_log_tables_on_startup].changed)
@ -2393,17 +2391,28 @@ try
if (has_zookeeper && config().has("distributed_ddl"))
{
/// DDL worker should be started after all tables were loaded
String ddl_zookeeper_path = config().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
String ddl_queue_path = config().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
String ddl_replicas_path = config().getString("distributed_ddl.replicas_path", "/clickhouse/task_queue/replicas/");
int pool_size = config().getInt("distributed_ddl.pool_size", 1);
if (pool_size < 1)
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "distributed_ddl.pool_size should be greater then 0");
global_context->setDDLWorker(std::make_unique<DDLWorker>(pool_size, ddl_zookeeper_path, global_context, &config(),
"distributed_ddl", "DDLWorker",
&CurrentMetrics::MaxDDLEntryID, &CurrentMetrics::MaxPushedDDLEntryID),
load_metadata_tasks);
global_context->setDDLWorker(
std::make_unique<DDLWorker>(
pool_size,
ddl_queue_path,
ddl_replicas_path,
global_context,
&config(),
"distributed_ddl",
"DDLWorker",
&CurrentMetrics::MaxDDLEntryID,
&CurrentMetrics::MaxPushedDDLEntryID),
joinTasks(load_system_metadata_tasks, load_metadata_tasks));
}
/// Do not keep tasks in server, they should be kept inside databases. Used here to make dependent tasks only.
load_system_metadata_tasks.clear();
load_system_metadata_tasks.shrink_to_fit();
load_metadata_tasks.clear();
load_metadata_tasks.shrink_to_fit();
@ -3023,7 +3032,7 @@ void Server::updateServers(
for (auto * server : all_servers)
{
if (!server->isStopping())
if (server->supportsRuntimeReconfiguration() && !server->isStopping())
{
std::string port_name = server->getPortName();
bool has_host = false;

View File

@ -1450,6 +1450,8 @@
<distributed_ddl>
<!-- Path in ZooKeeper to queue with DDL queries -->
<path>/clickhouse/task_queue/ddl</path>
<!-- Path in ZooKeeper to store running DDL hosts -->
<replicas_path>/clickhouse/task_queue/replicas</replicas_path>
<!-- Settings from this profile will be used to execute DDL queries -->
<!-- <profile>default</profile> -->

View File

@ -59,13 +59,13 @@ constexpr size_t group_array_sorted_sort_strategy_max_elements_threshold = 10000
template <typename T, GroupArraySortedStrategy strategy>
struct GroupArraySortedData
{
static constexpr bool is_value_generic_field = std::is_same_v<T, Field>;
using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
using Array = PODArray<T, 32, Allocator>;
using Array = typename std::conditional_t<is_value_generic_field, std::vector<T>, PODArray<T, 32, Allocator>>;
static constexpr size_t partial_sort_max_elements_factor = 2;
static constexpr bool is_value_generic_field = std::is_same_v<T, Field>;
Array values;
static bool compare(const T & lhs, const T & rhs)
@ -144,7 +144,7 @@ struct GroupArraySortedData
}
if (values.size() > max_elements)
values.resize(max_elements, arena);
resize(max_elements, arena);
}
ALWAYS_INLINE void partialSortAndLimitIfNeeded(size_t max_elements, Arena * arena)
@ -153,7 +153,23 @@ struct GroupArraySortedData
return;
::nth_element(values.begin(), values.begin() + max_elements, values.end(), Comparator());
values.resize(max_elements, arena);
resize(max_elements, arena);
}
ALWAYS_INLINE void resize(size_t n, Arena * arena)
{
if constexpr (is_value_generic_field)
values.resize(n);
else
values.resize(n, arena);
}
ALWAYS_INLINE void push_back(T && element, Arena * arena)
{
if constexpr (is_value_generic_field)
values.push_back(element);
else
values.push_back(element, arena);
}
ALWAYS_INLINE void addElement(T && element, size_t max_elements, Arena * arena)
@ -171,12 +187,12 @@ struct GroupArraySortedData
return;
}
values.push_back(std::move(element), arena);
push_back(std::move(element), arena);
std::push_heap(values.begin(), values.end(), Comparator());
}
else
{
values.push_back(std::move(element), arena);
push_back(std::move(element), arena);
partialSortAndLimitIfNeeded(max_elements, arena);
}
}
@ -210,14 +226,6 @@ struct GroupArraySortedData
result_array_data[result_array_data_insert_begin + i] = values[i];
}
}
~GroupArraySortedData()
{
for (auto & value : values)
{
value.~T();
}
}
};
template <typename T>
@ -313,14 +321,12 @@ public:
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too large array size, it should not exceed {}", max_elements);
auto & values = this->data(place).values;
values.resize_exact(size, arena);
if constexpr (std::is_same_v<T, Field>)
if constexpr (Data::is_value_generic_field)
{
values.resize(size);
for (Field & element : values)
{
/// We must initialize the Field type since some internal functions (like operator=) use them
new (&element) Field;
bool has_value = false;
readBinary(has_value, buf);
if (has_value)
@ -329,6 +335,7 @@ public:
}
else
{
values.resize_exact(size, arena);
if constexpr (std::endian::native == std::endian::little)
{
buf.readStrict(reinterpret_cast<char *>(values.data()), size * sizeof(values[0]));

View File

@ -147,6 +147,7 @@ namespace DB
DECLARE(UInt64, tables_loader_foreground_pool_size, 0, "The maximum number of threads that will be used for foreground (that is being waited for by a query) loading of tables. Also used for synchronous loading of tables before the server start. Zero means use all CPUs.", 0) \
DECLARE(UInt64, tables_loader_background_pool_size, 0, "The maximum number of threads that will be used for background async loading of tables. Zero means use all CPUs.", 0) \
DECLARE(Bool, async_load_databases, false, "Enable asynchronous loading of databases and tables to speedup server startup. Queries to not yet loaded entity will be blocked until load is finished.", 0) \
DECLARE(Bool, async_load_system_database, false, "Enable asynchronous loading of system tables that are not required on server startup. Queries to not yet loaded tables will be blocked until load is finished.", 0) \
DECLARE(Bool, display_secrets_in_show_and_select, false, "Allow showing secrets in SHOW and SELECT queries via a format setting and a grant", 0) \
DECLARE(Seconds, keep_alive_timeout, DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT, "The number of seconds that ClickHouse waits for incoming requests before closing the connection.", 0) \
DECLARE(UInt64, max_keep_alive_requests, 10000, "The maximum number of requests handled via a single http keepalive connection before the server closes this connection.", 0) \

View File

@ -107,7 +107,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"input_format_parquet_enable_row_group_prefetch", false, true, "Enable row group prefetching during parquet parsing. Currently, only single-threaded parsing can prefetch."},
{"input_format_orc_dictionary_as_low_cardinality", false, true, "Treat ORC dictionary encoded columns as LowCardinality columns while reading ORC files"},
{"allow_experimental_refreshable_materialized_view", false, true, "Not experimental anymore"},
{"max_parts_to_move", 1000, 1000, "New setting"},
{"max_parts_to_move", 0, 1000, "New setting"},
{"hnsw_candidate_list_size_for_search", 0, 0, "New setting"},
{"allow_reorder_prewhere_conditions", false, true, "New setting"},
{"input_format_parquet_bloom_filter_push_down", false, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and bloom filter in the Parquet metadata."},

View File

@ -4,47 +4,49 @@
#include <Backups/IRestoreCoordination.h>
#include <Backups/RestorerFromBackup.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
#include <Databases/DDLDependencyVisitor.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseReplicated.h>
#include <Databases/DatabaseReplicatedWorker.h>
#include <Databases/TablesDependencyGraph.h>
#include <Databases/enableAllExperimentalSettings.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/SharedThreadPools.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/ReplicatedDatabaseQueryStatusSource.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/executeQuery.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTDeleteQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Processors/Sinks/EmptySink.h>
#include <Storages/AlterCommands.h>
#include <Storages/StorageKeeperMap.h>
#include <base/chrono_io.h>
#include <base/getFQDNOrHostName.h>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/PoolId.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/PoolId.h>
#include <Core/ServerSettings.h>
#include <Core/Settings.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseReplicated.h>
#include <Databases/DatabaseReplicatedWorker.h>
#include <Databases/DDLDependencyVisitor.h>
#include <Databases/TablesDependencyGraph.h>
#include <Databases/enableAllExperimentalSettings.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/SharedThreadPools.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTDeleteQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/queryToString.h>
#include <Storages/StorageKeeperMap.h>
#include <Storages/AlterCommands.h>
namespace DB
{
@ -55,6 +57,8 @@ namespace Setting
extern const SettingsUInt64 max_parser_backtracks;
extern const SettingsUInt64 max_parser_depth;
extern const SettingsUInt64 max_query_size;
extern const SettingsDistributedDDLOutputMode distributed_ddl_output_mode;
extern const SettingsInt64 distributed_ddl_task_timeout;
extern const SettingsBool throw_on_unsupported_query_inside_transaction;
}
@ -443,7 +447,6 @@ void DatabaseReplicated::fillClusterAuthInfo(String collection_name, const Poco:
cluster_auth_info.cluster_secure_connection = config_ref.getBool(config_prefix + ".cluster_secure_connection", false);
}
void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessLevel mode)
{
try
@ -1096,7 +1099,8 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex
hosts_to_wait.push_back(unfiltered_hosts[i]);
}
return getDistributedDDLStatus(node_path, entry, query_context, &hosts_to_wait);
return getQueryStatus(node_path, fs::path(zookeeper_path) / "replicas", query_context, hosts_to_wait);
}
static UUID getTableUUIDIfReplicated(const String & metadata, ContextPtr context)
@ -2040,4 +2044,21 @@ void registerDatabaseReplicated(DatabaseFactory & factory)
};
factory.registerDatabase("Replicated", create_fn, {.supports_arguments = true, .supports_settings = true});
}
BlockIO DatabaseReplicated::getQueryStatus(
const String & node_path, const String & replicas_path, ContextPtr context_, const Strings & hosts_to_wait)
{
BlockIO io;
if (context_->getSettingsRef()[Setting::distributed_ddl_task_timeout] == 0)
return io;
auto source = std::make_shared<ReplicatedDatabaseQueryStatusSource>(node_path, replicas_path, context_, hosts_to_wait);
io.pipeline = QueryPipeline(std::move(source));
if (context_->getSettingsRef()[Setting::distributed_ddl_output_mode] == DistributedDDLOutputMode::NONE
|| context_->getSettingsRef()[Setting::distributed_ddl_output_mode] == DistributedDDLOutputMode::NONE_ONLY_ACTIVE)
io.pipeline.complete(std::make_shared<EmptySink>(io.pipeline.getHeader()));
return io;
}
}

View File

@ -151,6 +151,9 @@ private:
void waitDatabaseStarted() const override;
void stopLoading() override;
static BlockIO
getQueryStatus(const String & node_path, const String & replicas_path, ContextPtr context, const Strings & hosts_to_wait);
String zookeeper_path;
String shard_name;
String replica_name;

View File

@ -39,7 +39,14 @@ namespace ErrorCodes
static constexpr const char * FORCE_AUTO_RECOVERY_DIGEST = "42";
DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db, ContextPtr context_)
: DDLWorker(/* pool_size */ 1, db->zookeeper_path + "/log", context_, nullptr, {}, fmt::format("DDLWorker({})", db->getDatabaseName()))
: DDLWorker(
/* pool_size */ 1,
db->zookeeper_path + "/log",
db->zookeeper_path + "/replicas",
context_,
nullptr,
{},
fmt::format("DDLWorker({})", db->getDatabaseName()))
, database(db)
{
/// Pool size must be 1 to avoid reordering of log entries.

View File

@ -38,9 +38,14 @@ public:
UInt32 getLogPointer() const;
UInt64 getCurrentInitializationDurationMs() const;
private:
bool initializeMainThread() override;
void initializeReplication();
void initializeReplication() override;
void createReplicaDirs(const ZooKeeperPtr &, const NameSet &) override { }
void markReplicasActive(bool) override { }
void initializeLogPointer(const String & processed_entry_name);
DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper, bool dry_run) override;

View File

@ -88,7 +88,7 @@ void fillFixedBatch(size_t keys_size, const ColumnRawPtrs & key_columns, const S
out.resize_fill(num_rows);
/// Note: here we violate strict aliasing.
/// It should be ok as log as we do not reffer to any value from `out` before filling.
/// It should be ok as long as we do not refer to any value from `out` before filling.
const char * source = static_cast<const ColumnFixedSizeHelper *>(column)->getRawDataBegin<sizeof(T)>();
T * dest = reinterpret_cast<T *>(reinterpret_cast<char *>(out.data()) + offset);
fillFixedBatch<T, sizeof(Key) / sizeof(T)>(num_rows, reinterpret_cast<const T *>(source), dest); /// NOLINT(bugprone-sizeof-expression)

View File

@ -0,0 +1,157 @@
#include <unordered_set>
#include <Core/Settings.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/DDLOnClusterQueryStatusSource.h>
#include <Common/DNSResolver.h>
#include <Common/isLocalAddress.h>
namespace DB
{
namespace Setting
{
extern const SettingsDistributedDDLOutputMode distributed_ddl_output_mode;
}
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
}
DDLOnClusterQueryStatusSource::DDLOnClusterQueryStatusSource(
const String & zk_node_path, const String & zk_replicas_path, ContextPtr context_, const Strings & hosts_to_wait)
: DistributedQueryStatusSource(
zk_node_path, zk_replicas_path, getSampleBlock(context_), context_, hosts_to_wait, "DDLOnClusterQueryStatusSource")
{
}
ExecutionStatus DDLOnClusterQueryStatusSource::checkStatus(const String & host_id)
{
fs::path status_path = fs::path(node_path) / "finished" / host_id;
return getExecutionStatus(status_path);
}
Chunk DDLOnClusterQueryStatusSource::generateChunkWithUnfinishedHosts() const
{
NameSet unfinished_hosts = waiting_hosts;
for (const auto & host_id : finished_hosts)
unfinished_hosts.erase(host_id);
NameSet active_hosts_set = NameSet{current_active_hosts.begin(), current_active_hosts.end()};
/// Query is not finished on the rest hosts, so fill the corresponding rows with NULLs.
MutableColumns columns = output.getHeader().cloneEmptyColumns();
for (const String & host_id : unfinished_hosts)
{
size_t num = 0;
auto [host, port] = parseHostAndPort(host_id);
columns[num++]->insert(host);
columns[num++]->insert(port);
columns[num++]->insert(Field{});
columns[num++]->insert(Field{});
columns[num++]->insert(unfinished_hosts.size());
columns[num++]->insert(current_active_hosts.size());
}
return Chunk(std::move(columns), unfinished_hosts.size());
}
Strings DDLOnClusterQueryStatusSource::getNodesToWait()
{
return {String(fs::path(node_path) / "finished"), String(fs::path(node_path) / "active")};
}
Chunk DDLOnClusterQueryStatusSource::handleTimeoutExceeded()
{
timeout_exceeded = true;
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
size_t num_active_hosts = current_active_hosts.size();
constexpr auto msg_format = "Distributed DDL task {} is not finished on {} of {} hosts "
"({} of them are currently executing the task, {} are inactive). "
"They are going to execute the query in background. Was waiting for {} seconds{}";
if (throw_on_timeout || (throw_on_timeout_only_active && !stop_waiting_offline_hosts))
{
if (!first_exception)
first_exception = std::make_unique<Exception>(Exception(
ErrorCodes::TIMEOUT_EXCEEDED,
msg_format,
node_path,
num_unfinished_hosts,
waiting_hosts.size(),
num_active_hosts,
offline_hosts.size(),
watch.elapsedSeconds(),
stop_waiting_offline_hosts ? "" : ", which is longer than distributed_ddl_task_timeout"));
return {};
}
LOG_INFO(
log,
msg_format,
node_path,
num_unfinished_hosts,
waiting_hosts.size(),
num_active_hosts,
offline_hosts.size(),
watch.elapsedSeconds(),
stop_waiting_offline_hosts ? "" : "which is longer than distributed_ddl_task_timeout");
return generateChunkWithUnfinishedHosts();
}
Chunk DDLOnClusterQueryStatusSource::stopWaitingOfflineHosts()
{
// Same logic as timeout exceeded
return handleTimeoutExceeded();
}
void DDLOnClusterQueryStatusSource::handleNonZeroStatusCode(const ExecutionStatus & status, const String & host_id)
{
assert(status.code != 0);
if (!first_exception && context->getSettingsRef()[Setting::distributed_ddl_output_mode] != DistributedDDLOutputMode::NEVER_THROW)
{
auto [host, port] = parseHostAndPort(host_id);
first_exception
= std::make_unique<Exception>(Exception(status.code, "There was an error on [{}:{}]: {}", host, port, status.message));
}
}
void DDLOnClusterQueryStatusSource::fillHostStatus(const String & host_id, const ExecutionStatus & status, MutableColumns & columns)
{
size_t num = 0;
auto [host, port] = parseHostAndPort(host_id);
columns[num++]->insert(host);
columns[num++]->insert(port);
columns[num++]->insert(status.code);
columns[num++]->insert(status.message);
columns[num++]->insert(waiting_hosts.size() - num_hosts_finished);
columns[num++]->insert(current_active_hosts.size());
}
Block DDLOnClusterQueryStatusSource::getSampleBlock(ContextPtr context_)
{
auto output_mode = context_->getSettingsRef()[Setting::distributed_ddl_output_mode];
auto maybe_make_nullable = [&](const DataTypePtr & type) -> DataTypePtr
{
if (output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE
|| output_mode == DistributedDDLOutputMode::NONE_ONLY_ACTIVE)
return type;
return std::make_shared<DataTypeNullable>(type);
};
return Block{
{std::make_shared<DataTypeString>(), "host"},
{std::make_shared<DataTypeUInt16>(), "port"},
{maybe_make_nullable(std::make_shared<DataTypeInt64>()), "status"},
{maybe_make_nullable(std::make_shared<DataTypeString>()), "error"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
};
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/DistributedQueryStatusSource.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
namespace DB
{
class DDLOnClusterQueryStatusSource final : public DistributedQueryStatusSource
{
public:
DDLOnClusterQueryStatusSource(
const String & zk_node_path, const String & zk_replicas_path, ContextPtr context_, const Strings & hosts_to_wait);
String getName() const override { return "DDLOnClusterQueryStatus"; }
protected:
ExecutionStatus checkStatus(const String & host_id) override;
Chunk generateChunkWithUnfinishedHosts() const override;
Strings getNodesToWait() override;
Chunk handleTimeoutExceeded() override;
Chunk stopWaitingOfflineHosts() override;
void handleNonZeroStatusCode(const ExecutionStatus & status, const String & host_id) override;
void fillHostStatus(const String & host_id, const ExecutionStatus & status, MutableColumns & columns) override;
private:
static Block getSampleBlock(ContextPtr context_);
};
}

View File

@ -1,48 +1,47 @@
#include <filesystem>
#include <Interpreters/DDLWorker.h>
#include <Core/ServerUUID.h>
#include <Core/Settings.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/executeQuery.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ParserQuery.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <Storages/IStorage.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/setThreadName.h>
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeperLock.h>
#include <Common/isLocalAddress.h>
#include <Core/ServerUUID.h>
#include <Core/Settings.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Poco/Timestamp.h>
#include <base/sleep.h>
#include <base/getFQDNOrHostName.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/ThreadPool.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperLock.h>
#include <Common/isLocalAddress.h>
#include <Common/logger_useful.h>
#include <Common/randomSeed.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <base/getFQDNOrHostName.h>
#include <base/sleep.h>
#include <base/sort.h>
#include <memory>
#include <random>
#include <pcg_random.hpp>
#include <Common/scope_guard_safe.h>
#include <Common/ThreadPool.h>
#include <Interpreters/ZooKeeperLog.h>
namespace fs = std::filesystem;
namespace CurrentMetrics
{
extern const Metric DDLWorkerThreads;
@ -78,7 +77,8 @@ constexpr const char * TASK_PROCESSED_OUT_REASON = "Task has been already proces
DDLWorker::DDLWorker(
int pool_size_,
const std::string & zk_root_dir,
const std::string & zk_queue_dir,
const std::string & zk_replicas_dir,
ContextPtr context_,
const Poco::Util::AbstractConfiguration * config,
const String & prefix,
@ -104,10 +104,15 @@ DDLWorker::DDLWorker(
worker_pool = std::make_unique<ThreadPool>(CurrentMetrics::DDLWorkerThreads, CurrentMetrics::DDLWorkerThreadsActive, CurrentMetrics::DDLWorkerThreadsScheduled, pool_size);
}
queue_dir = zk_root_dir;
queue_dir = zk_queue_dir;
if (queue_dir.back() == '/')
queue_dir.resize(queue_dir.size() - 1);
replicas_dir = zk_replicas_dir;
if (replicas_dir.back() == '/')
replicas_dir.resize(replicas_dir.size() - 1);
if (config)
{
task_max_lifetime = config->getUInt64(prefix + ".task_max_lifetime", static_cast<UInt64>(task_max_lifetime));
@ -1058,6 +1063,11 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
String query_path_prefix = fs::path(queue_dir) / "query-";
zookeeper->createAncestors(query_path_prefix);
NameSet host_ids;
for (const HostID & host : entry.hosts)
host_ids.emplace(host.toString());
createReplicaDirs(zookeeper, host_ids);
String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
if (max_pushed_entry_metric)
{
@ -1097,6 +1107,7 @@ bool DDLWorker::initializeMainThread()
{
auto zookeeper = getAndSetZooKeeper();
zookeeper->createAncestors(fs::path(queue_dir) / "");
initializeReplication();
initialized = true;
return true;
}
@ -1158,6 +1169,14 @@ void DDLWorker::runMainThread()
}
cleanup_event->set();
try
{
markReplicasActive(reinitialized);
}
catch (...)
{
tryLogCurrentException(log, "An error occurred when markReplicasActive: ");
}
scheduleTasks(reinitialized);
subsequent_errors_count = 0;
@ -1215,6 +1234,97 @@ void DDLWorker::runMainThread()
}
void DDLWorker::initializeReplication()
{
auto zookeeper = getAndSetZooKeeper();
zookeeper->createAncestors(fs::path(replicas_dir) / "");
NameSet host_id_set;
for (const auto & it : context->getClusters())
{
auto cluster = it.second;
for (const auto & host_ids : cluster->getHostIDs())
for (const auto & host_id : host_ids)
host_id_set.emplace(host_id);
}
createReplicaDirs(zookeeper, host_id_set);
}
void DDLWorker::createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids)
{
for (const auto & host_id : host_ids)
zookeeper->createAncestors(fs::path(replicas_dir) / host_id / "");
}
void DDLWorker::markReplicasActive(bool reinitialized)
{
auto zookeeper = getAndSetZooKeeper();
if (reinitialized)
{
// Reset all active_node_holders
for (auto & it : active_node_holders)
{
auto & active_node_holder = it.second.second;
if (active_node_holder)
active_node_holder->setAlreadyRemoved();
active_node_holder.reset();
}
active_node_holders.clear();
}
const auto maybe_secure_port = context->getTCPPortSecure();
const auto port = context->getTCPPort();
Coordination::Stat replicas_stat;
Strings host_ids = zookeeper->getChildren(replicas_dir, &replicas_stat);
NameSet local_host_ids;
for (const auto & host_id : host_ids)
{
if (active_node_holders.contains(host_id))
continue;
try
{
HostID host = HostID::fromString(host_id);
/// The port is considered local if it matches TCP or TCP secure port that the server is listening.
bool is_local_host = (maybe_secure_port && host.isLocalAddress(*maybe_secure_port)) || host.isLocalAddress(port);
if (is_local_host)
local_host_ids.emplace(host_id);
}
catch (const Exception & e)
{
LOG_WARNING(log, "Unable to check if host {} is a local address, exception: {}", host_id, e.displayText());
continue;
}
}
for (const auto & host_id : local_host_ids)
{
auto it = active_node_holders.find(host_id);
if (it != active_node_holders.end())
{
continue;
}
String active_path = fs::path(replicas_dir) / host_id / "active";
if (zookeeper->exists(active_path))
continue;
String active_id = toString(ServerUUID::get());
LOG_TRACE(log, "Trying to mark a replica active: active_path={}, active_id={}", active_path, active_id);
zookeeper->create(active_path, active_id, zkutil::CreateMode::Ephemeral);
auto active_node_holder_zookeeper = zookeeper;
auto active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper);
active_node_holders[host_id] = {active_node_holder_zookeeper, active_node_holder};
}
}
void DDLWorker::runCleanupThread()
{
setThreadName("DDLWorkerClnr");

View File

@ -1,24 +1,24 @@
#pragma once
#include <Common/CurrentThread.h>
#include <Interpreters/Context.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Storages/IStorage_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Interpreters/Context_fwd.h>
#include <Poco/Event.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <list>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <unordered_set>
namespace zkutil
{
class ZooKeeper;
@ -52,8 +52,16 @@ class AccessRightsElements;
class DDLWorker
{
public:
DDLWorker(int pool_size_, const std::string & zk_root_dir, ContextPtr context_, const Poco::Util::AbstractConfiguration * config, const String & prefix,
const String & logger_name = "DDLWorker", const CurrentMetrics::Metric * max_entry_metric_ = nullptr, const CurrentMetrics::Metric * max_pushed_entry_metric_ = nullptr);
DDLWorker(
int pool_size_,
const std::string & zk_queue_dir,
const std::string & zk_replicas_dir,
ContextPtr context_,
const Poco::Util::AbstractConfiguration * config,
const String & prefix,
const String & logger_name = "DDLWorker",
const CurrentMetrics::Metric * max_entry_metric_ = nullptr,
const CurrentMetrics::Metric * max_pushed_entry_metric_ = nullptr);
virtual ~DDLWorker();
/// Pushes query into DDL queue, returns path to created node
@ -71,6 +79,8 @@ public:
return queue_dir;
}
std::string getReplicasDir() const { return replicas_dir; }
void startup();
virtual void shutdown();
@ -149,6 +159,10 @@ protected:
/// Return false if the worker was stopped (stop_flag = true)
virtual bool initializeMainThread();
virtual void initializeReplication();
virtual void createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids);
virtual void markReplicasActive(bool reinitialized);
void runMainThread();
void runCleanupThread();
@ -160,7 +174,8 @@ protected:
std::string host_fqdn; /// current host domain name
std::string host_fqdn_id; /// host_name:port
std::string queue_dir; /// dir with queue of queries
std::string queue_dir; /// dir with queue of queries
std::string replicas_dir;
mutable std::mutex zookeeper_mutex;
ZooKeeperPtr current_zookeeper TSA_GUARDED_BY(zookeeper_mutex);
@ -202,6 +217,8 @@ protected:
const CurrentMetrics::Metric * max_entry_metric;
const CurrentMetrics::Metric * max_pushed_entry_metric;
std::unordered_map<String, std::pair<ZooKeeperPtr, zkutil::EphemeralNodeHolderPtr>> active_node_holders;
};

View File

@ -0,0 +1,270 @@
#include <Core/Block.h>
#include <Core/Settings.h>
#include <Core/SettingsEnums.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Interpreters/DistributedQueryStatusSource.h>
#include <Common/Exception.h>
#include <Common/ZooKeeper/ZooKeeper.h>
namespace DB
{
namespace Setting
{
extern const SettingsDistributedDDLOutputMode distributed_ddl_output_mode;
extern const SettingsInt64 distributed_ddl_task_timeout;
}
namespace ErrorCodes
{
extern const int UNFINISHED;
}
DistributedQueryStatusSource::DistributedQueryStatusSource(
const String & zk_node_path,
const String & zk_replicas_path,
Block block,
ContextPtr context_,
const Strings & hosts_to_wait,
const char * logger_name)
: ISource(block)
, node_path(zk_node_path)
, replicas_path(zk_replicas_path)
, context(context_)
, watch(CLOCK_MONOTONIC_COARSE)
, log(getLogger(logger_name))
{
auto output_mode = context->getSettingsRef()[Setting::distributed_ddl_output_mode];
throw_on_timeout = output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE;
throw_on_timeout_only_active
= output_mode == DistributedDDLOutputMode::THROW_ONLY_ACTIVE || output_mode == DistributedDDLOutputMode::NONE_ONLY_ACTIVE;
waiting_hosts = NameSet(hosts_to_wait.begin(), hosts_to_wait.end());
only_running_hosts = output_mode == DistributedDDLOutputMode::THROW_ONLY_ACTIVE
|| output_mode == DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE
|| output_mode == DistributedDDLOutputMode::NONE_ONLY_ACTIVE;
addTotalRowsApprox(waiting_hosts.size());
timeout_seconds = context->getSettingsRef()[Setting::distributed_ddl_task_timeout];
}
IProcessor::Status DistributedQueryStatusSource::prepare()
{
/// This method is overloaded to throw exception after all data is read.
/// Exception is pushed into pipe (instead of simply being thrown) to ensure the order of data processing and exception.
if (finished)
{
if (first_exception)
{
if (!output.canPush())
return Status::PortFull;
output.pushException(std::make_exception_ptr(*first_exception));
}
output.finish();
return Status::Finished;
}
else
return ISource::prepare();
}
NameSet DistributedQueryStatusSource::getOfflineHosts(const NameSet & hosts_to_wait, const ZooKeeperPtr & zookeeper)
{
Strings paths;
Strings hosts_array;
for (const auto & host : hosts_to_wait)
{
hosts_array.push_back(host);
paths.push_back(fs::path(replicas_path) / host / "active");
}
NameSet offline;
auto res = zookeeper->tryGet(paths);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error == Coordination::Error::ZNONODE)
offline.insert(hosts_array[i]);
if (offline.size() == hosts_to_wait.size())
{
/// Avoid reporting that all hosts are offline
LOG_WARNING(log, "Did not find active hosts, will wait for all {} hosts. This should not happen often", offline.size());
return {};
}
return offline;
}
Strings DistributedQueryStatusSource::getNewAndUpdate(const Strings & current_finished_hosts)
{
Strings diff;
for (const String & host : current_finished_hosts)
{
if (!waiting_hosts.contains(host))
{
if (!ignoring_hosts.contains(host))
{
ignoring_hosts.emplace(host);
LOG_INFO(log, "Unexpected host {} appeared in task {}", host, node_path);
}
continue;
}
if (!finished_hosts.contains(host))
{
diff.emplace_back(host);
finished_hosts.emplace(host);
}
}
return diff;
}
ExecutionStatus DistributedQueryStatusSource::getExecutionStatus(const fs::path & status_path)
{
ExecutionStatus status(-1, "Cannot obtain error message");
String status_data;
bool finished_exists = false;
auto retries_ctl = ZooKeeperRetriesControl(
"executeDDLQueryOnCluster", getLogger("DDLQueryStatusSource"), getRetriesInfo(), context->getProcessListElement());
retries_ctl.retryLoop([&]() { finished_exists = context->getZooKeeper()->tryGet(status_path, status_data); });
if (finished_exists)
status.tryDeserializeText(status_data);
return status;
}
ZooKeeperRetriesInfo DistributedQueryStatusSource::getRetriesInfo()
{
const auto & config_ref = Context::getGlobalContextInstance()->getConfigRef();
return ZooKeeperRetriesInfo(
config_ref.getInt("distributed_ddl_keeper_max_retries", 5),
config_ref.getInt("distributed_ddl_keeper_initial_backoff_ms", 100),
config_ref.getInt("distributed_ddl_keeper_max_backoff_ms", 5000));
}
std::pair<String, UInt16> DistributedQueryStatusSource::parseHostAndPort(const String & host_id)
{
String host = host_id;
UInt16 port = 0;
auto host_and_port = Cluster::Address::fromString(host_id);
host = host_and_port.first;
port = host_and_port.second;
return {host, port};
}
Chunk DistributedQueryStatusSource::generate()
{
bool all_hosts_finished = num_hosts_finished >= waiting_hosts.size();
/// Seems like num_hosts_finished cannot be strictly greater than waiting_hosts.size()
assert(num_hosts_finished <= waiting_hosts.size());
if (all_hosts_finished || timeout_exceeded)
return {};
size_t try_number = 0;
while (true)
{
if (isCancelled())
return {};
if (stop_waiting_offline_hosts)
{
return stopWaitingOfflineHosts();
}
if ((timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds))
{
return handleTimeoutExceeded();
}
sleepForMilliseconds(std::min<size_t>(1000, 50 * try_number));
bool node_exists = false;
Strings tmp_hosts;
Strings tmp_active_hosts;
{
auto retries_ctl = ZooKeeperRetriesControl(
"executeDistributedQueryOnCluster", getLogger(getName()), getRetriesInfo(), context->getProcessListElement());
retries_ctl.retryLoop(
[&]()
{
auto zookeeper = context->getZooKeeper();
Strings paths = getNodesToWait();
auto res = zookeeper->tryGetChildren(paths);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error != Coordination::Error::ZOK && res[i].error != Coordination::Error::ZNONODE)
throw Coordination::Exception::fromPath(res[i].error, paths[i]);
if (res[0].error == Coordination::Error::ZNONODE)
node_exists = zookeeper->exists(node_path);
else
node_exists = true;
tmp_hosts = res[0].names;
tmp_active_hosts = res[1].names;
if (only_running_hosts)
offline_hosts = getOfflineHosts(waiting_hosts, zookeeper);
});
}
if (!node_exists)
{
/// Paradoxically, this exception will be throw even in case of "never_throw" mode.
if (!first_exception)
first_exception = std::make_unique<Exception>(Exception(
ErrorCodes::UNFINISHED,
"Cannot provide query execution status. The query's node {} has been deleted by the cleaner"
" since it was finished (or its lifetime is expired)",
node_path));
return {};
}
Strings new_hosts = getNewAndUpdate(tmp_hosts);
++try_number;
if (only_running_hosts)
{
size_t num_finished_or_offline = 0;
for (const auto & host : waiting_hosts)
num_finished_or_offline += finished_hosts.contains(host) || offline_hosts.contains(host);
if (num_finished_or_offline == waiting_hosts.size())
stop_waiting_offline_hosts = true;
}
if (new_hosts.empty())
continue;
current_active_hosts = std::move(tmp_active_hosts);
MutableColumns columns = output.getHeader().cloneEmptyColumns();
for (const String & host_id : new_hosts)
{
ExecutionStatus status = checkStatus(host_id);
if (status.code != 0)
{
handleNonZeroStatusCode(status, host_id);
}
++num_hosts_finished;
fillHostStatus(host_id, status, columns);
}
return Chunk(std::move(columns), new_hosts.size());
}
}
}

View File

@ -0,0 +1,68 @@
#pragma once
#include <filesystem>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/DDLTask.h>
#include <Processors/ISource.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
namespace fs = std::filesystem;
namespace DB
{
class DistributedQueryStatusSource : public ISource
{
public:
DistributedQueryStatusSource(
const String & zk_node_path,
const String & zk_replicas_path,
Block block,
ContextPtr context_,
const Strings & hosts_to_wait,
const char * logger_name);
Chunk generate() override;
Status prepare() override;
protected:
virtual ExecutionStatus checkStatus(const String & host_id) = 0;
virtual Chunk generateChunkWithUnfinishedHosts() const = 0;
virtual Strings getNodesToWait() = 0;
virtual Chunk handleTimeoutExceeded() = 0;
virtual Chunk stopWaitingOfflineHosts() = 0;
virtual void handleNonZeroStatusCode(const ExecutionStatus & status, const String & host_id) = 0;
virtual void fillHostStatus(const String & host_id, const ExecutionStatus & status, MutableColumns & columns) = 0;
virtual NameSet getOfflineHosts(const NameSet & hosts_to_wait, const ZooKeeperPtr & zookeeper);
Strings getNewAndUpdate(const Strings & current_finished_hosts);
ExecutionStatus getExecutionStatus(const fs::path & status_path);
static ZooKeeperRetriesInfo getRetriesInfo();
static std::pair<String, UInt16> parseHostAndPort(const String & host_id);
String node_path;
String replicas_path;
ContextPtr context;
Stopwatch watch;
LoggerPtr log;
NameSet waiting_hosts; /// hosts from task host list
NameSet finished_hosts; /// finished hosts from host list
NameSet ignoring_hosts; /// appeared hosts that are not in hosts list
Strings current_active_hosts; /// Hosts that are currently executing the task
NameSet offline_hosts; /// Hosts that are not currently running
size_t num_hosts_finished = 0;
/// Save the first detected error and throw it at the end of execution
std::unique_ptr<Exception> first_exception;
Int64 timeout_seconds = 120;
bool throw_on_timeout = true;
bool throw_on_timeout_only_active = false;
bool only_running_hosts = false;
bool timeout_exceeded = false;
bool stop_waiting_offline_hosts = false;
};
}

View File

@ -1987,6 +1987,12 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
UInt16 hashed_zk_path = sipHash64(txn->getTaskZooKeeperPath());
random_suffix = getHexUIntLowercase(hashed_zk_path);
}
else if (!current_context->getCurrentQueryId().empty())
{
random_suffix = getRandomASCIIString(/*length=*/2);
UInt8 hashed_query_id = sipHash64(current_context->getCurrentQueryId());
random_suffix += getHexUIntLowercase(hashed_query_id);
}
else
{
random_suffix = getRandomASCIIString(/*length=*/4);

View File

@ -0,0 +1,170 @@
#include <Core/Settings.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/ReplicatedDatabaseQueryStatusSource.h>
namespace DB
{
namespace Setting
{
extern const SettingsBool database_replicated_enforce_synchronous_settings;
extern const SettingsDistributedDDLOutputMode distributed_ddl_output_mode;
}
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
extern const int LOGICAL_ERROR;
}
ReplicatedDatabaseQueryStatusSource::ReplicatedDatabaseQueryStatusSource(
const String & zk_node_path, const String & zk_replicas_path, ContextPtr context_, const Strings & hosts_to_wait)
: DistributedQueryStatusSource(
zk_node_path, zk_replicas_path, getSampleBlock(), context_, hosts_to_wait, "ReplicatedDatabaseQueryStatusSource")
{
}
ExecutionStatus ReplicatedDatabaseQueryStatusSource::checkStatus([[maybe_unused]] const String & host_id)
{
/// Replicated database retries in case of error, it should not write error status.
#ifdef DEBUG_OR_SANITIZER_BUILD
fs::path status_path = fs::path(node_path) / "finished" / host_id;
return getExecutionStatus(status_path);
#else
return ExecutionStatus{0};
#endif
}
Chunk ReplicatedDatabaseQueryStatusSource::generateChunkWithUnfinishedHosts() const
{
NameSet unfinished_hosts = waiting_hosts;
for (const auto & host_id : finished_hosts)
unfinished_hosts.erase(host_id);
NameSet active_hosts_set = NameSet{current_active_hosts.begin(), current_active_hosts.end()};
/// Query is not finished on the rest hosts, so fill the corresponding rows with NULLs.
MutableColumns columns = output.getHeader().cloneEmptyColumns();
for (const String & host_id : unfinished_hosts)
{
size_t num = 0;
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard);
columns[num++]->insert(replica);
if (active_hosts_set.contains(host_id))
columns[num++]->insert(IN_PROGRESS);
else
columns[num++]->insert(QUEUED);
columns[num++]->insert(unfinished_hosts.size());
columns[num++]->insert(current_active_hosts.size());
}
return Chunk(std::move(columns), unfinished_hosts.size());
}
Strings ReplicatedDatabaseQueryStatusSource::getNodesToWait()
{
String node_to_wait = "finished";
if (context->getSettingsRef()[Setting::database_replicated_enforce_synchronous_settings])
{
node_to_wait = "synced";
}
return {String(fs::path(node_path) / node_to_wait), String(fs::path(node_path) / "active")};
}
Chunk ReplicatedDatabaseQueryStatusSource::handleTimeoutExceeded()
{
timeout_exceeded = true;
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
size_t num_active_hosts = current_active_hosts.size();
constexpr auto msg_format = "ReplicatedDatabase DDL task {} is not finished on {} of {} hosts "
"({} of them are currently executing the task, {} are inactive). "
"They are going to execute the query in background. Was waiting for {} seconds{}";
if (throw_on_timeout || (throw_on_timeout_only_active && !stop_waiting_offline_hosts))
{
if (!first_exception)
first_exception = std::make_unique<Exception>(Exception(
ErrorCodes::TIMEOUT_EXCEEDED,
msg_format,
node_path,
num_unfinished_hosts,
waiting_hosts.size(),
num_active_hosts,
offline_hosts.size(),
watch.elapsedSeconds(),
stop_waiting_offline_hosts ? "" : ", which is longer than distributed_ddl_task_timeout"));
/// For Replicated database print a list of unfinished hosts as well. Will return empty block on next iteration.
return generateChunkWithUnfinishedHosts();
}
LOG_INFO(
log,
msg_format,
node_path,
num_unfinished_hosts,
waiting_hosts.size(),
num_active_hosts,
offline_hosts.size(),
watch.elapsedSeconds(),
stop_waiting_offline_hosts ? "" : "which is longer than distributed_ddl_task_timeout");
return generateChunkWithUnfinishedHosts();
}
Chunk ReplicatedDatabaseQueryStatusSource::stopWaitingOfflineHosts()
{
// Same logic as timeout exceeded
return handleTimeoutExceeded();
}
void ReplicatedDatabaseQueryStatusSource::handleNonZeroStatusCode(const ExecutionStatus & status, const String & host_id)
{
assert(status.code != 0);
if (!first_exception && context->getSettingsRef()[Setting::distributed_ddl_output_mode] != DistributedDDLOutputMode::NEVER_THROW)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
}
}
void ReplicatedDatabaseQueryStatusSource::fillHostStatus(const String & host_id, const ExecutionStatus & status, MutableColumns & columns)
{
size_t num = 0;
if (status.code != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard);
columns[num++]->insert(replica);
columns[num++]->insert(OK);
columns[num++]->insert(waiting_hosts.size() - num_hosts_finished);
columns[num++]->insert(current_active_hosts.size());
}
Block ReplicatedDatabaseQueryStatusSource::getSampleBlock()
{
auto get_status_enum = []()
{
return std::make_shared<DataTypeEnum8>(DataTypeEnum8::Values{
{"OK", static_cast<Int8>(OK)},
{"IN_PROGRESS", static_cast<Int8>(IN_PROGRESS)},
{"QUEUED", static_cast<Int8>(QUEUED)},
});
};
return Block{
{std::make_shared<DataTypeString>(), "shard"},
{std::make_shared<DataTypeString>(), "replica"},
{get_status_enum(), "status"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
};
}
}

View File

@ -0,0 +1,40 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/DistributedQueryStatusSource.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
namespace DB
{
class ReplicatedDatabaseQueryStatusSource final : public DistributedQueryStatusSource
{
public:
ReplicatedDatabaseQueryStatusSource(
const String & zk_node_path, const String & zk_replicas_path, ContextPtr context_, const Strings & hosts_to_wait);
String getName() const override { return "ReplicatedDatabaseQueryStatus"; }
protected:
ExecutionStatus checkStatus(const String & host_id) override;
Chunk generateChunkWithUnfinishedHosts() const override;
Strings getNodesToWait() override;
Chunk handleTimeoutExceeded() override;
Chunk stopWaitingOfflineHosts() override;
void handleNonZeroStatusCode(const ExecutionStatus & status, const String & host_id) override;
void fillHostStatus(const String & host_id, const ExecutionStatus & status, MutableColumns & columns) override;
private:
static Block getSampleBlock();
enum ReplicatedDatabaseQueryStatus
{
/// Query is (successfully) finished
OK = 0,
/// Query is not finished yet, but replica is currently executing it
IN_PROGRESS = 1,
/// Replica is not available or busy with previous queries. It will process query asynchronously
QUEUED = 2,
};
};
}

View File

@ -1,33 +1,32 @@
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/queryToString.h>
#include <filesystem>
#include <Access/Common/AccessRightsElement.h>
#include <Access/ContextAccess.h>
#include <Core/Settings.h>
#include <Common/Macros.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include "Parsers/ASTSystemQuery.h"
#include <Databases/DatabaseReplicated.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLOnClusterQueryStatusSource.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTSystemQuery.h>
#include <Parsers/queryToString.h>
#include <Processors/Sinks/EmptySink.h>
#include <QueryPipeline/Pipe.h>
#include <filesystem>
#include <base/sort.h>
#include <Common/Macros.h>
#include <Common/ZooKeeper/ZooKeeper.h>
namespace fs = std::filesystem;
namespace DB
{
namespace Setting
@ -41,21 +40,11 @@ namespace Setting
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int TIMEOUT_EXCEEDED;
extern const int UNFINISHED;
extern const int QUERY_IS_PROHIBITED;
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int QUERY_IS_PROHIBITED;
extern const int LOGICAL_ERROR;
}
static ZooKeeperRetriesInfo getRetriesInfo()
{
const auto & config_ref = Context::getGlobalContextInstance()->getConfigRef();
return ZooKeeperRetriesInfo(
config_ref.getInt("distributed_ddl_keeper_max_retries", 5),
config_ref.getInt("distributed_ddl_keeper_initial_backoff_ms", 100),
config_ref.getInt("distributed_ddl_keeper_max_backoff_ms", 5000));
}
bool isSupportedAlterTypeForOnClusterDDLQuery(int type)
{
@ -202,72 +191,19 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
entry.initial_query_id = context->getClientInfo().initial_query_id;
String node_path = ddl_worker.enqueueQuery(entry);
return getDistributedDDLStatus(node_path, entry, context, /* hosts_to_wait */ nullptr);
return getDDLOnClusterStatus(node_path, ddl_worker.getReplicasDir(), entry, context);
}
class DDLQueryStatusSource final : public ISource
{
public:
DDLQueryStatusSource(
const String & zk_node_path, const DDLLogEntry & entry, ContextPtr context_, const Strings * hosts_to_wait);
String getName() const override { return "DDLQueryStatus"; }
Chunk generate() override;
Status prepare() override;
private:
static Block getSampleBlock(ContextPtr context_, bool hosts_to_wait);
Strings getNewAndUpdate(const Strings & current_list_of_finished_hosts);
std::pair<String, UInt16> parseHostAndPort(const String & host_id) const;
Chunk generateChunkWithUnfinishedHosts() const;
enum ReplicatedDatabaseQueryStatus
{
/// Query is (successfully) finished
OK = 0,
/// Query is not finished yet, but replica is currently executing it
IN_PROGRESS = 1,
/// Replica is not available or busy with previous queries. It will process query asynchronously
QUEUED = 2,
};
String node_path;
ContextPtr context;
Stopwatch watch;
LoggerPtr log;
NameSet waiting_hosts; /// hosts from task host list
NameSet finished_hosts; /// finished hosts from host list
NameSet ignoring_hosts; /// appeared hosts that are not in hosts list
Strings current_active_hosts; /// Hosts that are currently executing the task
NameSet offline_hosts; /// Hosts that are not currently running
size_t num_hosts_finished = 0;
/// Save the first detected error and throw it at the end of execution
std::unique_ptr<Exception> first_exception;
Int64 timeout_seconds = 120;
bool is_replicated_database = false;
bool throw_on_timeout = true;
bool throw_on_timeout_only_active = false;
bool only_running_hosts = false;
bool timeout_exceeded = false;
bool stop_waiting_offline_hosts = false;
};
BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & entry, ContextPtr context, const Strings * hosts_to_wait)
BlockIO getDDLOnClusterStatus(const String & node_path, const String & replicas_path, const DDLLogEntry & entry, ContextPtr context)
{
BlockIO io;
if (context->getSettingsRef()[Setting::distributed_ddl_task_timeout] == 0)
return io;
Strings hosts_to_wait;
for (const HostID & host : entry.hosts)
hosts_to_wait.push_back(host.toString());
auto source = std::make_shared<DDLQueryStatusSource>(node_path, entry, context, hosts_to_wait);
auto source = std::make_shared<DDLOnClusterQueryStatusSource>(node_path, replicas_path, context, hosts_to_wait);
io.pipeline = QueryPipeline(std::move(source));
if (context->getSettingsRef()[Setting::distributed_ddl_output_mode] == DistributedDDLOutputMode::NONE
@ -277,394 +213,6 @@ BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & en
return io;
}
Block DDLQueryStatusSource::getSampleBlock(ContextPtr context_, bool hosts_to_wait)
{
auto output_mode = context_->getSettingsRef()[Setting::distributed_ddl_output_mode];
auto maybe_make_nullable = [&](const DataTypePtr & type) -> DataTypePtr
{
if (output_mode == DistributedDDLOutputMode::THROW ||
output_mode == DistributedDDLOutputMode::NONE ||
output_mode == DistributedDDLOutputMode::NONE_ONLY_ACTIVE)
return type;
return std::make_shared<DataTypeNullable>(type);
};
auto get_status_enum = []()
{
return std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"OK", static_cast<Int8>(OK)},
{"IN_PROGRESS", static_cast<Int8>(IN_PROGRESS)},
{"QUEUED", static_cast<Int8>(QUEUED)},
});
};
if (hosts_to_wait)
{
return Block{
{std::make_shared<DataTypeString>(), "shard"},
{std::make_shared<DataTypeString>(), "replica"},
{get_status_enum(), "status"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
};
}
return Block{
{std::make_shared<DataTypeString>(), "host"},
{std::make_shared<DataTypeUInt16>(), "port"},
{maybe_make_nullable(std::make_shared<DataTypeInt64>()), "status"},
{maybe_make_nullable(std::make_shared<DataTypeString>()), "error"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
};
}
DDLQueryStatusSource::DDLQueryStatusSource(
const String & zk_node_path, const DDLLogEntry & entry, ContextPtr context_, const Strings * hosts_to_wait)
: ISource(getSampleBlock(context_, static_cast<bool>(hosts_to_wait)))
, node_path(zk_node_path)
, context(context_)
, watch(CLOCK_MONOTONIC_COARSE)
, log(getLogger("DDLQueryStatusSource"))
{
auto output_mode = context->getSettingsRef()[Setting::distributed_ddl_output_mode];
throw_on_timeout = output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE;
throw_on_timeout_only_active = output_mode == DistributedDDLOutputMode::THROW_ONLY_ACTIVE || output_mode == DistributedDDLOutputMode::NONE_ONLY_ACTIVE;
if (hosts_to_wait)
{
waiting_hosts = NameSet(hosts_to_wait->begin(), hosts_to_wait->end());
is_replicated_database = true;
only_running_hosts = output_mode == DistributedDDLOutputMode::THROW_ONLY_ACTIVE ||
output_mode == DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE ||
output_mode == DistributedDDLOutputMode::NONE_ONLY_ACTIVE;
}
else
{
for (const HostID & host : entry.hosts)
waiting_hosts.emplace(host.toString());
}
addTotalRowsApprox(waiting_hosts.size());
timeout_seconds = context->getSettingsRef()[Setting::distributed_ddl_task_timeout];
}
std::pair<String, UInt16> DDLQueryStatusSource::parseHostAndPort(const String & host_id) const
{
String host = host_id;
UInt16 port = 0;
if (!is_replicated_database)
{
auto host_and_port = Cluster::Address::fromString(host_id);
host = host_and_port.first;
port = host_and_port.second;
}
return {host, port};
}
Chunk DDLQueryStatusSource::generateChunkWithUnfinishedHosts() const
{
NameSet unfinished_hosts = waiting_hosts;
for (const auto & host_id : finished_hosts)
unfinished_hosts.erase(host_id);
NameSet active_hosts_set = NameSet{current_active_hosts.begin(), current_active_hosts.end()};
/// Query is not finished on the rest hosts, so fill the corresponding rows with NULLs.
MutableColumns columns = output.getHeader().cloneEmptyColumns();
for (const String & host_id : unfinished_hosts)
{
size_t num = 0;
if (is_replicated_database)
{
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard);
columns[num++]->insert(replica);
if (active_hosts_set.contains(host_id))
columns[num++]->insert(IN_PROGRESS);
else
columns[num++]->insert(QUEUED);
}
else
{
auto [host, port] = parseHostAndPort(host_id);
columns[num++]->insert(host);
columns[num++]->insert(port);
columns[num++]->insert(Field{});
columns[num++]->insert(Field{});
}
columns[num++]->insert(unfinished_hosts.size());
columns[num++]->insert(current_active_hosts.size());
}
return Chunk(std::move(columns), unfinished_hosts.size());
}
static NameSet getOfflineHosts(const String & node_path, const NameSet & hosts_to_wait, const ZooKeeperPtr & zookeeper, LoggerPtr log)
{
fs::path replicas_path;
if (node_path.ends_with('/'))
replicas_path = fs::path(node_path).parent_path().parent_path().parent_path() / "replicas";
else
replicas_path = fs::path(node_path).parent_path().parent_path() / "replicas";
Strings paths;
Strings hosts_array;
for (const auto & host : hosts_to_wait)
{
hosts_array.push_back(host);
paths.push_back(replicas_path / host / "active");
}
NameSet offline;
auto res = zookeeper->tryGet(paths);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error == Coordination::Error::ZNONODE)
offline.insert(hosts_array[i]);
if (offline.size() == hosts_to_wait.size())
{
/// Avoid reporting that all hosts are offline
LOG_WARNING(log, "Did not find active hosts, will wait for all {} hosts. This should not happen often", offline.size());
return {};
}
return offline;
}
Chunk DDLQueryStatusSource::generate()
{
bool all_hosts_finished = num_hosts_finished >= waiting_hosts.size();
/// Seems like num_hosts_finished cannot be strictly greater than waiting_hosts.size()
assert(num_hosts_finished <= waiting_hosts.size());
if (all_hosts_finished || timeout_exceeded)
return {};
String node_to_wait = "finished";
if (is_replicated_database && context->getSettingsRef()[Setting::database_replicated_enforce_synchronous_settings])
node_to_wait = "synced";
size_t try_number = 0;
while (true)
{
if (isCancelled())
return {};
if (stop_waiting_offline_hosts || (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds))
{
timeout_exceeded = true;
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
size_t num_active_hosts = current_active_hosts.size();
constexpr auto msg_format = "Distributed DDL task {} is not finished on {} of {} hosts "
"({} of them are currently executing the task, {} are inactive). "
"They are going to execute the query in background. Was waiting for {} seconds{}";
if (throw_on_timeout || (throw_on_timeout_only_active && !stop_waiting_offline_hosts))
{
if (!first_exception)
first_exception = std::make_unique<Exception>(Exception(ErrorCodes::TIMEOUT_EXCEEDED,
msg_format, node_path, num_unfinished_hosts, waiting_hosts.size(), num_active_hosts, offline_hosts.size(),
watch.elapsedSeconds(), stop_waiting_offline_hosts ? "" : ", which is longer than distributed_ddl_task_timeout"));
/// For Replicated database print a list of unfinished hosts as well. Will return empty block on next iteration.
if (is_replicated_database)
return generateChunkWithUnfinishedHosts();
return {};
}
LOG_INFO(log, msg_format, node_path, num_unfinished_hosts, waiting_hosts.size(), num_active_hosts, offline_hosts.size(),
watch.elapsedSeconds(), stop_waiting_offline_hosts ? "" : "which is longer than distributed_ddl_task_timeout");
return generateChunkWithUnfinishedHosts();
}
sleepForMilliseconds(std::min<size_t>(1000, 50 * try_number));
bool node_exists = false;
Strings tmp_hosts;
Strings tmp_active_hosts;
{
auto retries_ctl = ZooKeeperRetriesControl(
"executeDDLQueryOnCluster", getLogger("DDLQueryStatusSource"), getRetriesInfo(), context->getProcessListElement());
retries_ctl.retryLoop([&]()
{
auto zookeeper = context->getZooKeeper();
Strings paths = {String(fs::path(node_path) / node_to_wait), String(fs::path(node_path) / "active")};
auto res = zookeeper->tryGetChildren(paths);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error != Coordination::Error::ZOK && res[i].error != Coordination::Error::ZNONODE)
throw Coordination::Exception::fromPath(res[i].error, paths[i]);
if (res[0].error == Coordination::Error::ZNONODE)
node_exists = zookeeper->exists(node_path);
else
node_exists = true;
tmp_hosts = res[0].names;
tmp_active_hosts = res[1].names;
if (only_running_hosts)
offline_hosts = getOfflineHosts(node_path, waiting_hosts, zookeeper, log);
});
}
if (!node_exists)
{
/// Paradoxically, this exception will be throw even in case of "never_throw" mode.
if (!first_exception)
first_exception = std::make_unique<Exception>(Exception(ErrorCodes::UNFINISHED,
"Cannot provide query execution status. The query's node {} has been deleted by the cleaner"
" since it was finished (or its lifetime is expired)",
node_path));
return {};
}
Strings new_hosts = getNewAndUpdate(tmp_hosts);
++try_number;
if (only_running_hosts)
{
size_t num_finished_or_offline = 0;
for (const auto & host : waiting_hosts)
num_finished_or_offline += finished_hosts.contains(host) || offline_hosts.contains(host);
if (num_finished_or_offline == waiting_hosts.size())
stop_waiting_offline_hosts = true;
}
if (new_hosts.empty())
continue;
current_active_hosts = std::move(tmp_active_hosts);
MutableColumns columns = output.getHeader().cloneEmptyColumns();
for (const String & host_id : new_hosts)
{
ExecutionStatus status(-1, "Cannot obtain error message");
/// Replicated database retries in case of error, it should not write error status.
#ifdef DEBUG_OR_SANITIZER_BUILD
bool need_check_status = true;
#else
bool need_check_status = !is_replicated_database;
#endif
if (need_check_status)
{
String status_data;
bool finished_exists = false;
auto retries_ctl = ZooKeeperRetriesControl(
"executeDDLQueryOnCluster",
getLogger("DDLQueryStatusSource"),
getRetriesInfo(),
context->getProcessListElement());
retries_ctl.retryLoop([&]()
{
finished_exists = context->getZooKeeper()->tryGet(fs::path(node_path) / "finished" / host_id, status_data);
});
if (finished_exists)
status.tryDeserializeText(status_data);
}
else
{
status = ExecutionStatus{0};
}
if (status.code != 0 && !first_exception
&& context->getSettingsRef()[Setting::distributed_ddl_output_mode] != DistributedDDLOutputMode::NEVER_THROW)
{
if (is_replicated_database)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
auto [host, port] = parseHostAndPort(host_id);
first_exception = std::make_unique<Exception>(Exception(status.code,
"There was an error on [{}:{}]: {}", host, port, status.message));
}
++num_hosts_finished;
size_t num = 0;
if (is_replicated_database)
{
if (status.code != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id);
columns[num++]->insert(shard);
columns[num++]->insert(replica);
columns[num++]->insert(OK);
}
else
{
auto [host, port] = parseHostAndPort(host_id);
columns[num++]->insert(host);
columns[num++]->insert(port);
columns[num++]->insert(status.code);
columns[num++]->insert(status.message);
}
columns[num++]->insert(waiting_hosts.size() - num_hosts_finished);
columns[num++]->insert(current_active_hosts.size());
}
return Chunk(std::move(columns), new_hosts.size());
}
}
IProcessor::Status DDLQueryStatusSource::prepare()
{
/// This method is overloaded to throw exception after all data is read.
/// Exception is pushed into pipe (instead of simply being thrown) to ensure the order of data processing and exception.
if (finished)
{
if (first_exception)
{
if (!output.canPush())
return Status::PortFull;
output.pushException(std::make_exception_ptr(*first_exception));
}
output.finish();
return Status::Finished;
}
return ISource::prepare();
}
Strings DDLQueryStatusSource::getNewAndUpdate(const Strings & current_list_of_finished_hosts)
{
Strings diff;
for (const String & host : current_list_of_finished_hosts)
{
if (!waiting_hosts.contains(host))
{
if (!ignoring_hosts.contains(host))
{
ignoring_hosts.emplace(host);
LOG_INFO(log, "Unexpected host {} appeared in task {}", host, node_path);
}
continue;
}
if (!finished_hosts.contains(host))
{
diff.emplace_back(host);
finished_hosts.emplace(host);
}
}
return diff;
}
bool maybeRemoveOnCluster(const ASTPtr & query_ptr, ContextPtr context)
{
const auto * query = dynamic_cast<const ASTQueryWithTableAndOutput *>(query_ptr.get());

View File

@ -43,7 +43,7 @@ struct DDLQueryOnClusterParams
/// Returns DDLQueryStatusSource, which reads results of query execution on each host in the cluster.
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, ContextPtr context, const DDLQueryOnClusterParams & params = {});
BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & entry, ContextPtr context, const Strings * hosts_to_wait);
BlockIO getDDLOnClusterStatus(const String & node_path, const String & replicas_path, const DDLLogEntry & entry, ContextPtr context);
bool maybeRemoveOnCluster(const ASTPtr & query_ptr, ContextPtr context);

View File

@ -380,7 +380,7 @@ static void convertOrdinaryDatabaseToAtomic(LoggerPtr log, ContextMutablePtr con
/// Converts database with Ordinary engine to Atomic. Does nothing if database is not Ordinary.
/// Can be called only during server startup when there are no queries from users.
static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, const String & database_name, LoadTaskPtrs * startup_tasks = nullptr)
static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, const String & database_name, const LoadTaskPtrs & load_system_metadata_tasks = {})
{
LoggerPtr log = getLogger("loadMetadata");
@ -407,12 +407,8 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons
try
{
if (startup_tasks) // NOTE: only for system database
{
/// It's not quite correct to run DDL queries while database is not started up.
waitLoad(TablesLoaderForegroundPoolId, *startup_tasks);
startup_tasks->clear();
}
/// It's not quite correct to run DDL queries while database is not started up.
waitLoad(TablesLoaderForegroundPoolId, load_system_metadata_tasks);
auto local_context = Context::createCopy(context);
@ -462,13 +458,7 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons
};
TablesLoader loader{context, databases, LoadingStrictnessLevel::FORCE_RESTORE};
waitLoad(TablesLoaderForegroundPoolId, loader.loadTablesAsync());
/// Startup tables if they were started before conversion and detach/attach
if (startup_tasks) // NOTE: only for system database
*startup_tasks = loader.startupTablesAsync(); // We have loaded old database(s), replace tasks to startup new database
else
// An old database was already loaded, so we should load new one as well
waitLoad(TablesLoaderForegroundPoolId, loader.startupTablesAsync());
waitLoad(TablesLoaderForegroundPoolId, loader.startupTablesAsync());
}
catch (Exception & e)
{
@ -480,13 +470,13 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons
}
}
void maybeConvertSystemDatabase(ContextMutablePtr context, LoadTaskPtrs & system_startup_tasks)
void maybeConvertSystemDatabase(ContextMutablePtr context, LoadTaskPtrs & load_system_metadata_tasks)
{
/// TODO remove this check, convert system database unconditionally
if (context->getSettingsRef()[Setting::allow_deprecated_database_ordinary])
return;
maybeConvertOrdinaryDatabaseToAtomic(context, DatabaseCatalog::SYSTEM_DATABASE, &system_startup_tasks);
maybeConvertOrdinaryDatabaseToAtomic(context, DatabaseCatalog::SYSTEM_DATABASE, load_system_metadata_tasks);
}
void convertDatabasesEnginesIfNeed(const LoadTaskPtrs & load_metadata, ContextMutablePtr context)
@ -509,7 +499,7 @@ void convertDatabasesEnginesIfNeed(const LoadTaskPtrs & load_metadata, ContextMu
fs::remove(convert_flag_path);
}
LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context)
LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context, bool async_load_system_database)
{
loadSystemDatabaseImpl(context, DatabaseCatalog::SYSTEM_DATABASE, "Atomic");
loadSystemDatabaseImpl(context, DatabaseCatalog::INFORMATION_SCHEMA, "Memory");
@ -522,11 +512,28 @@ LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context)
{DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE, DatabaseCatalog::instance().getDatabase(DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE)},
};
TablesLoader loader{context, databases, LoadingStrictnessLevel::FORCE_RESTORE};
auto tasks = loader.loadTablesAsync();
waitLoad(TablesLoaderForegroundPoolId, tasks);
/// Will startup tables in system database after all databases are loaded.
return loader.startupTablesAsync();
auto load_tasks = loader.loadTablesAsync();
auto startup_tasks = loader.startupTablesAsync();
if (async_load_system_database)
{
scheduleLoad(load_tasks);
scheduleLoad(startup_tasks);
// Do NOT wait, just return tasks for continuation or later wait.
return joinTasks(load_tasks, startup_tasks);
}
else
{
waitLoad(TablesLoaderForegroundPoolId, load_tasks);
/// This has to be done before the initialization of system logs `initializeSystemLogs()`,
/// otherwise there is a race condition between the system database initialization
/// and creation of new tables in the database.
waitLoad(TablesLoaderForegroundPoolId, startup_tasks);
return {};
}
}
}

View File

@ -8,10 +8,10 @@ namespace DB
/// Load tables from system database. Only real tables like query_log, part_log.
/// You should first load system database, then attach system tables that you need into it, then load other databases.
/// It returns tasks to startup system tables.
/// It returns tasks that are still in progress if `async_load_system_database = true` otherwise it wait for all jobs to be done.
/// Background operations in system tables may slowdown loading of the rest tables,
/// so we startup system tables after all databases are loaded.
[[nodiscard]] LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context);
[[nodiscard]] LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context, bool async_load_system_database = false);
/// Load tables from databases and add them to context. Databases 'system' and 'information_schema' are ignored.
/// Use separate function to load system tables.
@ -20,7 +20,7 @@ namespace DB
[[nodiscard]] LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_database_name = {}, bool async_load_databases = false);
/// Converts `system` database from Ordinary to Atomic (if needed)
void maybeConvertSystemDatabase(ContextMutablePtr context, LoadTaskPtrs & system_startup_tasks);
void maybeConvertSystemDatabase(ContextMutablePtr context, LoadTaskPtrs & load_system_metadata_tasks);
/// Converts all databases (except system) from Ordinary to Atomic if convert_ordinary_to_atomic flag exists
/// Waits for `load_metadata` task before conversions

View File

@ -30,11 +30,13 @@ ProtocolServerAdapter::ProtocolServerAdapter(
const std::string & listen_host_,
const char * port_name_,
const std::string & description_,
std::unique_ptr<TCPServer> tcp_server_)
std::unique_ptr<TCPServer> tcp_server_,
bool supports_runtime_reconfiguration_)
: listen_host(listen_host_)
, port_name(port_name_)
, description(description_)
, impl(std::make_unique<TCPServerAdapterImpl>(std::move(tcp_server_)))
, supports_runtime_reconfiguration(supports_runtime_reconfiguration_)
{
}
@ -66,11 +68,13 @@ ProtocolServerAdapter::ProtocolServerAdapter(
const std::string & listen_host_,
const char * port_name_,
const std::string & description_,
std::unique_ptr<GRPCServer> grpc_server_)
std::unique_ptr<GRPCServer> grpc_server_,
bool supports_runtime_reconfiguration_)
: listen_host(listen_host_)
, port_name(port_name_)
, description(description_)
, impl(std::make_unique<GRPCServerAdapterImpl>(std::move(grpc_server_)))
, supports_runtime_reconfiguration(supports_runtime_reconfiguration_)
{
}
#endif

View File

@ -21,10 +21,20 @@ class ProtocolServerAdapter
public:
ProtocolServerAdapter(ProtocolServerAdapter && src) = default;
ProtocolServerAdapter & operator =(ProtocolServerAdapter && src) = default;
ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr<TCPServer> tcp_server_);
ProtocolServerAdapter(
const std::string & listen_host_,
const char * port_name_,
const std::string & description_,
std::unique_ptr<TCPServer> tcp_server_,
bool supports_runtime_reconfiguration_ = true);
#if USE_GRPC
ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr<GRPCServer> grpc_server_);
ProtocolServerAdapter(
const std::string & listen_host_,
const char * port_name_,
const std::string & description_,
std::unique_ptr<GRPCServer> grpc_server_,
bool supports_runtime_reconfiguration_ = true);
#endif
/// Starts the server. A new thread will be created that waits for and accepts incoming connections.
@ -46,6 +56,8 @@ public:
/// Returns the port this server is listening to.
UInt16 portNumber() const { return impl->portNumber(); }
bool supportsRuntimeReconfiguration() const { return supports_runtime_reconfiguration; }
const std::string & getListenHost() const { return listen_host; }
const std::string & getPortName() const { return port_name; }
@ -72,6 +84,7 @@ private:
std::string port_name;
std::string description;
std::unique_ptr<Impl> impl;
bool supports_runtime_reconfiguration = true;
};
}

View File

@ -294,8 +294,22 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context)
mut_context->setSetting("aggregate_functions_null_for_empty", Field(0));
mut_context->setSetting("transform_null_in", Field(0));
ASTPtr query_ast_copy = nullptr;
/// Respect the _row_exists column.
if (block.findByName("_row_exists"))
{
query_ast_copy = query_ast->clone();
auto * select_row_exists = query_ast_copy->as<ASTSelectQuery>();
if (!select_row_exists)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot get ASTSelectQuery when adding _row_exists = 1. It's a bug");
select_row_exists->setExpression(
ASTSelectQuery::Expression::WHERE,
makeASTFunction("equals", std::make_shared<ASTIdentifier>("_row_exists"), std::make_shared<ASTLiteral>(1)));
}
auto builder = InterpreterSelectQuery(
query_ast,
query_ast_copy ? query_ast_copy : query_ast,
mut_context,
Pipe(std::make_shared<SourceFromSingleChunk>(block)),
SelectQueryOptions{

View File

@ -2352,7 +2352,7 @@ class ClickHouseCluster:
time.sleep(0.5)
raise Exception("Cannot wait PostgreSQL Java Client container")
def wait_rabbitmq_to_start(self, timeout=60):
def wait_rabbitmq_to_start(self, timeout=120):
self.print_all_docker_pieces()
self.rabbitmq_ip = self.get_instance_ip(self.rabbitmq_host)
@ -3944,11 +3944,11 @@ class ClickHouseInstance:
)
logging.info(f"PS RESULT:\n{ps_clickhouse}")
pid = self.get_process_pid("clickhouse")
if pid is not None:
self.exec_in_container(
["bash", "-c", f"gdb -batch -ex 'thread apply all bt full' -p {pid}"],
user="root",
)
# if pid is not None:
# self.exec_in_container(
# ["bash", "-c", f"gdb -batch -ex 'thread apply all bt full' -p {pid}"],
# user="root",
# )
if last_err is not None:
raise last_err

View File

@ -0,0 +1,3 @@
<clickhouse>
<async_load_system_database>true</async_load_system_database>
</clickhouse>

View File

@ -1,4 +1,5 @@
import random
import time
import pytest
@ -13,25 +14,35 @@ DICTIONARY_FILES = [
]
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"instance",
node1 = cluster.add_instance(
"node1",
main_configs=["configs/config.xml"],
dictionaries=DICTIONARY_FILES,
stay_alive=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=[
"configs/async_load_system_database.xml",
],
dictionaries=DICTIONARY_FILES,
stay_alive=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
instance.query(
"""
CREATE DATABASE IF NOT EXISTS dict ENGINE=Dictionary;
CREATE DATABASE IF NOT EXISTS test;
"""
)
for node in [node1, node2]:
node.query(
"""
CREATE DATABASE IF NOT EXISTS dict ENGINE=Dictionary;
CREATE DATABASE IF NOT EXISTS test;
"""
)
yield cluster
@ -40,13 +51,13 @@ def started_cluster():
def get_status(dictionary_name):
return instance.query(
return node1.query(
"SELECT status FROM system.dictionaries WHERE name='" + dictionary_name + "'"
).rstrip("\n")
def test_dict_get_data(started_cluster):
query = instance.query
query = node1.query
query(
"CREATE TABLE test.elements (id UInt64, a String, b Int32, c Float64) ENGINE=Log;"
@ -80,7 +91,7 @@ def test_dict_get_data(started_cluster):
# Wait for dictionaries to be reloaded.
assert_eq_with_retry(
instance,
node1,
"SELECT dictHas('dep_x', toUInt64(3))",
"1",
sleep_time=2,
@ -94,7 +105,7 @@ def test_dict_get_data(started_cluster):
# so dep_x and dep_z are not going to be updated after the following INSERT.
query("INSERT INTO test.elements VALUES (4, 'ether', 404, 0.001)")
assert_eq_with_retry(
instance,
node1,
"SELECT dictHas('dep_y', toUInt64(4))",
"1",
sleep_time=2,
@ -104,11 +115,11 @@ def test_dict_get_data(started_cluster):
assert query("SELECT dictGetString('dep_y', 'a', toUInt64(4))") == "ether\n"
assert query("SELECT dictGetString('dep_z', 'a', toUInt64(4))") == "ZZ\n"
query("DROP TABLE IF EXISTS test.elements;")
instance.restart_clickhouse()
node1.restart_clickhouse()
def dependent_tables_assert():
res = instance.query("select database || '.' || name from system.tables")
res = node1.query("select database || '.' || name from system.tables")
assert "system.join" in res
assert "default.src" in res
assert "dict.dep_y" in res
@ -119,7 +130,7 @@ def dependent_tables_assert():
def test_dependent_tables(started_cluster):
query = instance.query
query = node1.query
query("create database lazy engine=Lazy(10)")
query("create database a")
query("create table lazy.src (n int, m int) engine=Log")
@ -157,7 +168,7 @@ def test_dependent_tables(started_cluster):
)
dependent_tables_assert()
instance.restart_clickhouse()
node1.restart_clickhouse()
dependent_tables_assert()
query("drop table a.t")
query("drop table lazy.log")
@ -170,14 +181,14 @@ def test_dependent_tables(started_cluster):
def test_multiple_tables(started_cluster):
query = instance.query
query = node1.query
tables_count = 20
for i in range(tables_count):
query(
f"create table test.table_{i} (n UInt64, s String) engine=MergeTree order by n as select number, randomString(100) from numbers(100)"
)
instance.restart_clickhouse()
node1.restart_clickhouse()
order = [i for i in range(tables_count)]
random.shuffle(order)
@ -185,3 +196,49 @@ def test_multiple_tables(started_cluster):
assert query(f"select count() from test.table_{i}") == "100\n"
for i in range(tables_count):
query(f"drop table test.table_{i} sync")
def test_async_load_system_database(started_cluster):
id = 1
for i in range(4):
# Access some system tables that might be still loading
if id > 1:
for j in range(3):
node2.query(
f"select count() from system.text_log_{random.randint(1, id - 1)}_test"
)
node2.query(
f"select count() from system.query_log_{random.randint(1, id - 1)}_test"
)
assert (
int(
node2.query(
f"select count() from system.asynchronous_loader where job ilike '%_log_%_test' and execution_pool = 'BackgroundLoad'"
)
)
> 0
)
# Generate more system tables
for j in range(10):
while True:
node2.query("system flush logs")
count = int(
node2.query(
"select count() from system.tables where database = 'system' and name in ['query_log', 'text_log']"
)
)
if count == 2:
break
time.sleep(0.1)
node2.query(f"rename table system.text_log to system.text_log_{id}_test")
node2.query(f"rename table system.query_log to system.query_log_{id}_test")
id += 1
# Trigger async load of system database
node2.restart_clickhouse()
for i in range(id - 1):
node2.query(f"drop table if exists system.text_log_{i + 1}_test")
node2.query(f"drop table if exists system.query_log_{i + 1}_test")

View File

@ -291,6 +291,8 @@
<distributed_ddl>
<!-- Path in ZooKeeper to queue with DDL queries -->
<path>/clickhouse/task_queue/ddl</path>
<!-- Path in ZooKeeper to store running DDL hosts -->
<replicas_path>/clickhouse/task_queue/replicas</replicas_path>
<!-- Settings from this profile will be used to execute DDL queries -->
<!-- <profile>default</profile> -->

View File

@ -849,6 +849,8 @@
<distributed_ddl>
<!-- Path in ZooKeeper to queue with DDL queries -->
<path>/clickhouse/task_queue/ddl</path>
<!-- Path in ZooKeeper to store running DDL hosts -->
<replicas_path>/clickhouse/task_queue/replicas</replicas_path>
<!-- Settings from this profile will be used to execute DDL queries -->
<!-- <profile>default</profile> -->

View File

@ -0,0 +1,30 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
<replica>
<host>node4</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
<allow_zookeeper_write>1</allow_zookeeper_write>
</clickhouse>

View File

@ -0,0 +1,88 @@
import time
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/remote_servers.xml"],
with_zookeeper=True,
stay_alive=True,
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node3 = cluster.add_instance(
"node3", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node4 = cluster.add_instance(
"node4",
main_configs=["configs/remote_servers.xml"],
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_stop_waiting_for_offline_hosts(started_cluster):
timeout = 10
settings = {"distributed_ddl_task_timeout": timeout}
node1.query(
"DROP TABLE IF EXISTS test_table ON CLUSTER test_cluster SYNC",
settings=settings,
)
node1.query(
"CREATE TABLE test_table ON CLUSTER test_cluster (x Int) Engine=Memory",
settings=settings,
)
try:
node4.stop_clickhouse()
start = time.time()
assert "Code: 159. DB::Exception" in node1.query_and_get_error(
"DROP TABLE IF EXISTS test_table ON CLUSTER test_cluster SYNC",
settings=settings,
)
assert time.time() - start >= timeout
start = time.time()
assert "Code: 159. DB::Exception" in node1.query_and_get_error(
"CREATE TABLE test_table ON CLUSTER test_cluster (x Int) Engine=Memory",
settings=settings,
)
assert time.time() - start >= timeout
# set `distributed_ddl_output_mode` = `throw_only_active``
settings = {
"distributed_ddl_task_timeout": timeout,
"distributed_ddl_output_mode": "throw_only_active",
}
start = time.time()
node1.query(
"DROP TABLE IF EXISTS test_table ON CLUSTER test_cluster SYNC",
settings=settings,
)
start = time.time()
node1.query(
"CREATE TABLE test_table ON CLUSTER test_cluster (x Int) Engine=Memory",
settings=settings,
)
finally:
node4.start_clickhouse()

View File

@ -0,0 +1,30 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
<replica>
<host>node4</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
<allow_zookeeper_write>1</allow_zookeeper_write>
</clickhouse>

View File

@ -0,0 +1,77 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/remote_servers.xml"],
with_zookeeper=True,
stay_alive=True,
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node3 = cluster.add_instance(
"node3", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node4 = cluster.add_instance(
"node4",
main_configs=["configs/remote_servers.xml"],
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_ddl_worker_replicas(started_cluster):
for replica in ["node1:9000", "node2:9000", "node3:9000", "node4:9000"]:
# wait until the replicas path is created
node1.query_with_retry(
sql=f"SELECT count() FROM system.zookeeper WHERE path='/clickhouse/task_queue/replicas/{replica}'",
check_callback=lambda result: result == 1,
)
result = node1.query(
f"SELECT name, value, ephemeralOwner FROM system.zookeeper WHERE path='/clickhouse/task_queue/replicas/{replica}'"
).strip()
print(f"result: {replica} {result}")
lines = list(result.split("\n"))
assert len(lines) == 1
parts = list(lines[0].split("\t"))
assert len(parts) == 3
assert parts[0] == "active"
assert len(parts[1]) != 0
assert len(parts[2]) != 0
try:
node4.stop_clickhouse()
# wait for node4 active path is removed
node1.query_with_retry(
sql=f"SELECT count() FROM system.zookeeper WHERE path='/clickhouse/task_queue/replicas/node4:9000'",
check_callback=lambda result: result == 0,
)
result = node1.query_with_retry(
f"SELECT name, value, ephemeralOwner FROM system.zookeeper WHERE path='/clickhouse/task_queue/replicas/node4:9000'"
).strip()
print(f"result: {replica} {result}")
lines = list(result.split("\n"))
assert len(lines) == 1
assert len(lines[0]) == 0
finally:
node4.start_clickhouse()

View File

@ -256,6 +256,8 @@
<distributed_ddl>
<!-- Path in ZooKeeper to queue with DDL queries -->
<path>/clickhouse/task_queue/ddl</path>
<!-- Path in ZooKeeper to store running DDL hosts -->
<replicas_path>/clickhouse/task_queue/replicas</replicas_path>
<!-- Settings from this profile will be used to execute DDL queries -->
<!-- <profile>default</profile> -->

View File

@ -16,7 +16,8 @@ select distinct
from (
select string_value
from test_table
);
)
order by all;
select distinct
'constant_1' as constant_value, *

View File

@ -0,0 +1,20 @@
DROP TABLE IF EXISTS users;
CREATE TABLE users (
uid Int16,
name String,
age Int16,
projection p1 (select age, count() group by age),
) ENGINE = MergeTree order by uid
SETTINGS lightweight_mutation_projection_mode = 'rebuild';
INSERT INTO users VALUES (1231, 'John', 33), (1232, 'Mary', 34);
DELETE FROM users WHERE uid = 1231;
SELECT
age,
count()
FROM users
GROUP BY age
SETTINGS optimize_use_projections = 1, force_optimize_projection = 1;