Load MT table as RMT instead of creating new one

This commit is contained in:
Кирилл Гарбар 2023-12-14 23:19:39 +03:00
parent 8d1dfebb4f
commit 8623a30448
4 changed files with 87 additions and 122 deletions

View File

@ -1719,8 +1719,6 @@ try
load_metadata_tasks = loadMetadata(global_context, default_database, server_settings.async_load_databases);
/// If we need to convert database engines, disable async tables loading
convertDatabasesEnginesIfNeed(load_metadata_tasks, global_context);
/// Convert MergeTree tables to replicated if flag is set
convertMergeTreeToReplicatedIfNeed(global_context);
database_catalog.startupBackgroundTasks();
/// After loading validate that default database exists
database_catalog.assertDatabaseExists(default_database);

View File

@ -28,6 +28,10 @@
#include <Common/typeid_cast.h>
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include "Core/Defines.h"
#include <Storages/StorageReplicatedMergeTree.h>
#include <boost/algorithm/string/replace.hpp>
namespace fs = std::filesystem;
@ -107,6 +111,59 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
QualifiedTableName qualified_name{TSA_SUPPRESS_WARNING_FOR_READ(database_name), create_query->getTable()};
if (create_query->storage && create_query->storage->engine->name.ends_with("MergeTree") && !create_query->storage->engine->name.starts_with("Replicated"))
{
auto convert_to_replicated_flag_path = fs::path(getContext()->getPath()) / "data" / qualified_name.database / qualified_name.table / "flags" / "convert_to_replicated";
LOG_INFO(log, "Searching for convert_to_replicated flag at {}.", backQuote(convert_to_replicated_flag_path.string()));
if (fs::exists(convert_to_replicated_flag_path))
{
LOG_INFO(log, "Found convert_to_replicated flag for table {}. Will try to load it as replicated table.", backQuote(qualified_name.getFullName()));
/// Get storage definition
/// Set uuid explicitly, because it is forbidden to use the 'uuid' macro without ON CLUSTER
auto * storage = create_query->storage->as<ASTStorage>();
String storage_definition = queryToString(*storage);
String replica_path = getContext()->getConfigRef().getString("default_replica_path", "/clickhouse/tables/{uuid}/{shard}");
replica_path = boost::algorithm::replace_all_copy(replica_path, "{uuid}", fmt::format("{}", create_query->uuid));
String replica_name = getContext()->getConfigRef().getString("default_replica_name", "{replica}");
String replicated_args = fmt::format("('{}', '{}')", replica_path, replica_name);
String replicated_engine = "Replicated" + storage->engine->name + replicated_args;
String create_query_string = queryToString(*ast);
create_query_string = boost::algorithm::replace_first_copy(create_query_string, storage->engine->name, replicated_engine);
ParserCreateQuery parser_create_query;
auto new_ast = parseQuery(parser_create_query, create_query_string, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
ast = new_ast;
/// Write changes to metadata
String table_metadata_path = full_path;
String table_metadata_tmp_path = table_metadata_path + ".tmp";
String statement = getObjectDefinitionFromCreateQuery(ast);
{
WriteBufferFromFile out(table_metadata_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(statement, out);
out.next();
if (getContext()->getSettingsRef().fsync_metadata)
out.sync();
out.close();
}
fs::rename(table_metadata_tmp_path, table_metadata_path);
LOG_INFO
(
log,
"Table {} is loaded as replicated. Not removing convert_to_replicated flag until metadata in zookeeper is restored.",
backQuote(qualified_name.getFullName())
);
}
}
std::lock_guard lock{metadata.mutex};
metadata.parsed_tables[qualified_name] = ParsedTableMetadata{full_path.string(), ast};
metadata.total_dictionaries += create_query->is_dictionary;
@ -208,6 +265,36 @@ LoadTaskPtr DatabaseOrdinary::startupTableAsync(
/// until startup finished.
auto table_lock_holder = table->lockForShare(RWLockImpl::NO_QUERY, getContext()->getSettingsRef().lock_acquire_timeout);
table->startup();
/// If table is ReplicatedMergeTree after conversion from MergeTree,
/// it is in readonly mode due to metadata in zookeeper missing.
if (auto * rmt = table->as<StorageReplicatedMergeTree>())
{
auto convert_to_replicated_flag_path = fs::path(getContext()->getPath()) / "data" / name.database / name.table / "flags" / "convert_to_replicated";
if (fs::exists(convert_to_replicated_flag_path))
{
if (rmt->isTableReadOnly())
{
rmt->restoreMetadataInZooKeeper();
LOG_INFO
(
log,
"Metadata in zookeeper for {} is restored. Removing convert_to_replicated flag.",
backQuote(name.getFullName())
);
}
else
{
LOG_INFO
(
log,
"Table {} is not in readonly mode but convert_to_replicated flag is set. Removing flag.",
backQuote(name.getFullName())
);
}
fs::remove(convert_to_replicated_flag_path);
}
}
logAboutProgress(log, ++tables_started, total_tables_to_startup, startup_watch);
}
else

View File

@ -4,9 +4,7 @@
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterSystemQuery.h>
#include <Interpreters/Context.h>
@ -16,7 +14,6 @@
#include <Databases/DatabaseOrdinary.h>
#include <Databases/TablesLoader.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageMergeTree.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
@ -26,9 +23,6 @@
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <boost/algorithm/string/replace.hpp>
#include <fmt/core.h>
#include <filesystem>
#define ORDINARY_TO_ATOMIC_PREFIX ".tmp_convert."
@ -501,116 +495,6 @@ void convertDatabasesEnginesIfNeed(const LoadTaskPtrs & load_metadata, ContextMu
fs::remove(convert_flag_path);
}
static void convertMergeTreeToReplicated(Poco::Logger * log, ContextMutablePtr context, const DatabasePtr & database, StorageID id)
{
ASTPtr as_create_ptr = database->getCreateTableQuery(id.table_name, context);
const auto & as_create = as_create_ptr->as<ASTCreateQuery &>();
const auto & storage = as_create.storage->as<ASTStorage &>();
String storage_definition = queryToString(storage);
/// Get storage definition
/// Set uuid explicitly, because it is forbidden to use the 'uuid' macro without ON CLUSTER
auto uuid = UUIDHelpers::generateV4();
String replica_path = context->getConfigRef().getString("default_replica_path", "/clickhouse/tables/{uuid}/{shard}");
replica_path = boost::algorithm::replace_all_copy(replica_path, "{uuid}", fmt::format("{}", uuid));
String replica_name = context->getConfigRef().getString("default_replica_name", "{replica}");
String replicated_args = fmt::format("('{}', '{}')", replica_path, replica_name);
String replicated_engine = "Replicated" + storage.engine->name + replicated_args;
storage_definition = boost::algorithm::replace_first_copy(storage_definition, storage.engine->name, replicated_engine);
/// Get names
String table_name = id.getTableName();
String database_name = id.getDatabaseName();
String qualified_quoted_name = id.getFullTableName();
id.table_name = id.table_name + "_temp";
String tmp_qualified_quoted_name = id.getFullTableName();
try
{
String create_table_query = fmt::format("CREATE TABLE {} UUID '{}' AS {} {}", tmp_qualified_quoted_name, uuid, qualified_quoted_name, storage_definition);
auto res = executeQuery(create_table_query, context, QueryFlags{ .internal = true }).second;
executeTrivialBlockIO(res, context);
String exchange_tables_query = fmt::format("EXCHANGE TABLES {} AND {}", qualified_quoted_name, tmp_qualified_quoted_name);
res = executeQuery(exchange_tables_query, context, QueryFlags{ .internal = true }).second;
executeTrivialBlockIO(res, context);
/// Get partition ids
String get_attach_queries_query = fmt::format("SELECT DISTINCT partition_id FROM system.parts WHERE table = '{}' AND database = '{}' AND active;", id.table_name, database_name);
WriteBufferFromOwnString buffer2;
ReadBufferFromOwnString buffer3 {std::move(get_attach_queries_query)};
auto select_query_context = Context::createCopy(context);
select_query_context->makeQueryContext();
select_query_context->setCurrentQueryId("");
executeQuery(buffer3, buffer2, false, select_query_context, {}, {.internal=true});
std::stringstream partition_ids_string{buffer2.str()};
std::string line;
/// Attach partitions
while (std::getline(partition_ids_string, line, '\n'))
{
String query3 = fmt::format("ALTER TABLE {} ATTACH PARTITION ID '{}' FROM {};", qualified_quoted_name, line, tmp_qualified_quoted_name);
executeQuery(query3, select_query_context, {.internal=true});
}
LOG_INFO(log, "Table {} is converted from MergeTree to replicated", qualified_quoted_name);
}
catch (Exception & e)
{
e.addMessage(
"Exception while trying to convert table {} from MergeTree to replicated. Tables may be in some intermediate state."
, qualified_quoted_name
);
throw;
}
}
static void findAndConvertMergeTreeTablesToReplicated(ContextMutablePtr context, const String & database_name)
{
Poco::Logger * log = &Poco::Logger::get("loadMetadata");
auto database = DatabaseCatalog::instance().getDatabase(database_name);
if (!database)
{
LOG_WARNING(log, "Database {} not found (while trying to convert it's tables from MergeTree to ReplicatedMergeTree)", database_name);
return;
}
auto local_context = Context::createCopy(context);
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
if (const auto * merge_tree = dynamic_cast<const StorageMergeTree *>(iterator->table().get()))
{
auto id = merge_tree->getStorageID();
/// Check if convert flag is set
auto convert_flag_path = fs::path(context->getPath()) / "data" / database_name / id.getTableName() / "flags" / "convert_to_replicated";
if (fs::exists(convert_flag_path))
{
LOG_INFO(log, "Will convert table {} from MergeTree to replicated", id.getFullTableName());
convertMergeTreeToReplicated(log, local_context, database, id);
LOG_INFO(log, "Removing convert_to_replicated flag after convertation");
convert_flag_path = fs::path(local_context->getPath()) / "data" / database_name / (id.getTableName() + "_temp") / "flags" / "convert_to_replicated";
fs::remove(convert_flag_path);
}
}
}
}
void convertMergeTreeToReplicatedIfNeed(ContextMutablePtr context)
{
LOG_INFO(&Poco::Logger::get("loadMetadata"), "Start searching for MergeTree tables with convert_to_replicated flag");
for (const auto & [name, _] : DatabaseCatalog::instance().getDatabases())
findAndConvertMergeTreeTablesToReplicated(context, name);
LOG_INFO(&Poco::Logger::get("loadMetadata"), "All MergTree tables with convert_to_replicated flag are converted");
}
LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context)
{
loadSystemDatabaseImpl(context, DatabaseCatalog::SYSTEM_DATABASE, "Atomic");

View File

@ -26,8 +26,4 @@ void maybeConvertSystemDatabase(ContextMutablePtr context, LoadTaskPtrs & system
/// Waits for `load_metadata` task before conversions
void convertDatabasesEnginesIfNeed(const LoadTaskPtrs & load_metadata, ContextMutablePtr context);
/// Converts MergeTree tables to replicated if convert_to_replicated flag exists
/// Flag must be set at /clickhouse/data/database_name/table_name/flags/
void convertMergeTreeToReplicatedIfNeed(ContextMutablePtr context);
}