add flag that enables automatic canversion from Ordinary to Atomic

This commit is contained in:
Alexander Tokmakov 2022-08-05 21:41:02 +02:00
parent 25a9bf5deb
commit f2c4cad255
7 changed files with 252 additions and 27 deletions

View File

@ -1519,7 +1519,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// We load temporary database first, because projections need it.
database_catalog.initializeAndLoadTemporaryDatabase();
loadMetadataSystem(global_context);
maybeConvertOrdinaryDatabaseToAtomic(global_context, DatabaseCatalog::instance().getSystemDatabase());
maybeConvertSystemDatabase(global_context);
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
@ -1533,6 +1533,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
database_catalog.loadMarkedAsDroppedTables();
/// Then, load remaining databases
loadMetadata(global_context, default_database);
convertDatabasesEnginesIfNeed(global_context);
startupSystemTables();
database_catalog.loadDatabases();
/// After loading validate that default database exists

View File

@ -1,7 +1,5 @@
#include <Common/ThreadPool.h>
#include <Poco/DirectoryIterator.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
@ -13,6 +11,7 @@
#include <Databases/DatabaseOrdinary.h>
#include <Databases/TablesLoader.h>
#include <Storages/StorageMaterializedView.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
@ -211,13 +210,11 @@ static void loadSystemDatabaseImpl(ContextMutablePtr context, const String & dat
}
}
static void convertOrdinaryDatabaseToAtomic(ContextMutablePtr context, const DatabasePtr & database)
static void convertOrdinaryDatabaseToAtomic(Poco::Logger * log, ContextMutablePtr context, const DatabasePtr & database)
{
/// It's kind of C++ script that creates temporary database with Atomic engine,
/// moves all tables to it, drops old database and then renames new one to old name.
Poco::Logger * log = &Poco::Logger::get("loadMetadata");
String name = database->getDatabaseName();
String tmp_name = fmt::format(".tmp_convert.{}.{}", name, thread_local_rng());
@ -235,19 +232,34 @@ static void convertOrdinaryDatabaseToAtomic(ContextMutablePtr context, const Dat
assert(tmp_database->getEngineName() == "Atomic");
size_t num_tables = 0;
std::unordered_set<String> inner_mv_tables;
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
++num_tables;
auto id = iterator->table()->getStorageID();
id.database_name = tmp_name;
/// We need some uuid for checkTableCanBeRenamed
id.uuid = UUIDHelpers::generateV4();
iterator->table()->checkTableCanBeRenamed(id);
if (const auto * mv = dynamic_cast<const StorageMaterializedView *>(iterator->table().get()))
{
/// We should not rename inner tables of MVs, because MVs are responsible for renaming it...
if (mv->hasInnerTable())
inner_mv_tables.emplace(mv->getTargetTable()->getStorageID().table_name);
}
}
LOG_INFO(log, "Will move {} tables to {}", num_tables, tmp_name_quoted);
LOG_INFO(log, "Will move {} tables to {} (including {} inner tables of MVs)", num_tables, tmp_name_quoted, inner_mv_tables.size());
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
auto id = iterator->table()->getStorageID();
if (inner_mv_tables.contains(id.table_name))
{
LOG_DEBUG(log, "Do not rename {}, because it will be renamed together with MV", id.getNameForLogs());
continue;
}
String qualified_quoted_name = id.getFullTableName();
id.database_name = tmp_name;
String tmp_qualified_quoted_name = id.getFullTableName();
@ -275,31 +287,43 @@ static void convertOrdinaryDatabaseToAtomic(ContextMutablePtr context, const Dat
LOG_INFO(log, "Finished database engine conversion of {}", name_quoted);
}
void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, const DatabasePtr & database)
/// Converts database with Ordinary engine to Atomic. Does nothing if database is not Ordinary.
/// Can be called only during server startup when there are no queries from users.
static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, const String & database_name, bool tables_started)
{
if (database->getEngineName() != "Ordinary")
return;
Poco::Logger * log = &Poco::Logger::get("loadMetadata");
if (context->getSettingsRef().allow_deprecated_database_ordinary)
auto database = DatabaseCatalog::instance().getDatabase(database_name);
if (!database)
{
LOG_WARNING(log, "Database {} not found (while trying to convert it from Ordinary to Atomic)", database_name);
return;
}
if (database->getEngineName() != "Ordinary")
return;
try
{
/// It's not quite correct to run DDL queries while database is not started up.
startupSystemTables();
if (!tables_started)
{
/// It's not quite correct to run DDL queries while database is not started up.
ThreadPool pool;
DatabaseCatalog::instance().getSystemDatabase()->startupTables(pool, /* force_restore */ true, /* force_attach */ true);
}
auto local_context = Context::createCopy(context);
local_context->setSetting("check_table_dependencies", false);
convertOrdinaryDatabaseToAtomic(local_context, database);
convertOrdinaryDatabaseToAtomic(log, local_context, database);
auto new_database = DatabaseCatalog::instance().getDatabase(DatabaseCatalog::SYSTEM_DATABASE);
auto new_database = DatabaseCatalog::instance().getDatabase(database_name);
UUID db_uuid = new_database->getUUID();
std::vector<UUID> tables_uuids;
for (auto iterator = new_database->getTablesIterator(context); iterator->isValid(); iterator->next())
tables_uuids.push_back(iterator->uuid());
/// Reload database just in case (and update logger name)
String detach_query = fmt::format("DETACH DATABASE {}", backQuoteIfNeed(DatabaseCatalog::SYSTEM_DATABASE));
String detach_query = fmt::format("DETACH DATABASE {}", backQuoteIfNeed(database_name));
auto res = executeQuery(detach_query, context, true);
executeTrivialBlockIO(res, context);
res = {};
@ -310,23 +334,51 @@ void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, const Datab
for (const auto & uuid : tables_uuids)
DatabaseCatalog::instance().removeUUIDMappingFinally(uuid);
loadSystemDatabaseImpl(context, DatabaseCatalog::SYSTEM_DATABASE, "Atomic");
String path = context->getPath() + "metadata/" + escapeForFileName(database_name);
/// force_restore_data is needed to re-create metadata symlinks
loadDatabase(context, database_name, path, /* force_restore_data */ true);
TablesLoader::Databases databases =
{
{DatabaseCatalog::SYSTEM_DATABASE, DatabaseCatalog::instance().getSystemDatabase()},
{database_name, DatabaseCatalog::instance().getDatabase(database_name)},
};
TablesLoader loader{context, databases, /* force_restore */ true, /* force_attach */ true};
loader.loadTables();
/// Will startup tables usual way
/// Startup tables if they were started before conversion and detach/attach
if (tables_started)
loader.startupTables();
}
catch (Exception & e)
{
e.addMessage("While trying to convert {} to Atomic", database->getDatabaseName());
e.addMessage("While trying to convert {} to Atomic", database_name);
throw;
}
}
void maybeConvertSystemDatabase(ContextMutablePtr context)
{
/// TODO remove this check, convert system database unconditionally
if (context->getSettingsRef().allow_deprecated_database_ordinary)
return;
maybeConvertOrdinaryDatabaseToAtomic(context, DatabaseCatalog::SYSTEM_DATABASE, /* tables_started */ false);
}
void convertDatabasesEnginesIfNeed(ContextMutablePtr context)
{
auto convert_flag_path = fs::path(context->getFlagsPath()) / "convert_ordinary_to_atomic";
if (!fs::exists(convert_flag_path))
return;
LOG_INFO(&Poco::Logger::get("loadMetadata"), "Found convert_ordinary_to_atomic file in flags directory, "
"will try to convert all Ordinary databases to Atomic");
fs::remove(convert_flag_path);
for (const auto & [name, _] : DatabaseCatalog::instance().getDatabases())
if (name != DatabaseCatalog::SYSTEM_DATABASE)
maybeConvertOrdinaryDatabaseToAtomic(context, name, /* tables_started */ true);
}
void startupSystemTables()
{

View File

@ -19,8 +19,10 @@ void loadMetadata(ContextMutablePtr context, const String & default_database_nam
/// so we startup system tables after all databases are loaded.
void startupSystemTables();
/// Converts database with Ordinary engine to Atomic. Does nothing if database is not Ordinary.
/// Can be called only during server startup when there are no queries from users.
void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, const DatabasePtr & database);
/// Converts `system` database from Ordinary to Atomic (if needed)
void maybeConvertSystemDatabase(ContextMutablePtr context);
/// Converts all databases (except system) from Ordinary to Atomic if convert_ordinary_to_atomic flag exists
void convertDatabasesEnginesIfNeed(ContextMutablePtr context);
}

