mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
skip sanity checks on secondary create query
This commit is contained in:
parent
f775e1a7c6
commit
ec0986af0b
@ -253,7 +253,7 @@ StoragePtr DatabaseLazy::loadTable(const String & table_name) const
|
||||
{
|
||||
const auto & ast_create = ast->as<const ASTCreateQuery &>();
|
||||
String table_data_path_relative = getTableDataPath(ast_create);
|
||||
table = createTableFromAST(ast_create, getDatabaseName(), table_data_path_relative, context_copy, false).second;
|
||||
table = createTableFromAST(ast_create, getDatabaseName(), table_data_path_relative, context_copy, LoadingStrictnessLevel::ATTACH).second;
|
||||
}
|
||||
|
||||
if (!ast || !endsWith(table->getName(), "Log"))
|
||||
|
@ -59,7 +59,7 @@ std::pair<String, StoragePtr> createTableFromAST(
|
||||
const String & database_name,
|
||||
const String & table_data_path_relative,
|
||||
ContextMutablePtr context,
|
||||
bool force_restore)
|
||||
LoadingStrictnessLevel mode)
|
||||
{
|
||||
ast_create_query.attach = true;
|
||||
ast_create_query.setDatabase(database_name);
|
||||
@ -115,7 +115,7 @@ std::pair<String, StoragePtr> createTableFromAST(
|
||||
context->getGlobalContext(),
|
||||
columns,
|
||||
constraints,
|
||||
force_restore)
|
||||
mode)
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,7 @@ std::pair<String, StoragePtr> createTableFromAST(
|
||||
const String & database_name,
|
||||
const String & table_data_path_relative,
|
||||
ContextMutablePtr context,
|
||||
bool force_restore);
|
||||
LoadingStrictnessLevel mode);
|
||||
|
||||
/** Get the string with the table definition based on the CREATE query.
|
||||
* It is an ATTACH query that you can execute to create a table from the correspondent database.
|
||||
|
@ -150,7 +150,7 @@ void DatabaseOrdinary::loadTableFromMetadata(
|
||||
name.database,
|
||||
getTableDataPath(query),
|
||||
local_context,
|
||||
LoadingStrictnessLevel::FORCE_RESTORE <= mode);
|
||||
mode);
|
||||
|
||||
attachTable(local_context, table_name, table, getTableDataPath(query));
|
||||
}
|
||||
|
@ -4,7 +4,7 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
LoadingStrictnessLevel getLoadingStrictnessLevel(bool attach, bool force_attach, bool force_restore)
|
||||
LoadingStrictnessLevel getLoadingStrictnessLevel(bool attach, bool force_attach, bool force_restore, bool secondary)
|
||||
{
|
||||
if (force_restore)
|
||||
{
|
||||
@ -22,6 +22,9 @@ LoadingStrictnessLevel getLoadingStrictnessLevel(bool attach, bool force_attach,
|
||||
if (attach)
|
||||
return LoadingStrictnessLevel::ATTACH;
|
||||
|
||||
if (secondary)
|
||||
return LoadingStrictnessLevel::SECONDARY_CREATE;
|
||||
|
||||
return LoadingStrictnessLevel::CREATE;
|
||||
}
|
||||
|
||||
|
@ -8,14 +8,16 @@ enum class LoadingStrictnessLevel
|
||||
{
|
||||
/// Do all possible sanity checks
|
||||
CREATE = 0,
|
||||
/// Skip some sanity checks (for internal queries in DatabaseReplicated; for RESTORE)
|
||||
SECONDARY_CREATE = 1,
|
||||
/// Expect existing paths on FS and in ZK for ATTACH query
|
||||
ATTACH = 1,
|
||||
ATTACH = 2,
|
||||
/// We ignore some error on server startup
|
||||
FORCE_ATTACH = 2,
|
||||
FORCE_ATTACH = 3,
|
||||
/// Skip all sanity checks (if force_restore_data flag exists)
|
||||
FORCE_RESTORE = 3,
|
||||
FORCE_RESTORE = 4,
|
||||
};
|
||||
|
||||
LoadingStrictnessLevel getLoadingStrictnessLevel(bool attach, bool force_attach, bool force_restore);
|
||||
LoadingStrictnessLevel getLoadingStrictnessLevel(bool attach, bool force_attach, bool force_restore, bool secondary);
|
||||
|
||||
}
|
||||
|
@ -1094,7 +1094,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
|
||||
create->setTable(table_id.table_name);
|
||||
try
|
||||
{
|
||||
table = createTableFromAST(*create, table_id.getDatabaseName(), data_path, getContext(), /* force_restore */ true).second;
|
||||
table = createTableFromAST(*create, table_id.getDatabaseName(), data_path, getContext(), LoadingStrictnessLevel::FORCE_RESTORE).second;
|
||||
table->is_dropped = true;
|
||||
}
|
||||
catch (...)
|
||||
|
@ -276,7 +276,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
|
||||
|
||||
bool need_write_metadata = !create.attach || !fs::exists(metadata_file_path);
|
||||
bool need_lock_uuid = internal || need_write_metadata;
|
||||
auto mode = getLoadingStrictnessLevel(create.attach, force_attach, has_force_restore_data_flag);
|
||||
auto mode = getLoadingStrictnessLevel(create.attach, force_attach, has_force_restore_data_flag, /*secondary*/ false);
|
||||
|
||||
/// Lock uuid, so we will known it's already in use.
|
||||
/// We do it when attaching databases on server startup (internal) and on CREATE query (!create.attach);
|
||||
@ -1380,6 +1380,9 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
|
||||
const InterpreterCreateQuery::TableProperties & properties,
|
||||
DDLGuardPtr & ddl_guard)
|
||||
{
|
||||
bool is_secondary_query = getContext()->getZooKeeperMetadataTransaction() && !getContext()->getZooKeeperMetadataTransaction()->isInitialQuery();
|
||||
auto mode = getLoadingStrictnessLevel(create.attach, /*force_attach*/ false, /*has_force_restore_data_flag*/ false, is_secondary_query);
|
||||
|
||||
if (create.temporary)
|
||||
{
|
||||
if (create.if_not_exists && getContext()->tryResolveStorageID({"", create.getTable()}, Context::ResolveExternal))
|
||||
@ -1396,7 +1399,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
|
||||
getContext()->getGlobalContext(),
|
||||
properties.columns,
|
||||
properties.constraints,
|
||||
false);
|
||||
mode);
|
||||
};
|
||||
auto temporary_table = TemporaryTableHolder(getContext(), creator, query_ptr);
|
||||
|
||||
@ -1544,7 +1547,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
|
||||
getContext()->getGlobalContext(),
|
||||
properties.columns,
|
||||
properties.constraints,
|
||||
false);
|
||||
mode);
|
||||
|
||||
/// If schema wes inferred while storage creation, add columns description to create query.
|
||||
addColumnsDescriptionToCreateQueryIfNecessary(query_ptr->as<ASTCreateQuery &>(), res);
|
||||
|
@ -839,7 +839,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica,
|
||||
system_context->getGlobalContext(),
|
||||
columns,
|
||||
constraints,
|
||||
false);
|
||||
LoadingStrictnessLevel::ATTACH);
|
||||
|
||||
database->attachTable(system_context, replica.table_name, table, data_path);
|
||||
|
||||
|
@ -234,7 +234,7 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data
|
||||
loaded_databases.insert({name, DatabaseCatalog::instance().getDatabase(name)});
|
||||
}
|
||||
|
||||
auto mode = getLoadingStrictnessLevel(/* attach */ true, /* force_attach */ true, has_force_restore_data_flag);
|
||||
auto mode = getLoadingStrictnessLevel(/* attach */ true, /* force_attach */ true, has_force_restore_data_flag, /*secondary*/ false);
|
||||
TablesLoader loader{context, std::move(loaded_databases), mode};
|
||||
auto load_tasks = loader.loadTablesAsync();
|
||||
auto startup_tasks = loader.startupTablesAsync();
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Databases/LoadingStrictnessLevel.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <filesystem>
|
||||
@ -22,15 +23,15 @@ public:
|
||||
using Configuration = typename Storage::Configuration;
|
||||
|
||||
template <class ...Args>
|
||||
explicit IStorageDataLake(const Configuration & configuration_, ContextPtr context_, bool attach, Args && ...args)
|
||||
: Storage(getConfigurationForDataRead(configuration_, context_, {}, attach), context_, std::forward<Args>(args)...)
|
||||
explicit IStorageDataLake(const Configuration & configuration_, ContextPtr context_, LoadingStrictnessLevel mode, Args && ...args)
|
||||
: Storage(getConfigurationForDataRead(configuration_, context_, {}, mode), context_, std::forward<Args>(args)...)
|
||||
, base_configuration(configuration_)
|
||||
, log(getLogger(getName())) {} // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
|
||||
|
||||
template <class ...Args>
|
||||
static StoragePtr create(const Configuration & configuration_, ContextPtr context_, bool attach, Args && ...args)
|
||||
static StoragePtr create(const Configuration & configuration_, ContextPtr context_, LoadingStrictnessLevel mode, Args && ...args)
|
||||
{
|
||||
return std::make_shared<IStorageDataLake<Storage, Name, MetadataParser>>(configuration_, context_, attach, std::forward<Args>(args)...);
|
||||
return std::make_shared<IStorageDataLake<Storage, Name, MetadataParser>>(configuration_, context_, mode, std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
String getName() const override { return name; }
|
||||
@ -64,7 +65,8 @@ public:
|
||||
|
||||
private:
|
||||
static Configuration getConfigurationForDataRead(
|
||||
const Configuration & base_configuration, const ContextPtr & local_context, const Strings & keys = {}, bool attach = false)
|
||||
const Configuration & base_configuration, const ContextPtr & local_context, const Strings & keys = {},
|
||||
LoadingStrictnessLevel mode = LoadingStrictnessLevel::CREATE)
|
||||
{
|
||||
auto configuration{base_configuration};
|
||||
configuration.update(local_context);
|
||||
@ -87,7 +89,7 @@ private:
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
if (!attach)
|
||||
if (mode <= LoadingStrictnessLevel::CREATE)
|
||||
throw;
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
return configuration;
|
||||
@ -125,7 +127,7 @@ static StoragePtr createDataLakeStorage(const StorageFactory::Arguments & args)
|
||||
if (configuration.format == "auto")
|
||||
configuration.format = "Parquet";
|
||||
|
||||
return DataLake::create(configuration, args.getContext(), args.attach, args.table_id, args.columns, args.constraints,
|
||||
return DataLake::create(configuration, args.getContext(), args.mode, args.table_id, args.columns, args.constraints,
|
||||
args.comment, getFormatSettings(args.getContext()));
|
||||
}
|
||||
|
||||
|
@ -8,7 +8,7 @@ namespace DB
|
||||
StoragePtr StorageIceberg::create(
|
||||
const DB::StorageIceberg::Configuration & base_configuration,
|
||||
DB::ContextPtr context_,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
const DB::StorageID & table_id_,
|
||||
const DB::ColumnsDescription & columns_,
|
||||
const DB::ConstraintsDescription & constraints_,
|
||||
@ -27,7 +27,7 @@ StoragePtr StorageIceberg::create(
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
if (!attach)
|
||||
if (mode <= LoadingStrictnessLevel::CREATE)
|
||||
throw;
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ public:
|
||||
|
||||
static StoragePtr create(const Configuration & base_configuration,
|
||||
ContextPtr context_,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
|
@ -132,7 +132,7 @@ StorageFileLog::StorageFileLog(
|
||||
const String & format_name_,
|
||||
std::unique_ptr<FileLogSettings> settings,
|
||||
const String & comment,
|
||||
bool attach)
|
||||
LoadingStrictnessLevel mode)
|
||||
: IStorage(table_id_)
|
||||
, WithContext(context_->getGlobalContext())
|
||||
, filelog_settings(std::move(settings))
|
||||
@ -150,7 +150,7 @@ StorageFileLog::StorageFileLog(
|
||||
|
||||
if (!fileOrSymlinkPathStartsWith(path, getContext()->getUserFilesPath()))
|
||||
{
|
||||
if (attach)
|
||||
if (LoadingStrictnessLevel::ATTACH <= mode)
|
||||
{
|
||||
LOG_ERROR(log, "The absolute data path should be inside `user_files_path`({})", getContext()->getUserFilesPath());
|
||||
return;
|
||||
@ -165,7 +165,7 @@ StorageFileLog::StorageFileLog(
|
||||
bool created_metadata_directory = false;
|
||||
try
|
||||
{
|
||||
if (!attach)
|
||||
if (mode < LoadingStrictnessLevel::ATTACH)
|
||||
{
|
||||
if (disk->exists(metadata_base_path))
|
||||
{
|
||||
@ -178,7 +178,7 @@ StorageFileLog::StorageFileLog(
|
||||
created_metadata_directory = true;
|
||||
}
|
||||
|
||||
loadMetaFiles(attach);
|
||||
loadMetaFiles(LoadingStrictnessLevel::ATTACH <= mode);
|
||||
loadFiles();
|
||||
|
||||
assert(file_infos.file_names.size() == file_infos.meta_by_inode.size());
|
||||
@ -192,7 +192,7 @@ StorageFileLog::StorageFileLog(
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
if (!attach)
|
||||
if (mode <= LoadingStrictnessLevel::ATTACH)
|
||||
{
|
||||
if (created_metadata_directory)
|
||||
disk->removeRecursive(metadata_base_path);
|
||||
@ -845,7 +845,7 @@ void registerStorageFileLog(StorageFactory & factory)
|
||||
format,
|
||||
std::move(filelog_settings),
|
||||
args.comment,
|
||||
args.attach);
|
||||
args.mode);
|
||||
};
|
||||
|
||||
factory.registerStorage(
|
||||
|
@ -38,7 +38,7 @@ public:
|
||||
const String & format_name_,
|
||||
std::unique_ptr<FileLogSettings> settings,
|
||||
const String & comment,
|
||||
bool attach);
|
||||
LoadingStrictnessLevel mode);
|
||||
|
||||
using Files = std::vector<String>;
|
||||
|
||||
|
@ -783,7 +783,7 @@ void registerStorageLiveView(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage("LiveView", [](const StorageFactory::Arguments & args)
|
||||
{
|
||||
if (!args.attach && !args.getLocalContext()->getSettingsRef().allow_experimental_live_view)
|
||||
if (args.mode <= LoadingStrictnessLevel::CREATE && !args.getLocalContext()->getSettingsRef().allow_experimental_live_view)
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
|
||||
"Experimental LIVE VIEW feature is not enabled (the setting 'allow_experimental_live_view')");
|
||||
|
||||
|
@ -345,7 +345,7 @@ MergeTreeData::MergeTreeData(
|
||||
const MergingParams & merging_params_,
|
||||
std::unique_ptr<MergeTreeSettings> storage_settings_,
|
||||
bool require_part_metadata_,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
BrokenPartCallback broken_part_callback_)
|
||||
: IStorage(table_id_)
|
||||
, WithMutableContext(context_->getGlobalContext())
|
||||
@ -366,10 +366,12 @@ MergeTreeData::MergeTreeData(
|
||||
|
||||
const auto settings = getSettings();
|
||||
|
||||
allow_nullable_key = attach || settings->allow_nullable_key;
|
||||
bool sanity_checks = mode <= LoadingStrictnessLevel::CREATE;
|
||||
|
||||
allow_nullable_key = !sanity_checks || settings->allow_nullable_key;
|
||||
|
||||
/// Check sanity of MergeTreeSettings. Only when table is created.
|
||||
if (!attach)
|
||||
if (sanity_checks)
|
||||
settings->sanityCheck(getContext()->getMergeMutateExecutor()->getMaxTasksCount());
|
||||
|
||||
if (!date_column_name.empty())
|
||||
@ -377,7 +379,7 @@ MergeTreeData::MergeTreeData(
|
||||
try
|
||||
{
|
||||
checkPartitionKeyAndInitMinMax(metadata_.partition_key);
|
||||
setProperties(metadata_, metadata_, attach);
|
||||
setProperties(metadata_, metadata_, !sanity_checks);
|
||||
if (minmax_idx_date_column_pos == -1)
|
||||
throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Could not find Date column");
|
||||
}
|
||||
@ -393,7 +395,7 @@ MergeTreeData::MergeTreeData(
|
||||
is_custom_partitioned = true;
|
||||
checkPartitionKeyAndInitMinMax(metadata_.partition_key);
|
||||
}
|
||||
setProperties(metadata_, metadata_, attach);
|
||||
setProperties(metadata_, metadata_, !sanity_checks);
|
||||
|
||||
/// NOTE: using the same columns list as is read when performing actual merges.
|
||||
merging_params.check(metadata_);
|
||||
@ -401,11 +403,11 @@ MergeTreeData::MergeTreeData(
|
||||
if (metadata_.sampling_key.definition_ast != nullptr)
|
||||
{
|
||||
/// This is for backward compatibility.
|
||||
checkSampleExpression(metadata_, attach || settings->compatibility_allow_sampling_expression_not_in_primary_key,
|
||||
settings->check_sample_column_is_correct && !attach);
|
||||
checkSampleExpression(metadata_, !sanity_checks || settings->compatibility_allow_sampling_expression_not_in_primary_key,
|
||||
settings->check_sample_column_is_correct && sanity_checks);
|
||||
}
|
||||
|
||||
checkColumnFilenamesForCollision(metadata_.getColumns(), *settings, !attach);
|
||||
checkColumnFilenamesForCollision(metadata_.getColumns(), *settings, sanity_checks);
|
||||
checkTTLExpressions(metadata_, metadata_);
|
||||
|
||||
String reason;
|
||||
|
@ -391,7 +391,7 @@ public:
|
||||
const MergingParams & merging_params_,
|
||||
std::unique_ptr<MergeTreeSettings> settings_,
|
||||
bool require_part_metadata_,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
BrokenPartCallback broken_part_callback_ = [](const String &){});
|
||||
|
||||
/// Build a block of minmax and count values of a MergeTree table. These values are extracted
|
||||
|
@ -298,7 +298,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
arg_idx, e.message(), verbose_help_message);
|
||||
}
|
||||
}
|
||||
else if (!args.attach && !args.getLocalContext()->getSettingsRef().allow_deprecated_syntax_for_merge_tree)
|
||||
else if (args.mode <= LoadingStrictnessLevel::CREATE && !args.getLocalContext()->getSettingsRef().allow_deprecated_syntax_for_merge_tree)
|
||||
{
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "This syntax for *MergeTree engine is deprecated. "
|
||||
"Use extended storage definition syntax with ORDER BY/PRIMARY KEY clause. "
|
||||
@ -321,7 +321,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
auto expand_macro = [&] (ASTLiteral * ast_zk_path, ASTLiteral * ast_replica_name)
|
||||
{
|
||||
/// Unfold {database} and {table} macro on table creation, so table can be renamed.
|
||||
if (!args.attach)
|
||||
if (args.mode < LoadingStrictnessLevel::ATTACH)
|
||||
{
|
||||
Macros::MacroExpansionInfo info;
|
||||
/// NOTE: it's not recursive
|
||||
@ -582,7 +582,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
if (args.storage_def->sample_by)
|
||||
metadata.sampling_key = KeyDescription::getKeyFromAST(args.storage_def->sample_by->ptr(), metadata.columns, context);
|
||||
|
||||
bool allow_suspicious_ttl = args.attach || args.getLocalContext()->getSettingsRef().allow_suspicious_ttl_expressions;
|
||||
bool allow_suspicious_ttl = LoadingStrictnessLevel::SECONDARY_CREATE <= args.mode || args.getLocalContext()->getSettingsRef().allow_suspicious_ttl_expressions;
|
||||
|
||||
if (args.storage_def->ttl_table)
|
||||
{
|
||||
@ -609,12 +609,12 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
metadata.column_ttls_by_name[name] = new_ttl_entry;
|
||||
}
|
||||
|
||||
storage_settings->loadFromQuery(*args.storage_def, context, args.attach);
|
||||
storage_settings->loadFromQuery(*args.storage_def, context, LoadingStrictnessLevel::ATTACH <= args.mode);
|
||||
|
||||
// updates the default storage_settings with settings specified via SETTINGS arg in a query
|
||||
if (args.storage_def->settings)
|
||||
{
|
||||
if (!args.attach)
|
||||
if (args.mode <= LoadingStrictnessLevel::CREATE)
|
||||
args.getLocalContext()->checkMergeTreeSettingsConstraints(initial_storage_settings, storage_settings->changes());
|
||||
metadata.settings_changes = args.storage_def->settings->ptr();
|
||||
}
|
||||
@ -690,7 +690,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
if (ast && ast->value.getType() == Field::Types::UInt64)
|
||||
{
|
||||
storage_settings->index_granularity = ast->value.safeGet<UInt64>();
|
||||
if (!args.attach)
|
||||
if (args.mode <= LoadingStrictnessLevel::CREATE)
|
||||
{
|
||||
SettingsChanges changes;
|
||||
changes.emplace_back("index_granularity", Field(storage_settings->index_granularity));
|
||||
@ -701,12 +701,12 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Index granularity must be a positive integer{}", verbose_help_message);
|
||||
++arg_num;
|
||||
|
||||
if (args.storage_def->ttl_table && !args.attach)
|
||||
if (args.storage_def->ttl_table && args.mode <= LoadingStrictnessLevel::CREATE)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table TTL is not allowed for MergeTree in old syntax");
|
||||
}
|
||||
|
||||
DataTypes data_types = metadata.partition_key.data_types;
|
||||
if (!args.attach && !storage_settings->allow_floating_point_partition_key)
|
||||
if (args.mode <= LoadingStrictnessLevel::CREATE && !storage_settings->allow_floating_point_partition_key)
|
||||
{
|
||||
for (size_t i = 0; i < data_types.size(); ++i)
|
||||
if (isFloat(data_types[i]))
|
||||
@ -726,7 +726,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
return std::make_shared<StorageReplicatedMergeTree>(
|
||||
zookeeper_path,
|
||||
replica_name,
|
||||
args.attach,
|
||||
args.mode,
|
||||
args.table_id,
|
||||
args.relative_data_path,
|
||||
metadata,
|
||||
@ -734,7 +734,6 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
date_column_name,
|
||||
merging_params,
|
||||
std::move(storage_settings),
|
||||
args.has_force_restore_data_flag,
|
||||
renaming_restrictions,
|
||||
need_check_table_structure);
|
||||
}
|
||||
@ -743,12 +742,11 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
args.table_id,
|
||||
args.relative_data_path,
|
||||
metadata,
|
||||
args.attach,
|
||||
args.mode,
|
||||
context,
|
||||
date_column_name,
|
||||
merging_params,
|
||||
std::move(storage_settings),
|
||||
args.has_force_restore_data_flag);
|
||||
std::move(storage_settings));
|
||||
}
|
||||
|
||||
|
||||
|
@ -50,7 +50,7 @@ StorageNATS::StorageNATS(
|
||||
ContextPtr context_,
|
||||
const ColumnsDescription & columns_,
|
||||
std::unique_ptr<NATSSettings> nats_settings_,
|
||||
bool is_attach_)
|
||||
LoadingStrictnessLevel mode)
|
||||
: IStorage(table_id_)
|
||||
, WithContext(context_->getGlobalContext())
|
||||
, nats_settings(std::move(nats_settings_))
|
||||
@ -62,7 +62,7 @@ StorageNATS::StorageNATS(
|
||||
, log(getLogger("StorageNATS (" + table_id_.table_name + ")"))
|
||||
, semaphore(0, static_cast<int>(num_consumers))
|
||||
, queue_size(std::max(QUEUE_SIZE, static_cast<uint32_t>(getMaxBlockSize())))
|
||||
, is_attach(is_attach_)
|
||||
, throw_on_startup_failure(mode <= LoadingStrictnessLevel::CREATE)
|
||||
{
|
||||
auto nats_username = getContext()->getMacros()->expand(nats_settings->nats_username);
|
||||
auto nats_password = getContext()->getMacros()->expand(nats_settings->nats_password);
|
||||
@ -116,7 +116,7 @@ StorageNATS::StorageNATS(
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log);
|
||||
if (!is_attach)
|
||||
if (throw_on_startup_failure)
|
||||
throw;
|
||||
}
|
||||
|
||||
@ -399,7 +399,6 @@ SinkToStoragePtr StorageNATS::write(const ASTPtr &, const StorageMetadataPtr & m
|
||||
|
||||
void StorageNATS::startup()
|
||||
{
|
||||
(void) is_attach;
|
||||
for (size_t i = 0; i < num_consumers; ++i)
|
||||
{
|
||||
try
|
||||
@ -410,7 +409,7 @@ void StorageNATS::startup()
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
if (!is_attach)
|
||||
if (throw_on_startup_failure)
|
||||
throw;
|
||||
tryLogCurrentException(log);
|
||||
}
|
||||
@ -741,7 +740,7 @@ void registerStorageNATS(StorageFactory & factory)
|
||||
if (!nats_settings->nats_subjects.changed)
|
||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "You must specify `nats_subjects` setting");
|
||||
|
||||
return std::make_shared<StorageNATS>(args.table_id, args.getContext(), args.columns, std::move(nats_settings), args.attach);
|
||||
return std::make_shared<StorageNATS>(args.table_id, args.getContext(), args.columns, std::move(nats_settings), args.mode);
|
||||
};
|
||||
|
||||
factory.registerStorage("NATS", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
|
||||
|
@ -24,7 +24,7 @@ public:
|
||||
ContextPtr context_,
|
||||
const ColumnsDescription & columns_,
|
||||
std::unique_ptr<NATSSettings> nats_settings_,
|
||||
bool is_attach_);
|
||||
LoadingStrictnessLevel mode);
|
||||
|
||||
std::string getName() const override { return "NATS"; }
|
||||
|
||||
@ -117,7 +117,7 @@ private:
|
||||
std::mutex loop_mutex;
|
||||
|
||||
mutable bool drop_table = false;
|
||||
bool is_attach;
|
||||
bool throw_on_startup_failure;
|
||||
|
||||
NATSConsumerPtr createConsumer();
|
||||
|
||||
|
@ -51,7 +51,7 @@ namespace ErrorCodes
|
||||
/// For the case of single storage.
|
||||
StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
|
||||
const StorageID & table_id_,
|
||||
bool is_attach_,
|
||||
LoadingStrictnessLevel mode,
|
||||
const String & remote_database_name,
|
||||
const String & remote_table_name_,
|
||||
const postgres::ConnectionInfo & connection_info,
|
||||
@ -66,7 +66,7 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
|
||||
, nested_context(makeNestedTableContext(context_->getGlobalContext()))
|
||||
, nested_table_id(StorageID(table_id_.database_name, getNestedTableName()))
|
||||
, remote_table_name(remote_table_name_)
|
||||
, is_attach(is_attach_)
|
||||
, is_attach(mode >= LoadingStrictnessLevel::ATTACH)
|
||||
{
|
||||
if (table_id_.uuid == UUIDHelpers::Nil)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Storage MaterializedPostgreSQL is allowed only for Atomic database");
|
||||
@ -573,7 +573,8 @@ void registerStorageMaterializedPostgreSQL(StorageFactory & factory)
|
||||
metadata.setColumns(args.columns);
|
||||
metadata.setConstraints(args.constraints);
|
||||
|
||||
if (!args.attach && !args.getLocalContext()->getSettingsRef().allow_experimental_materialized_postgresql_table)
|
||||
if (args.mode <= LoadingStrictnessLevel::CREATE
|
||||
&& !args.getLocalContext()->getSettingsRef().allow_experimental_materialized_postgresql_table)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "MaterializedPostgreSQL is an experimental table engine."
|
||||
" You can enable it with the `allow_experimental_materialized_postgresql_table` setting");
|
||||
|
||||
@ -600,7 +601,7 @@ void registerStorageMaterializedPostgreSQL(StorageFactory & factory)
|
||||
postgresql_replication_settings->loadFromQuery(*args.storage_def);
|
||||
|
||||
return std::make_shared<StorageMaterializedPostgreSQL>(
|
||||
args.table_id, args.attach, configuration.database, configuration.table, connection_info,
|
||||
args.table_id, args.mode, configuration.database, configuration.table, connection_info,
|
||||
metadata, args.getContext(),
|
||||
std::move(postgresql_replication_settings));
|
||||
};
|
||||
|
@ -74,7 +74,7 @@ public:
|
||||
|
||||
StorageMaterializedPostgreSQL(
|
||||
const StorageID & table_id_,
|
||||
bool is_attach_,
|
||||
LoadingStrictnessLevel mode,
|
||||
const String & remote_database_name,
|
||||
const String & remote_table_name,
|
||||
const postgres::ConnectionInfo & connection_info,
|
||||
|
@ -69,7 +69,7 @@ StorageRabbitMQ::StorageRabbitMQ(
|
||||
ContextPtr context_,
|
||||
const ColumnsDescription & columns_,
|
||||
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_,
|
||||
bool is_attach)
|
||||
LoadingStrictnessLevel mode)
|
||||
: IStorage(table_id_)
|
||||
, WithContext(context_->getGlobalContext())
|
||||
, rabbitmq_settings(std::move(rabbitmq_settings_))
|
||||
@ -170,13 +170,13 @@ StorageRabbitMQ::StorageRabbitMQ(
|
||||
connection = std::make_unique<RabbitMQConnection>(configuration, log);
|
||||
if (connection->connect())
|
||||
initRabbitMQ();
|
||||
else if (!is_attach)
|
||||
else if (mode <= LoadingStrictnessLevel::CREATE)
|
||||
throw Exception(ErrorCodes::CANNOT_CONNECT_RABBITMQ, "Cannot connect to {}", connection->connectionInfoForLog());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log);
|
||||
if (!is_attach)
|
||||
if (mode <= LoadingStrictnessLevel::CREATE)
|
||||
throw;
|
||||
}
|
||||
|
||||
@ -1188,7 +1188,7 @@ void registerStorageRabbitMQ(StorageFactory & factory)
|
||||
if (!rabbitmq_settings->rabbitmq_format.changed)
|
||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "You must specify `rabbitmq_format` setting");
|
||||
|
||||
return std::make_shared<StorageRabbitMQ>(args.table_id, args.getContext(), args.columns, std::move(rabbitmq_settings), args.attach);
|
||||
return std::make_shared<StorageRabbitMQ>(args.table_id, args.getContext(), args.columns, std::move(rabbitmq_settings), args.mode);
|
||||
};
|
||||
|
||||
factory.registerStorage("RabbitMQ", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
|
||||
|
@ -27,7 +27,7 @@ public:
|
||||
ContextPtr context_,
|
||||
const ColumnsDescription & columns_,
|
||||
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_,
|
||||
bool is_attach);
|
||||
LoadingStrictnessLevel mode);
|
||||
|
||||
std::string getName() const override { return "RabbitMQ"; }
|
||||
|
||||
|
@ -172,7 +172,7 @@ private:
|
||||
StorageEmbeddedRocksDB::StorageEmbeddedRocksDB(const StorageID & table_id_,
|
||||
const String & relative_data_path_,
|
||||
const StorageInMemoryMetadata & metadata_,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
ContextPtr context_,
|
||||
const String & primary_key_,
|
||||
Int32 ttl_,
|
||||
@ -190,7 +190,7 @@ StorageEmbeddedRocksDB::StorageEmbeddedRocksDB(const StorageID & table_id_,
|
||||
{
|
||||
rocksdb_dir = context_->getPath() + relative_data_path_;
|
||||
}
|
||||
if (!attach)
|
||||
if (mode < LoadingStrictnessLevel::ATTACH)
|
||||
{
|
||||
fs::create_directories(rocksdb_dir);
|
||||
}
|
||||
@ -630,7 +630,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
||||
{
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "StorageEmbeddedRocksDB must require one column in primary key");
|
||||
}
|
||||
return std::make_shared<StorageEmbeddedRocksDB>(args.table_id, args.relative_data_path, metadata, args.attach, args.getContext(), primary_key_names[0], ttl, std::move(rocksdb_dir), read_only);
|
||||
return std::make_shared<StorageEmbeddedRocksDB>(args.table_id, args.relative_data_path, metadata, args.mode, args.getContext(), primary_key_names[0], ttl, std::move(rocksdb_dir), read_only);
|
||||
}
|
||||
|
||||
std::shared_ptr<rocksdb::Statistics> StorageEmbeddedRocksDB::getRocksDBStatistics() const
|
||||
|
@ -32,7 +32,7 @@ public:
|
||||
StorageEmbeddedRocksDB(const StorageID & table_id_,
|
||||
const String & relative_data_path_,
|
||||
const StorageInMemoryMetadata & metadata,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
ContextPtr context_,
|
||||
const String & primary_key_,
|
||||
Int32 ttl_ = 0,
|
||||
|
@ -316,7 +316,7 @@ void registerStorageDictionary(StorageFactory & factory)
|
||||
auto result_storage = std::make_shared<StorageDictionary>(dictionary_id, abstract_dictionary_configuration, local_context);
|
||||
|
||||
bool lazy_load = local_context->getConfigRef().getBool("dictionaries_lazy_load", true);
|
||||
if (!args.attach && !lazy_load)
|
||||
if (args.mode <= LoadingStrictnessLevel::CREATE && !lazy_load)
|
||||
{
|
||||
/// load() is called here to force loading the dictionary, wait until the loading is finished,
|
||||
/// and throw an exception if the loading is failed.
|
||||
@ -335,7 +335,7 @@ void registerStorageDictionary(StorageFactory & factory)
|
||||
args.engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(args.engine_args[0], local_context);
|
||||
String dictionary_name = checkAndGetLiteralArgument<String>(args.engine_args[0], "dictionary_name");
|
||||
|
||||
if (!args.attach)
|
||||
if (args.mode <= LoadingStrictnessLevel::CREATE)
|
||||
{
|
||||
const auto & dictionary = args.getContext()->getExternalDictionariesLoader().getDictionary(dictionary_name, args.getContext());
|
||||
const DictionaryStructure & dictionary_structure = dictionary->getStructure();
|
||||
|
@ -321,7 +321,7 @@ StorageDistributed::StorageDistributed(
|
||||
const String & storage_policy_name_,
|
||||
const String & relative_data_path_,
|
||||
const DistributedSettings & distributed_settings_,
|
||||
bool attach_,
|
||||
LoadingStrictnessLevel mode,
|
||||
ClusterPtr owned_cluster_,
|
||||
ASTPtr remote_table_function_ptr_)
|
||||
: IStorage(id_)
|
||||
@ -372,7 +372,7 @@ StorageDistributed::StorageDistributed(
|
||||
}
|
||||
|
||||
/// Sanity check. Skip check if the table is already created to allow the server to start.
|
||||
if (!attach_)
|
||||
if (mode <= LoadingStrictnessLevel::CREATE)
|
||||
{
|
||||
if (remote_database.empty() && !remote_table_function_ptr && !getCluster()->maybeCrossReplication())
|
||||
LOG_WARNING(log, "Name of remote database is empty. Default database will be used implicitly.");
|
||||
@ -397,7 +397,7 @@ StorageDistributed::StorageDistributed(
|
||||
const String & storage_policy_name_,
|
||||
const String & relative_data_path_,
|
||||
const DistributedSettings & distributed_settings_,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
ClusterPtr owned_cluster_)
|
||||
: StorageDistributed(
|
||||
id_,
|
||||
@ -412,7 +412,7 @@ StorageDistributed::StorageDistributed(
|
||||
storage_policy_name_,
|
||||
relative_data_path_,
|
||||
distributed_settings_,
|
||||
attach,
|
||||
mode,
|
||||
std::move(owned_cluster_),
|
||||
remote_table_function_ptr_)
|
||||
{
|
||||
@ -1955,7 +1955,7 @@ void registerStorageDistributed(StorageFactory & factory)
|
||||
storage_policy,
|
||||
args.relative_data_path,
|
||||
distributed_settings,
|
||||
args.attach);
|
||||
args.mode);
|
||||
},
|
||||
{
|
||||
.supports_settings = true,
|
||||
|
@ -58,7 +58,7 @@ public:
|
||||
const String & storage_policy_name_,
|
||||
const String & relative_data_path_,
|
||||
const DistributedSettings & distributed_settings_,
|
||||
bool attach_,
|
||||
LoadingStrictnessLevel mode,
|
||||
ClusterPtr owned_cluster_ = {},
|
||||
ASTPtr remote_table_function_ptr_ = {});
|
||||
|
||||
@ -73,7 +73,7 @@ public:
|
||||
const String & storage_policy_name_,
|
||||
const String & relative_data_path_,
|
||||
const DistributedSettings & distributed_settings_,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
ClusterPtr owned_cluster_ = {});
|
||||
|
||||
~StorageDistributed() override;
|
||||
|
@ -62,7 +62,7 @@ StoragePtr StorageFactory::get(
|
||||
ContextMutablePtr context,
|
||||
const ColumnsDescription & columns,
|
||||
const ConstraintsDescription & constraints,
|
||||
bool has_force_restore_data_flag) const
|
||||
LoadingStrictnessLevel mode) const
|
||||
{
|
||||
String name, comment;
|
||||
|
||||
@ -216,8 +216,7 @@ StoragePtr StorageFactory::get(
|
||||
.context = context,
|
||||
.columns = columns,
|
||||
.constraints = constraints,
|
||||
.attach = query.attach,
|
||||
.has_force_restore_data_flag = has_force_restore_data_flag,
|
||||
.mode = mode,
|
||||
.comment = comment};
|
||||
|
||||
assert(arguments.getContext() == arguments.getContext()->getGlobalContext());
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/NamePrompter.h>
|
||||
#include <Databases/LoadingStrictnessLevel.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
@ -43,8 +44,7 @@ public:
|
||||
ContextWeakMutablePtr context;
|
||||
const ColumnsDescription & columns;
|
||||
const ConstraintsDescription & constraints;
|
||||
bool attach;
|
||||
bool has_force_restore_data_flag;
|
||||
LoadingStrictnessLevel mode;
|
||||
const String & comment;
|
||||
|
||||
ContextMutablePtr getContext() const;
|
||||
@ -87,7 +87,7 @@ public:
|
||||
ContextMutablePtr context,
|
||||
const ColumnsDescription & columns,
|
||||
const ConstraintsDescription & constraints,
|
||||
bool has_force_restore_data_flag) const;
|
||||
LoadingStrictnessLevel mode) const;
|
||||
|
||||
/// Register a table engine by its name.
|
||||
/// No locking, you must register all engines before usage of get.
|
||||
|
@ -569,7 +569,7 @@ StorageLog::StorageLog(
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
ContextMutablePtr context_)
|
||||
: IStorage(table_id_)
|
||||
, WithMutableContext(context_)
|
||||
@ -603,7 +603,7 @@ StorageLog::StorageLog(
|
||||
file_checker.setEmpty(marks_file_path);
|
||||
}
|
||||
|
||||
if (!attach)
|
||||
if (mode < LoadingStrictnessLevel::ATTACH)
|
||||
{
|
||||
/// create directories if they do not exist
|
||||
disk->createDirectories(table_path);
|
||||
@ -1163,7 +1163,7 @@ void registerStorageLog(StorageFactory & factory)
|
||||
args.columns,
|
||||
args.constraints,
|
||||
args.comment,
|
||||
args.attach,
|
||||
args.mode,
|
||||
args.getContext());
|
||||
};
|
||||
|
||||
|
@ -40,7 +40,7 @@ public:
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
ContextMutablePtr context_);
|
||||
|
||||
~StorageLog() override;
|
||||
|
@ -71,7 +71,7 @@ StorageMaterializedView::StorageMaterializedView(
|
||||
ContextPtr local_context,
|
||||
const ASTCreateQuery & query,
|
||||
const ColumnsDescription & columns_,
|
||||
bool attach_,
|
||||
LoadingStrictnessLevel mode,
|
||||
const String & comment)
|
||||
: IStorage(table_id_), WithMutableContext(local_context->getGlobalContext())
|
||||
{
|
||||
@ -118,7 +118,7 @@ StorageMaterializedView::StorageMaterializedView(
|
||||
{
|
||||
target_table_id = query.to_table_id;
|
||||
}
|
||||
else if (attach_)
|
||||
else if (LoadingStrictnessLevel::ATTACH <= mode)
|
||||
{
|
||||
/// If there is an ATTACH request, then the internal table must already be created.
|
||||
target_table_id = StorageID(getStorageID().database_name, generateInnerTableName(getStorageID()), query.to_inner_uuid);
|
||||
@ -151,7 +151,7 @@ StorageMaterializedView::StorageMaterializedView(
|
||||
*this,
|
||||
getContext(),
|
||||
*query.refresh_strategy);
|
||||
refresh_on_start = !attach_ && !query.is_create_empty;
|
||||
refresh_on_start = mode < LoadingStrictnessLevel::ATTACH && !query.is_create_empty;
|
||||
}
|
||||
}
|
||||
|
||||
@ -624,7 +624,7 @@ void registerStorageMaterializedView(StorageFactory & factory)
|
||||
/// Pass local_context here to convey setting for inner table
|
||||
return std::make_shared<StorageMaterializedView>(
|
||||
args.table_id, args.getLocalContext(), args.query,
|
||||
args.columns, args.attach, args.comment);
|
||||
args.columns, args.mode, args.comment);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -18,7 +18,7 @@ public:
|
||||
ContextPtr local_context,
|
||||
const ASTCreateQuery & query,
|
||||
const ColumnsDescription & columns_,
|
||||
bool attach_,
|
||||
LoadingStrictnessLevel mode,
|
||||
const String & comment);
|
||||
|
||||
std::string getName() const override { return "MaterializedView"; }
|
||||
|
@ -96,12 +96,11 @@ StorageMergeTree::StorageMergeTree(
|
||||
const StorageID & table_id_,
|
||||
const String & relative_data_path_,
|
||||
const StorageInMemoryMetadata & metadata_,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
ContextMutablePtr context_,
|
||||
const String & date_column_name,
|
||||
const MergingParams & merging_params_,
|
||||
std::unique_ptr<MergeTreeSettings> storage_settings_,
|
||||
bool has_force_restore_data_flag)
|
||||
std::unique_ptr<MergeTreeSettings> storage_settings_)
|
||||
: MergeTreeData(
|
||||
table_id_,
|
||||
metadata_,
|
||||
@ -110,17 +109,17 @@ StorageMergeTree::StorageMergeTree(
|
||||
merging_params_,
|
||||
std::move(storage_settings_),
|
||||
false, /// require_part_metadata
|
||||
attach)
|
||||
mode)
|
||||
, reader(*this)
|
||||
, writer(*this)
|
||||
, merger_mutator(*this)
|
||||
{
|
||||
initializeDirectoriesAndFormatVersion(relative_data_path_, attach, date_column_name);
|
||||
initializeDirectoriesAndFormatVersion(relative_data_path_, LoadingStrictnessLevel::ATTACH <= mode, date_column_name);
|
||||
|
||||
|
||||
loadDataParts(has_force_restore_data_flag, std::nullopt);
|
||||
loadDataParts(LoadingStrictnessLevel::FORCE_RESTORE <= mode, std::nullopt);
|
||||
|
||||
if (!attach && !getDataPartsForInternalUsage().empty() && !isStaticStorage())
|
||||
if (mode < LoadingStrictnessLevel::ATTACH && !getDataPartsForInternalUsage().empty() && !isStaticStorage())
|
||||
throw Exception(ErrorCodes::INCORRECT_DATA,
|
||||
"Data directory for table already containing data parts - probably "
|
||||
"it was unclean DROP table or manual intervention. "
|
||||
|
@ -37,12 +37,11 @@ public:
|
||||
const StorageID & table_id_,
|
||||
const String & relative_data_path_,
|
||||
const StorageInMemoryMetadata & metadata,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
ContextMutablePtr context_,
|
||||
const String & date_column_name,
|
||||
const MergingParams & merging_params_,
|
||||
std::unique_ptr<MergeTreeSettings> settings_,
|
||||
bool has_force_restore_data_flag);
|
||||
std::unique_ptr<MergeTreeSettings> settings_);
|
||||
|
||||
void startup() override;
|
||||
void shutdown(bool is_drop) override;
|
||||
|
@ -288,7 +288,7 @@ static MergeTreePartInfo makeDummyDropRangeForMovePartitionOrAttachPartitionFrom
|
||||
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
const String & zookeeper_path_,
|
||||
const String & replica_name_,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
const StorageID & table_id_,
|
||||
const String & relative_data_path_,
|
||||
const StorageInMemoryMetadata & metadata_,
|
||||
@ -296,7 +296,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
const String & date_column_name,
|
||||
const MergingParams & merging_params_,
|
||||
std::unique_ptr<MergeTreeSettings> settings_,
|
||||
bool has_force_restore_data_flag,
|
||||
RenamingRestrictions renaming_restrictions_,
|
||||
bool need_check_structure)
|
||||
: MergeTreeData(table_id_,
|
||||
@ -306,10 +305,10 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
merging_params_,
|
||||
std::move(settings_),
|
||||
true, /// require_part_metadata
|
||||
attach,
|
||||
mode,
|
||||
[this] (const std::string & name) { enqueuePartForCheck(name); })
|
||||
, zookeeper_name(zkutil::extractZooKeeperName(zookeeper_path_))
|
||||
, zookeeper_path(zkutil::extractZooKeeperPath(zookeeper_path_, /* check_starts_with_slash */ !attach, log.load()))
|
||||
, zookeeper_path(zkutil::extractZooKeeperPath(zookeeper_path_, /* check_starts_with_slash */ mode <= LoadingStrictnessLevel::CREATE, log.load()))
|
||||
, replica_name(replica_name_)
|
||||
, replica_path(fs::path(zookeeper_path) / "replicas" / replica_name_)
|
||||
, reader(*this)
|
||||
@ -327,7 +326,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
, replicated_fetches_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_fetches_network_bandwidth, getContext()->getReplicatedFetchesThrottler()))
|
||||
, replicated_sends_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_sends_network_bandwidth, getContext()->getReplicatedSendsThrottler()))
|
||||
{
|
||||
initializeDirectoriesAndFormatVersion(relative_data_path_, attach, date_column_name);
|
||||
initializeDirectoriesAndFormatVersion(relative_data_path_, LoadingStrictnessLevel::ATTACH <= mode, date_column_name);
|
||||
/// We create and deactivate all tasks for consistency.
|
||||
/// They all will be scheduled and activated by the restarting thread.
|
||||
queue_updating_task = getContext()->getSchedulePool().createTask(
|
||||
@ -379,7 +378,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
if (!attach)
|
||||
if (mode < LoadingStrictnessLevel::ATTACH)
|
||||
{
|
||||
dropIfEmpty();
|
||||
throw;
|
||||
@ -395,7 +394,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
std::optional<std::unordered_set<std::string>> expected_parts_on_this_replica;
|
||||
bool skip_sanity_checks = false;
|
||||
/// It does not make sense for CREATE query
|
||||
if (attach)
|
||||
if (LoadingStrictnessLevel::ATTACH <= mode)
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -416,7 +415,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
"Skipping the limits on severity of changes to data parts and columns (flag {}/flags/force_restore_data).",
|
||||
replica_path);
|
||||
}
|
||||
else if (has_force_restore_data_flag)
|
||||
else if (LoadingStrictnessLevel::FORCE_RESTORE <= mode)
|
||||
{
|
||||
skip_sanity_checks = true;
|
||||
|
||||
@ -443,7 +442,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
|
||||
loadDataParts(skip_sanity_checks, expected_parts_on_this_replica);
|
||||
|
||||
if (attach)
|
||||
if (LoadingStrictnessLevel::ATTACH <= mode)
|
||||
{
|
||||
/// Provide better initial value of merge_selecting_sleep_ms on server startup
|
||||
auto settings = getSettings();
|
||||
@ -458,7 +457,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
|
||||
if (!current_zookeeper)
|
||||
{
|
||||
if (!attach)
|
||||
if (mode < LoadingStrictnessLevel::ATTACH)
|
||||
{
|
||||
dropIfEmpty();
|
||||
throw Exception(ErrorCodes::NO_ZOOKEEPER, "Can't create replicated table without ZooKeeper");
|
||||
@ -474,7 +473,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
}
|
||||
}
|
||||
|
||||
if (attach)
|
||||
if (LoadingStrictnessLevel::ATTACH <= mode)
|
||||
{
|
||||
LOG_INFO(log, "Table will be in readonly mode until initialization is finished");
|
||||
attach_thread.emplace(*this);
|
||||
|
@ -99,7 +99,7 @@ public:
|
||||
StorageReplicatedMergeTree(
|
||||
const String & zookeeper_path_,
|
||||
const String & replica_name_,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
const StorageID & table_id_,
|
||||
const String & relative_data_path_,
|
||||
const StorageInMemoryMetadata & metadata_,
|
||||
@ -107,7 +107,6 @@ public:
|
||||
const String & date_column_name,
|
||||
const MergingParams & merging_params_,
|
||||
std::unique_ptr<MergeTreeSettings> settings_,
|
||||
bool has_force_restore_data_flag,
|
||||
RenamingRestrictions renaming_restrictions_,
|
||||
bool need_check_structure);
|
||||
|
||||
|
@ -208,7 +208,7 @@ void registerStorageSQLite(StorageFactory & factory)
|
||||
const auto database_path = checkAndGetLiteralArgument<String>(engine_args[0], "database_path");
|
||||
const auto table_name = checkAndGetLiteralArgument<String>(engine_args[1], "table_name");
|
||||
|
||||
auto sqlite_db = openSQLiteDB(database_path, args.getContext(), /* throw_on_error */!args.attach);
|
||||
auto sqlite_db = openSQLiteDB(database_path, args.getContext(), /* throw_on_error */ args.mode <= LoadingStrictnessLevel::CREATE);
|
||||
|
||||
return std::make_shared<StorageSQLite>(args.table_id, sqlite_db, database_path,
|
||||
table_name, args.columns, args.constraints, args.getContext());
|
||||
|
@ -267,7 +267,7 @@ StorageStripeLog::StorageStripeLog(
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
ContextMutablePtr context_)
|
||||
: IStorage(table_id_)
|
||||
, WithMutableContext(context_)
|
||||
@ -295,7 +295,7 @@ StorageStripeLog::StorageStripeLog(
|
||||
file_checker.setEmpty(index_file_path);
|
||||
}
|
||||
|
||||
if (!attach)
|
||||
if (mode < LoadingStrictnessLevel::ATTACH)
|
||||
{
|
||||
/// create directories if they do not exist
|
||||
disk->createDirectories(table_path);
|
||||
@ -698,7 +698,7 @@ void registerStorageStripeLog(StorageFactory & factory)
|
||||
args.columns,
|
||||
args.constraints,
|
||||
args.comment,
|
||||
args.attach,
|
||||
args.mode,
|
||||
args.getContext());
|
||||
}, features);
|
||||
}
|
||||
|
@ -33,7 +33,7 @@ public:
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
bool attach,
|
||||
LoadingStrictnessLevel mode,
|
||||
ContextMutablePtr context_);
|
||||
|
||||
~StorageStripeLog() override;
|
||||
|
@ -479,13 +479,13 @@ static void createInformationSchemaView(ContextMutablePtr context, IDatabase & d
|
||||
ast_create.setDatabase(database.getDatabaseName());
|
||||
|
||||
StoragePtr view = createTableFromAST(ast_create, database.getDatabaseName(),
|
||||
database.getTableDataPath(ast_create), context, true).second;
|
||||
database.getTableDataPath(ast_create), context, LoadingStrictnessLevel::FORCE_RESTORE).second;
|
||||
database.createTable(context, ast_create.getTable(), view, ast);
|
||||
ASTPtr ast_upper = ast_create.clone();
|
||||
auto & ast_create_upper = ast_upper->as<ASTCreateQuery &>();
|
||||
ast_create_upper.setTable(Poco::toUpper(view_name));
|
||||
StoragePtr view_upper = createTableFromAST(ast_create_upper, database.getDatabaseName(),
|
||||
database.getTableDataPath(ast_create_upper), context, true).second;
|
||||
database.getTableDataPath(ast_create_upper), context, LoadingStrictnessLevel::FORCE_RESTORE).second;
|
||||
|
||||
database.createTable(context, ast_create_upper.getTable(), view_upper, ast_upper);
|
||||
|
||||
|
@ -1157,7 +1157,7 @@ StorageWindowView::StorageWindowView(
|
||||
ContextPtr context_,
|
||||
const ASTCreateQuery & query,
|
||||
const ColumnsDescription & columns_,
|
||||
bool attach_)
|
||||
LoadingStrictnessLevel mode)
|
||||
: IStorage(table_id_)
|
||||
, WithContext(context_->getGlobalContext())
|
||||
, log(getLogger(fmt::format("StorageWindowView({}.{})", table_id_.database_name, table_id_.table_name)))
|
||||
@ -1203,7 +1203,7 @@ StorageWindowView::StorageWindowView(
|
||||
next_fire_signal = getWindowUpperBound(now());
|
||||
|
||||
std::exchange(has_inner_table, true);
|
||||
if (!attach_)
|
||||
if (mode < LoadingStrictnessLevel::ATTACH)
|
||||
{
|
||||
auto inner_create_query = getInnerTableCreateQuery(inner_query, inner_table_id);
|
||||
auto create_context = Context::createCopy(context_);
|
||||
@ -1672,12 +1672,12 @@ void registerStorageWindowView(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage("WindowView", [](const StorageFactory::Arguments & args)
|
||||
{
|
||||
if (!args.attach && !args.getLocalContext()->getSettingsRef().allow_experimental_window_view)
|
||||
if (args.mode <= LoadingStrictnessLevel::CREATE && !args.getLocalContext()->getSettingsRef().allow_experimental_window_view)
|
||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
|
||||
"Experimental WINDOW VIEW feature "
|
||||
"is not enabled (the setting 'allow_experimental_window_view')");
|
||||
|
||||
return std::make_shared<StorageWindowView>(args.table_id, args.getLocalContext(), args.query, args.columns, args.attach);
|
||||
return std::make_shared<StorageWindowView>(args.table_id, args.getLocalContext(), args.query, args.columns, args.mode);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -111,7 +111,7 @@ public:
|
||||
ContextPtr context_,
|
||||
const ASTCreateQuery & query,
|
||||
const ColumnsDescription & columns_,
|
||||
bool attach_);
|
||||
LoadingStrictnessLevel mode);
|
||||
|
||||
String getName() const override { return "WindowView"; }
|
||||
|
||||
|
@ -34,7 +34,7 @@ protected:
|
||||
columns = parseColumnsListFromString(TableFunction::configuration.structure, context);
|
||||
|
||||
StoragePtr storage = Storage::create(
|
||||
TableFunction::configuration, context, false, StorageID(TableFunction::getDatabaseName(), table_name),
|
||||
TableFunction::configuration, context, LoadingStrictnessLevel::CREATE, StorageID(TableFunction::getDatabaseName(), table_name),
|
||||
columns, ConstraintsDescription{}, String{}, std::nullopt);
|
||||
|
||||
storage->startup();
|
||||
|
@ -317,7 +317,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, Con
|
||||
String{},
|
||||
String{},
|
||||
DistributedSettings{},
|
||||
false,
|
||||
LoadingStrictnessLevel::CREATE,
|
||||
cluster)
|
||||
: std::make_shared<StorageDistributed>(
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
@ -332,7 +332,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, Con
|
||||
String{},
|
||||
String{},
|
||||
DistributedSettings{},
|
||||
false,
|
||||
LoadingStrictnessLevel::CREATE,
|
||||
cluster);
|
||||
|
||||
res->startup();
|
||||
|
Loading…
Reference in New Issue
Block a user