mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Merge pull request #56306 from ClickHouse/fix-backup-restore-flatten-nested
Fix restore from backup with `flatten_nested` and `data_type_default_nullable`
This commit is contained in:
commit
ecd98006ce
@ -131,7 +131,7 @@ void LocalConnection::sendQuery(
|
||||
|
||||
try
|
||||
{
|
||||
state->io = executeQuery(state->query, query_context, false, state->stage).second;
|
||||
state->io = executeQuery(state->query, query_context, QueryFlags{}, state->stage).second;
|
||||
|
||||
if (state->io.pipeline.pushing())
|
||||
{
|
||||
|
@ -73,7 +73,7 @@ std::pair<String, StoragePtr> createTableFromAST(
|
||||
auto table_function = factory.get(table_function_ast, context);
|
||||
ColumnsDescription columns;
|
||||
if (ast_create_query.columns_list && ast_create_query.columns_list->columns)
|
||||
columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, true);
|
||||
columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, true, false);
|
||||
StoragePtr storage = table_function->execute(table_function_ast, context, ast_create_query.getTable(), std::move(columns));
|
||||
storage->renameInMemory(ast_create_query);
|
||||
return {ast_create_query.getTable(), storage};
|
||||
@ -99,7 +99,7 @@ std::pair<String, StoragePtr> createTableFromAST(
|
||||
}
|
||||
else
|
||||
{
|
||||
columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, true);
|
||||
columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, true, false);
|
||||
constraints = InterpreterCreateQuery::getConstraintsDescription(ast_create_query.columns_list->constraints);
|
||||
}
|
||||
}
|
||||
|
@ -722,7 +722,7 @@ void DatabaseReplicated::checkQueryValid(const ASTPtr & query, ContextPtr query_
|
||||
}
|
||||
}
|
||||
|
||||
BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, bool internal)
|
||||
BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, QueryFlags flags)
|
||||
{
|
||||
|
||||
if (query_context->getCurrentTransaction() && query_context->getSettingsRef().throw_on_unsupported_query_inside_transaction)
|
||||
@ -731,7 +731,7 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex
|
||||
if (is_readonly)
|
||||
throw Exception(ErrorCodes::NO_ZOOKEEPER, "Database is in readonly mode, because it cannot connect to ZooKeeper");
|
||||
|
||||
if (!internal && (query_context->getClientInfo().query_kind != ClientInfo::QueryKind::INITIAL_QUERY))
|
||||
if (!flags.internal && (query_context->getClientInfo().query_kind != ClientInfo::QueryKind::INITIAL_QUERY))
|
||||
throw Exception(ErrorCodes::INCORRECT_QUERY, "It's not initial query. ON CLUSTER is not allowed for Replicated database.");
|
||||
|
||||
checkQueryValid(query, query_context);
|
||||
@ -742,6 +742,7 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex
|
||||
entry.initiator = ddl_worker->getCommonHostID();
|
||||
entry.setSettingsIfRequired(query_context);
|
||||
entry.tracing_context = OpenTelemetry::CurrentContext();
|
||||
entry.is_backup_restore = flags.distributed_backup_restore;
|
||||
String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry, query_context);
|
||||
|
||||
Strings hosts_to_wait;
|
||||
@ -919,14 +920,14 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
|
||||
String query = fmt::format("CREATE DATABASE IF NOT EXISTS {} ENGINE=Ordinary", backQuoteIfNeed(to_db_name));
|
||||
auto query_context = Context::createCopy(getContext());
|
||||
query_context->setSetting("allow_deprecated_database_ordinary", 1);
|
||||
executeQuery(query, query_context, true);
|
||||
executeQuery(query, query_context, QueryFlags{ .internal = true });
|
||||
|
||||
/// But we want to avoid discarding UUID of ReplicatedMergeTree tables, because it will not work
|
||||
/// if zookeeper_path contains {uuid} macro. Replicated database do not recreate replicated tables on recovery,
|
||||
/// so it's ok to save UUID of replicated table.
|
||||
query = fmt::format("CREATE DATABASE IF NOT EXISTS {} ENGINE=Atomic", backQuoteIfNeed(to_db_name_replicated));
|
||||
query_context = Context::createCopy(getContext());
|
||||
executeQuery(query, query_context, true);
|
||||
executeQuery(query, query_context, QueryFlags{ .internal = true });
|
||||
}
|
||||
|
||||
size_t moved_tables = 0;
|
||||
|
@ -46,7 +46,7 @@ public:
|
||||
|
||||
/// Try to execute DLL query on current host as initial query. If query is succeed,
|
||||
/// then it will be executed on all replicas.
|
||||
BlockIO tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, bool internal) override;
|
||||
BlockIO tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, QueryFlags flags) override;
|
||||
|
||||
bool canExecuteReplicatedMetadataAlter() const override;
|
||||
|
||||
|
@ -372,6 +372,7 @@ void DatabaseWithOwnTablesBase::createTableRestoredFromBackup(const ASTPtr & cre
|
||||
/// Creates a table by executing a "CREATE TABLE" query.
|
||||
InterpreterCreateQuery interpreter{create_table_query, local_context};
|
||||
interpreter.setInternal(true);
|
||||
interpreter.setIsRestoreFromBackup(true);
|
||||
interpreter.execute();
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Core/UUID.h>
|
||||
#include <Databases/LoadingStrictnessLevel.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Interpreters/executeQuery.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <Storages/IStorage_fwd.h>
|
||||
#include <base/types.h>
|
||||
@ -345,7 +346,7 @@ public:
|
||||
|
||||
virtual bool shouldReplicateQuery(const ContextPtr & /*query_context*/, const ASTPtr & /*query_ptr*/) const { return false; }
|
||||
|
||||
virtual BlockIO tryEnqueueReplicatedDDL(const ASTPtr & /*query*/, ContextPtr /*query_context*/, [[maybe_unused]] bool internal = false) /// NOLINT
|
||||
virtual BlockIO tryEnqueueReplicatedDDL(const ASTPtr & /*query*/, ContextPtr /*query_context*/, [[maybe_unused]] QueryFlags flags = {}) /// NOLINT
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database engine {} does not have replicated DDL queue", getEngineName());
|
||||
}
|
||||
|
@ -75,7 +75,7 @@ static BlockIO tryToExecuteQuery(const String & query_to_execute, ContextMutable
|
||||
if (!database.empty())
|
||||
query_context->setCurrentDatabase(database);
|
||||
|
||||
return executeQuery("/*" + comment + "*/ " + query_to_execute, query_context, true).second;
|
||||
return executeQuery("/*" + comment + "*/ " + query_to_execute, query_context, QueryFlags{ .internal = true }).second;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -168,7 +168,7 @@ QueryPipeline ClickHouseDictionarySource::createStreamForQuery(const String & qu
|
||||
|
||||
if (configuration.is_local)
|
||||
{
|
||||
pipeline = executeQuery(query, context_copy, true).second.pipeline;
|
||||
pipeline = executeQuery(query, context_copy, QueryFlags{ .internal = true }).second.pipeline;
|
||||
pipeline.convertStructureTo(empty_sample_block.getColumnsWithTypeAndName());
|
||||
}
|
||||
else
|
||||
@ -190,7 +190,7 @@ std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & re
|
||||
|
||||
if (configuration.is_local)
|
||||
{
|
||||
return readInvalidateQuery(executeQuery(request, context_copy, true).second.pipeline);
|
||||
return readInvalidateQuery(executeQuery(request, context_copy, QueryFlags{ .internal = true }).second.pipeline);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -23,7 +23,7 @@ InterpreterShowAccessEntitiesQuery::InterpreterShowAccessEntitiesQuery(const AST
|
||||
|
||||
BlockIO InterpreterShowAccessEntitiesQuery::execute()
|
||||
{
|
||||
return executeQuery(getRewrittenQuery(), getContext(), true).second;
|
||||
return executeQuery(getRewrittenQuery(), getContext(), QueryFlags{ .internal = true }).second;
|
||||
}
|
||||
|
||||
|
||||
|
@ -12,7 +12,7 @@ InterpreterShowPrivilegesQuery::InterpreterShowPrivilegesQuery(const ASTPtr & qu
|
||||
|
||||
BlockIO InterpreterShowPrivilegesQuery::execute()
|
||||
{
|
||||
return executeQuery("SELECT * FROM system.privileges", context, true).second;
|
||||
return executeQuery("SELECT * FROM system.privileges", context, QueryFlags{ .internal = true }).second;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -113,6 +113,9 @@ String DDLLogEntry::toString() const
|
||||
writeChar('\n', wb);
|
||||
}
|
||||
|
||||
if (version >= BACKUP_RESTORE_FLAG_IN_ZK_VERSION)
|
||||
wb << "is_backup_restore: " << is_backup_restore << "\n";
|
||||
|
||||
return wb.str();
|
||||
}
|
||||
|
||||
@ -165,6 +168,12 @@ void DDLLogEntry::parse(const String & data)
|
||||
checkChar('\n', rb);
|
||||
}
|
||||
|
||||
if (version >= BACKUP_RESTORE_FLAG_IN_ZK_VERSION)
|
||||
{
|
||||
checkString("is_backup_restore: ", rb);
|
||||
readBoolText(is_backup_restore, rb);
|
||||
checkChar('\n', rb);
|
||||
}
|
||||
|
||||
assertEOF(rb);
|
||||
|
||||
|
@ -72,10 +72,11 @@ struct DDLLogEntry
|
||||
static constexpr const UInt64 NORMALIZE_CREATE_ON_INITIATOR_VERSION = 3;
|
||||
static constexpr const UInt64 OPENTELEMETRY_ENABLED_VERSION = 4;
|
||||
static constexpr const UInt64 PRESERVE_INITIAL_QUERY_ID_VERSION = 5;
|
||||
static constexpr const UInt64 BACKUP_RESTORE_FLAG_IN_ZK_VERSION = 6;
|
||||
/// Add new version here
|
||||
|
||||
/// Remember to update the value below once new version is added
|
||||
static constexpr const UInt64 DDL_ENTRY_FORMAT_MAX_VERSION = 5;
|
||||
static constexpr const UInt64 DDL_ENTRY_FORMAT_MAX_VERSION = 6;
|
||||
|
||||
UInt64 version = 1;
|
||||
String query;
|
||||
@ -84,6 +85,7 @@ struct DDLLogEntry
|
||||
std::optional<SettingsChanges> settings;
|
||||
OpenTelemetry::TracingContext tracing_context;
|
||||
String initial_query_id;
|
||||
bool is_backup_restore = false;
|
||||
|
||||
void setSettingsIfRequired(ContextPtr context);
|
||||
String toString() const;
|
||||
|
@ -489,7 +489,8 @@ bool DDLWorker::tryExecuteQuery(DDLTaskBase & task, const ZooKeeperPtr & zookeep
|
||||
|
||||
if (!task.is_initial_query)
|
||||
query_scope.emplace(query_context);
|
||||
executeQuery(istr, ostr, !task.is_initial_query, query_context, {});
|
||||
|
||||
executeQuery(istr, ostr, !task.is_initial_query, query_context, {}, QueryFlags{ .internal = false, .distributed_backup_restore = task.entry.is_backup_restore });
|
||||
|
||||
if (auto txn = query_context->getZooKeeperMetadataTransaction())
|
||||
{
|
||||
|
@ -480,7 +480,7 @@ ASTPtr InterpreterCreateQuery::formatProjections(const ProjectionsDescription &
|
||||
}
|
||||
|
||||
ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
|
||||
const ASTExpressionList & columns_ast, ContextPtr context_, bool attach)
|
||||
const ASTExpressionList & columns_ast, ContextPtr context_, bool attach, bool is_restore_from_backup)
|
||||
{
|
||||
/// First, deduce implicit types.
|
||||
|
||||
@ -489,7 +489,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
|
||||
|
||||
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
|
||||
NamesAndTypesList column_names_and_types;
|
||||
bool make_columns_nullable = !attach && context_->getSettingsRef().data_type_default_nullable;
|
||||
bool make_columns_nullable = !attach && !is_restore_from_backup && context_->getSettingsRef().data_type_default_nullable;
|
||||
|
||||
for (const auto & ast : columns_ast.children)
|
||||
{
|
||||
@ -645,7 +645,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
|
||||
res.add(std::move(column));
|
||||
}
|
||||
|
||||
if (!attach && context_->getSettingsRef().flatten_nested)
|
||||
if (!attach && !is_restore_from_backup && context_->getSettingsRef().flatten_nested)
|
||||
res.flattenNested();
|
||||
|
||||
if (res.getAllPhysical().empty())
|
||||
@ -692,7 +692,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
|
||||
|
||||
if (create.columns_list->columns)
|
||||
{
|
||||
properties.columns = getColumnsDescription(*create.columns_list->columns, getContext(), create.attach);
|
||||
properties.columns = getColumnsDescription(*create.columns_list->columns, getContext(), create.attach, is_restore_from_backup);
|
||||
}
|
||||
|
||||
if (create.columns_list->indices)
|
||||
@ -752,7 +752,6 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
|
||||
}
|
||||
else if (create.select)
|
||||
{
|
||||
|
||||
Block as_select_sample;
|
||||
|
||||
if (getContext()->getSettingsRef().allow_experimental_analyzer)
|
||||
@ -1077,7 +1076,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
auto guard = DatabaseCatalog::instance().getDDLGuard(database_name, create.getTable());
|
||||
create.setDatabase(database_name);
|
||||
guard->releaseTableLock();
|
||||
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), internal);
|
||||
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), QueryFlags{ .internal = internal, .distributed_backup_restore = is_restore_from_backup });
|
||||
}
|
||||
|
||||
if (!create.cluster.empty())
|
||||
@ -1233,7 +1232,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
auto guard = DatabaseCatalog::instance().getDDLGuard(create.getDatabase(), create.getTable());
|
||||
assertOrSetUUID(create, database);
|
||||
guard->releaseTableLock();
|
||||
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), internal);
|
||||
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), QueryFlags{ .internal = internal, .distributed_backup_restore = is_restore_from_backup });
|
||||
}
|
||||
|
||||
if (!create.cluster.empty())
|
||||
|
@ -66,9 +66,14 @@ public:
|
||||
need_ddl_guard = false;
|
||||
}
|
||||
|
||||
void setIsRestoreFromBackup(bool is_restore_from_backup_)
|
||||
{
|
||||
is_restore_from_backup = is_restore_from_backup_;
|
||||
}
|
||||
|
||||
/// Obtain information about columns, their types, default values and column comments,
|
||||
/// for case when columns in CREATE query is specified explicitly.
|
||||
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, bool attach);
|
||||
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, bool attach, bool is_restore_from_backup);
|
||||
static ConstraintsDescription getConstraintsDescription(const ASTExpressionList * constraints);
|
||||
|
||||
static void prepareOnClusterQuery(ASTCreateQuery & create, ContextPtr context, const String & cluster_name);
|
||||
@ -116,6 +121,7 @@ private:
|
||||
bool force_attach = false;
|
||||
bool load_database_without_tables = false;
|
||||
bool need_ddl_guard = true;
|
||||
bool is_restore_from_backup = false;
|
||||
|
||||
mutable String as_database_saved;
|
||||
mutable String as_table_saved;
|
||||
|
@ -420,7 +420,7 @@ Block InterpreterKillQueryQuery::getSelectResult(const String & columns, const S
|
||||
if (where_expression)
|
||||
select_query += " WHERE " + queryToString(where_expression);
|
||||
|
||||
auto io = executeQuery(select_query, getContext(), true).second;
|
||||
auto io = executeQuery(select_query, getContext(), QueryFlags{ .internal = true }).second;
|
||||
PullingPipelineExecutor executor(io.pipeline);
|
||||
Block res;
|
||||
while (!res && executor.pull(res));
|
||||
|
@ -161,7 +161,7 @@ WHERE
|
||||
|
||||
BlockIO InterpreterShowColumnsQuery::execute()
|
||||
{
|
||||
return executeQuery(getRewrittenQuery(), getContext(), true).second;
|
||||
return executeQuery(getRewrittenQuery(), getContext(), QueryFlags{ .internal = true }).second;
|
||||
}
|
||||
|
||||
|
||||
|
@ -12,7 +12,7 @@ namespace DB
|
||||
|
||||
BlockIO InterpreterShowEnginesQuery::execute()
|
||||
{
|
||||
return executeQuery("SELECT * FROM system.table_engines ORDER BY name", getContext(), true).second;
|
||||
return executeQuery("SELECT * FROM system.table_engines ORDER BY name", getContext(), QueryFlags{ .internal = true }).second;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ InterpreterShowFunctionsQuery::InterpreterShowFunctionsQuery(const ASTPtr & quer
|
||||
|
||||
BlockIO InterpreterShowFunctionsQuery::execute()
|
||||
{
|
||||
return executeQuery(getRewrittenQuery(), getContext(), true).second;
|
||||
return executeQuery(getRewrittenQuery(), getContext(), QueryFlags{ .internal = true }).second;
|
||||
}
|
||||
|
||||
String InterpreterShowFunctionsQuery::getRewrittenQuery()
|
||||
|
@ -101,7 +101,7 @@ ORDER BY index_type, expression, column_name, seq_in_index;)", database, table,
|
||||
|
||||
BlockIO InterpreterShowIndexesQuery::execute()
|
||||
{
|
||||
return executeQuery(getRewrittenQuery(), getContext(), true).second;
|
||||
return executeQuery(getRewrittenQuery(), getContext(), QueryFlags{ .internal = true }).second;
|
||||
}
|
||||
|
||||
|
||||
|
@ -12,7 +12,7 @@ namespace DB
|
||||
|
||||
BlockIO InterpreterShowProcesslistQuery::execute()
|
||||
{
|
||||
return executeQuery("SELECT * FROM system.processes ORDER BY elapsed DESC", getContext(), true).second;
|
||||
return executeQuery("SELECT * FROM system.processes ORDER BY elapsed DESC", getContext(), QueryFlags{ .internal = true }).second;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -26,9 +26,8 @@ String InterpreterShowSettingQuery::getRewrittenQuery()
|
||||
|
||||
BlockIO InterpreterShowSettingQuery::execute()
|
||||
{
|
||||
return executeQuery(getRewrittenQuery(), getContext(), true).second;
|
||||
return executeQuery(getRewrittenQuery(), getContext(), QueryFlags{ .internal = true }).second;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
@ -214,7 +214,7 @@ BlockIO InterpreterShowTablesQuery::execute()
|
||||
return res;
|
||||
}
|
||||
|
||||
return executeQuery(getRewrittenQuery(), getContext(), true).second;
|
||||
return executeQuery(getRewrittenQuery(), getContext(), QueryFlags{ .internal = true }).second;
|
||||
}
|
||||
|
||||
/// (*) Sorting is strictly speaking not necessary but 1. it is convenient for users, 2. SQL currently does not allow to
|
||||
|
@ -750,7 +750,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica,
|
||||
auto & create = create_ast->as<ASTCreateQuery &>();
|
||||
create.attach = true;
|
||||
|
||||
auto columns = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, system_context, true);
|
||||
auto columns = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, system_context, true, false);
|
||||
auto constraints = InterpreterCreateQuery::getConstraintsDescription(create.columns_list->constraints);
|
||||
auto data_path = database->getTableDataPath(create);
|
||||
|
||||
|
@ -75,7 +75,7 @@ public:
|
||||
ASTs rewritten_queries = InterpreterImpl::getRewrittenQueries(query, getContext(), mapped_to_database, mysql_database);
|
||||
|
||||
for (const auto & rewritten_query : rewritten_queries)
|
||||
executeQuery("/* Rewritten MySQL DDL Query */ " + queryToString(rewritten_query), getContext(), true);
|
||||
executeQuery("/* Rewritten MySQL DDL Query */ " + queryToString(rewritten_query), getContext(), QueryFlags{ .internal = true });
|
||||
|
||||
return BlockIO{};
|
||||
}
|
||||
|
@ -45,6 +45,7 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/InterpreterFactory.h>
|
||||
#include <Interpreters/InterpreterInsertQuery.h>
|
||||
#include <Interpreters/InterpreterCreateQuery.h>
|
||||
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
|
||||
#include <Interpreters/InterpreterSetQuery.h>
|
||||
#include <Interpreters/InterpreterTransactionControlQuery.h>
|
||||
@ -646,10 +647,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
const char * begin,
|
||||
const char * end,
|
||||
ContextMutablePtr context,
|
||||
bool internal,
|
||||
QueryFlags flags,
|
||||
QueryProcessingStage::Enum stage,
|
||||
ReadBuffer * istr)
|
||||
{
|
||||
const bool internal = flags.internal;
|
||||
|
||||
/// query_span is a special span, when this function exits, it's lifetime is not ended, but ends when the query finishes.
|
||||
/// Some internal queries might call this function recursively by setting 'internal' parameter to 'true',
|
||||
/// to make sure SpanHolders in current stack ends in correct order, we disable this span for these internal queries
|
||||
@ -1085,6 +1088,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
insert_interpreter->addBuffer(std::move(insert_data_buffer_holder));
|
||||
}
|
||||
|
||||
if (auto * create_interpreter = typeid_cast<InterpreterCreateQuery *>(&*interpreter))
|
||||
create_interpreter->setIsRestoreFromBackup(flags.distributed_backup_restore);
|
||||
|
||||
{
|
||||
std::unique_ptr<OpenTelemetry::SpanHolder> span;
|
||||
if (OpenTelemetry::CurrentContext().isTraceEnabled())
|
||||
@ -1257,13 +1263,13 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
std::pair<ASTPtr, BlockIO> executeQuery(
|
||||
const String & query,
|
||||
ContextMutablePtr context,
|
||||
bool internal,
|
||||
QueryFlags flags,
|
||||
QueryProcessingStage::Enum stage)
|
||||
{
|
||||
ASTPtr ast;
|
||||
BlockIO res;
|
||||
|
||||
std::tie(ast, res) = executeQueryImpl(query.data(), query.data() + query.size(), context, internal, stage, nullptr);
|
||||
std::tie(ast, res) = executeQueryImpl(query.data(), query.data() + query.size(), context, flags, stage, nullptr);
|
||||
|
||||
if (const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
|
||||
{
|
||||
@ -1284,6 +1290,7 @@ void executeQuery(
|
||||
bool allow_into_outfile,
|
||||
ContextMutablePtr context,
|
||||
SetResultDetailsFunc set_result_details,
|
||||
QueryFlags flags,
|
||||
const std::optional<FormatSettings> & output_format_settings,
|
||||
HandleExceptionInOutputFormatFunc handle_exception_in_output_format)
|
||||
{
|
||||
@ -1372,7 +1379,7 @@ void executeQuery(
|
||||
|
||||
try
|
||||
{
|
||||
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, &istr);
|
||||
std::tie(ast, streams) = executeQueryImpl(begin, end, context, flags, QueryProcessingStage::Complete, &istr);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -29,6 +29,13 @@ struct QueryResultDetails
|
||||
using SetResultDetailsFunc = std::function<void(const QueryResultDetails &)>;
|
||||
using HandleExceptionInOutputFormatFunc = std::function<void(IOutputFormat & output_format)>;
|
||||
|
||||
struct QueryFlags
|
||||
{
|
||||
bool internal = false; /// If true, this query is caused by another query and thus needn't be registered in the ProcessList.
|
||||
bool distributed_backup_restore = false; /// If true, this query is a part of backup restore.
|
||||
};
|
||||
|
||||
|
||||
/// Parse and execute a query.
|
||||
void executeQuery(
|
||||
ReadBuffer & istr, /// Where to read query from (and data for INSERT, if present).
|
||||
@ -36,6 +43,7 @@ void executeQuery(
|
||||
bool allow_into_outfile, /// If true and the query contains INTO OUTFILE section, redirect output to that file.
|
||||
ContextMutablePtr context, /// DB, tables, data types, storage engines, functions, aggregate functions...
|
||||
SetResultDetailsFunc set_result_details, /// If a non-empty callback is passed, it will be called with the query id, the content-type, the format, and the timezone.
|
||||
QueryFlags flags = {},
|
||||
const std::optional<FormatSettings> & output_format_settings = std::nullopt, /// Format settings for output format, will be calculated from the context if not set.
|
||||
HandleExceptionInOutputFormatFunc handle_exception_in_output_format = {} /// If a non-empty callback is passed, it will be called on exception with created output format.
|
||||
);
|
||||
@ -58,7 +66,7 @@ void executeQuery(
|
||||
std::pair<ASTPtr, BlockIO> executeQuery(
|
||||
const String & query, /// Query text without INSERT data. The latter must be written to BlockIO::out.
|
||||
ContextMutablePtr context, /// DB, tables, data types, storage engines, functions, aggregate functions...
|
||||
bool internal = false, /// If true, this query is caused by another query and thus needn't be registered in the ProcessList.
|
||||
QueryFlags flags = {},
|
||||
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete /// To which stage the query must be executed.
|
||||
);
|
||||
|
||||
|
@ -42,7 +42,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
|
||||
static bool initialized = initialize();
|
||||
(void) initialized;
|
||||
|
||||
auto io = DB::executeQuery(input, context, true, QueryProcessingStage::Complete).second;
|
||||
auto io = DB::executeQuery(input, context, QueryFlags{ .internal = true }, QueryProcessingStage::Complete).second;
|
||||
|
||||
PullingPipelineExecutor executor(io.pipeline);
|
||||
Block res;
|
||||
|
@ -282,7 +282,7 @@ static void convertOrdinaryDatabaseToAtomic(Poco::Logger * log, ContextMutablePt
|
||||
LOG_INFO(log, "Will convert database {} from Ordinary to Atomic", name_quoted);
|
||||
|
||||
String create_database_query = fmt::format("CREATE DATABASE IF NOT EXISTS {}", tmp_name_quoted);
|
||||
auto res = executeQuery(create_database_query, context, true).second;
|
||||
auto res = executeQuery(create_database_query, context, QueryFlags{ .internal = true }).second;
|
||||
executeTrivialBlockIO(res, context);
|
||||
res = {};
|
||||
auto tmp_database = DatabaseCatalog::instance().getDatabase(tmp_name);
|
||||
@ -322,7 +322,7 @@ static void convertOrdinaryDatabaseToAtomic(Poco::Logger * log, ContextMutablePt
|
||||
String tmp_qualified_quoted_name = id.getFullTableName();
|
||||
|
||||
String move_table_query = fmt::format("RENAME TABLE {} TO {}", qualified_quoted_name, tmp_qualified_quoted_name);
|
||||
res = executeQuery(move_table_query, context, true).second;
|
||||
res = executeQuery(move_table_query, context, QueryFlags{ .internal = true }).second;
|
||||
executeTrivialBlockIO(res, context);
|
||||
res = {};
|
||||
}
|
||||
@ -334,12 +334,12 @@ static void convertOrdinaryDatabaseToAtomic(Poco::Logger * log, ContextMutablePt
|
||||
|
||||
String drop_query = fmt::format("DROP DATABASE {}", name_quoted);
|
||||
context->setSetting("force_remove_data_recursively_on_drop", false);
|
||||
res = executeQuery(drop_query, context, true).second;
|
||||
res = executeQuery(drop_query, context, QueryFlags{ .internal = true }).second;
|
||||
executeTrivialBlockIO(res, context);
|
||||
res = {};
|
||||
|
||||
String rename_query = fmt::format("RENAME DATABASE {} TO {}", tmp_name_quoted, name_quoted);
|
||||
res = executeQuery(rename_query, context, true).second;
|
||||
res = executeQuery(rename_query, context, QueryFlags{ .internal = true }).second;
|
||||
executeTrivialBlockIO(res, context);
|
||||
|
||||
LOG_INFO(log, "Finished database engine conversion of {}", name_quoted);
|
||||
@ -409,7 +409,7 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons
|
||||
|
||||
/// Reload database just in case (and update logger name)
|
||||
String detach_query = fmt::format("DETACH DATABASE {}", backQuoteIfNeed(database_name));
|
||||
auto res = executeQuery(detach_query, context, true).second;
|
||||
auto res = executeQuery(detach_query, context, QueryFlags{ .internal = true }).second;
|
||||
executeTrivialBlockIO(res, context);
|
||||
res = {};
|
||||
|
||||
|
@ -73,7 +73,7 @@ ColumnsDescription parseColumnsListFromString(const std::string & structure, con
|
||||
if (!columns_list)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not cast AST to ASTExpressionList");
|
||||
|
||||
auto columns = InterpreterCreateQuery::getColumnsDescription(*columns_list, context, false);
|
||||
auto columns = InterpreterCreateQuery::getColumnsDescription(*columns_list, context, false, false);
|
||||
auto validation_settings = DataTypeValidationSettings(context->getSettingsRef());
|
||||
for (const auto & [name, type] : columns.getAll())
|
||||
validateDataType(type, validation_settings);
|
||||
@ -100,7 +100,7 @@ bool tryParseColumnsListFromString(const std::string & structure, ColumnsDescrip
|
||||
|
||||
try
|
||||
{
|
||||
columns = InterpreterCreateQuery::getColumnsDescription(*columns_list, context, false);
|
||||
columns = InterpreterCreateQuery::getColumnsDescription(*columns_list, context, false, false);
|
||||
auto validation_settings = DataTypeValidationSettings(context->getSettingsRef());
|
||||
for (const auto & [name, type] : columns.getAll())
|
||||
validateDataType(type, validation_settings);
|
||||
|
@ -886,6 +886,7 @@ void HTTPHandler::processQuery(
|
||||
/* allow_into_outfile = */ false,
|
||||
context,
|
||||
set_query_result,
|
||||
QueryFlags{},
|
||||
{},
|
||||
handle_exception_in_output_format);
|
||||
|
||||
|
@ -378,7 +378,7 @@ void MySQLHandler::comQuery(ReadBuffer & payload, bool binary_protocol)
|
||||
}
|
||||
};
|
||||
|
||||
executeQuery(should_replace ? replacement : payload, *out, false, query_context, set_result_details, format_settings);
|
||||
executeQuery(should_replace ? replacement : payload, *out, false, query_context, set_result_details, QueryFlags{}, format_settings);
|
||||
|
||||
if (!with_output)
|
||||
packet_endpoint->sendPacket(OKPacket(0x00, client_capabilities, affected_rows, 0, 0), true);
|
||||
|
@ -496,7 +496,7 @@ void TCPHandler::runImpl()
|
||||
});
|
||||
|
||||
/// Processing Query
|
||||
std::tie(state.parsed_query, state.io) = executeQuery(state.query, query_context, false, state.stage);
|
||||
std::tie(state.parsed_query, state.io) = executeQuery(state.query, query_context, QueryFlags{}, state.stage);
|
||||
|
||||
after_check_cancelled.restart();
|
||||
after_send_progress.restart();
|
||||
|
@ -0,0 +1,4 @@
|
||||
BACKUP_CREATED
|
||||
CREATE TABLE default.test\n(\n `test` String\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
|
||||
RESTORED
|
||||
CREATE TABLE default.test\n(\n `test` String\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
|
22
tests/queries/0_stateless/02907_backup_restore_default_nullable.sh
Executable file
22
tests/queries/0_stateless/02907_backup_restore_default_nullable.sh
Executable file
@ -0,0 +1,22 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
${CLICKHOUSE_CLIENT} -nm --query "
|
||||
drop table if exists test;
|
||||
set data_type_default_nullable = 0;
|
||||
create table test (test String) ENGINE = MergeTree() ORDER BY tuple();
|
||||
backup table ${CLICKHOUSE_DATABASE}.test on cluster test_shard_localhost to Disk('backups', '${CLICKHOUSE_TEST_UNIQUE_NAME}');
|
||||
" | grep -o "BACKUP_CREATED"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "show create table test"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -nm --query "
|
||||
drop table test sync;
|
||||
set data_type_default_nullable = 1;
|
||||
restore table ${CLICKHOUSE_DATABASE}.test on cluster test_shard_localhost from Disk('backups', '${CLICKHOUSE_TEST_UNIQUE_NAME}');
|
||||
" | grep -o "RESTORED"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "show create table test"
|
@ -0,0 +1,8 @@
|
||||
BACKUP_CREATED
|
||||
CREATE TABLE default.test\n(\n `test` Array(Tuple(foo String, bar Float64))\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
|
||||
BACKUP_CREATED
|
||||
CREATE TABLE default.test2\n(\n `test` Nested(foo String, bar Float64)\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
|
||||
RESTORED
|
||||
CREATE TABLE default.test\n(\n `test` Array(Tuple(foo String, bar Float64))\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
|
||||
RESTORED
|
||||
CREATE TABLE default.test2\n(\n `test` Nested(foo String, bar Float64)\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
|
39
tests/queries/0_stateless/02907_backup_restore_flatten_nested.sh
Executable file
39
tests/queries/0_stateless/02907_backup_restore_flatten_nested.sh
Executable file
@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
${CLICKHOUSE_CLIENT} -nm --query "
|
||||
drop table if exists test;
|
||||
set flatten_nested = 0;
|
||||
create table test (test Array(Tuple(foo String, bar Float64))) ENGINE = MergeTree() ORDER BY tuple();
|
||||
backup table ${CLICKHOUSE_DATABASE}.test on cluster test_shard_localhost to Disk('backups', '${CLICKHOUSE_TEST_UNIQUE_NAME}');
|
||||
" | grep -o "BACKUP_CREATED"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "show create table test"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -nm --query "
|
||||
drop table if exists test2;
|
||||
set flatten_nested = 0;
|
||||
create table test2 (test Nested(foo String, bar Float64)) ENGINE = MergeTree() ORDER BY tuple();
|
||||
backup table ${CLICKHOUSE_DATABASE}.test2 on cluster test_shard_localhost to Disk('backups', '${CLICKHOUSE_TEST_UNIQUE_NAME}2');
|
||||
" | grep -o "BACKUP_CREATED"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "show create table test2"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -nm --query "
|
||||
drop table test sync;
|
||||
set flatten_nested = 1;
|
||||
restore table ${CLICKHOUSE_DATABASE}.test on cluster test_shard_localhost from Disk('backups', '${CLICKHOUSE_TEST_UNIQUE_NAME}');
|
||||
" | grep -o "RESTORED"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "show create table test"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -nm --query "
|
||||
drop table test2 sync;
|
||||
set flatten_nested = 1;
|
||||
restore table ${CLICKHOUSE_DATABASE}.test2 on cluster test_shard_localhost from Disk('backups', '${CLICKHOUSE_TEST_UNIQUE_NAME}2');
|
||||
" | grep -o "RESTORED"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "show create table test2"
|
Loading…
Reference in New Issue
Block a user