View File

@ -729,6 +729,7 @@ void StorageLog::rename(const String & new_path_to_table_data, const StorageID &
{
assert(table_path != new_path_to_table_data);
{
disk->createDirectories(new_path_to_table_data);
disk->moveDirectory(table_path, new_path_to_table_data);
table_path = new_path_to_table_data;

View File

@ -5132,9 +5132,28 @@ void StorageReplicatedMergeTree::checkTableCanBeRenamed(const StorageID & new_na
return;
if (renaming_restrictions == RenamingRestrictions::DO_NOT_ALLOW)
throw Exception("Cannot rename Replicated table, because zookeeper_path contains implicit 'database' or 'table' macro. "
"We cannot rename path in ZooKeeper, so path may become inconsistent with table name. If you really want to rename table, "
"you should edit metadata file first and restart server or reattach the table.", ErrorCodes::NOT_IMPLEMENTED);
{
auto old_name = getStorageID();
bool is_server_startup = Context::getGlobalContextInstance()->getApplicationType() == Context::ApplicationType::SERVER
&& !Context::getGlobalContextInstance()->isServerCompletelyStarted();
bool move_to_atomic = old_name.uuid == UUIDHelpers::Nil && new_name.uuid != UUIDHelpers::Nil;
bool likely_converting_ordinary_to_atomic = is_server_startup && move_to_atomic;
if (likely_converting_ordinary_to_atomic)
{
LOG_INFO(log, "Table {} should not be renamed, because zookeeper_path contains implicit 'database' or 'table' macro. "
"We cannot rename path in ZooKeeper, so path may become inconsistent with table name. "
"However, we allow renaming while converting Ordinary database to Atomic, because all tables will be renamed back",
old_name.getNameForLogs());
return;
}
throw Exception(
"Cannot rename Replicated table, because zookeeper_path contains implicit 'database' or 'table' macro. "
"We cannot rename path in ZooKeeper, so path may become inconsistent with table name. If you really want to rename table, "
"you should edit metadata file first and restart server or reattach the table.",
ErrorCodes::NOT_IMPLEMENTED);
}
assert(renaming_restrictions == RenamingRestrictions::ALLOW_PRESERVING_UUID);
if (!new_name.hasUUID() && getStorageID().hasUUID())

View File

@ -320,6 +320,7 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Stora
{
assert(table_path != new_path_to_table_data);
{
disk->createDirectories(new_path_to_table_data);
disk->moveDirectory(table_path, new_path_to_table_data);
table_path = new_path_to_table_data;

View File

@ -7,6 +7,7 @@ node = cluster.add_instance(
image="yandex/clickhouse-server",
tag="19.17.8.54",
stay_alive=True,
with_zookeeper=True,
with_installed_binary=True,
)
@ -25,7 +26,7 @@ def q(query):
return node.query(query, settings={"log_queries": 1})
def test_convert_system_db_to_atomic(start_cluster):
def check_convert_system_db_to_atomic():
q(
"CREATE TABLE t(date Date, id UInt32) ENGINE = MergeTree PARTITION BY toYYYYMM(date) ORDER BY id"
)
@ -75,3 +76,151 @@ def test_convert_system_db_to_atomic(start_cluster):
"1\n" == errors_count
and "1\n" == node.count_in_log("Can't receive Netlink response")
)
def create_some_tables(db):
node.query("CREATE TABLE {}.t1 (n int) ENGINE=Memory".format(db))
node.query(
"CREATE TABLE {}.mt1 (n int) ENGINE=MergeTree order by n".format(db),
)
node.query(
"CREATE TABLE {}.mt2 (n int) ENGINE=MergeTree order by n".format(db),
)
node.query(
"CREATE TABLE {}.rmt1 (n int, m int) ENGINE=ReplicatedMergeTree('/test/rmt1/{}', '1') order by n".format(
db, db
),
)
node.query(
"CREATE TABLE {}.rmt2 (n int, m int) ENGINE=ReplicatedMergeTree('/test/{}/rmt2', '1') order by n".format(
db, db
),
)
node.exec_in_container(
[
"bash",
"-c",
f"sed --follow-symlinks -i 's|/test/{db}/rmt2|/test/{{database}}/{{table}}|' /var/lib/clickhouse/metadata/{db}/rmt2.sql",
]
)
node.query(
"CREATE MATERIALIZED VIEW {}.mv1 (n int) ENGINE=ReplicatedMergeTree('/test/{}/mv1/', '1') order by n AS SELECT n FROM {}.rmt1".format(
db, db, db
),
)
node.query(
"CREATE MATERIALIZED VIEW {}.mv2 (n int) ENGINE=MergeTree order by n AS SELECT n FROM {}.rmt2".format(
db, db
),
)
node.query(
"CREATE DICTIONARY {}.d1 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n "
"SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt1' PASSWORD '' DB '{}')) "
"LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT())".format(db, db)
)
node.query(
"CREATE DICTIONARY {}.d2 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n "
"SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt2' PASSWORD '' DB '{}')) "
"LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT())".format(db, db)
)
node.query(
"CREATE TABLE {}.merge (n int) ENGINE=Merge('{}', '(mt)|(mv)')".format(db, db)
)
def check_convert_all_dbs_to_atomic():
node.query(
"CREATE DATABASE ordinary ENGINE=Ordinary",
settings={"allow_deprecated_database_ordinary": 1},
)
node.query(
"CREATE DATABASE other ENGINE=Ordinary",
settings={"allow_deprecated_database_ordinary": 1},
)
node.query(
"CREATE DATABASE `.o r d i n a r y.` ENGINE=Ordinary",
settings={"allow_deprecated_database_ordinary": 1},
)
node.query("CREATE DATABASE atomic ENGINE=Atomic")
node.query("CREATE DATABASE mem ENGINE=Memory")
node.query("CREATE DATABASE lazy ENGINE=Lazy(1)")
tables_with_data = ["mt1", "mt2", "rmt1", "rmt2", "mv1", "mv2"]
for db in ["ordinary", "other", "atomic"]:
create_some_tables(db)
for table in tables_with_data:
node.query("INSERT INTO {}.{} (n) VALUES ({})".format(db, table, len(db)))
node.query(
"CREATE TABLE `.o r d i n a r y.`.`t. a. b. l. e.` (n int) ENGINE=MergeTree ORDER BY n"
)
node.query("CREATE TABLE lazy.table (n int) ENGINE=Log")
# Introduce some cross dependencies
node.query(
"CREATE TABLE ordinary.l (n DEFAULT dictGet('other.d1', 'm', toUInt64(3))) ENGINE=Log"
)
node.query(
"CREATE TABLE other.l (n DEFAULT dictGet('ordinary.d1', 'm', toUInt64(3))) ENGINE=StripeLog"
)
node.query(
"CREATE TABLE atomic.l (n DEFAULT dictGet('ordinary.d1', 'm', toUInt64(3))) ENGINE=TinyLog"
)
tables_without_data = ["t1", "d1", "d2", "merge", "l"]
# 6 tables + 2 inner tables of MVs, each contains 2 rows
for db in ["ordinary", "other"]:
assert "12\t{}\n".format(12 * len(db)) == node.query(
"SELECT count(), sum(n) FROM {}.merge".format(db)
)
# 6 tables, MVs contain 2 rows (inner tables does not match regexp)
assert "8\t{}\n".format(8 * len("atomic")) == node.query(
"SELECT count(), sum(n) FROM atomic.merge".format(db)
)
node.exec_in_container(
["bash", "-c", f"touch /var/lib/clickhouse/flags/convert_ordinary_to_atomic"]
)
node.restart_clickhouse()
assert (
".o r d i n a r y.\natomic\ndefault\nordinary\nother\nsystem\n"
== node.query(
"SELECT name FROM system.databases WHERE engine='Atomic' ORDER BY name"
)
)
assert "Lazy\nMemory\n" == node.query(
"SELECT engine FROM system.databases WHERE name IN ('mem', 'lazy') ORDER BY name"
)
assert "t. a. b. l. e.\n" == node.query("SHOW TABLES FROM `.o r d i n a r y.`")
assert "table\n" == node.query("SHOW TABLES FROM lazy")
for db in ["ordinary", "other", "atomic"]:
assert "\n".join(
sorted(tables_with_data + tables_without_data) + [""]
) == node.query("SHOW TABLES FROM {} NOT LIKE '%inner%'".format(db))
for db in ["ordinary", "other"]:
assert "8\t{}\n".format(8 * len(db)) == node.query(
"SELECT count(), sum(n) FROM {}.merge".format(db)
)
for db in ["ordinary", "other", "atomic"]:
for table in tables_with_data:
node.query(
"INSERT INTO {}.{} (n) VALUES ({})".format(db, table, len(db) * 3)
)
for db in ["ordinary", "other", "atomic"]:
assert "16\t{}\n".format(16 * len(db) * 2) == node.query(
"SELECT count(), sum(n) FROM {}.merge".format(db)
)
def test_convert_ordinary_to_atomic(start_cluster):
check_convert_system_db_to_atomic()
check_convert_all_dbs_to_atomic()