Merge branch 'ClickHouse:master' into master

This commit is contained in:
Roman Antonov 2024-10-29 02:42:46 +03:00 committed by GitHub
commit 1c10c0da49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 135 additions and 51 deletions

View File

@ -1975,6 +1975,22 @@ The default is `false`.
<async_load_databases>true</async_load_databases>
```
## async_load_system_database {#async_load_system_database}
Asynchronous loading of system tables. Helpful if there is a high amount of log tables and parts in the `system` database. Independent of the `async_load_databases` setting.
If set to `true`, all system databases with `Ordinary`, `Atomic`, and `Replicated` engines will be loaded asynchronously after the ClickHouse server starts. See `system.asynchronous_loader` table, `tables_loader_background_pool_size` and `tables_loader_foreground_pool_size` server settings. Any query that tries to access a system table, that is not yet loaded, will wait for exactly this table to be started up. The table that is waited for by at least one query will be loaded with higher priority. Also consider setting the `max_waiting_queries` setting to limit the total number of waiting queries.
If `false`, system database loads before server start.
The default is `false`.
**Example**
``` xml
<async_load_system_database>true</async_load_system_database>
```
## tables_loader_foreground_pool_size {#tables_loader_foreground_pool_size}
Sets the number of threads performing load jobs in foreground pool. The foreground pool is used for loading table synchronously before server start listening on a port and for loading tables that are waited for. Foreground pool has higher priority than background pool. It means that no job starts in background pool while there are jobs running in foreground pool.

View File

@ -821,11 +821,11 @@ void LocalServer::processConfig()
status.emplace(fs::path(path) / "status", StatusFile::write_full_info);
LOG_DEBUG(log, "Loading metadata from {}", path);
auto startup_system_tasks = loadMetadataSystem(global_context);
auto load_system_metadata_tasks = loadMetadataSystem(global_context);
attachSystemTablesServer(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE), false);
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
waitLoad(TablesLoaderForegroundPoolId, startup_system_tasks);
waitLoad(TablesLoaderForegroundPoolId, load_system_metadata_tasks);
if (!getClientConfiguration().has("only-system-tables"))
{

View File

@ -171,6 +171,7 @@ namespace ServerSetting
extern const ServerSettingsBool async_insert_queue_flush_on_shutdown;
extern const ServerSettingsUInt64 async_insert_threads;
extern const ServerSettingsBool async_load_databases;
extern const ServerSettingsBool async_load_system_database;
extern const ServerSettingsUInt64 background_buffer_flush_schedule_pool_size;
extern const ServerSettingsUInt64 background_common_pool_size;
extern const ServerSettingsUInt64 background_distributed_schedule_pool_size;
@ -2199,6 +2200,7 @@ try
LOG_INFO(log, "Loading metadata from {}", path_str);
LoadTaskPtrs load_system_metadata_tasks;
LoadTaskPtrs load_metadata_tasks;
// Make sure that if exception is thrown during startup async, new async loading jobs are not going to be called.
@ -2222,12 +2224,8 @@ try
auto & database_catalog = DatabaseCatalog::instance();
/// We load temporary database first, because projections need it.
database_catalog.initializeAndLoadTemporaryDatabase();
auto system_startup_tasks = loadMetadataSystem(global_context);
maybeConvertSystemDatabase(global_context, system_startup_tasks);
/// This has to be done before the initialization of system logs,
/// otherwise there is a race condition between the system database initialization
/// and creation of new tables in the database.
waitLoad(TablesLoaderForegroundPoolId, system_startup_tasks);
load_system_metadata_tasks = loadMetadataSystem(global_context, server_settings[ServerSetting::async_load_system_database]);
maybeConvertSystemDatabase(global_context, load_system_metadata_tasks);
/// Startup scripts can depend on the system log tables.
if (config().has("startup_scripts") && !server_settings[ServerSetting::prepare_system_log_tables_on_startup].changed)
@ -2410,10 +2408,12 @@ try
"DDLWorker",
&CurrentMetrics::MaxDDLEntryID,
&CurrentMetrics::MaxPushedDDLEntryID),
load_metadata_tasks);
joinTasks(load_system_metadata_tasks, load_metadata_tasks));
}
/// Do not keep tasks in server, they should be kept inside databases. Used here to make dependent tasks only.
load_system_metadata_tasks.clear();
load_system_metadata_tasks.shrink_to_fit();
load_metadata_tasks.clear();
load_metadata_tasks.shrink_to_fit();

View File

@ -147,6 +147,7 @@ namespace DB
DECLARE(UInt64, tables_loader_foreground_pool_size, 0, "The maximum number of threads that will be used for foreground (that is being waited for by a query) loading of tables. Also used for synchronous loading of tables before the server start. Zero means use all CPUs.", 0) \
DECLARE(UInt64, tables_loader_background_pool_size, 0, "The maximum number of threads that will be used for background async loading of tables. Zero means use all CPUs.", 0) \
DECLARE(Bool, async_load_databases, false, "Enable asynchronous loading of databases and tables to speedup server startup. Queries to not yet loaded entity will be blocked until load is finished.", 0) \
DECLARE(Bool, async_load_system_database, false, "Enable asynchronous loading of system tables that are not required on server startup. Queries to not yet loaded tables will be blocked until load is finished.", 0) \
DECLARE(Bool, display_secrets_in_show_and_select, false, "Allow showing secrets in SHOW and SELECT queries via a format setting and a grant", 0) \
DECLARE(Seconds, keep_alive_timeout, DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT, "The number of seconds that ClickHouse waits for incoming requests before closing the connection.", 0) \
DECLARE(UInt64, max_keep_alive_requests, 10000, "The maximum number of requests handled via a single http keepalive connection before the server closes this connection.", 0) \

View File

@ -380,7 +380,7 @@ static void convertOrdinaryDatabaseToAtomic(LoggerPtr log, ContextMutablePtr con
/// Converts database with Ordinary engine to Atomic. Does nothing if database is not Ordinary.
/// Can be called only during server startup when there are no queries from users.
static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, const String & database_name, LoadTaskPtrs * startup_tasks = nullptr)
static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, const String & database_name, const LoadTaskPtrs & load_system_metadata_tasks = {})
{
LoggerPtr log = getLogger("loadMetadata");
@ -407,12 +407,8 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons
try
{
if (startup_tasks) // NOTE: only for system database
{
/// It's not quite correct to run DDL queries while database is not started up.
waitLoad(TablesLoaderForegroundPoolId, *startup_tasks);
startup_tasks->clear();
}
/// It's not quite correct to run DDL queries while database is not started up.
waitLoad(TablesLoaderForegroundPoolId, load_system_metadata_tasks);
auto local_context = Context::createCopy(context);
@ -462,13 +458,7 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons
};
TablesLoader loader{context, databases, LoadingStrictnessLevel::FORCE_RESTORE};
waitLoad(TablesLoaderForegroundPoolId, loader.loadTablesAsync());
/// Startup tables if they were started before conversion and detach/attach
if (startup_tasks) // NOTE: only for system database
*startup_tasks = loader.startupTablesAsync(); // We have loaded old database(s), replace tasks to startup new database
else
// An old database was already loaded, so we should load new one as well
waitLoad(TablesLoaderForegroundPoolId, loader.startupTablesAsync());
waitLoad(TablesLoaderForegroundPoolId, loader.startupTablesAsync());
}
catch (Exception & e)
{
@ -480,13 +470,13 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons
}
}
void maybeConvertSystemDatabase(ContextMutablePtr context, LoadTaskPtrs & system_startup_tasks)
void maybeConvertSystemDatabase(ContextMutablePtr context, LoadTaskPtrs & load_system_metadata_tasks)
{
/// TODO remove this check, convert system database unconditionally
if (context->getSettingsRef()[Setting::allow_deprecated_database_ordinary])
return;
maybeConvertOrdinaryDatabaseToAtomic(context, DatabaseCatalog::SYSTEM_DATABASE, &system_startup_tasks);
maybeConvertOrdinaryDatabaseToAtomic(context, DatabaseCatalog::SYSTEM_DATABASE, load_system_metadata_tasks);
}
void convertDatabasesEnginesIfNeed(const LoadTaskPtrs & load_metadata, ContextMutablePtr context)
@ -509,7 +499,7 @@ void convertDatabasesEnginesIfNeed(const LoadTaskPtrs & load_metadata, ContextMu
fs::remove(convert_flag_path);
}
LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context)
LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context, bool async_load_system_database)
{
loadSystemDatabaseImpl(context, DatabaseCatalog::SYSTEM_DATABASE, "Atomic");
loadSystemDatabaseImpl(context, DatabaseCatalog::INFORMATION_SCHEMA, "Memory");
@ -522,11 +512,28 @@ LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context)
{DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE, DatabaseCatalog::instance().getDatabase(DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE)},
};
TablesLoader loader{context, databases, LoadingStrictnessLevel::FORCE_RESTORE};
auto tasks = loader.loadTablesAsync();
waitLoad(TablesLoaderForegroundPoolId, tasks);
/// Will startup tables in system database after all databases are loaded.
return loader.startupTablesAsync();
auto load_tasks = loader.loadTablesAsync();
auto startup_tasks = loader.startupTablesAsync();
if (async_load_system_database)
{
scheduleLoad(load_tasks);
scheduleLoad(startup_tasks);
// Do NOT wait, just return tasks for continuation or later wait.
return joinTasks(load_tasks, startup_tasks);
}
else
{
waitLoad(TablesLoaderForegroundPoolId, load_tasks);
/// This has to be done before the initialization of system logs `initializeSystemLogs()`,
/// otherwise there is a race condition between the system database initialization
/// and creation of new tables in the database.
waitLoad(TablesLoaderForegroundPoolId, startup_tasks);
return {};
}
}
}

View File

@ -8,10 +8,10 @@ namespace DB
/// Load tables from system database. Only real tables like query_log, part_log.
/// You should first load system database, then attach system tables that you need into it, then load other databases.
/// It returns tasks to startup system tables.
/// It returns tasks that are still in progress if `async_load_system_database = true` otherwise it wait for all jobs to be done.
/// Background operations in system tables may slowdown loading of the rest tables,
/// so we startup system tables after all databases are loaded.
[[nodiscard]] LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context);
[[nodiscard]] LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context, bool async_load_system_database = false);
/// Load tables from databases and add them to context. Databases 'system' and 'information_schema' are ignored.
/// Use separate function to load system tables.
@ -20,7 +20,7 @@ namespace DB
[[nodiscard]] LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_database_name = {}, bool async_load_databases = false);
/// Converts `system` database from Ordinary to Atomic (if needed)
void maybeConvertSystemDatabase(ContextMutablePtr context, LoadTaskPtrs & system_startup_tasks);
void maybeConvertSystemDatabase(ContextMutablePtr context, LoadTaskPtrs & load_system_metadata_tasks);
/// Converts all databases (except system) from Ordinary to Atomic if convert_ordinary_to_atomic flag exists
/// Waits for `load_metadata` task before conversions

View File

@ -0,0 +1,3 @@
<clickhouse>
<async_load_system_database>true</async_load_system_database>
</clickhouse>

View File

@ -1,4 +1,5 @@
import random
import time
import pytest
@ -13,25 +14,35 @@ DICTIONARY_FILES = [
]
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"instance",
node1 = cluster.add_instance(
"node1",
main_configs=["configs/config.xml"],
dictionaries=DICTIONARY_FILES,
stay_alive=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=[
"configs/async_load_system_database.xml",
],
dictionaries=DICTIONARY_FILES,
stay_alive=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
instance.query(
"""
CREATE DATABASE IF NOT EXISTS dict ENGINE=Dictionary;
CREATE DATABASE IF NOT EXISTS test;
"""
)
for node in [node1, node2]:
node.query(
"""
CREATE DATABASE IF NOT EXISTS dict ENGINE=Dictionary;
CREATE DATABASE IF NOT EXISTS test;
"""
)
yield cluster
@ -40,13 +51,13 @@ def started_cluster():
def get_status(dictionary_name):
return instance.query(
return node1.query(
"SELECT status FROM system.dictionaries WHERE name='" + dictionary_name + "'"
).rstrip("\n")
def test_dict_get_data(started_cluster):
query = instance.query
query = node1.query
query(
"CREATE TABLE test.elements (id UInt64, a String, b Int32, c Float64) ENGINE=Log;"
@ -80,7 +91,7 @@ def test_dict_get_data(started_cluster):
# Wait for dictionaries to be reloaded.
assert_eq_with_retry(
instance,
node1,
"SELECT dictHas('dep_x', toUInt64(3))",
"1",
sleep_time=2,
@ -94,7 +105,7 @@ def test_dict_get_data(started_cluster):
# so dep_x and dep_z are not going to be updated after the following INSERT.
query("INSERT INTO test.elements VALUES (4, 'ether', 404, 0.001)")
assert_eq_with_retry(
instance,
node1,
"SELECT dictHas('dep_y', toUInt64(4))",
"1",
sleep_time=2,
@ -104,11 +115,11 @@ def test_dict_get_data(started_cluster):
assert query("SELECT dictGetString('dep_y', 'a', toUInt64(4))") == "ether\n"
assert query("SELECT dictGetString('dep_z', 'a', toUInt64(4))") == "ZZ\n"
query("DROP TABLE IF EXISTS test.elements;")
instance.restart_clickhouse()
node1.restart_clickhouse()
def dependent_tables_assert():
res = instance.query("select database || '.' || name from system.tables")
res = node1.query("select database || '.' || name from system.tables")
assert "system.join" in res
assert "default.src" in res
assert "dict.dep_y" in res
@ -119,7 +130,7 @@ def dependent_tables_assert():
def test_dependent_tables(started_cluster):
query = instance.query
query = node1.query
query("create database lazy engine=Lazy(10)")
query("create database a")
query("create table lazy.src (n int, m int) engine=Log")
@ -157,7 +168,7 @@ def test_dependent_tables(started_cluster):
)
dependent_tables_assert()
instance.restart_clickhouse()
node1.restart_clickhouse()
dependent_tables_assert()
query("drop table a.t")
query("drop table lazy.log")
@ -170,14 +181,14 @@ def test_dependent_tables(started_cluster):
def test_multiple_tables(started_cluster):
query = instance.query
query = node1.query
tables_count = 20
for i in range(tables_count):
query(
f"create table test.table_{i} (n UInt64, s String) engine=MergeTree order by n as select number, randomString(100) from numbers(100)"
)
instance.restart_clickhouse()
node1.restart_clickhouse()
order = [i for i in range(tables_count)]
random.shuffle(order)
@ -185,3 +196,49 @@ def test_multiple_tables(started_cluster):
assert query(f"select count() from test.table_{i}") == "100\n"
for i in range(tables_count):
query(f"drop table test.table_{i} sync")
def test_async_load_system_database(started_cluster):
id = 1
for i in range(4):
# Access some system tables that might be still loading
if id > 1:
for j in range(3):
node2.query(
f"select count() from system.text_log_{random.randint(1, id - 1)}_test"
)
node2.query(
f"select count() from system.query_log_{random.randint(1, id - 1)}_test"
)
assert (
int(
node2.query(
f"select count() from system.asynchronous_loader where job ilike '%_log_%_test' and execution_pool = 'BackgroundLoad'"
)
)
> 0
)
# Generate more system tables
for j in range(10):
while True:
node2.query("system flush logs")
count = int(
node2.query(
"select count() from system.tables where database = 'system' and name in ['query_log', 'text_log']"
)
)
if count == 2:
break
time.sleep(0.1)
node2.query(f"rename table system.text_log to system.text_log_{id}_test")
node2.query(f"rename table system.query_log to system.query_log_{id}_test")
id += 1
# Trigger async load of system database
node2.restart_clickhouse()
for i in range(id - 1):
node2.query(f"drop table if exists system.text_log_{i + 1}_test")
node2.query(f"drop table if exists system.query_log_{i + 1}_test")