mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-03 13:02:00 +00:00
Merge branch 'master' into fix-false-too-slow
This commit is contained in:
commit
421a62e978
@ -55,6 +55,7 @@ function(protobuf_generate_cpp_impl SRCS HDRS MODES OUTPUT_FILE_EXTS PLUGIN)
|
||||
endif()
|
||||
|
||||
set (intermediate_dir ${CMAKE_CURRENT_BINARY_DIR}/intermediate)
|
||||
file (MAKE_DIRECTORY ${intermediate_dir})
|
||||
|
||||
set (protoc_args)
|
||||
foreach (mode ${MODES})
|
||||
@ -112,7 +113,7 @@ if (PROTOBUF_GENERATE_CPP_SCRIPT_MODE)
|
||||
set (intermediate_dir ${DIR}/intermediate)
|
||||
set (intermediate_output "${intermediate_dir}/${FILENAME}")
|
||||
|
||||
if (COMPILER_ID STREQUAL "Clang")
|
||||
if (COMPILER_ID MATCHES "Clang")
|
||||
set (pragma_push "#pragma clang diagnostic push\n")
|
||||
set (pragma_pop "#pragma clang diagnostic pop\n")
|
||||
set (pragma_disable_warnings "#pragma clang diagnostic ignored \"-Weverything\"\n")
|
||||
|
@ -114,7 +114,7 @@ void ClusterCopierApp::mainImpl()
|
||||
registerDisks();
|
||||
|
||||
static const std::string default_database = "_local";
|
||||
DatabaseCatalog::instance().attachDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));
|
||||
DatabaseCatalog::instance().attachDatabase(default_database, std::make_shared<DatabaseMemory>(default_database, *context));
|
||||
context->setCurrentDatabase(default_database);
|
||||
|
||||
/// Initialize query scope just in case.
|
||||
|
@ -118,13 +118,13 @@ void LocalServer::tryInitPath()
|
||||
}
|
||||
|
||||
|
||||
static void attachSystemTables()
|
||||
static void attachSystemTables(const Context & context)
|
||||
{
|
||||
DatabasePtr system_database = DatabaseCatalog::instance().tryGetDatabase(DatabaseCatalog::SYSTEM_DATABASE);
|
||||
if (!system_database)
|
||||
{
|
||||
/// TODO: add attachTableDelayed into DatabaseMemory to speedup loading
|
||||
system_database = std::make_shared<DatabaseMemory>(DatabaseCatalog::SYSTEM_DATABASE);
|
||||
system_database = std::make_shared<DatabaseMemory>(DatabaseCatalog::SYSTEM_DATABASE, context);
|
||||
DatabaseCatalog::instance().attachDatabase(DatabaseCatalog::SYSTEM_DATABASE, system_database);
|
||||
}
|
||||
|
||||
@ -202,7 +202,7 @@ try
|
||||
* if such tables will not be dropped, clickhouse-server will not be able to load them due to security reasons.
|
||||
*/
|
||||
std::string default_database = config().getString("default_database", "_local");
|
||||
DatabaseCatalog::instance().attachDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));
|
||||
DatabaseCatalog::instance().attachDatabase(default_database, std::make_shared<DatabaseMemory>(default_database, *context));
|
||||
context->setCurrentDatabase(default_database);
|
||||
applyCmdOptions();
|
||||
|
||||
@ -213,14 +213,14 @@ try
|
||||
|
||||
LOG_DEBUG(log, "Loading metadata from {}", context->getPath());
|
||||
loadMetadataSystem(*context);
|
||||
attachSystemTables();
|
||||
attachSystemTables(*context);
|
||||
loadMetadata(*context);
|
||||
DatabaseCatalog::instance().loadDatabases();
|
||||
LOG_DEBUG(log, "Loaded metadata.");
|
||||
}
|
||||
else
|
||||
{
|
||||
attachSystemTables();
|
||||
attachSystemTables(*context);
|
||||
}
|
||||
|
||||
processQueries();
|
||||
|
@ -236,6 +236,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
if (ThreadFuzzer::instance().isEffective())
|
||||
LOG_WARNING(log, "ThreadFuzzer is enabled. Application will run slowly and unstable.");
|
||||
|
||||
#if !defined(NDEBUG) || !defined(__OPTIMIZE__)
|
||||
LOG_WARNING(log, "Server was built in debug mode. It will work slowly.");
|
||||
#endif
|
||||
|
||||
#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) || defined(MEMORY_SANITIZER)
|
||||
LOG_WARNING(log, "Server was built with sanitizer. It will work slowly.");
|
||||
#endif
|
||||
|
||||
/** Context contains all that query execution is dependent:
|
||||
* settings, available functions, data types, aggregate functions, databases...
|
||||
*/
|
||||
|
@ -164,7 +164,7 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
|
||||
|
||||
/// Create table
|
||||
NamesAndTypesList columns = sample_block.getNamesAndTypesList();
|
||||
auto temporary_table = TemporaryTableHolder(context, ColumnsDescription{columns});
|
||||
auto temporary_table = TemporaryTableHolder(context, ColumnsDescription{columns}, {});
|
||||
auto storage = temporary_table.getTable();
|
||||
context.addExternalTable(data->table_name, std::move(temporary_table));
|
||||
BlockOutputStreamPtr output = storage->write(ASTPtr(), context);
|
||||
|
@ -126,7 +126,7 @@ struct Settings : public SettingsCollection<Settings>
|
||||
M(SettingBool, force_optimize_skip_unused_shards_no_nested, false, "Do not apply force_optimize_skip_unused_shards for nested Distributed tables.", 0) \
|
||||
\
|
||||
M(SettingBool, input_format_parallel_parsing, true, "Enable parallel parsing for some data formats.", 0) \
|
||||
M(SettingUInt64, min_chunk_bytes_for_parallel_parsing, (1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.", 0) \
|
||||
M(SettingUInt64, min_chunk_bytes_for_parallel_parsing, (10 * 1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.", 0) \
|
||||
\
|
||||
M(SettingUInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.", 0) \
|
||||
M(SettingUInt64, merge_tree_min_bytes_for_concurrent_read, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized.", 0) \
|
||||
@ -437,6 +437,7 @@ struct Settings : public SettingsCollection<Settings>
|
||||
M(SettingUInt64, mark_cache_min_lifetime, 0, "Obsolete setting, does nothing. Will be removed after 2020-05-31", 0) \
|
||||
M(SettingBool, partial_merge_join, false, "Obsolete. Use join_algorithm='prefer_partial_merge' instead.", 0) \
|
||||
M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Obsolete. Will be removed after 2020-10-20", 0) \
|
||||
M(SettingBool, experimental_use_processors, true, "Obsolete setting, does nothing. Will be removed after 2020-11-29.", 0) \
|
||||
|
||||
DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS)
|
||||
|
||||
|
@ -58,7 +58,7 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
|
||||
|
||||
if (context.getSettingsRef().input_format_defaults_for_omitted_fields && ast_insert_query->table_id && !input_function)
|
||||
{
|
||||
StoragePtr storage = DatabaseCatalog::instance().getTable(ast_insert_query->table_id);
|
||||
StoragePtr storage = DatabaseCatalog::instance().getTable(ast_insert_query->table_id, context);
|
||||
auto column_defaults = storage->getColumns().getDefaults();
|
||||
if (!column_defaults.empty())
|
||||
res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, column_defaults, context);
|
||||
|
@ -59,7 +59,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
|
||||
|
||||
for (const auto & database_table : dependencies)
|
||||
{
|
||||
auto dependent_table = DatabaseCatalog::instance().getTable(database_table);
|
||||
auto dependent_table = DatabaseCatalog::instance().getTable(database_table, context);
|
||||
|
||||
ASTPtr query;
|
||||
BlockOutputStreamPtr out;
|
||||
|
@ -5,7 +5,7 @@
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
#include <Columns/ColumnConst.h>
|
||||
#include <Interpreters/addTypeConversionToAST.h>
|
||||
#include <Storages/MergeTree/TTLMode.h>
|
||||
#include <Storages/TTLMode.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
namespace DB
|
||||
@ -38,7 +38,7 @@ TTLBlockInputStream::TTLBlockInputStream(
|
||||
const auto & column_defaults = storage_columns.getDefaults();
|
||||
|
||||
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
|
||||
for (const auto & [name, _] : storage.column_ttl_entries_by_name)
|
||||
for (const auto & [name, _] : storage.getColumnTTLs())
|
||||
{
|
||||
auto it = column_defaults.find(name);
|
||||
if (it != column_defaults.end())
|
||||
@ -70,21 +70,21 @@ TTLBlockInputStream::TTLBlockInputStream(
|
||||
defaults_expression = ExpressionAnalyzer{default_expr_list, syntax_result, storage.global_context}.getActions(true);
|
||||
}
|
||||
|
||||
if (storage.hasRowsTTL() && storage.rows_ttl_entry.mode == TTLMode::GROUP_BY)
|
||||
if (storage.hasRowsTTL() && storage.getRowsTTL().mode == TTLMode::GROUP_BY)
|
||||
{
|
||||
current_key_value.resize(storage.rows_ttl_entry.group_by_keys.size());
|
||||
current_key_value.resize(storage.getRowsTTL().group_by_keys.size());
|
||||
|
||||
ColumnNumbers keys;
|
||||
for (const auto & key : storage.rows_ttl_entry.group_by_keys)
|
||||
for (const auto & key : storage.getRowsTTL().group_by_keys)
|
||||
keys.push_back(header.getPositionByName(key));
|
||||
agg_key_columns.resize(storage.rows_ttl_entry.group_by_keys.size());
|
||||
agg_key_columns.resize(storage.getRowsTTL().group_by_keys.size());
|
||||
|
||||
AggregateDescriptions aggregates = storage.rows_ttl_entry.aggregate_descriptions;
|
||||
AggregateDescriptions aggregates = storage.getRowsTTL().aggregate_descriptions;
|
||||
for (auto & descr : aggregates)
|
||||
if (descr.arguments.empty())
|
||||
for (const auto & name : descr.argument_names)
|
||||
descr.arguments.push_back(header.getPositionByName(name));
|
||||
agg_aggregate_columns.resize(storage.rows_ttl_entry.aggregate_descriptions.size());
|
||||
agg_aggregate_columns.resize(storage.getRowsTTL().aggregate_descriptions.size());
|
||||
|
||||
const Settings & settings = storage.global_context.getSettingsRef();
|
||||
|
||||
@ -105,8 +105,8 @@ bool TTLBlockInputStream::isTTLExpired(time_t ttl) const
|
||||
Block TTLBlockInputStream::readImpl()
|
||||
{
|
||||
/// Skip all data if table ttl is expired for part
|
||||
if (storage.hasRowsTTL() && !storage.rows_ttl_entry.where_expression &&
|
||||
storage.rows_ttl_entry.mode != TTLMode::GROUP_BY && isTTLExpired(old_ttl_infos.table_ttl.max))
|
||||
if (storage.hasRowsTTL() && !storage.getRowsTTL().where_expression &&
|
||||
storage.getRowsTTL().mode != TTLMode::GROUP_BY && isTTLExpired(old_ttl_infos.table_ttl.max))
|
||||
{
|
||||
rows_removed = data_part->rows_count;
|
||||
return {};
|
||||
@ -151,15 +151,17 @@ void TTLBlockInputStream::readSuffixImpl()
|
||||
|
||||
void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
|
||||
{
|
||||
storage.rows_ttl_entry.expression->execute(block);
|
||||
if (storage.rows_ttl_entry.where_expression)
|
||||
storage.rows_ttl_entry.where_expression->execute(block);
|
||||
const auto & rows_ttl = storage.getRowsTTL();
|
||||
|
||||
rows_ttl.expression->execute(block);
|
||||
if (rows_ttl.where_expression)
|
||||
rows_ttl.where_expression->execute(block);
|
||||
|
||||
const IColumn * ttl_column =
|
||||
block.getByName(storage.rows_ttl_entry.result_column).column.get();
|
||||
block.getByName(rows_ttl.result_column).column.get();
|
||||
|
||||
const IColumn * where_result_column = storage.rows_ttl_entry.where_expression ?
|
||||
block.getByName(storage.rows_ttl_entry.where_result_column).column.get() : nullptr;
|
||||
const IColumn * where_result_column = storage.getRowsTTL().where_expression ?
|
||||
block.getByName(storage.getRowsTTL().where_result_column).column.get() : nullptr;
|
||||
|
||||
const auto & column_names = header.getNames();
|
||||
|
||||
@ -204,9 +206,9 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
|
||||
bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed;
|
||||
|
||||
bool same_as_current = true;
|
||||
for (size_t j = 0; j < storage.rows_ttl_entry.group_by_keys.size(); ++j)
|
||||
for (size_t j = 0; j < storage.getRowsTTL().group_by_keys.size(); ++j)
|
||||
{
|
||||
const String & key_column = storage.rows_ttl_entry.group_by_keys[j];
|
||||
const String & key_column = storage.getRowsTTL().group_by_keys[j];
|
||||
const IColumn * values_column = block.getByName(key_column).column.get();
|
||||
if (!same_as_current || (*values_column)[i] != current_key_value[j])
|
||||
{
|
||||
@ -275,18 +277,18 @@ void TTLBlockInputStream::finalizeAggregates(MutableColumns & result_columns)
|
||||
auto aggregated_res = aggregator->convertToBlocks(agg_result, true, 1);
|
||||
for (auto & agg_block : aggregated_res)
|
||||
{
|
||||
for (const auto & it : storage.rows_ttl_entry.group_by_aggregations)
|
||||
std::get<2>(it)->execute(agg_block);
|
||||
for (const auto & name : storage.rows_ttl_entry.group_by_keys)
|
||||
for (const auto & it : storage.getRowsTTL().set_parts)
|
||||
it.expression->execute(agg_block);
|
||||
for (const auto & name : storage.getRowsTTL().group_by_keys)
|
||||
{
|
||||
const IColumn * values_column = agg_block.getByName(name).column.get();
|
||||
auto & result_column = result_columns[header.getPositionByName(name)];
|
||||
result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
|
||||
}
|
||||
for (const auto & it : storage.rows_ttl_entry.group_by_aggregations)
|
||||
for (const auto & it : storage.getRowsTTL().set_parts)
|
||||
{
|
||||
const IColumn * values_column = agg_block.getByName(get<1>(it)).column.get();
|
||||
auto & result_column = result_columns[header.getPositionByName(std::get<0>(it))];
|
||||
const IColumn * values_column = agg_block.getByName(it.expression_result_column_name).column.get();
|
||||
auto & result_column = result_columns[header.getPositionByName(it.column_name)];
|
||||
result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
|
||||
}
|
||||
}
|
||||
@ -304,7 +306,7 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
|
||||
}
|
||||
|
||||
std::vector<String> columns_to_remove;
|
||||
for (const auto & [name, ttl_entry] : storage.column_ttl_entries_by_name)
|
||||
for (const auto & [name, ttl_entry] : storage.getColumnTTLs())
|
||||
{
|
||||
/// If we read not all table columns. E.g. while mutation.
|
||||
if (!block.has(name))
|
||||
@ -365,7 +367,7 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
|
||||
void TTLBlockInputStream::updateMovesTTL(Block & block)
|
||||
{
|
||||
std::vector<String> columns_to_remove;
|
||||
for (const auto & ttl_entry : storage.move_ttl_entries)
|
||||
for (const auto & ttl_entry : storage.getMoveTTLs())
|
||||
{
|
||||
auto & new_ttl_info = new_ttl_infos.moves_ttl[ttl_entry.result_column];
|
||||
|
||||
|
@ -35,7 +35,7 @@ try
|
||||
Names column_names;
|
||||
column_names.push_back("WatchID");
|
||||
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable({"default", "hits6"});
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable({"default", "hits6"}, context);
|
||||
|
||||
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
|
||||
auto pipes = table->read(column_names, {}, context, stage, settings.max_block_size, settings.max_threads);
|
||||
|
@ -288,15 +288,15 @@ void DatabaseAtomic::assertCanBeDetached(bool cleenup)
|
||||
"because some tables are still in use. Retry later.", ErrorCodes::DATABASE_NOT_EMPTY);
|
||||
}
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseAtomic::getTablesIterator(const IDatabase::FilterByNameFunction & filter_by_table_name)
|
||||
DatabaseTablesIteratorPtr DatabaseAtomic::getTablesIterator(const Context & context, const IDatabase::FilterByNameFunction & filter_by_table_name)
|
||||
{
|
||||
auto base_iter = DatabaseWithOwnTablesBase::getTablesIterator(filter_by_table_name);
|
||||
auto base_iter = DatabaseWithOwnTablesBase::getTablesIterator(context, filter_by_table_name);
|
||||
return std::make_unique<AtomicDatabaseTablesSnapshotIterator>(std::move(typeid_cast<DatabaseTablesSnapshotIterator &>(*base_iter)));
|
||||
}
|
||||
|
||||
UUID DatabaseAtomic::tryGetTableUUID(const String & table_name) const
|
||||
{
|
||||
if (auto table = tryGetTable(table_name))
|
||||
if (auto table = tryGetTable(table_name, global_context))
|
||||
return table->getStorageID().uuid;
|
||||
return UUIDHelpers::Nil;
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ public:
|
||||
|
||||
void drop(const Context & /*context*/) override;
|
||||
|
||||
DatabaseTablesIteratorPtr getTablesIterator(const FilterByNameFunction & filter_by_table_name) override;
|
||||
DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) override;
|
||||
|
||||
void loadStoredObjects(Context & context, bool has_force_restore_data_flag) override;
|
||||
|
||||
|
@ -50,18 +50,18 @@ Tables DatabaseDictionary::listTables(const FilterByNameFunction & filter_by_nam
|
||||
return tables;
|
||||
}
|
||||
|
||||
bool DatabaseDictionary::isTableExist(const String & table_name) const
|
||||
bool DatabaseDictionary::isTableExist(const String & table_name, const Context &) const
|
||||
{
|
||||
return global_context.getExternalDictionariesLoader().getCurrentStatus(table_name) != ExternalLoader::Status::NOT_EXIST;
|
||||
}
|
||||
|
||||
StoragePtr DatabaseDictionary::tryGetTable(const String & table_name) const
|
||||
StoragePtr DatabaseDictionary::tryGetTable(const String & table_name, const Context &) const
|
||||
{
|
||||
auto load_result = global_context.getExternalDictionariesLoader().getLoadResult(table_name);
|
||||
return createStorageDictionary(getDatabaseName(), load_result);
|
||||
}
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseDictionary::getTablesIterator(const FilterByNameFunction & filter_by_table_name)
|
||||
DatabaseTablesIteratorPtr DatabaseDictionary::getTablesIterator(const Context &, const FilterByNameFunction & filter_by_table_name)
|
||||
{
|
||||
return std::make_unique<DatabaseTablesSnapshotIterator>(listTables(filter_by_table_name));
|
||||
}
|
||||
@ -71,7 +71,7 @@ bool DatabaseDictionary::empty() const
|
||||
return !global_context.getExternalDictionariesLoader().hasObjects();
|
||||
}
|
||||
|
||||
ASTPtr DatabaseDictionary::getCreateTableQueryImpl(const String & table_name, bool throw_on_error) const
|
||||
ASTPtr DatabaseDictionary::getCreateTableQueryImpl(const String & table_name, const Context &, bool throw_on_error) const
|
||||
{
|
||||
String query;
|
||||
{
|
||||
|
@ -29,11 +29,11 @@ public:
|
||||
return "Dictionary";
|
||||
}
|
||||
|
||||
bool isTableExist(const String & table_name) const override;
|
||||
bool isTableExist(const String & table_name, const Context & context) const override;
|
||||
|
||||
StoragePtr tryGetTable(const String & table_name) const override;
|
||||
StoragePtr tryGetTable(const String & table_name, const Context & context) const override;
|
||||
|
||||
DatabaseTablesIteratorPtr getTablesIterator(const FilterByNameFunction & filter_by_table_name) override;
|
||||
DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) override;
|
||||
|
||||
bool empty() const override;
|
||||
|
||||
@ -44,7 +44,7 @@ public:
|
||||
void shutdown() override;
|
||||
|
||||
protected:
|
||||
ASTPtr getCreateTableQueryImpl(const String & table_name, bool throw_on_error) const override;
|
||||
ASTPtr getCreateTableQueryImpl(const String & table_name, const Context & context, bool throw_on_error) const override;
|
||||
|
||||
private:
|
||||
mutable std::mutex mutex;
|
||||
|
@ -82,7 +82,7 @@ DatabasePtr DatabaseFactory::getImpl(
|
||||
else if (engine_name == "Atomic")
|
||||
return std::make_shared<DatabaseAtomic>(database_name, metadata_path, context);
|
||||
else if (engine_name == "Memory")
|
||||
return std::make_shared<DatabaseMemory>(database_name);
|
||||
return std::make_shared<DatabaseMemory>(database_name, context);
|
||||
else if (engine_name == "Dictionary")
|
||||
return std::make_shared<DatabaseDictionary>(database_name, context);
|
||||
|
||||
|
@ -132,7 +132,7 @@ StoragePtr DatabaseLazy::tryGetTable(const String & table_name) const
|
||||
return loadTable(table_name);
|
||||
}
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseLazy::getTablesIterator(const FilterByNameFunction & filter_by_table_name)
|
||||
DatabaseTablesIteratorPtr DatabaseLazy::getTablesIterator(const Context &, const FilterByNameFunction & filter_by_table_name)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
Strings filtered_tables;
|
||||
|
@ -51,13 +51,15 @@ public:
|
||||
|
||||
time_t getObjectMetadataModificationTime(const String & table_name) const override;
|
||||
|
||||
bool isTableExist(const String & table_name) const override;
|
||||
bool isTableExist(const String & table_name, const Context &) const override { return isTableExist(table_name); }
|
||||
bool isTableExist(const String & table_name) const;
|
||||
|
||||
StoragePtr tryGetTable(const String & table_name) const override;
|
||||
StoragePtr tryGetTable(const String & table_name, const Context &) const override { return tryGetTable(table_name); }
|
||||
StoragePtr tryGetTable(const String & table_name) const;
|
||||
|
||||
bool empty() const override;
|
||||
|
||||
DatabaseTablesIteratorPtr getTablesIterator(const FilterByNameFunction & filter_by_table_name) override;
|
||||
DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) override;
|
||||
|
||||
void attachTable(const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
|
||||
|
||||
|
@ -16,8 +16,8 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_TABLE;
|
||||
}
|
||||
|
||||
DatabaseMemory::DatabaseMemory(const String & name_)
|
||||
: DatabaseWithOwnTablesBase(name_, "DatabaseMemory(" + name_ + ")")
|
||||
DatabaseMemory::DatabaseMemory(const String & name_, const Context & context)
|
||||
: DatabaseWithOwnTablesBase(name_, "DatabaseMemory(" + name_ + ")", context)
|
||||
, data_path("data/" + escapeForFileName(database_name) + "/")
|
||||
{}
|
||||
|
||||
@ -64,7 +64,7 @@ ASTPtr DatabaseMemory::getCreateDatabaseQuery() const
|
||||
return create_query;
|
||||
}
|
||||
|
||||
ASTPtr DatabaseMemory::getCreateTableQueryImpl(const String & table_name, bool throw_on_error) const
|
||||
ASTPtr DatabaseMemory::getCreateTableQueryImpl(const String & table_name, const Context &, bool throw_on_error) const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
auto it = create_queries.find(table_name);
|
||||
@ -80,7 +80,7 @@ ASTPtr DatabaseMemory::getCreateTableQueryImpl(const String & table_name, bool t
|
||||
|
||||
UUID DatabaseMemory::tryGetTableUUID(const String & table_name) const
|
||||
{
|
||||
if (auto table = tryGetTable(table_name))
|
||||
if (auto table = tryGetTable(table_name, global_context))
|
||||
return table->getStorageID().uuid;
|
||||
return UUIDHelpers::Nil;
|
||||
}
|
||||
|
@ -19,7 +19,7 @@ namespace DB
|
||||
class DatabaseMemory final : public DatabaseWithOwnTablesBase
|
||||
{
|
||||
public:
|
||||
DatabaseMemory(const String & name_);
|
||||
DatabaseMemory(const String & name_, const Context & context);
|
||||
|
||||
String getEngineName() const override { return "Memory"; }
|
||||
|
||||
@ -34,7 +34,7 @@ public:
|
||||
const String & table_name,
|
||||
bool no_delay) override;
|
||||
|
||||
ASTPtr getCreateTableQueryImpl(const String & name, bool throw_on_error) const override;
|
||||
ASTPtr getCreateTableQueryImpl(const String & name, const Context & context, bool throw_on_error) const override;
|
||||
ASTPtr getCreateDatabaseQuery() const override;
|
||||
|
||||
/// DatabaseMemory allows to create tables, which store data on disk.
|
||||
|
@ -89,7 +89,7 @@ bool DatabaseMySQL::empty() const
|
||||
return true;
|
||||
}
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(const FilterByNameFunction & filter_by_table_name)
|
||||
DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(const Context &, const FilterByNameFunction & filter_by_table_name)
|
||||
{
|
||||
Tables tables;
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
@ -103,12 +103,12 @@ DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(const FilterByNameFun
|
||||
return std::make_unique<DatabaseTablesSnapshotIterator>(tables);
|
||||
}
|
||||
|
||||
bool DatabaseMySQL::isTableExist(const String & name) const
|
||||
bool DatabaseMySQL::isTableExist(const String & name, const Context &) const
|
||||
{
|
||||
return bool(tryGetTable(name));
|
||||
return bool(tryGetTable(name, global_context));
|
||||
}
|
||||
|
||||
StoragePtr DatabaseMySQL::tryGetTable(const String & mysql_table_name) const
|
||||
StoragePtr DatabaseMySQL::tryGetTable(const String & mysql_table_name, const Context &) const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
||||
@ -155,7 +155,7 @@ static ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr
|
||||
return create_table_query;
|
||||
}
|
||||
|
||||
ASTPtr DatabaseMySQL::getCreateTableQueryImpl(const String & table_name, bool throw_on_error) const
|
||||
ASTPtr DatabaseMySQL::getCreateTableQueryImpl(const String & table_name, const Context &, bool throw_on_error) const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
||||
@ -501,7 +501,7 @@ void DatabaseMySQL::createTable(const Context &, const String & table_name, cons
|
||||
/// XXX: hack
|
||||
/// In order to prevent users from broken the table structure by executing attach table database_name.table_name (...)
|
||||
/// we should compare the old and new create_query to make them completely consistent
|
||||
const auto & origin_create_query = getCreateTableQuery(table_name);
|
||||
const auto & origin_create_query = getCreateTableQuery(table_name, global_context);
|
||||
origin_create_query->as<ASTCreateQuery>()->attach = true;
|
||||
|
||||
if (queryToString(origin_create_query) != queryToString(create_query))
|
||||
|
@ -32,13 +32,13 @@ public:
|
||||
|
||||
bool empty() const override;
|
||||
|
||||
DatabaseTablesIteratorPtr getTablesIterator(const FilterByNameFunction & filter_by_table_name) override;
|
||||
DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) override;
|
||||
|
||||
ASTPtr getCreateDatabaseQuery() const override;
|
||||
|
||||
bool isTableExist(const String & name) const override;
|
||||
bool isTableExist(const String & name, const Context & context) const override;
|
||||
|
||||
StoragePtr tryGetTable(const String & name) const override;
|
||||
StoragePtr tryGetTable(const String & name, const Context & context) const override;
|
||||
|
||||
time_t getObjectMetadataModificationTime(const String & name) const override;
|
||||
|
||||
@ -59,7 +59,7 @@ public:
|
||||
void attachTable(const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
|
||||
|
||||
protected:
|
||||
ASTPtr getCreateTableQueryImpl(const String & name, bool throw_on_error) const override;
|
||||
ASTPtr getCreateTableQueryImpl(const String & name, const Context & context, bool throw_on_error) const override;
|
||||
|
||||
private:
|
||||
const Context & global_context;
|
||||
|
@ -123,10 +123,9 @@ String getObjectDefinitionFromCreateQuery(const ASTPtr & query)
|
||||
}
|
||||
|
||||
DatabaseOnDisk::DatabaseOnDisk(const String & name, const String & metadata_path_, const String & data_path_, const String & logger, const Context & context)
|
||||
: DatabaseWithOwnTablesBase(name, logger)
|
||||
: DatabaseWithOwnTablesBase(name, logger, context)
|
||||
, metadata_path(metadata_path_)
|
||||
, data_path(data_path_)
|
||||
, global_context(context.getGlobalContext())
|
||||
{
|
||||
Poco::File(context.getPath() + data_path).createDirectories();
|
||||
Poco::File(metadata_path).createDirectories();
|
||||
@ -160,7 +159,7 @@ void DatabaseOnDisk::createTable(
|
||||
throw Exception("Dictionary " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " already exists.",
|
||||
ErrorCodes::DICTIONARY_ALREADY_EXISTS);
|
||||
|
||||
if (isTableExist(table_name))
|
||||
if (isTableExist(table_name, global_context))
|
||||
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
|
||||
|
||||
if (create.attach_short_syntax)
|
||||
@ -267,7 +266,7 @@ void DatabaseOnDisk::renameTable(
|
||||
String table_metadata_path;
|
||||
ASTPtr attach_query;
|
||||
/// DatabaseLazy::detachTable may return nullptr even if table exists, so we need tryGetTable for this case.
|
||||
StoragePtr table = tryGetTable(table_name);
|
||||
StoragePtr table = tryGetTable(table_name, global_context);
|
||||
detachTable(table_name);
|
||||
try
|
||||
{
|
||||
@ -304,10 +303,10 @@ void DatabaseOnDisk::renameTable(
|
||||
Poco::File(table_metadata_path).remove();
|
||||
}
|
||||
|
||||
ASTPtr DatabaseOnDisk::getCreateTableQueryImpl(const String & table_name, bool throw_on_error) const
|
||||
ASTPtr DatabaseOnDisk::getCreateTableQueryImpl(const String & table_name, const Context &, bool throw_on_error) const
|
||||
{
|
||||
ASTPtr ast;
|
||||
bool has_table = tryGetTable(table_name) != nullptr;
|
||||
bool has_table = tryGetTable(table_name, global_context) != nullptr;
|
||||
auto table_metadata_path = getObjectMetadataPath(table_name);
|
||||
try
|
||||
{
|
||||
|
@ -76,6 +76,7 @@ protected:
|
||||
|
||||
ASTPtr getCreateTableQueryImpl(
|
||||
const String & table_name,
|
||||
const Context & context,
|
||||
bool throw_on_error) const override;
|
||||
|
||||
ASTPtr getCreateQueryFromMetadata(const String & metadata_path, bool throw_on_error) const;
|
||||
@ -85,7 +86,6 @@ protected:
|
||||
|
||||
const String metadata_path;
|
||||
const String data_path;
|
||||
const Context & global_context;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -127,7 +127,7 @@ void DatabaseWithDictionaries::createDictionary(const Context & context, const S
|
||||
"Dictionary " + backQuote(getDatabaseName()) + "." + backQuote(dictionary_name) + " already exists.",
|
||||
ErrorCodes::DICTIONARY_ALREADY_EXISTS);
|
||||
|
||||
if (isTableExist(dictionary_name))
|
||||
if (isTableExist(dictionary_name, global_context))
|
||||
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(dictionary_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
|
||||
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include <Databases/DatabasesCommon.h>
|
||||
#include <Interpreters/InterpreterCreateQuery.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Parsers/ParserCreateQuery.h>
|
||||
#include <Parsers/formatAST.h>
|
||||
#include <Storages/StorageDictionary.h>
|
||||
@ -18,18 +19,18 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_TABLE;
|
||||
}
|
||||
|
||||
DatabaseWithOwnTablesBase::DatabaseWithOwnTablesBase(const String & name_, const String & logger)
|
||||
: IDatabase(name_), log(&Logger::get(logger))
|
||||
DatabaseWithOwnTablesBase::DatabaseWithOwnTablesBase(const String & name_, const String & logger, const Context & context)
|
||||
: IDatabase(name_), log(&Logger::get(logger)), global_context(context.getGlobalContext())
|
||||
{
|
||||
}
|
||||
|
||||
bool DatabaseWithOwnTablesBase::isTableExist(const String & table_name) const
|
||||
bool DatabaseWithOwnTablesBase::isTableExist(const String & table_name, const Context &) const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return tables.find(table_name) != tables.end();
|
||||
}
|
||||
|
||||
StoragePtr DatabaseWithOwnTablesBase::tryGetTable(const String & table_name) const
|
||||
StoragePtr DatabaseWithOwnTablesBase::tryGetTable(const String & table_name, const Context &) const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto it = tables.find(table_name);
|
||||
@ -38,7 +39,7 @@ StoragePtr DatabaseWithOwnTablesBase::tryGetTable(const String & table_name) con
|
||||
return {};
|
||||
}
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(const FilterByNameFunction & filter_by_table_name)
|
||||
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(const Context &, const FilterByNameFunction & filter_by_table_name)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
if (!filter_by_table_name)
|
||||
|
@ -19,9 +19,9 @@ class Context;
|
||||
class DatabaseWithOwnTablesBase : public IDatabase
|
||||
{
|
||||
public:
|
||||
bool isTableExist(const String & table_name) const override;
|
||||
bool isTableExist(const String & table_name, const Context & context) const override;
|
||||
|
||||
StoragePtr tryGetTable(const String & table_name) const override;
|
||||
StoragePtr tryGetTable(const String & table_name, const Context & context) const override;
|
||||
|
||||
bool empty() const override;
|
||||
|
||||
@ -29,18 +29,19 @@ public:
|
||||
|
||||
StoragePtr detachTable(const String & table_name) override;
|
||||
|
||||
DatabaseTablesIteratorPtr getTablesIterator(const FilterByNameFunction & filter_by_table_name) override;
|
||||
DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) override;
|
||||
|
||||
void shutdown() override;
|
||||
|
||||
virtual ~DatabaseWithOwnTablesBase() override;
|
||||
~DatabaseWithOwnTablesBase() override;
|
||||
|
||||
protected:
|
||||
mutable std::mutex mutex;
|
||||
Tables tables;
|
||||
Poco::Logger * log;
|
||||
const Context & global_context;
|
||||
|
||||
DatabaseWithOwnTablesBase(const String & name_, const String & logger);
|
||||
DatabaseWithOwnTablesBase(const String & name_, const String & logger, const Context & context);
|
||||
|
||||
void attachTableUnlocked(const String & table_name, const StoragePtr & table, std::unique_lock<std::mutex> & lock);
|
||||
StoragePtr detachTableUnlocked(const String & table_name, std::unique_lock<std::mutex> & lock);
|
||||
|
@ -130,7 +130,7 @@ public:
|
||||
virtual void loadStoredObjects(Context & /*context*/, bool /*has_force_restore_data_flag*/) {}
|
||||
|
||||
/// Check the existence of the table.
|
||||
virtual bool isTableExist(const String & name) const = 0;
|
||||
virtual bool isTableExist(const String & name, const Context & context) const = 0;
|
||||
|
||||
/// Check the existence of the dictionary
|
||||
virtual bool isDictionaryExist(const String & /*name*/) const
|
||||
@ -139,7 +139,7 @@ public:
|
||||
}
|
||||
|
||||
/// Get the table for work. Return nullptr if there is no table.
|
||||
virtual StoragePtr tryGetTable(const String & name) const = 0;
|
||||
virtual StoragePtr tryGetTable(const String & name, const Context & context) const = 0;
|
||||
|
||||
virtual UUID tryGetTableUUID(const String & /*table_name*/) const { return UUIDHelpers::Nil; }
|
||||
|
||||
@ -147,7 +147,7 @@ public:
|
||||
|
||||
/// Get an iterator that allows you to pass through all the tables.
|
||||
/// It is possible to have "hidden" tables that are not visible when passing through, but are visible if you get them by name using the functions above.
|
||||
virtual DatabaseTablesIteratorPtr getTablesIterator(const FilterByNameFunction & filter_by_table_name = {}) = 0;
|
||||
virtual DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name = {}) = 0;
|
||||
|
||||
/// Get an iterator to pass through all the dictionaries.
|
||||
virtual DatabaseDictionariesIteratorPtr getDictionariesIterator([[maybe_unused]] const FilterByNameFunction & filter_by_dictionary_name = {})
|
||||
@ -249,14 +249,14 @@ public:
|
||||
}
|
||||
|
||||
/// Get the CREATE TABLE query for the table. It can also provide information for detached tables for which there is metadata.
|
||||
ASTPtr tryGetCreateTableQuery(const String & name) const noexcept
|
||||
ASTPtr tryGetCreateTableQuery(const String & name, const Context & context) const noexcept
|
||||
{
|
||||
return getCreateTableQueryImpl(name, false);
|
||||
return getCreateTableQueryImpl(name, context, false);
|
||||
}
|
||||
|
||||
ASTPtr getCreateTableQuery(const String & name) const
|
||||
ASTPtr getCreateTableQuery(const String & name, const Context & context) const
|
||||
{
|
||||
return getCreateTableQueryImpl(name, true);
|
||||
return getCreateTableQueryImpl(name, context, true);
|
||||
}
|
||||
|
||||
/// Get the CREATE DICTIONARY query for the dictionary. Returns nullptr if dictionary doesn't exists.
|
||||
@ -304,7 +304,7 @@ public:
|
||||
virtual ~IDatabase() {}
|
||||
|
||||
protected:
|
||||
virtual ASTPtr getCreateTableQueryImpl(const String & /*name*/, bool throw_on_error) const
|
||||
virtual ASTPtr getCreateTableQueryImpl(const String & /*name*/, const Context & /*context*/, bool throw_on_error) const
|
||||
{
|
||||
if (throw_on_error)
|
||||
throw Exception("There is no SHOW CREATE TABLE query for Database" + getEngineName(), ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY);
|
||||
|
@ -24,8 +24,8 @@ SRCS(
|
||||
ComplexKeyCacheDictionary_generate3.cpp
|
||||
ComplexKeyCacheDictionary_setAttributeValue.cpp
|
||||
ComplexKeyCacheDictionary_setDefaultAttributeValue.cpp
|
||||
ComplexKeyHashedDictionary.cpp
|
||||
ComplexKeyDirectDictionary.cpp
|
||||
ComplexKeyHashedDictionary.cpp
|
||||
DictionaryBlockInputStreamBase.cpp
|
||||
DictionaryFactory.cpp
|
||||
DictionarySourceFactory.cpp
|
||||
|
@ -43,7 +43,7 @@ static auto getJoin(const ColumnsWithTypeAndName & arguments, const Context & co
|
||||
++dot;
|
||||
}
|
||||
String table_name = join_name.substr(dot);
|
||||
auto table = DatabaseCatalog::instance().getTable({database_name, table_name});
|
||||
auto table = DatabaseCatalog::instance().getTable({database_name, table_name}, context);
|
||||
auto storage_join = std::dynamic_pointer_cast<StorageJoin>(table);
|
||||
if (!storage_join)
|
||||
throw Exception{"Table " + join_name + " should have engine StorageJoin", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
|
||||
|
@ -113,7 +113,7 @@ void FunctionHasColumnInTable::executeImpl(Block & block, const ColumnNumbers &
|
||||
bool has_column;
|
||||
if (host_name.empty())
|
||||
{
|
||||
const StoragePtr & table = DatabaseCatalog::instance().getTable({database_name, table_name});
|
||||
const StoragePtr & table = DatabaseCatalog::instance().getTable({database_name, table_name}, global_context);
|
||||
has_column = table->getColumns().hasPhysical(column_name);
|
||||
}
|
||||
else
|
||||
|
@ -171,7 +171,6 @@ SRCS(
|
||||
FunctionsRound.cpp
|
||||
FunctionsStringArray.cpp
|
||||
FunctionsStringSimilarity.cpp
|
||||
FunctionUnixTimestamp64.h
|
||||
GatherUtils/concat.cpp
|
||||
GatherUtils/createArraySink.cpp
|
||||
GatherUtils/createArraySource.cpp
|
||||
@ -285,10 +284,10 @@ SRCS(
|
||||
rand64.cpp
|
||||
randConstant.cpp
|
||||
rand.cpp
|
||||
randomFixedString.cpp
|
||||
randomPrintableASCII.cpp
|
||||
randomString.cpp
|
||||
randomStringUTF8.cpp
|
||||
randomFixedString.cpp
|
||||
regexpQuoteMeta.cpp
|
||||
registerFunctionsArithmetic.cpp
|
||||
registerFunctionsComparison.cpp
|
||||
@ -308,8 +307,8 @@ SRCS(
|
||||
registerFunctionsStringRegexp.cpp
|
||||
registerFunctionsStringSearch.cpp
|
||||
registerFunctionsTuple.cpp
|
||||
registerFunctionsVisitParam.cpp
|
||||
registerFunctionsUnixTimestamp64.cpp
|
||||
registerFunctionsVisitParam.cpp
|
||||
reinterpretAsFixedString.cpp
|
||||
reinterpretAsString.cpp
|
||||
reinterpretStringAs.cpp
|
||||
@ -390,10 +389,10 @@ SRCS(
|
||||
toTime.cpp
|
||||
toTimeZone.cpp
|
||||
toTypeName.cpp
|
||||
toValidUTF8.cpp
|
||||
toUnixTimestamp64Micro.cpp
|
||||
toUnixTimestamp64Milli.cpp
|
||||
toUnixTimestamp64Nano.cpp
|
||||
toValidUTF8.cpp
|
||||
toYear.cpp
|
||||
toYYYYMM.cpp
|
||||
toYYYYMMDD.cpp
|
||||
@ -424,8 +423,8 @@ SRCS(
|
||||
URL/fragment.cpp
|
||||
URL/path.cpp
|
||||
URL/pathFull.cpp
|
||||
URL/protocol.cpp
|
||||
URL/port.cpp
|
||||
URL/protocol.cpp
|
||||
URL/queryStringAndFragment.cpp
|
||||
URL/queryString.cpp
|
||||
URL/registerFunctionsURL.cpp
|
||||
|
@ -650,7 +650,6 @@ void readCSVStringInto(Vector & s, ReadBuffer & buf, const FormatSettings::CSV &
|
||||
++next_pos;
|
||||
}();
|
||||
|
||||
|
||||
appendToStringOrVector(s, buf, next_pos);
|
||||
buf.position() = next_pos;
|
||||
|
||||
|
@ -19,23 +19,28 @@ namespace ActionLocks
|
||||
}
|
||||
|
||||
|
||||
ActionLocksManager::ActionLocksManager(const Context & context)
|
||||
: global_context(context.getGlobalContext())
|
||||
{
|
||||
}
|
||||
|
||||
template <typename F>
|
||||
inline void forEachTable(F && f)
|
||||
inline void forEachTable(F && f, const Context & context)
|
||||
{
|
||||
for (auto & elem : DatabaseCatalog::instance().getDatabases())
|
||||
for (auto iterator = elem.second->getTablesIterator(); iterator->isValid(); iterator->next())
|
||||
for (auto iterator = elem.second->getTablesIterator(context); iterator->isValid(); iterator->next())
|
||||
f(iterator->table());
|
||||
|
||||
}
|
||||
|
||||
void ActionLocksManager::add(StorageActionBlockType action_type)
|
||||
void ActionLocksManager::add(StorageActionBlockType action_type, const Context & context)
|
||||
{
|
||||
forEachTable([&](const StoragePtr & table) { add(table, action_type); });
|
||||
forEachTable([&](const StoragePtr & table) { add(table, action_type); }, context);
|
||||
}
|
||||
|
||||
void ActionLocksManager::add(const StorageID & table_id, StorageActionBlockType action_type)
|
||||
{
|
||||
if (auto table = DatabaseCatalog::instance().tryGetTable(table_id))
|
||||
if (auto table = DatabaseCatalog::instance().tryGetTable(table_id, global_context))
|
||||
add(table, action_type);
|
||||
}
|
||||
|
||||
@ -60,7 +65,7 @@ void ActionLocksManager::remove(StorageActionBlockType action_type)
|
||||
|
||||
void ActionLocksManager::remove(const StorageID & table_id, StorageActionBlockType action_type)
|
||||
{
|
||||
if (auto table = DatabaseCatalog::instance().tryGetTable(table_id))
|
||||
if (auto table = DatabaseCatalog::instance().tryGetTable(table_id, global_context))
|
||||
remove(table, action_type);
|
||||
}
|
||||
|
||||
|
@ -19,8 +19,10 @@ class Context;
|
||||
class ActionLocksManager
|
||||
{
|
||||
public:
|
||||
ActionLocksManager(const Context & context);
|
||||
|
||||
/// Adds new locks for each table
|
||||
void add(StorageActionBlockType action_type);
|
||||
void add(StorageActionBlockType action_type, const Context & context);
|
||||
/// Add new lock for a table if it has not been already added
|
||||
void add(const StorageID & table_id, StorageActionBlockType action_type);
|
||||
void add(const StoragePtr & table, StorageActionBlockType action_type);
|
||||
@ -41,6 +43,7 @@ private:
|
||||
|
||||
mutable std::mutex mutex;
|
||||
StorageLocks storage_locks;
|
||||
const Context & global_context;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -670,7 +670,7 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su
|
||||
if (identifier)
|
||||
{
|
||||
auto table_id = data.context.resolveStorageID(right_in_operand);
|
||||
StoragePtr table = DatabaseCatalog::instance().tryGetTable(table_id);
|
||||
StoragePtr table = DatabaseCatalog::instance().tryGetTable(table_id, data.context);
|
||||
|
||||
if (table)
|
||||
{
|
||||
|
@ -181,7 +181,7 @@ void AsynchronousMetrics::update()
|
||||
/// Lazy database can not contain MergeTree tables
|
||||
if (db.second->getEngineName() == "Lazy")
|
||||
continue;
|
||||
for (auto iterator = db.second->getTablesIterator(); iterator->isValid(); iterator->next())
|
||||
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
++total_number_of_tables;
|
||||
const auto & table = iterator->table();
|
||||
|
@ -191,7 +191,7 @@ void SelectStreamFactory::createForShard(
|
||||
else
|
||||
{
|
||||
auto resolved_id = context.resolveStorageID(main_table);
|
||||
main_table_storage = DatabaseCatalog::instance().tryGetTable(resolved_id);
|
||||
main_table_storage = DatabaseCatalog::instance().tryGetTable(resolved_id, context);
|
||||
}
|
||||
|
||||
|
||||
|
@ -2017,7 +2017,7 @@ std::shared_ptr<ActionLocksManager> Context::getActionLocksManager()
|
||||
auto lock = getLock();
|
||||
|
||||
if (!shared->action_locks_manager)
|
||||
shared->action_locks_manager = std::make_shared<ActionLocksManager>();
|
||||
shared->action_locks_manager = std::make_shared<ActionLocksManager>(*this);
|
||||
|
||||
return shared->action_locks_manager;
|
||||
}
|
||||
|
@ -634,7 +634,7 @@ void DDLWorker::processTask(DDLTask & task, const ZooKeeperPtr & zookeeper)
|
||||
{
|
||||
/// It's not CREATE DATABASE
|
||||
auto table_id = context.tryResolveStorageID(*query_with_table, Context::ResolveOrdinary);
|
||||
storage = DatabaseCatalog::instance().tryGetTable(table_id);
|
||||
storage = DatabaseCatalog::instance().tryGetTable(table_id, context);
|
||||
}
|
||||
|
||||
/// For some reason we check consistency of cluster definition only
|
||||
|
@ -58,13 +58,17 @@ TemporaryTableHolder::TemporaryTableHolder(const Context & context_,
|
||||
}
|
||||
|
||||
|
||||
TemporaryTableHolder::TemporaryTableHolder(const Context & context_, const ColumnsDescription & columns, const ASTPtr & query)
|
||||
TemporaryTableHolder::TemporaryTableHolder(
|
||||
const Context & context_,
|
||||
const ColumnsDescription & columns,
|
||||
const ConstraintsDescription & constraints,
|
||||
const ASTPtr & query)
|
||||
: TemporaryTableHolder
|
||||
(
|
||||
context_,
|
||||
[&](const StorageID & table_id)
|
||||
{
|
||||
return StorageMemory::create(table_id, ColumnsDescription{columns}, ConstraintsDescription{});
|
||||
return StorageMemory::create(table_id, ColumnsDescription{columns}, ConstraintsDescription{constraints});
|
||||
},
|
||||
query
|
||||
)
|
||||
@ -97,7 +101,7 @@ StorageID TemporaryTableHolder::getGlobalTableID() const
|
||||
|
||||
StoragePtr TemporaryTableHolder::getTable() const
|
||||
{
|
||||
auto table = temporary_tables->tryGetTable("_tmp_" + toString(id));
|
||||
auto table = temporary_tables->tryGetTable("_tmp_" + toString(id), *global_context);
|
||||
if (!table)
|
||||
throw Exception("Temporary table " + getGlobalTableID().getNameForLogs() + " not found", ErrorCodes::LOGICAL_ERROR);
|
||||
return table;
|
||||
@ -108,7 +112,7 @@ void DatabaseCatalog::loadDatabases()
|
||||
{
|
||||
drop_delay_sec = global_context->getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec);
|
||||
|
||||
auto db_for_temporary_and_external_tables = std::make_shared<DatabaseMemory>(TEMPORARY_DATABASE);
|
||||
auto db_for_temporary_and_external_tables = std::make_shared<DatabaseMemory>(TEMPORARY_DATABASE, *global_context);
|
||||
attachDatabase(TEMPORARY_DATABASE, db_for_temporary_and_external_tables);
|
||||
|
||||
loadMarkedAsDroppedTables();
|
||||
@ -159,6 +163,7 @@ DatabaseAndTable DatabaseCatalog::tryGetByUUID(const UUID & uuid) const
|
||||
|
||||
DatabaseAndTable DatabaseCatalog::getTableImpl(
|
||||
const StorageID & table_id,
|
||||
const Context & context,
|
||||
std::optional<Exception> * exception) const
|
||||
{
|
||||
if (!table_id)
|
||||
@ -206,7 +211,7 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
|
||||
database = it->second;
|
||||
}
|
||||
|
||||
auto table = database->tryGetTable(table_id.table_name);
|
||||
auto table = database->tryGetTable(table_id.table_name, context);
|
||||
if (!table && exception)
|
||||
exception->emplace("Table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
|
||||
|
||||
@ -319,7 +324,7 @@ Databases DatabaseCatalog::getDatabases() const
|
||||
return databases;
|
||||
}
|
||||
|
||||
bool DatabaseCatalog::isTableExist(const DB::StorageID & table_id) const
|
||||
bool DatabaseCatalog::isTableExist(const DB::StorageID & table_id, const Context & context) const
|
||||
{
|
||||
if (table_id.hasUUID())
|
||||
return tryGetByUUID(table_id.uuid).second != nullptr;
|
||||
@ -331,12 +336,12 @@ bool DatabaseCatalog::isTableExist(const DB::StorageID & table_id) const
|
||||
if (iter != databases.end())
|
||||
db = iter->second;
|
||||
}
|
||||
return db && db->isTableExist(table_id.table_name);
|
||||
return db && db->isTableExist(table_id.table_name, context);
|
||||
}
|
||||
|
||||
void DatabaseCatalog::assertTableDoesntExist(const StorageID & table_id) const
|
||||
void DatabaseCatalog::assertTableDoesntExist(const StorageID & table_id, const Context & context) const
|
||||
{
|
||||
if (isTableExist(table_id))
|
||||
if (isTableExist(table_id, context))
|
||||
throw Exception("Table " + table_id.getNameForLogs() + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
|
||||
}
|
||||
|
||||
@ -468,32 +473,32 @@ bool DatabaseCatalog::isDictionaryExist(const StorageID & table_id) const
|
||||
return db && db->isDictionaryExist(table_id.getTableName());
|
||||
}
|
||||
|
||||
StoragePtr DatabaseCatalog::getTable(const StorageID & table_id) const
|
||||
StoragePtr DatabaseCatalog::getTable(const StorageID & table_id, const Context & context) const
|
||||
{
|
||||
std::optional<Exception> exc;
|
||||
auto res = getTableImpl(table_id, &exc);
|
||||
auto res = getTableImpl(table_id, context, &exc);
|
||||
if (!res.second)
|
||||
throw Exception(*exc);
|
||||
return res.second;
|
||||
}
|
||||
|
||||
StoragePtr DatabaseCatalog::tryGetTable(const StorageID & table_id) const
|
||||
StoragePtr DatabaseCatalog::tryGetTable(const StorageID & table_id, const Context & context) const
|
||||
{
|
||||
return getTableImpl(table_id, nullptr).second;
|
||||
return getTableImpl(table_id, context, nullptr).second;
|
||||
}
|
||||
|
||||
DatabaseAndTable DatabaseCatalog::getDatabaseAndTable(const StorageID & table_id) const
|
||||
DatabaseAndTable DatabaseCatalog::getDatabaseAndTable(const StorageID & table_id, const Context & context) const
|
||||
{
|
||||
std::optional<Exception> exc;
|
||||
auto res = getTableImpl(table_id, &exc);
|
||||
auto res = getTableImpl(table_id, context, &exc);
|
||||
if (!res.second)
|
||||
throw Exception(*exc);
|
||||
return res;
|
||||
}
|
||||
|
||||
DatabaseAndTable DatabaseCatalog::tryGetDatabaseAndTable(const StorageID & table_id) const
|
||||
DatabaseAndTable DatabaseCatalog::tryGetDatabaseAndTable(const StorageID & table_id, const Context & context) const
|
||||
{
|
||||
return getTableImpl(table_id, nullptr);
|
||||
return getTableImpl(table_id, context, nullptr);
|
||||
}
|
||||
|
||||
void DatabaseCatalog::loadMarkedAsDroppedTables()
|
||||
|
@ -21,6 +21,7 @@ class Context;
|
||||
class IDatabase;
|
||||
class Exception;
|
||||
class ColumnsDescription;
|
||||
struct ConstraintsDescription;
|
||||
|
||||
using DatabasePtr = std::shared_ptr<IDatabase>;
|
||||
using DatabaseAndTable = std::pair<DatabasePtr, StoragePtr>;
|
||||
@ -71,7 +72,11 @@ struct TemporaryTableHolder : boost::noncopyable
|
||||
TemporaryTableHolder(const Context & context, const Creator & creator, const ASTPtr & query = {});
|
||||
|
||||
/// Creates temporary table with Engine=Memory
|
||||
TemporaryTableHolder(const Context & context, const ColumnsDescription & columns, const ASTPtr & query = {});
|
||||
TemporaryTableHolder(
|
||||
const Context & context,
|
||||
const ColumnsDescription & columns,
|
||||
const ConstraintsDescription & constraints,
|
||||
const ASTPtr & query = {});
|
||||
|
||||
TemporaryTableHolder(TemporaryTableHolder && rhs);
|
||||
TemporaryTableHolder & operator = (TemporaryTableHolder && rhs);
|
||||
@ -129,15 +134,17 @@ public:
|
||||
DatabasePtr getDatabase(const String & database_name, const Context & local_context) const;
|
||||
|
||||
/// For all of the following methods database_name in table_id must be not empty (even for temporary tables).
|
||||
void assertTableDoesntExist(const StorageID & table_id) const;
|
||||
bool isTableExist(const StorageID & table_id) const;
|
||||
void assertTableDoesntExist(const StorageID & table_id, const Context & context) const;
|
||||
bool isTableExist(const StorageID & table_id, const Context & context) const;
|
||||
bool isDictionaryExist(const StorageID & table_id) const;
|
||||
|
||||
StoragePtr getTable(const StorageID & table_id) const;
|
||||
StoragePtr tryGetTable(const StorageID & table_id) const;
|
||||
DatabaseAndTable getDatabaseAndTable(const StorageID & table_id) const;
|
||||
DatabaseAndTable tryGetDatabaseAndTable(const StorageID & table_id) const;
|
||||
DatabaseAndTable getTableImpl(const StorageID & table_id, std::optional<Exception> * exception = nullptr) const;
|
||||
StoragePtr getTable(const StorageID & table_id, const Context & context) const;
|
||||
StoragePtr tryGetTable(const StorageID & table_id, const Context & context) const;
|
||||
DatabaseAndTable getDatabaseAndTable(const StorageID & table_id, const Context & context) const;
|
||||
DatabaseAndTable tryGetDatabaseAndTable(const StorageID & table_id, const Context & context) const;
|
||||
DatabaseAndTable getTableImpl(const StorageID & table_id,
|
||||
const Context & context,
|
||||
std::optional<Exception> * exception = nullptr) const;
|
||||
|
||||
void addDependency(const StorageID & from, const StorageID & where);
|
||||
void removeDependency(const StorageID & from, const StorageID & where);
|
||||
|
@ -322,7 +322,7 @@ SetPtr SelectQueryExpressionAnalyzer::isPlainStorageSetInSubquery(const ASTPtr &
|
||||
if (!table)
|
||||
return nullptr;
|
||||
auto table_id = context.resolveStorageID(subquery_or_table_name);
|
||||
const auto storage = DatabaseCatalog::instance().getTable(table_id);
|
||||
const auto storage = DatabaseCatalog::instance().getTable(table_id, context);
|
||||
if (storage->getName() != "Set")
|
||||
return nullptr;
|
||||
const auto storage_set = std::dynamic_pointer_cast<StorageSet>(storage);
|
||||
|
@ -103,7 +103,7 @@ public:
|
||||
Block sample = interpreter->getSampleBlock();
|
||||
NamesAndTypesList columns = sample.getNamesAndTypesList();
|
||||
|
||||
auto external_storage_holder = std::make_shared<TemporaryTableHolder>(context, ColumnsDescription{columns});
|
||||
auto external_storage_holder = std::make_shared<TemporaryTableHolder>(context, ColumnsDescription{columns}, ConstraintsDescription{});
|
||||
StoragePtr external_storage = external_storage_holder->getTable();
|
||||
|
||||
/** We replace the subquery with the name of the temporary table.
|
||||
|
@ -27,7 +27,7 @@ namespace
|
||||
StoragePtr tryGetTable(const ASTPtr & database_and_table, const Context & context)
|
||||
{
|
||||
auto table_id = context.resolveStorageID(database_and_table);
|
||||
return DatabaseCatalog::instance().tryGetTable(table_id);
|
||||
return DatabaseCatalog::instance().tryGetTable(table_id, context);
|
||||
}
|
||||
|
||||
using CheckShardsAndTables = InJoinSubqueriesPreprocessor::CheckShardsAndTables;
|
||||
|
@ -42,7 +42,7 @@ BlockIO InterpreterAlterQuery::execute()
|
||||
|
||||
context.checkAccess(getRequiredAccess());
|
||||
auto table_id = context.resolveStorageID(alter, Context::ResolveOrdinary);
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(table_id);
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context);
|
||||
|
||||
/// Add default database to table identifiers that we can encounter in e.g. default expressions,
|
||||
/// mutation expression, etc.
|
||||
@ -244,12 +244,12 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
|
||||
}
|
||||
case ASTAlterCommand::MOVE_PARTITION:
|
||||
{
|
||||
if ((command.move_destination_type == PartDestinationType::DISK)
|
||||
|| (command.move_destination_type == PartDestinationType::VOLUME))
|
||||
if ((command.move_destination_type == DataDestinationType::DISK)
|
||||
|| (command.move_destination_type == DataDestinationType::VOLUME))
|
||||
{
|
||||
required_access.emplace_back(AccessType::ALTER_MOVE_PARTITION, database, table);
|
||||
}
|
||||
else if (command.move_destination_type == PartDestinationType::TABLE)
|
||||
else if (command.move_destination_type == DataDestinationType::TABLE)
|
||||
{
|
||||
required_access.emplace_back(AccessType::SELECT | AccessType::ALTER_DELETE, database, table);
|
||||
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
|
||||
|
@ -41,7 +41,7 @@ BlockIO InterpreterCheckQuery::execute()
|
||||
auto table_id = context.resolveStorageID(check, Context::ResolveOrdinary);
|
||||
|
||||
context.checkAccess(AccessType::SHOW_TABLES, table_id);
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(table_id);
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context);
|
||||
auto check_results = table->checkData(query_ptr, context);
|
||||
|
||||
Block block;
|
||||
|
@ -406,7 +406,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
|
||||
else if (!create.as_table.empty())
|
||||
{
|
||||
String as_database_name = context.resolveDatabase(create.as_database);
|
||||
StoragePtr as_storage = DatabaseCatalog::instance().getTable({as_database_name, create.as_table});
|
||||
StoragePtr as_storage = DatabaseCatalog::instance().getTable({as_database_name, create.as_table}, context);
|
||||
|
||||
/// as_storage->getColumns() and setEngine(...) must be called under structure lock of other_table for CREATE ... AS other_table.
|
||||
as_storage_lock = as_storage->lockStructureForShare(
|
||||
@ -504,7 +504,7 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
|
||||
String as_database_name = context.resolveDatabase(create.as_database);
|
||||
String as_table_name = create.as_table;
|
||||
|
||||
ASTPtr as_create_ptr = DatabaseCatalog::instance().getDatabase(as_database_name)->getCreateTableQuery(as_table_name);
|
||||
ASTPtr as_create_ptr = DatabaseCatalog::instance().getDatabase(as_database_name)->getCreateTableQuery(as_table_name, context);
|
||||
const auto & as_create = as_create_ptr->as<ASTCreateQuery &>();
|
||||
|
||||
const String qualified_name = backQuoteIfNeed(as_database_name) + "." + backQuoteIfNeed(as_table_name);
|
||||
@ -546,7 +546,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
bool if_not_exists = create.if_not_exists;
|
||||
|
||||
// Table SQL definition is available even if the table is detached
|
||||
auto query = database->getCreateTableQuery(create.table);
|
||||
auto query = database->getCreateTableQuery(create.table, context);
|
||||
create = query->as<ASTCreateQuery &>(); // Copy the saved create query, but use ATTACH instead of CREATE
|
||||
create.attach = true;
|
||||
create.attach_short_syntax = true;
|
||||
@ -608,7 +608,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
|
||||
guard = DatabaseCatalog::instance().getDDLGuard(create.database, table_name);
|
||||
|
||||
/// Table can be created before or it can be created concurrently in another thread, while we were waiting in DDLGuard.
|
||||
if (database->isTableExist(table_name))
|
||||
if (database->isTableExist(table_name, context))
|
||||
{
|
||||
/// TODO Check structure of table
|
||||
if (create.if_not_exists)
|
||||
@ -637,7 +637,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
|
||||
if (create.if_not_exists && context.tryResolveStorageID({"", table_name}, Context::ResolveExternal))
|
||||
return false;
|
||||
|
||||
auto temporary_table = TemporaryTableHolder(context, properties.columns, query_ptr);
|
||||
auto temporary_table = TemporaryTableHolder(context, properties.columns, properties.constraints, query_ptr);
|
||||
context.getSessionContext().addExternalTable(table_name, std::move(temporary_table));
|
||||
return true;
|
||||
}
|
||||
|
@ -86,7 +86,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
|
||||
{
|
||||
auto table_id = context.resolveStorageID(table_expression.database_and_table_name);
|
||||
context.checkAccess(AccessType::SHOW_COLUMNS, table_id);
|
||||
table = DatabaseCatalog::instance().getTable(table_id);
|
||||
table = DatabaseCatalog::instance().getTable(table_id, context);
|
||||
}
|
||||
|
||||
auto table_lock = table->lockStructureForShare(
|
||||
|
@ -81,8 +81,8 @@ BlockIO InterpreterDropQuery::executeToTable(
|
||||
auto ddl_guard = (!query.no_ddl_lock ? DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name) : nullptr);
|
||||
|
||||
/// If table was already dropped by anyone, an exception will be thrown
|
||||
auto [database, table] = query.if_exists ? DatabaseCatalog::instance().tryGetDatabaseAndTable(table_id)
|
||||
: DatabaseCatalog::instance().getDatabaseAndTable(table_id);
|
||||
auto [database, table] = query.if_exists ? DatabaseCatalog::instance().tryGetDatabaseAndTable(table_id, context)
|
||||
: DatabaseCatalog::instance().getDatabaseAndTable(table_id, context);
|
||||
|
||||
if (database && table)
|
||||
{
|
||||
@ -182,7 +182,7 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(const String & table_name,
|
||||
auto resolved_id = context_handle.tryResolveStorageID(StorageID("", table_name), Context::ResolveExternal);
|
||||
if (resolved_id)
|
||||
{
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(resolved_id);
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(resolved_id, context);
|
||||
if (kind == ASTDropQuery::Kind::Truncate)
|
||||
{
|
||||
auto table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
|
||||
@ -234,7 +234,7 @@ BlockIO InterpreterDropQuery::executeToDatabase(const String & database_name, AS
|
||||
ASTDropQuery query;
|
||||
query.kind = kind;
|
||||
query.database = database_name;
|
||||
for (auto iterator = database->getTablesIterator(); iterator->isValid(); iterator->next())
|
||||
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
query.table = iterator->name();
|
||||
executeToTable({query.database, query.table}, query);
|
||||
|
@ -50,7 +50,7 @@ BlockInputStreamPtr InterpreterExistsQuery::executeImpl()
|
||||
{
|
||||
String database = context.resolveDatabase(exists_query->database);
|
||||
context.checkAccess(AccessType::SHOW_TABLES, database, exists_query->table);
|
||||
result = DatabaseCatalog::instance().isTableExist({database, exists_query->table});
|
||||
result = DatabaseCatalog::instance().isTableExist({database, exists_query->table}, context);
|
||||
}
|
||||
}
|
||||
else if ((exists_query = query_ptr->as<ASTExistsDictionaryQuery>()))
|
||||
|
@ -76,7 +76,7 @@ namespace
|
||||
if (const auto * identifier = expression.database_and_table_name->as<ASTIdentifier>())
|
||||
{
|
||||
auto table_id = data.context.resolveStorageID(*identifier);
|
||||
const auto & storage = DatabaseCatalog::instance().getTable(table_id);
|
||||
const auto & storage = DatabaseCatalog::instance().getTable(table_id, data.context);
|
||||
|
||||
if (auto * storage_view = dynamic_cast<StorageView *>(storage.get()))
|
||||
storage_view->getRuntimeViewQuery(&select_query, data.context, true);
|
||||
|
@ -70,7 +70,7 @@ StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query)
|
||||
}
|
||||
|
||||
query.table_id = context.resolveStorageID(query.table_id);
|
||||
return DatabaseCatalog::instance().getTable(query.table_id);
|
||||
return DatabaseCatalog::instance().getTable(query.table_id, context);
|
||||
}
|
||||
|
||||
Block InterpreterInsertQuery::getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table) const
|
||||
@ -233,6 +233,21 @@ BlockIO InterpreterInsertQuery::execute()
|
||||
else
|
||||
out = std::make_shared<PushingToViewsBlockOutputStream>(table, context, query_ptr, no_destination);
|
||||
|
||||
/// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order.
|
||||
|
||||
/// Checking constraints. It must be done after calculation of all defaults, so we can check them on calculated columns.
|
||||
if (const auto & constraints = table->getConstraints(); !constraints.empty())
|
||||
out = std::make_shared<CheckConstraintsBlockOutputStream>(
|
||||
query.table_id, out, out->getHeader(), table->getConstraints(), context);
|
||||
|
||||
/// Actually we don't know structure of input blocks from query/table,
|
||||
/// because some clients break insertion protocol (columns != header)
|
||||
out = std::make_shared<AddingDefaultBlockOutputStream>(
|
||||
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
|
||||
|
||||
/// It's important to squash blocks as early as possible (before other transforms),
|
||||
/// because other transforms may work inefficient if block size is small.
|
||||
|
||||
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
|
||||
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
|
||||
if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()) && !no_squash && !query.watch)
|
||||
@ -244,15 +259,6 @@ BlockIO InterpreterInsertQuery::execute()
|
||||
context.getSettingsRef().min_insert_block_size_bytes);
|
||||
}
|
||||
|
||||
/// Actually we don't know structure of input blocks from query/table,
|
||||
/// because some clients break insertion protocol (columns != header)
|
||||
out = std::make_shared<AddingDefaultBlockOutputStream>(
|
||||
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
|
||||
|
||||
if (const auto & constraints = table->getConstraints(); !constraints.empty())
|
||||
out = std::make_shared<CheckConstraintsBlockOutputStream>(
|
||||
query.table_id, out, query_sample_block, table->getConstraints(), context);
|
||||
|
||||
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
|
||||
out_wrapper->setProcessListElement(context.getProcessListElement());
|
||||
out = std::move(out_wrapper);
|
||||
|
@ -261,7 +261,7 @@ BlockIO InterpreterKillQueryQuery::execute()
|
||||
CancellationCode code = CancellationCode::Unknown;
|
||||
if (!query.test)
|
||||
{
|
||||
auto storage = DatabaseCatalog::instance().tryGetTable(table_id);
|
||||
auto storage = DatabaseCatalog::instance().tryGetTable(table_id, context);
|
||||
if (!storage)
|
||||
code = CancellationCode::NotFound;
|
||||
else
|
||||
|
@ -25,7 +25,7 @@ BlockIO InterpreterOptimizeQuery::execute()
|
||||
context.checkAccess(getRequiredAccess());
|
||||
|
||||
auto table_id = context.resolveStorageID(ast, Context::ResolveOrdinary);
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(table_id);
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context);
|
||||
table->optimize(query_ptr, ast.partition, ast.final, ast.deduplicate, context);
|
||||
return {};
|
||||
}
|
||||
|
@ -78,7 +78,7 @@ BlockIO InterpreterRenameQuery::execute()
|
||||
for (auto & elem : descriptions)
|
||||
{
|
||||
if (!rename.exchange)
|
||||
database_catalog.assertTableDoesntExist(StorageID(elem.to_database_name, elem.to_table_name));
|
||||
database_catalog.assertTableDoesntExist(StorageID(elem.to_database_name, elem.to_table_name), context);
|
||||
|
||||
database_catalog.getDatabase(elem.from_database_name)->renameTable(
|
||||
context,
|
||||
|
@ -50,7 +50,7 @@ BlockInputStreamPtr InterpreterShowCreateQuery::executeImpl()
|
||||
auto resolve_table_type = show_query->temporary ? Context::ResolveExternal : Context::ResolveOrdinary;
|
||||
auto table_id = context.resolveStorageID(*show_query, resolve_table_type);
|
||||
context.checkAccess(AccessType::SHOW_COLUMNS, table_id);
|
||||
create_query = DatabaseCatalog::instance().getDatabase(table_id.database_name)->getCreateTableQuery(table_id.table_name);
|
||||
create_query = DatabaseCatalog::instance().getDatabase(table_id.database_name)->getCreateTableQuery(table_id.table_name, context);
|
||||
}
|
||||
else if ((show_query = query_ptr->as<ASTShowCreateDatabaseQuery>()))
|
||||
{
|
||||
|
@ -144,7 +144,7 @@ void InterpreterSystemQuery::startStopAction(StorageActionBlockType action_type,
|
||||
auto access = context.getAccess();
|
||||
for (auto & elem : DatabaseCatalog::instance().getDatabases())
|
||||
{
|
||||
for (auto iterator = elem.second->getTablesIterator(); iterator->isValid(); iterator->next())
|
||||
for (auto iterator = elem.second->getTablesIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
if (!access->isGranted(log, getRequiredAccessType(action_type), elem.first, iterator->name()))
|
||||
continue;
|
||||
@ -321,7 +321,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica,
|
||||
context.checkAccess(AccessType::SYSTEM_RESTART_REPLICA, replica);
|
||||
|
||||
auto table_ddl_guard = need_ddl_guard ? DatabaseCatalog::instance().getDDLGuard(replica.getDatabaseName(), replica.getTableName()) : nullptr;
|
||||
auto [database, table] = DatabaseCatalog::instance().tryGetDatabaseAndTable(replica);
|
||||
auto [database, table] = DatabaseCatalog::instance().tryGetDatabaseAndTable(replica, context);
|
||||
ASTPtr create_ast;
|
||||
|
||||
/// Detach actions
|
||||
@ -332,7 +332,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica,
|
||||
{
|
||||
/// If table was already dropped by anyone, an exception will be thrown
|
||||
auto table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
|
||||
create_ast = database->getCreateTableQuery(replica.table_name);
|
||||
create_ast = database->getCreateTableQuery(replica.table_name, context);
|
||||
|
||||
database->detachTable(replica.table_name);
|
||||
}
|
||||
@ -369,7 +369,7 @@ void InterpreterSystemQuery::restartReplicas(Context & system_context)
|
||||
for (auto & elem : catalog.getDatabases())
|
||||
{
|
||||
DatabasePtr & database = elem.second;
|
||||
for (auto iterator = database->getTablesIterator(); iterator->isValid(); iterator->next())
|
||||
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
if (dynamic_cast<const StorageReplicatedMergeTree *>(iterator->table().get()))
|
||||
replica_names.emplace_back(StorageID{database->getDatabaseName(), iterator->name()});
|
||||
@ -394,7 +394,7 @@ void InterpreterSystemQuery::restartReplicas(Context & system_context)
|
||||
void InterpreterSystemQuery::syncReplica(ASTSystemQuery &)
|
||||
{
|
||||
context.checkAccess(AccessType::SYSTEM_SYNC_REPLICA, table_id);
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(table_id);
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context);
|
||||
|
||||
if (auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
|
||||
{
|
||||
@ -416,7 +416,7 @@ void InterpreterSystemQuery::flushDistributed(ASTSystemQuery &)
|
||||
{
|
||||
context.checkAccess(AccessType::SYSTEM_FLUSH_DISTRIBUTED, table_id);
|
||||
|
||||
if (auto * storage_distributed = dynamic_cast<StorageDistributed *>(DatabaseCatalog::instance().getTable(table_id).get()))
|
||||
if (auto * storage_distributed = dynamic_cast<StorageDistributed *>(DatabaseCatalog::instance().getTable(table_id, context).get()))
|
||||
storage_distributed->flushClusterNodesAllData();
|
||||
else
|
||||
throw Exception("Table " + table_id.getNameForLogs() + " is not distributed", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
@ -40,7 +40,7 @@ BlockIO InterpreterWatchQuery::execute()
|
||||
auto table_id = context.resolveStorageID(query, Context::ResolveOrdinary);
|
||||
|
||||
/// Get storage
|
||||
storage = DatabaseCatalog::instance().tryGetTable(table_id);
|
||||
storage = DatabaseCatalog::instance().tryGetTable(table_id, context);
|
||||
|
||||
if (!storage)
|
||||
throw Exception("Table " + table_id.getNameForLogs() + " doesn't exist.",
|
||||
|
@ -181,7 +181,7 @@ StoragePtr JoinedTables::getLeftTableStorage()
|
||||
}
|
||||
|
||||
/// Read from table. Even without table expression (implicit SELECT ... FROM system.one).
|
||||
return DatabaseCatalog::instance().getTable(table_id);
|
||||
return DatabaseCatalog::instance().getTable(table_id, context);
|
||||
}
|
||||
|
||||
bool JoinedTables::resolveTables()
|
||||
@ -261,7 +261,7 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
|
||||
if (table_to_join.database_and_table_name)
|
||||
{
|
||||
auto joined_table_id = context.resolveStorageID(table_to_join.database_and_table_name);
|
||||
StoragePtr table = DatabaseCatalog::instance().tryGetTable(joined_table_id);
|
||||
StoragePtr table = DatabaseCatalog::instance().tryGetTable(joined_table_id, context);
|
||||
if (table)
|
||||
{
|
||||
if (dynamic_cast<StorageJoin *>(table.get()) ||
|
||||
|
@ -431,7 +431,7 @@ void SystemLog<LogElement>::prepareTable()
|
||||
{
|
||||
String description = table_id.getNameForLogs();
|
||||
|
||||
table = DatabaseCatalog::instance().tryGetTable(table_id);
|
||||
table = DatabaseCatalog::instance().tryGetTable(table_id, context);
|
||||
|
||||
if (table)
|
||||
{
|
||||
@ -442,7 +442,7 @@ void SystemLog<LogElement>::prepareTable()
|
||||
{
|
||||
/// Rename the existing table.
|
||||
int suffix = 0;
|
||||
while (DatabaseCatalog::instance().isTableExist({table_id.database_name, table_id.table_name + "_" + toString(suffix)}))
|
||||
while (DatabaseCatalog::instance().isTableExist({table_id.database_name, table_id.table_name + "_" + toString(suffix)}, context))
|
||||
++suffix;
|
||||
|
||||
auto rename = std::make_shared<ASTRenameQuery>();
|
||||
@ -483,7 +483,7 @@ void SystemLog<LogElement>::prepareTable()
|
||||
interpreter.setInternal(true);
|
||||
interpreter.execute();
|
||||
|
||||
table = DatabaseCatalog::instance().getTable(table_id);
|
||||
table = DatabaseCatalog::instance().getTable(table_id, context);
|
||||
}
|
||||
|
||||
is_prepared = true;
|
||||
|
@ -96,7 +96,7 @@ static NamesAndTypesList getColumnsFromTableExpression(const ASTTableExpression
|
||||
else if (table_expression.database_and_table_name)
|
||||
{
|
||||
auto table_id = context.resolveStorageID(table_expression.database_and_table_name);
|
||||
const auto & table = DatabaseCatalog::instance().getTable(table_id);
|
||||
const auto & table = DatabaseCatalog::instance().getTable(table_id, context);
|
||||
const auto & columns = table->getColumns();
|
||||
names_and_type_list = columns.getOrdinary();
|
||||
materialized = columns.getMaterialized();
|
||||
|
@ -96,7 +96,7 @@ std::shared_ptr<InterpreterSelectWithUnionQuery> interpretSubquery(
|
||||
else
|
||||
{
|
||||
auto table_id = context.resolveStorageID(table_expression);
|
||||
const auto & storage = DatabaseCatalog::instance().getTable(table_id);
|
||||
const auto & storage = DatabaseCatalog::instance().getTable(table_id, context);
|
||||
columns = storage->getColumns().getOrdinary();
|
||||
select_query->replaceDatabaseAndTable(table_id);
|
||||
}
|
||||
|
@ -181,13 +181,13 @@ void ASTAlterCommand::formatImpl(
|
||||
settings.ostr << " TO ";
|
||||
switch (move_destination_type)
|
||||
{
|
||||
case PartDestinationType::DISK:
|
||||
case DataDestinationType::DISK:
|
||||
settings.ostr << "DISK ";
|
||||
break;
|
||||
case PartDestinationType::VOLUME:
|
||||
case DataDestinationType::VOLUME:
|
||||
settings.ostr << "VOLUME ";
|
||||
break;
|
||||
case PartDestinationType::TABLE:
|
||||
case DataDestinationType::TABLE:
|
||||
settings.ostr << "TABLE ";
|
||||
if (!to_database.empty())
|
||||
{
|
||||
@ -201,7 +201,7 @@ void ASTAlterCommand::formatImpl(
|
||||
default:
|
||||
break;
|
||||
}
|
||||
if (move_destination_type != PartDestinationType::TABLE)
|
||||
if (move_destination_type != DataDestinationType::TABLE)
|
||||
{
|
||||
settings.ostr << quoteString(move_destination_name);
|
||||
}
|
||||
|
@ -136,7 +136,7 @@ public:
|
||||
|
||||
bool if_exists = false; /// option for DROP_COLUMN, MODIFY_COLUMN, COMMENT_COLUMN
|
||||
|
||||
PartDestinationType move_destination_type; /// option for MOVE PART/PARTITION
|
||||
DataDestinationType move_destination_type; /// option for MOVE PART/PARTITION
|
||||
|
||||
String move_destination_name; /// option for MOVE PART/PARTITION
|
||||
|
||||
|
@ -28,11 +28,11 @@ ASTPtr ASTTTLElement::clone() const
|
||||
void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
|
||||
{
|
||||
ttl()->formatImpl(settings, state, frame);
|
||||
if (mode == TTLMode::MOVE && destination_type == PartDestinationType::DISK)
|
||||
if (mode == TTLMode::MOVE && destination_type == DataDestinationType::DISK)
|
||||
{
|
||||
settings.ostr << " TO DISK " << quoteString(destination_name);
|
||||
}
|
||||
else if (mode == TTLMode::MOVE && destination_type == PartDestinationType::VOLUME)
|
||||
else if (mode == TTLMode::MOVE && destination_type == DataDestinationType::VOLUME)
|
||||
{
|
||||
settings.ostr << " TO VOLUME " << quoteString(destination_name);
|
||||
}
|
||||
|
@ -1,8 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include <Parsers/IAST.h>
|
||||
#include <Storages/MergeTree/PartDestinationType.h>
|
||||
#include <Storages/MergeTree/TTLMode.h>
|
||||
#include <Storages/DataDestinationType.h>
|
||||
#include <Storages/TTLMode.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -14,13 +14,13 @@ class ASTTTLElement : public IAST
|
||||
{
|
||||
public:
|
||||
TTLMode mode;
|
||||
PartDestinationType destination_type;
|
||||
DataDestinationType destination_type;
|
||||
String destination_name;
|
||||
|
||||
ASTs group_by_key;
|
||||
std::vector<std::pair<String, ASTPtr>> group_by_aggregations;
|
||||
|
||||
ASTTTLElement(TTLMode mode_, PartDestinationType destination_type_, const String & destination_name_)
|
||||
ASTTTLElement(TTLMode mode_, DataDestinationType destination_type_, const String & destination_name_)
|
||||
: mode(mode_)
|
||||
, destination_type(destination_type_)
|
||||
, destination_name(destination_name_)
|
||||
|
@ -1471,18 +1471,18 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
return false;
|
||||
|
||||
TTLMode mode;
|
||||
PartDestinationType destination_type = PartDestinationType::DELETE;
|
||||
DataDestinationType destination_type = DataDestinationType::DELETE;
|
||||
String destination_name;
|
||||
|
||||
if (s_to_disk.ignore(pos))
|
||||
{
|
||||
mode = TTLMode::MOVE;
|
||||
destination_type = PartDestinationType::DISK;
|
||||
destination_type = DataDestinationType::DISK;
|
||||
}
|
||||
else if (s_to_volume.ignore(pos))
|
||||
{
|
||||
mode = TTLMode::MOVE;
|
||||
destination_type = PartDestinationType::VOLUME;
|
||||
destination_type = DataDestinationType::VOLUME;
|
||||
}
|
||||
else if (s_group_by.ignore(pos))
|
||||
{
|
||||
|
@ -260,19 +260,19 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
|
||||
command->part = true;
|
||||
|
||||
if (s_to_disk.ignore(pos))
|
||||
command->move_destination_type = PartDestinationType::DISK;
|
||||
command->move_destination_type = DataDestinationType::DISK;
|
||||
else if (s_to_volume.ignore(pos))
|
||||
command->move_destination_type = PartDestinationType::VOLUME;
|
||||
command->move_destination_type = DataDestinationType::VOLUME;
|
||||
else if (s_to_table.ignore(pos))
|
||||
{
|
||||
if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
|
||||
return false;
|
||||
command->move_destination_type = PartDestinationType::TABLE;
|
||||
command->move_destination_type = DataDestinationType::TABLE;
|
||||
}
|
||||
else
|
||||
return false;
|
||||
|
||||
if (command->move_destination_type != PartDestinationType::TABLE)
|
||||
if (command->move_destination_type != DataDestinationType::TABLE)
|
||||
{
|
||||
ASTPtr ast_space_name;
|
||||
if (!parser_string_literal.parse(pos, ast_space_name, expected))
|
||||
@ -289,19 +289,19 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
|
||||
command->type = ASTAlterCommand::MOVE_PARTITION;
|
||||
|
||||
if (s_to_disk.ignore(pos))
|
||||
command->move_destination_type = PartDestinationType::DISK;
|
||||
command->move_destination_type = DataDestinationType::DISK;
|
||||
else if (s_to_volume.ignore(pos))
|
||||
command->move_destination_type = PartDestinationType::VOLUME;
|
||||
command->move_destination_type = DataDestinationType::VOLUME;
|
||||
else if (s_to_table.ignore(pos))
|
||||
{
|
||||
if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
|
||||
return false;
|
||||
command->move_destination_type = PartDestinationType::TABLE;
|
||||
command->move_destination_type = DataDestinationType::TABLE;
|
||||
}
|
||||
else
|
||||
return false;
|
||||
|
||||
if (command->move_destination_type != PartDestinationType::TABLE)
|
||||
if (command->move_destination_type != DataDestinationType::TABLE)
|
||||
{
|
||||
ASTPtr ast_space_name;
|
||||
if (!parser_string_literal.parse(pos, ast_space_name, expected))
|
||||
|
@ -404,8 +404,6 @@ void HTTPHandler::processQuery(
|
||||
used_output.out_maybe_delayed_and_compressed = used_output.out_maybe_compressed;
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> in_post_raw = std::make_unique<ReadBufferFromIStream>(istr);
|
||||
|
||||
/// Request body can be compressed using algorithm specified in the Content-Encoding header.
|
||||
String http_request_compression_method_str = request.get("Content-Encoding", "");
|
||||
std::unique_ptr<ReadBuffer> in_post = wrapReadBufferWithCompressionMethod(
|
||||
|
@ -253,7 +253,7 @@ void MySQLHandler::comFieldList(ReadBuffer & payload)
|
||||
ComFieldList packet;
|
||||
packet.readPayload(payload);
|
||||
String database = connection_context.getCurrentDatabase();
|
||||
StoragePtr table_ptr = DatabaseCatalog::instance().getTable({database, packet.table});
|
||||
StoragePtr table_ptr = DatabaseCatalog::instance().getTable({database, packet.table}, connection_context);
|
||||
for (const NameAndTypePair & column: table_ptr->getColumns().getAll())
|
||||
{
|
||||
ColumnDefinition column_definition(
|
||||
|
@ -44,7 +44,7 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request
|
||||
if (db.second->getEngineName() == "Lazy")
|
||||
continue;
|
||||
|
||||
for (auto iterator = db.second->getTablesIterator(); iterator->isValid(); iterator->next())
|
||||
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
const auto & table = iterator->table();
|
||||
StorageReplicatedMergeTree * table_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get());
|
||||
|
@ -474,7 +474,7 @@ void TCPHandler::processInsertQuery(const Settings & connection_settings)
|
||||
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
|
||||
{
|
||||
if (!table_id.empty())
|
||||
sendTableColumns(DatabaseCatalog::instance().getTable(table_id)->getColumns());
|
||||
sendTableColumns(DatabaseCatalog::instance().getTable(table_id, *query_context)->getColumns());
|
||||
}
|
||||
}
|
||||
|
||||
@ -627,7 +627,7 @@ void TCPHandler::processTablesStatusRequest()
|
||||
for (const QualifiedTableName & table_name: request.tables)
|
||||
{
|
||||
auto resolved_id = connection_context.tryResolveStorageID({table_name.database, table_name.table});
|
||||
StoragePtr table = DatabaseCatalog::instance().tryGetTable(resolved_id);
|
||||
StoragePtr table = DatabaseCatalog::instance().tryGetTable(resolved_id, connection_context);
|
||||
if (!table)
|
||||
continue;
|
||||
|
||||
@ -944,11 +944,11 @@ bool TCPHandler::receiveData(bool scalar)
|
||||
StoragePtr storage;
|
||||
/// If such a table does not exist, create it.
|
||||
if (resolved)
|
||||
storage = DatabaseCatalog::instance().getTable(resolved);
|
||||
storage = DatabaseCatalog::instance().getTable(resolved, *query_context);
|
||||
else
|
||||
{
|
||||
NamesAndTypesList columns = block.getNamesAndTypesList();
|
||||
auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns});
|
||||
auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns}, {});
|
||||
storage = temporary_table.getTable();
|
||||
query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table));
|
||||
}
|
||||
|
@ -4,7 +4,7 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
enum class PartDestinationType
|
||||
enum class DataDestinationType
|
||||
{
|
||||
DISK,
|
||||
VOLUME,
|
@ -557,4 +557,54 @@ Names IStorage::getColumnsRequiredForSampling() const
|
||||
return {};
|
||||
}
|
||||
|
||||
const TTLTableDescription & IStorage::getTableTTLs() const
|
||||
{
|
||||
return table_ttl;
|
||||
}
|
||||
|
||||
void IStorage::setTableTTLs(const TTLTableDescription & table_ttl_)
|
||||
{
|
||||
table_ttl = table_ttl_;
|
||||
}
|
||||
|
||||
bool IStorage::hasAnyTableTTL() const
|
||||
{
|
||||
return hasAnyMoveTTL() || hasRowsTTL();
|
||||
}
|
||||
|
||||
const TTLColumnsDescription & IStorage::getColumnTTLs() const
|
||||
{
|
||||
return column_ttls_by_name;
|
||||
}
|
||||
|
||||
void IStorage::setColumnTTLs(const TTLColumnsDescription & column_ttls_by_name_)
|
||||
{
|
||||
column_ttls_by_name = column_ttls_by_name_;
|
||||
}
|
||||
|
||||
bool IStorage::hasAnyColumnTTL() const
|
||||
{
|
||||
return !column_ttls_by_name.empty();
|
||||
}
|
||||
|
||||
const TTLDescription & IStorage::getRowsTTL() const
|
||||
{
|
||||
return table_ttl.rows_ttl;
|
||||
}
|
||||
|
||||
bool IStorage::hasRowsTTL() const
|
||||
{
|
||||
return table_ttl.rows_ttl.expression != nullptr;
|
||||
}
|
||||
|
||||
const TTLDescriptions & IStorage::getMoveTTLs() const
|
||||
{
|
||||
return table_ttl.move_ttl;
|
||||
}
|
||||
|
||||
bool IStorage::hasAnyMoveTTL() const
|
||||
{
|
||||
return !table_ttl.move_ttl.empty();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include <Storages/IndicesDescription.h>
|
||||
#include <Storages/ConstraintsDescription.h>
|
||||
#include <Storages/StorageInMemoryMetadata.h>
|
||||
#include <Storages/TTLDescription.h>
|
||||
#include <Storages/ColumnDependency.h>
|
||||
#include <Common/ActionLock.h>
|
||||
#include <Common/Exception.h>
|
||||
@ -82,7 +83,9 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
|
||||
{
|
||||
public:
|
||||
IStorage() = delete;
|
||||
explicit IStorage(StorageID storage_id_) : storage_id(std::move(storage_id_)) {}
|
||||
/// Storage fields should be initialized in separate methods like setColumns
|
||||
/// or setTableTTLs.
|
||||
explicit IStorage(StorageID storage_id_) : storage_id(std::move(storage_id_)) {} //-V730
|
||||
|
||||
virtual ~IStorage() = default;
|
||||
IStorage(const IStorage &) = delete;
|
||||
@ -130,10 +133,7 @@ public:
|
||||
virtual bool hasEvenlyDistributedRead() const { return false; }
|
||||
|
||||
/// Returns true if there is set table TTL, any column TTL or any move TTL.
|
||||
virtual bool hasAnyTTL() const { return false; }
|
||||
|
||||
/// Returns true if there is set TTL for rows.
|
||||
virtual bool hasRowsTTL() const { return false; }
|
||||
virtual bool hasAnyTTL() const { return hasAnyColumnTTL() || hasAnyTableTTL(); }
|
||||
|
||||
/// Optional size information of each physical column.
|
||||
/// Currently it's only used by the MergeTree family for query optimizations.
|
||||
@ -205,6 +205,9 @@ private:
|
||||
StorageMetadataKeyField sorting_key;
|
||||
StorageMetadataKeyField sampling_key;
|
||||
|
||||
TTLColumnsDescription column_ttls_by_name;
|
||||
TTLTableDescription table_ttl;
|
||||
|
||||
private:
|
||||
RWLockImpl::LockHolder tryLockTimed(
|
||||
const RWLock & rwlock, RWLockImpl::Type type, const String & query_id, const SettingSeconds & acquire_timeout) const;
|
||||
@ -514,13 +517,32 @@ public:
|
||||
Names getColumnsRequiredForFinal() const { return getColumnsRequiredForSortingKey(); }
|
||||
|
||||
|
||||
/// Returns columns, which will be needed to calculate dependencies
|
||||
/// (skip indices, TTL expressions) if we update @updated_columns set of columns.
|
||||
/// Returns columns, which will be needed to calculate dependencies (skip
|
||||
/// indices, TTL expressions) if we update @updated_columns set of columns.
|
||||
virtual ColumnDependencies getColumnDependencies(const NameSet & /* updated_columns */) const { return {}; }
|
||||
|
||||
/// Returns storage policy if storage supports it
|
||||
/// Returns storage policy if storage supports it.
|
||||
virtual StoragePolicyPtr getStoragePolicy() const { return {}; }
|
||||
|
||||
/// Common tables TTLs (for rows and moves).
|
||||
const TTLTableDescription & getTableTTLs() const;
|
||||
void setTableTTLs(const TTLTableDescription & table_ttl_);
|
||||
bool hasAnyTableTTL() const;
|
||||
|
||||
/// Separate TTLs for columns.
|
||||
const TTLColumnsDescription & getColumnTTLs() const;
|
||||
void setColumnTTLs(const TTLColumnsDescription & column_ttls_by_name_);
|
||||
bool hasAnyColumnTTL() const;
|
||||
|
||||
/// Just wrapper for table TTLs, return rows part of table TTLs.
|
||||
const TTLDescription & getRowsTTL() const;
|
||||
bool hasRowsTTL() const;
|
||||
|
||||
/// Just wrapper for table TTLs, return moves (to disks or volumes) parts of
|
||||
/// table TTL.
|
||||
const TTLDescriptions & getMoveTTLs() const;
|
||||
bool hasAnyMoveTTL() const;
|
||||
|
||||
/// If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
|
||||
/// Used for:
|
||||
/// - Simple count() opimization
|
||||
|
@ -324,10 +324,10 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
|
||||
}
|
||||
|
||||
// No need to add any prefix, messages can be distinguished
|
||||
conf.set_log_callback([this](cppkafka::KafkaHandleBase &, int level, const std::string & /* facility */, const std::string & message)
|
||||
conf.set_log_callback([this](cppkafka::KafkaHandleBase &, int level, const std::string & facility, const std::string & message)
|
||||
{
|
||||
auto [poco_level, client_logs_level] = parseSyslogLevel(level);
|
||||
LOG_IMPL(log, client_logs_level, poco_level, message);
|
||||
LOG_IMPL(log, client_logs_level, poco_level, "[rdk:{}] {}", facility, message);
|
||||
});
|
||||
|
||||
// Configure interceptor to change thread name
|
||||
@ -361,7 +361,7 @@ bool StorageKafka::checkDependencies(const StorageID & table_id)
|
||||
// Check the dependencies are ready?
|
||||
for (const auto & db_tab : dependencies)
|
||||
{
|
||||
auto table = DatabaseCatalog::instance().tryGetTable(db_tab);
|
||||
auto table = DatabaseCatalog::instance().tryGetTable(db_tab, global_context);
|
||||
if (!table)
|
||||
return false;
|
||||
|
||||
@ -429,7 +429,7 @@ void StorageKafka::threadFunc()
|
||||
bool StorageKafka::streamToViews()
|
||||
{
|
||||
auto table_id = getStorageID();
|
||||
auto table = DatabaseCatalog::instance().getTable(table_id);
|
||||
auto table = DatabaseCatalog::instance().getTable(table_id, global_context);
|
||||
if (!table)
|
||||
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
|
@ -401,7 +401,7 @@ void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, co
|
||||
|
||||
if (drop_table)
|
||||
{
|
||||
if (DatabaseCatalog::instance().tryGetTable(table_id))
|
||||
if (DatabaseCatalog::instance().tryGetTable(table_id, storage->global_context))
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@ -53,7 +53,7 @@ public:
|
||||
{
|
||||
return getStorageID().table_name + "_blocks";
|
||||
}
|
||||
StoragePtr getParentStorage() const { return DatabaseCatalog::instance().getTable(select_table_id); }
|
||||
StoragePtr getParentStorage() const { return DatabaseCatalog::instance().getTable(select_table_id, global_context); }
|
||||
|
||||
ASTPtr getInnerQuery() const { return inner_query->clone(); }
|
||||
ASTPtr getInnerSubQuery() const
|
||||
|
@ -775,6 +775,7 @@ void IMergeTreeDataPart::remove() const
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
String IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix) const
|
||||
{
|
||||
/// Do not allow underscores in the prefix because they are used as separators.
|
||||
@ -839,9 +840,10 @@ void IMergeTreeDataPart::checkConsistencyBase() const
|
||||
{
|
||||
String path = getFullRelativePath();
|
||||
|
||||
const auto & pk = storage.getPrimaryKey();
|
||||
if (!checksums.empty())
|
||||
{
|
||||
if (storage.hasPrimaryKey() && !checksums.files.count("primary.idx"))
|
||||
if (!pk.column_names.empty() && !checksums.files.count("primary.idx"))
|
||||
throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||
|
||||
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
@ -875,7 +877,7 @@ void IMergeTreeDataPart::checkConsistencyBase() const
|
||||
};
|
||||
|
||||
/// Check that the primary key index is not empty.
|
||||
if (storage.hasPrimaryKey())
|
||||
if (!pk.column_names.empty())
|
||||
check_file_not_empty(volume->getDisk(), path + "primary.idx");
|
||||
|
||||
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
|
@ -181,8 +181,11 @@ const KeyCondition::AtomMap KeyCondition::atom_map
|
||||
},
|
||||
{
|
||||
"empty",
|
||||
[] (RPNElement & out, const Field &)
|
||||
[] (RPNElement & out, const Field & value)
|
||||
{
|
||||
if (value.getType() != Field::Types::String)
|
||||
return false;
|
||||
|
||||
out.function = RPNElement::FUNCTION_IN_RANGE;
|
||||
out.range = Range("");
|
||||
return true;
|
||||
@ -190,8 +193,11 @@ const KeyCondition::AtomMap KeyCondition::atom_map
|
||||
},
|
||||
{
|
||||
"notEmpty",
|
||||
[] (RPNElement & out, const Field &)
|
||||
[] (RPNElement & out, const Field & value)
|
||||
{
|
||||
if (value.getType() != Field::Types::String)
|
||||
return false;
|
||||
|
||||
out.function = RPNElement::FUNCTION_NOT_IN_RANGE;
|
||||
out.range = Range("");
|
||||
return true;
|
||||
|
@ -259,8 +259,8 @@ StorageInMemoryMetadata MergeTreeData::getInMemoryMetadata() const
|
||||
if (isPrimaryKeyDefined())
|
||||
metadata.primary_key_ast = getPrimaryKeyAST()->clone();
|
||||
|
||||
if (ttl_table_ast)
|
||||
metadata.ttl_for_table_ast = ttl_table_ast->clone();
|
||||
if (hasAnyTableTTL())
|
||||
metadata.ttl_for_table_ast = getTableTTLs().definition_ast->clone();
|
||||
|
||||
if (isSamplingKeyDefined())
|
||||
metadata.sample_by_ast = getSamplingKeyAST()->clone();
|
||||
@ -580,151 +580,16 @@ void MergeTreeData::initPartitionKey(ASTPtr partition_by_ast)
|
||||
setPartitionKey(new_partition_key);
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
void checkTTLExpression(const ExpressionActionsPtr & ttl_expression, const String & result_column_name)
|
||||
{
|
||||
for (const auto & action : ttl_expression->getActions())
|
||||
{
|
||||
if (action.type == ExpressionAction::APPLY_FUNCTION)
|
||||
{
|
||||
IFunctionBase & func = *action.function_base;
|
||||
if (!func.isDeterministic())
|
||||
throw Exception("TTL expression cannot contain non-deterministic functions, "
|
||||
"but contains function " + func.getName(), ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
}
|
||||
|
||||
const auto & result_column = ttl_expression->getSampleBlock().getByName(result_column_name);
|
||||
|
||||
if (!typeid_cast<const DataTypeDateTime *>(result_column.type.get())
|
||||
&& !typeid_cast<const DataTypeDate *>(result_column.type.get()))
|
||||
{
|
||||
throw Exception("TTL expression result column should have DateTime or Date type, but has "
|
||||
+ result_column.type->getName(), ErrorCodes::BAD_TTL_EXPRESSION);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns,
|
||||
const ASTPtr & new_ttl_table_ast, bool only_check)
|
||||
{
|
||||
|
||||
auto new_column_ttls = new_columns.getColumnTTLs();
|
||||
auto new_column_ttls_asts = new_columns.getColumnTTLs();
|
||||
|
||||
auto create_ttl_entry = [this, &new_columns](ASTPtr ttl_expr_ast)
|
||||
{
|
||||
TTLEntry result;
|
||||
TTLColumnsDescription new_column_ttl_by_name = getColumnTTLs();
|
||||
|
||||
auto ttl_syntax_result = SyntaxAnalyzer(global_context).analyze(ttl_expr_ast, new_columns.getAllPhysical());
|
||||
result.expression = ExpressionAnalyzer(ttl_expr_ast, ttl_syntax_result, global_context).getActions(false);
|
||||
result.result_column = ttl_expr_ast->getColumnName();
|
||||
|
||||
result.destination_type = PartDestinationType::DELETE;
|
||||
result.mode = TTLMode::DELETE;
|
||||
|
||||
checkTTLExpression(result.expression, result.result_column);
|
||||
return result;
|
||||
};
|
||||
|
||||
auto create_rows_ttl_entry = [this, &new_columns, &create_ttl_entry](const ASTTTLElement * ttl_element)
|
||||
{
|
||||
auto result = create_ttl_entry(ttl_element->ttl());
|
||||
result.mode = ttl_element->mode;
|
||||
|
||||
if (ttl_element->mode == TTLMode::DELETE)
|
||||
{
|
||||
if (ASTPtr where_expr_ast = ttl_element->where())
|
||||
{
|
||||
auto where_syntax_result = SyntaxAnalyzer(global_context).analyze(where_expr_ast, new_columns.getAllPhysical());
|
||||
result.where_expression = ExpressionAnalyzer(where_expr_ast, where_syntax_result, global_context).getActions(false);
|
||||
result.where_result_column = where_expr_ast->getColumnName();
|
||||
}
|
||||
}
|
||||
else if (ttl_element->mode == TTLMode::GROUP_BY)
|
||||
{
|
||||
if (ttl_element->group_by_key.size() > this->getPrimaryKey().column_names.size())
|
||||
throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION);
|
||||
|
||||
NameSet primary_key_columns_set(this->getPrimaryKey().column_names.begin(), this->getPrimaryKey().column_names.end());
|
||||
NameSet aggregation_columns_set;
|
||||
|
||||
for (const auto & column : this->getPrimaryKey().expression->getRequiredColumns())
|
||||
primary_key_columns_set.insert(column);
|
||||
|
||||
for (size_t i = 0; i < ttl_element->group_by_key.size(); ++i)
|
||||
{
|
||||
if (ttl_element->group_by_key[i]->getColumnName() != this->getPrimaryKey().column_names[i])
|
||||
throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION);
|
||||
}
|
||||
for (const auto & [name, value] : ttl_element->group_by_aggregations)
|
||||
{
|
||||
if (primary_key_columns_set.count(name))
|
||||
throw Exception("Can not set custom aggregation for column in primary key in TTL Expression", ErrorCodes::BAD_TTL_EXPRESSION);
|
||||
aggregation_columns_set.insert(name);
|
||||
}
|
||||
if (aggregation_columns_set.size() != ttl_element->group_by_aggregations.size())
|
||||
throw Exception("Multiple aggregations set for one column in TTL Expression", ErrorCodes::BAD_TTL_EXPRESSION);
|
||||
|
||||
result.group_by_keys = Names(this->getPrimaryKey().column_names.begin(), this->getPrimaryKey().column_names.begin() + ttl_element->group_by_key.size());
|
||||
|
||||
auto aggregations = ttl_element->group_by_aggregations;
|
||||
for (size_t i = 0; i < this->getPrimaryKey().column_names.size(); ++i)
|
||||
{
|
||||
ASTPtr value = this->getPrimaryKey().expression_list_ast->children[i]->clone();
|
||||
|
||||
if (i >= ttl_element->group_by_key.size())
|
||||
{
|
||||
ASTPtr value_max = makeASTFunction("max", value->clone());
|
||||
aggregations.emplace_back(value->getColumnName(), std::move(value_max));
|
||||
}
|
||||
|
||||
if (value->as<ASTFunction>())
|
||||
{
|
||||
auto syntax_result = SyntaxAnalyzer(global_context).analyze(value, new_columns.getAllPhysical(), {}, true);
|
||||
auto expr_actions = ExpressionAnalyzer(value, syntax_result, global_context).getActions(false);
|
||||
for (const auto & column : expr_actions->getRequiredColumns())
|
||||
{
|
||||
if (i < ttl_element->group_by_key.size())
|
||||
{
|
||||
ASTPtr expr = makeASTFunction("any", std::make_shared<ASTIdentifier>(column));
|
||||
aggregations.emplace_back(column, std::move(expr));
|
||||
}
|
||||
else
|
||||
{
|
||||
ASTPtr expr = makeASTFunction("argMax", std::make_shared<ASTIdentifier>(column), value->clone());
|
||||
aggregations.emplace_back(column, std::move(expr));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for (const auto & column : new_columns.getAllPhysical())
|
||||
{
|
||||
if (!primary_key_columns_set.count(column.name) && !aggregation_columns_set.count(column.name))
|
||||
{
|
||||
ASTPtr expr = makeASTFunction("any", std::make_shared<ASTIdentifier>(column.name));
|
||||
aggregations.emplace_back(column.name, std::move(expr));
|
||||
}
|
||||
}
|
||||
|
||||
for (auto [name, value] : aggregations)
|
||||
{
|
||||
auto syntax_result = SyntaxAnalyzer(global_context).analyze(value, new_columns.getAllPhysical(), {}, true);
|
||||
auto expr_analyzer = ExpressionAnalyzer(value, syntax_result, global_context);
|
||||
|
||||
result.group_by_aggregations.emplace_back(name, value->getColumnName(), expr_analyzer.getActions(false));
|
||||
|
||||
for (const auto & descr : expr_analyzer.getAnalyzedData().aggregate_descriptions)
|
||||
result.aggregate_descriptions.push_back(descr);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
};
|
||||
|
||||
if (!new_column_ttls.empty())
|
||||
if (!new_column_ttls_asts.empty())
|
||||
{
|
||||
NameSet columns_ttl_forbidden;
|
||||
|
||||
@ -736,23 +601,24 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns,
|
||||
for (const auto & col : getColumnsRequiredForSortingKey())
|
||||
columns_ttl_forbidden.insert(col);
|
||||
|
||||
for (const auto & [name, ast] : new_column_ttls)
|
||||
for (const auto & [name, ast] : new_column_ttls_asts)
|
||||
{
|
||||
if (columns_ttl_forbidden.count(name))
|
||||
throw Exception("Trying to set TTL for key column " + name, ErrorCodes::ILLEGAL_COLUMN);
|
||||
else
|
||||
{
|
||||
auto new_ttl_entry = create_ttl_entry(ast);
|
||||
if (!only_check)
|
||||
column_ttl_entries_by_name[name] = new_ttl_entry;
|
||||
auto new_ttl_entry = TTLDescription::getTTLFromAST(ast, new_columns, global_context, getPrimaryKey());
|
||||
new_column_ttl_by_name[name] = new_ttl_entry;
|
||||
}
|
||||
}
|
||||
if (!only_check)
|
||||
setColumnTTLs(new_column_ttl_by_name);
|
||||
}
|
||||
|
||||
if (new_ttl_table_ast)
|
||||
{
|
||||
std::vector<TTLEntry> update_move_ttl_entries;
|
||||
TTLEntry update_rows_ttl_entry;
|
||||
TTLDescriptions update_move_ttl_entries;
|
||||
TTLDescription update_rows_ttl_entry;
|
||||
|
||||
bool seen_delete_ttl = false;
|
||||
for (const auto & ttl_element_ptr : new_ttl_table_ast->children)
|
||||
@ -761,48 +627,46 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns,
|
||||
if (!ttl_element)
|
||||
throw Exception("Unexpected AST element in TTL expression", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
|
||||
|
||||
if (ttl_element->destination_type == PartDestinationType::DELETE)
|
||||
if (ttl_element->destination_type == DataDestinationType::DELETE)
|
||||
{
|
||||
if (seen_delete_ttl)
|
||||
{
|
||||
throw Exception("More than one DELETE TTL expression is not allowed", ErrorCodes::BAD_TTL_EXPRESSION);
|
||||
}
|
||||
|
||||
auto new_rows_ttl_entry = create_rows_ttl_entry(ttl_element);
|
||||
if (!only_check)
|
||||
update_rows_ttl_entry = new_rows_ttl_entry;
|
||||
update_rows_ttl_entry = TTLDescription::getTTLFromAST(ttl_element_ptr, new_columns, global_context, getPrimaryKey());
|
||||
|
||||
seen_delete_ttl = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
auto new_ttl_entry = create_rows_ttl_entry(ttl_element);
|
||||
auto new_ttl_entry = TTLDescription::getTTLFromAST(ttl_element_ptr, new_columns, global_context, getPrimaryKey());
|
||||
|
||||
new_ttl_entry.entry_ast = ttl_element_ptr;
|
||||
new_ttl_entry.destination_type = ttl_element->destination_type;
|
||||
new_ttl_entry.destination_name = ttl_element->destination_name;
|
||||
if (!new_ttl_entry.getDestination(getStoragePolicy()))
|
||||
if (!getDestinationForTTL(new_ttl_entry))
|
||||
{
|
||||
String message;
|
||||
if (new_ttl_entry.destination_type == PartDestinationType::DISK)
|
||||
if (new_ttl_entry.destination_type == DataDestinationType::DISK)
|
||||
message = "No such disk " + backQuote(new_ttl_entry.destination_name) + " for given storage policy.";
|
||||
else
|
||||
message = "No such volume " + backQuote(new_ttl_entry.destination_name) + " for given storage policy.";
|
||||
throw Exception(message, ErrorCodes::BAD_TTL_EXPRESSION);
|
||||
}
|
||||
|
||||
if (!only_check)
|
||||
update_move_ttl_entries.emplace_back(std::move(new_ttl_entry));
|
||||
update_move_ttl_entries.emplace_back(std::move(new_ttl_entry));
|
||||
}
|
||||
}
|
||||
|
||||
if (!only_check)
|
||||
{
|
||||
rows_ttl_entry = update_rows_ttl_entry;
|
||||
ttl_table_ast = new_ttl_table_ast;
|
||||
TTLTableDescription new_table_ttl
|
||||
{
|
||||
.definition_ast = new_ttl_table_ast,
|
||||
.rows_ttl = update_rows_ttl_entry,
|
||||
.move_ttl = update_move_ttl_entries,
|
||||
};
|
||||
|
||||
auto move_ttl_entries_lock = std::lock_guard<std::mutex>(move_ttl_entries_mutex);
|
||||
move_ttl_entries = update_move_ttl_entries;
|
||||
setTableTTLs(new_table_ttl);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2616,9 +2480,6 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
|
||||
return part_ptr->volume->getDisk()->getName() == disk->getName();
|
||||
}), parts.end());
|
||||
|
||||
if (parts.empty())
|
||||
throw Exception("Nothing to move", ErrorCodes::NO_SUCH_DATA_PART);
|
||||
|
||||
if (parts.empty())
|
||||
{
|
||||
String no_parts_to_move_message;
|
||||
@ -3009,12 +2870,12 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_
|
||||
auto ttl_entry = selectTTLEntryForTTLInfos(ttl_infos, time_of_move);
|
||||
if (ttl_entry)
|
||||
{
|
||||
SpacePtr destination_ptr = ttl_entry->getDestination(getStoragePolicy());
|
||||
SpacePtr destination_ptr = getDestinationForTTL(*ttl_entry);
|
||||
if (!destination_ptr)
|
||||
{
|
||||
if (ttl_entry->destination_type == PartDestinationType::VOLUME)
|
||||
if (ttl_entry->destination_type == DataDestinationType::VOLUME)
|
||||
LOG_WARNING(log, "Would like to reserve space on volume '{}' by TTL rule of table '{}' but volume was not found", ttl_entry->destination_name, log_name);
|
||||
else if (ttl_entry->destination_type == PartDestinationType::DISK)
|
||||
else if (ttl_entry->destination_type == DataDestinationType::DISK)
|
||||
LOG_WARNING(log, "Would like to reserve space on disk '{}' by TTL rule of table '{}' but disk was not found", ttl_entry->destination_name, log_name);
|
||||
}
|
||||
else
|
||||
@ -3023,9 +2884,9 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_
|
||||
if (reservation)
|
||||
return reservation;
|
||||
else
|
||||
if (ttl_entry->destination_type == PartDestinationType::VOLUME)
|
||||
if (ttl_entry->destination_type == DataDestinationType::VOLUME)
|
||||
LOG_WARNING(log, "Would like to reserve space on volume '{}' by TTL rule of table '{}' but there is not enough space", ttl_entry->destination_name, log_name);
|
||||
else if (ttl_entry->destination_type == PartDestinationType::DISK)
|
||||
else if (ttl_entry->destination_type == DataDestinationType::DISK)
|
||||
LOG_WARNING(log, "Would like to reserve space on disk '{}' by TTL rule of table '{}' but there is not enough space", ttl_entry->destination_name, log_name);
|
||||
}
|
||||
}
|
||||
@ -3035,37 +2896,39 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_
|
||||
return reservation;
|
||||
}
|
||||
|
||||
SpacePtr MergeTreeData::TTLEntry::getDestination(StoragePolicyPtr policy) const
|
||||
SpacePtr MergeTreeData::getDestinationForTTL(const TTLDescription & ttl) const
|
||||
{
|
||||
if (destination_type == PartDestinationType::VOLUME)
|
||||
return policy->getVolumeByName(destination_name);
|
||||
else if (destination_type == PartDestinationType::DISK)
|
||||
return policy->getDiskByName(destination_name);
|
||||
auto policy = getStoragePolicy();
|
||||
if (ttl.destination_type == DataDestinationType::VOLUME)
|
||||
return policy->getVolumeByName(ttl.destination_name);
|
||||
else if (ttl.destination_type == DataDestinationType::DISK)
|
||||
return policy->getDiskByName(ttl.destination_name);
|
||||
else
|
||||
return {};
|
||||
}
|
||||
|
||||
bool MergeTreeData::TTLEntry::isPartInDestination(StoragePolicyPtr policy, const IMergeTreeDataPart & part) const
|
||||
bool MergeTreeData::isPartInTTLDestination(const TTLDescription & ttl, const IMergeTreeDataPart & part) const
|
||||
{
|
||||
if (destination_type == PartDestinationType::VOLUME)
|
||||
auto policy = getStoragePolicy();
|
||||
if (ttl.destination_type == DataDestinationType::VOLUME)
|
||||
{
|
||||
for (const auto & disk : policy->getVolumeByName(destination_name)->getDisks())
|
||||
for (const auto & disk : policy->getVolumeByName(ttl.destination_name)->getDisks())
|
||||
if (disk->getName() == part.volume->getDisk()->getName())
|
||||
return true;
|
||||
}
|
||||
else if (destination_type == PartDestinationType::DISK)
|
||||
return policy->getDiskByName(destination_name)->getName() == part.volume->getDisk()->getName();
|
||||
else if (ttl.destination_type == DataDestinationType::DISK)
|
||||
return policy->getDiskByName(ttl.destination_name)->getName() == part.volume->getDisk()->getName();
|
||||
return false;
|
||||
}
|
||||
|
||||
std::optional<MergeTreeData::TTLEntry> MergeTreeData::selectTTLEntryForTTLInfos(
|
||||
const IMergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
time_t time_of_move) const
|
||||
std::optional<TTLDescription>
|
||||
MergeTreeData::selectTTLEntryForTTLInfos(const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const
|
||||
{
|
||||
time_t max_max_ttl = 0;
|
||||
std::vector<DB::MergeTreeData::TTLEntry>::const_iterator best_entry_it;
|
||||
TTLDescriptions::const_iterator best_entry_it;
|
||||
|
||||
auto lock = std::lock_guard(move_ttl_entries_mutex);
|
||||
const auto & move_ttl_entries = getMoveTTLs();
|
||||
for (auto ttl_entry_it = move_ttl_entries.begin(); ttl_entry_it != move_ttl_entries.end(); ++ttl_entry_it)
|
||||
{
|
||||
auto ttl_info_it = ttl_infos.moves_ttl.find(ttl_entry_it->result_column);
|
||||
@ -3079,7 +2942,7 @@ std::optional<MergeTreeData::TTLEntry> MergeTreeData::selectTTLEntryForTTLInfos(
|
||||
}
|
||||
}
|
||||
|
||||
return max_max_ttl ? *best_entry_it : std::optional<MergeTreeData::TTLEntry>();
|
||||
return max_max_ttl ? *best_entry_it : std::optional<TTLDescription>();
|
||||
}
|
||||
|
||||
MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const
|
||||
@ -3498,7 +3361,7 @@ bool MergeTreeData::areBackgroundMovesNeeded() const
|
||||
if (policy->getVolumes().size() > 1)
|
||||
return true;
|
||||
|
||||
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1 && !move_ttl_entries.empty();
|
||||
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1 && hasAnyMoveTTL();
|
||||
}
|
||||
|
||||
bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space)
|
||||
@ -3637,7 +3500,7 @@ ColumnDependencies MergeTreeData::getColumnDependencies(const NameSet & updated_
|
||||
|
||||
if (hasRowsTTL())
|
||||
{
|
||||
if (add_dependent_columns(rows_ttl_entry.expression, required_ttl_columns))
|
||||
if (add_dependent_columns(getRowsTTL().expression, required_ttl_columns))
|
||||
{
|
||||
/// Filter all columns, if rows TTL expression have to be recalculated.
|
||||
for (const auto & column : getColumns().getAllPhysical())
|
||||
@ -3645,13 +3508,13 @@ ColumnDependencies MergeTreeData::getColumnDependencies(const NameSet & updated_
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & [name, entry] : column_ttl_entries_by_name)
|
||||
for (const auto & [name, entry] : getColumnTTLs())
|
||||
{
|
||||
if (add_dependent_columns(entry.expression, required_ttl_columns))
|
||||
updated_ttl_columns.insert(name);
|
||||
}
|
||||
|
||||
for (const auto & entry : move_ttl_entries)
|
||||
for (const auto & entry : getMoveTTLs())
|
||||
add_dependent_columns(entry.expression, required_ttl_columns);
|
||||
|
||||
for (const auto & column : indices_columns)
|
||||
|
@ -8,7 +8,7 @@
|
||||
#include <Storages/MergeTree/MergeTreeSettings.h>
|
||||
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
|
||||
#include <Storages/MergeTree/MergeList.h>
|
||||
#include <Storages/MergeTree/PartDestinationType.h>
|
||||
#include <Storages/DataDestinationType.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
@ -21,7 +21,6 @@
|
||||
#include <Interpreters/PartLog.h>
|
||||
#include <Disks/StoragePolicy.h>
|
||||
#include <Interpreters/Aggregator.h>
|
||||
#include <Storages/MergeTree/TTLMode.h>
|
||||
|
||||
#include <boost/multi_index_container.hpp>
|
||||
#include <boost/multi_index/ordered_index.hpp>
|
||||
@ -521,11 +520,6 @@ public:
|
||||
|
||||
bool hasSkipIndices() const { return !skip_indices.empty(); }
|
||||
|
||||
bool hasAnyColumnTTL() const { return !column_ttl_entries_by_name.empty(); }
|
||||
bool hasAnyMoveTTL() const { return !move_ttl_entries.empty(); }
|
||||
bool hasRowsTTL() const override { return !rows_ttl_entry.isEmpty(); }
|
||||
bool hasAnyTTL() const override { return hasRowsTTL() || hasAnyMoveTTL() || hasAnyColumnTTL(); }
|
||||
|
||||
/// Check that the part is not broken and calculate the checksums for it if they are not present.
|
||||
MutableDataPartPtr loadPartAndFixMetadata(const VolumePtr & volume, const String & relative_path) const;
|
||||
|
||||
@ -627,6 +621,13 @@ public:
|
||||
|
||||
/// Return alter conversions for part which must be applied on fly.
|
||||
AlterConversions getAlterConversionsForPart(const MergeTreeDataPartPtr part) const;
|
||||
/// Returns destination disk or volume for the TTL rule according to current
|
||||
/// storage policy
|
||||
SpacePtr getDestinationForTTL(const TTLDescription & ttl) const;
|
||||
|
||||
/// Checks if given part already belongs destination disk or volume for the
|
||||
/// TTL rule.
|
||||
bool isPartInTTLDestination(const TTLDescription & ttl, const IMergeTreeDataPart & part) const;
|
||||
|
||||
MergeTreeDataFormatVersion format_version;
|
||||
|
||||
@ -649,48 +650,13 @@ public:
|
||||
ExpressionActionsPtr primary_key_and_skip_indices_expr;
|
||||
ExpressionActionsPtr sorting_key_and_skip_indices_expr;
|
||||
|
||||
struct TTLEntry
|
||||
{
|
||||
TTLMode mode;
|
||||
std::optional<TTLDescription> selectTTLEntryForTTLInfos(const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const;
|
||||
|
||||
ExpressionActionsPtr expression;
|
||||
String result_column;
|
||||
|
||||
ExpressionActionsPtr where_expression;
|
||||
String where_result_column;
|
||||
|
||||
Names group_by_keys;
|
||||
std::vector<std::tuple<String, String, ExpressionActionsPtr>> group_by_aggregations;
|
||||
AggregateDescriptions aggregate_descriptions;
|
||||
|
||||
/// Name and type of a destination are only valid in table-level context.
|
||||
PartDestinationType destination_type;
|
||||
String destination_name;
|
||||
|
||||
ASTPtr entry_ast;
|
||||
|
||||
/// Returns destination disk or volume for this rule.
|
||||
SpacePtr getDestination(StoragePolicyPtr policy) const;
|
||||
|
||||
/// Checks if given part already belongs destination disk or volume for this rule.
|
||||
bool isPartInDestination(StoragePolicyPtr policy, const IMergeTreeDataPart & part) const;
|
||||
|
||||
bool isEmpty() const { return expression == nullptr; }
|
||||
};
|
||||
|
||||
std::optional<TTLEntry> selectTTLEntryForTTLInfos(const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const;
|
||||
|
||||
using TTLEntriesByName = std::unordered_map<String, TTLEntry>;
|
||||
TTLEntriesByName column_ttl_entries_by_name;
|
||||
|
||||
TTLEntry rows_ttl_entry;
|
||||
|
||||
/// This mutex is required for background move operations which do not obtain global locks.
|
||||
/// This mutex is required for background move operations which do not
|
||||
/// obtain global locks.
|
||||
/// TODO (alesap) It will be removed after metadata became atomic
|
||||
mutable std::mutex move_ttl_entries_mutex;
|
||||
|
||||
/// Vector rw operations have to be done under "move_ttl_entries_mutex".
|
||||
std::vector<TTLEntry> move_ttl_entries;
|
||||
|
||||
/// Limiting parallel sends per one table, used in DataPartsExchange
|
||||
std::atomic_uint current_table_sends {0};
|
||||
|
||||
@ -717,7 +683,6 @@ protected:
|
||||
friend struct ReplicatedMergeTreeTableMetadata;
|
||||
friend class StorageReplicatedMergeTree;
|
||||
|
||||
ASTPtr ttl_table_ast;
|
||||
ASTPtr settings_ast;
|
||||
|
||||
bool require_part_metadata;
|
||||
|
@ -78,10 +78,12 @@ void buildScatterSelector(
|
||||
}
|
||||
|
||||
/// Computes ttls and updates ttl infos
|
||||
void updateTTL(const MergeTreeData::TTLEntry & ttl_entry,
|
||||
void updateTTL(
|
||||
const TTLDescription & ttl_entry,
|
||||
IMergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
DB::MergeTreeDataPartTTLInfo & ttl_info,
|
||||
Block & block, bool update_part_min_max_ttls)
|
||||
Block & block,
|
||||
bool update_part_min_max_ttls)
|
||||
{
|
||||
bool remove_column = false;
|
||||
if (!block.has(ttl_entry.result_column))
|
||||
@ -229,7 +231,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
|
||||
size_t expected_size = block.bytes();
|
||||
|
||||
DB::IMergeTreeDataPart::TTLInfos move_ttl_infos;
|
||||
for (const auto & ttl_entry : data.move_ttl_entries)
|
||||
const auto & move_ttl_entries = data.getMoveTTLs();
|
||||
for (const auto & ttl_entry : move_ttl_entries)
|
||||
updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
|
||||
|
||||
NamesAndTypesList columns = data.getColumns().getAllPhysical().filter(block.getNames());
|
||||
@ -288,9 +291,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
|
||||
}
|
||||
|
||||
if (data.hasRowsTTL())
|
||||
updateTTL(data.rows_ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
|
||||
updateTTL(data.getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
|
||||
|
||||
for (const auto & [name, ttl_entry] : data.column_ttl_entries_by_name)
|
||||
for (const auto & [name, ttl_entry] : data.getColumnTTLs())
|
||||
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.columns_ttl[name], block, true);
|
||||
|
||||
new_data_part->ttl_infos.update(move_ttl_infos);
|
||||
|
@ -128,14 +128,14 @@ bool MergeTreePartsMover::selectPartsForMove(
|
||||
if (!can_move(part, &reason))
|
||||
continue;
|
||||
|
||||
auto ttl_entry = part->storage.selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move);
|
||||
auto ttl_entry = data->selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move);
|
||||
auto to_insert = need_to_move.find(part->volume->getDisk());
|
||||
ReservationPtr reservation;
|
||||
if (ttl_entry)
|
||||
{
|
||||
auto destination = ttl_entry->getDestination(policy);
|
||||
if (destination && !ttl_entry->isPartInDestination(policy, *part))
|
||||
reservation = part->storage.tryReserveSpace(part->getBytesOnDisk(), ttl_entry->getDestination(policy));
|
||||
auto destination = data->getDestinationForTTL(*ttl_entry);
|
||||
if (destination && !data->isPartInTTLDestination(*ttl_entry, *part))
|
||||
reservation = data->tryReserveSpace(part->getBytesOnDisk(), data->getDestinationForTTL(*ttl_entry));
|
||||
}
|
||||
|
||||
if (reservation) /// Found reservation by TTL rule.
|
||||
|
@ -53,7 +53,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
|
||||
if (data.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
partition_key = formattedAST(data.getPartitionKey().expression_list_ast);
|
||||
|
||||
ttl_table = formattedAST(data.ttl_table_ast);
|
||||
ttl_table = formattedAST(data.getTableTTLs().definition_ast);
|
||||
|
||||
skip_indices = data.getIndices().toString();
|
||||
if (data.canUseAdaptiveGranularity())
|
||||
|
@ -39,9 +39,6 @@ public:
|
||||
return part->storage.mayBenefitFromIndexForIn(left_in_operand, query_context);
|
||||
}
|
||||
|
||||
bool hasAnyTTL() const override { return part->storage.hasAnyTTL(); }
|
||||
bool hasRowsTTL() const override { return part->storage.hasRowsTTL(); }
|
||||
|
||||
ColumnDependencies getColumnDependencies(const NameSet & updated_columns) const override
|
||||
{
|
||||
return part->storage.getColumnDependencies(updated_columns);
|
||||
@ -65,6 +62,8 @@ protected:
|
||||
setColumns(part_->storage.getColumns());
|
||||
setIndices(part_->storage.getIndices());
|
||||
setSortingKey(part_->storage.getSortingKey());
|
||||
setColumnTTLs(part->storage.getColumnTTLs());
|
||||
setTableTTLs(part->storage.getTableTTLs());
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -1,6 +1,6 @@
|
||||
#include <Storages/PartitionCommands.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/MergeTree/PartDestinationType.h>
|
||||
#include <Storages/DataDestinationType.h>
|
||||
#include <Parsers/ASTAlterQuery.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
|
||||
@ -42,13 +42,13 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
|
||||
res.part = command_ast->part;
|
||||
switch (command_ast->move_destination_type)
|
||||
{
|
||||
case PartDestinationType::DISK:
|
||||
case DataDestinationType::DISK:
|
||||
res.move_destination_type = PartitionCommand::MoveDestinationType::DISK;
|
||||
break;
|
||||
case PartDestinationType::VOLUME:
|
||||
case DataDestinationType::VOLUME:
|
||||
res.move_destination_type = PartitionCommand::MoveDestinationType::VOLUME;
|
||||
break;
|
||||
case PartDestinationType::TABLE:
|
||||
case DataDestinationType::TABLE:
|
||||
res.move_destination_type = PartitionCommand::MoveDestinationType::TABLE;
|
||||
res.to_database = command_ast->to_database;
|
||||
res.to_table = command_ast->to_table;
|
||||
|
@ -129,7 +129,7 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context
|
||||
{
|
||||
if (destination_id)
|
||||
{
|
||||
auto destination = DatabaseCatalog::instance().getTable(destination_id);
|
||||
auto destination = DatabaseCatalog::instance().getTable(destination_id, context);
|
||||
|
||||
if (destination.get() == this)
|
||||
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
|
||||
@ -153,7 +153,7 @@ Pipes StorageBuffer::read(
|
||||
|
||||
if (destination_id)
|
||||
{
|
||||
auto destination = DatabaseCatalog::instance().getTable(destination_id);
|
||||
auto destination = DatabaseCatalog::instance().getTable(destination_id, context);
|
||||
|
||||
if (destination.get() == this)
|
||||
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
|
||||
@ -334,7 +334,7 @@ public:
|
||||
StoragePtr destination;
|
||||
if (storage.destination_id)
|
||||
{
|
||||
destination = DatabaseCatalog::instance().tryGetTable(storage.destination_id);
|
||||
destination = DatabaseCatalog::instance().tryGetTable(storage.destination_id, storage.global_context);
|
||||
if (destination.get() == &storage)
|
||||
throw Exception("Destination table is myself. Write will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
|
||||
}
|
||||
@ -434,7 +434,7 @@ bool StorageBuffer::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, con
|
||||
if (!destination_id)
|
||||
return false;
|
||||
|
||||
auto destination = DatabaseCatalog::instance().getTable(destination_id);
|
||||
auto destination = DatabaseCatalog::instance().getTable(destination_id, query_context);
|
||||
|
||||
if (destination.get() == this)
|
||||
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
|
||||
@ -602,7 +602,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
|
||||
*/
|
||||
try
|
||||
{
|
||||
writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id));
|
||||
writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id, global_context));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -739,7 +739,7 @@ void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const S
|
||||
std::optional<UInt64> StorageBuffer::totalRows() const
|
||||
{
|
||||
std::optional<UInt64> underlying_rows;
|
||||
auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id);
|
||||
auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id, global_context);
|
||||
|
||||
if (underlying)
|
||||
underlying_rows = underlying->totalRows();
|
||||
|
@ -75,7 +75,7 @@ public:
|
||||
{
|
||||
if (!destination_id)
|
||||
return false;
|
||||
auto dest = DatabaseCatalog::instance().tryGetTable(destination_id);
|
||||
auto dest = DatabaseCatalog::instance().tryGetTable(destination_id, global_context);
|
||||
if (dest && dest.get() != this)
|
||||
return dest->supportsPrewhere();
|
||||
return false;
|
||||
|
@ -5,11 +5,11 @@
|
||||
#include <Interpreters/SyntaxAnalyzer.h>
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
StorageInMemoryMetadata::StorageInMemoryMetadata(
|
||||
const ColumnsDescription & columns_,
|
||||
const IndicesDescription & indices_,
|
||||
@ -138,4 +138,5 @@ StorageMetadataKeyField StorageMetadataKeyField::getKeyFromAST(const ASTPtr & de
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -72,5 +72,4 @@ struct StorageMetadataKeyField
|
||||
static StorageMetadataKeyField getKeyFromAST(const ASTPtr & definition_ast, const ColumnsDescription & columns, const Context & context);
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
|
@ -149,7 +149,7 @@ StorageMaterializedView::StorageMaterializedView(
|
||||
create_interpreter.setInternal(true);
|
||||
create_interpreter.execute();
|
||||
|
||||
target_table_id = DatabaseCatalog::instance().getTable({manual_create_query->database, manual_create_query->table})->getStorageID();
|
||||
target_table_id = DatabaseCatalog::instance().getTable({manual_create_query->database, manual_create_query->table}, global_context)->getStorageID();
|
||||
}
|
||||
|
||||
if (!select_table_id.empty())
|
||||
@ -204,7 +204,7 @@ BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const
|
||||
|
||||
static void executeDropQuery(ASTDropQuery::Kind kind, Context & global_context, const StorageID & target_table_id)
|
||||
{
|
||||
if (DatabaseCatalog::instance().tryGetTable(target_table_id))
|
||||
if (DatabaseCatalog::instance().tryGetTable(target_table_id, global_context))
|
||||
{
|
||||
/// We create and execute `drop` query for internal table.
|
||||
auto drop_query = std::make_shared<ASTDropQuery>();
|
||||
@ -362,12 +362,12 @@ void StorageMaterializedView::shutdown()
|
||||
|
||||
StoragePtr StorageMaterializedView::getTargetTable() const
|
||||
{
|
||||
return DatabaseCatalog::instance().getTable(target_table_id);
|
||||
return DatabaseCatalog::instance().getTable(target_table_id, global_context);
|
||||
}
|
||||
|
||||
StoragePtr StorageMaterializedView::tryGetTargetTable() const
|
||||
{
|
||||
return DatabaseCatalog::instance().tryGetTable(target_table_id);
|
||||
return DatabaseCatalog::instance().tryGetTable(target_table_id, global_context);
|
||||
}
|
||||
|
||||
Strings StorageMaterializedView::getDataPaths() const
|
||||
|
@ -62,7 +62,7 @@ StorageMerge::StorageMerge(
|
||||
template <typename F>
|
||||
StoragePtr StorageMerge::getFirstTable(F && predicate) const
|
||||
{
|
||||
auto iterator = getDatabaseIterator();
|
||||
auto iterator = getDatabaseIterator(global_context);
|
||||
|
||||
while (iterator->isValid())
|
||||
{
|
||||
@ -110,7 +110,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
|
||||
{
|
||||
auto stage_in_source_tables = QueryProcessingStage::FetchColumns;
|
||||
|
||||
DatabaseTablesIteratorPtr iterator = getDatabaseIterator();
|
||||
DatabaseTablesIteratorPtr iterator = getDatabaseIterator(context);
|
||||
|
||||
size_t selected_table_size = 0;
|
||||
|
||||
@ -329,7 +329,7 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
|
||||
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String & query_id, const Settings & settings) const
|
||||
{
|
||||
StorageListWithLocks selected_tables;
|
||||
auto iterator = getDatabaseIterator();
|
||||
auto iterator = getDatabaseIterator(global_context);
|
||||
|
||||
while (iterator->isValid())
|
||||
{
|
||||
@ -349,7 +349,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
|
||||
const ASTPtr & query, bool has_virtual_column, const String & query_id, const Settings & settings) const
|
||||
{
|
||||
StorageListWithLocks selected_tables;
|
||||
DatabaseTablesIteratorPtr iterator = getDatabaseIterator();
|
||||
DatabaseTablesIteratorPtr iterator = getDatabaseIterator(global_context);
|
||||
|
||||
auto virtual_column = ColumnString::create();
|
||||
|
||||
@ -384,12 +384,12 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
|
||||
}
|
||||
|
||||
|
||||
DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator() const
|
||||
DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(const Context & context) const
|
||||
{
|
||||
checkStackSize();
|
||||
auto database = DatabaseCatalog::instance().getDatabase(source_database);
|
||||
auto table_name_match = [this](const String & table_name_) { return table_name_regexp.match(table_name_); };
|
||||
return database->getTablesIterator(table_name_match);
|
||||
return database->getTablesIterator(context, table_name_match);
|
||||
}
|
||||
|
||||
|
||||
|
@ -61,7 +61,7 @@ private:
|
||||
template <typename F>
|
||||
StoragePtr getFirstTable(F && predicate) const;
|
||||
|
||||
DatabaseTablesIteratorPtr getDatabaseIterator() const;
|
||||
DatabaseTablesIteratorPtr getDatabaseIterator(const Context & context) const;
|
||||
|
||||
NamesAndTypesList getVirtuals() const override;
|
||||
|
||||
|
@ -990,7 +990,7 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
|
||||
case PartitionCommand::MoveDestinationType::TABLE:
|
||||
checkPartitionCanBeDropped(command.partition);
|
||||
String dest_database = context.resolveDatabase(command.to_database);
|
||||
auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table});
|
||||
auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, context);
|
||||
movePartitionToTable(dest_storage, command.partition, context);
|
||||
break;
|
||||
}
|
||||
@ -1002,7 +1002,7 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
|
||||
{
|
||||
checkPartitionCanBeDropped(command.partition);
|
||||
String from_database = context.resolveDatabase(command.from_database);
|
||||
auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table});
|
||||
auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}, context);
|
||||
replacePartitionFrom(from_storage, command.partition, command.replace, context);
|
||||
}
|
||||
break;
|
||||
|
@ -1570,7 +1570,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
|
||||
auto clone_data_parts_from_source_table = [&] () -> size_t
|
||||
{
|
||||
source_table = DatabaseCatalog::instance().tryGetTable(source_table_id);
|
||||
source_table = DatabaseCatalog::instance().tryGetTable(source_table_id, global_context);
|
||||
if (!source_table)
|
||||
{
|
||||
LOG_DEBUG(log, "Can't use {} as source table for REPLACE PARTITION command. It does not exist.", source_table_id.getNameForLogs());
|
||||
@ -3485,7 +3485,7 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
|
||||
case PartitionCommand::MoveDestinationType::TABLE:
|
||||
checkPartitionCanBeDropped(command.partition);
|
||||
String dest_database = query_context.resolveDatabase(command.to_database);
|
||||
auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table});
|
||||
auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context);
|
||||
movePartitionToTable(dest_storage, command.partition, query_context);
|
||||
break;
|
||||
}
|
||||
@ -3496,7 +3496,7 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
|
||||
{
|
||||
checkPartitionCanBeDropped(command.partition);
|
||||
String from_database = query_context.resolveDatabase(command.from_database);
|
||||
auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table});
|
||||
auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}, query_context);
|
||||
replacePartitionFrom(from_storage, command.partition, command.replace, query_context);
|
||||
}
|
||||
break;
|
||||
|
@ -301,7 +301,7 @@ Pipes StorageSystemColumns::read(
|
||||
const DatabasePtr database = databases.at(database_name);
|
||||
offsets[i] = i ? offsets[i - 1] : 0;
|
||||
|
||||
for (auto iterator = database->getTablesIterator(); iterator->isValid(); iterator->next())
|
||||
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
const String & table_name = iterator->name();
|
||||
storages.emplace(std::piecewise_construct,
|
||||
|
@ -25,7 +25,7 @@ NamesAndTypesList StorageSystemGraphite::getNamesAndTypes()
|
||||
/*
|
||||
* Looking for (Replicated)*GraphiteMergeTree and get all configuration parameters for them
|
||||
*/
|
||||
static StorageSystemGraphite::Configs getConfigs()
|
||||
static StorageSystemGraphite::Configs getConfigs(const Context & context)
|
||||
{
|
||||
const Databases databases = DatabaseCatalog::instance().getDatabases();
|
||||
StorageSystemGraphite::Configs graphite_configs;
|
||||
@ -36,7 +36,7 @@ static StorageSystemGraphite::Configs getConfigs()
|
||||
if (db.second->getEngineName() == "Lazy")
|
||||
continue;
|
||||
|
||||
for (auto iterator = db.second->getTablesIterator(); iterator->isValid(); iterator->next())
|
||||
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
const auto & table = iterator->table();
|
||||
|
||||
@ -71,9 +71,9 @@ static StorageSystemGraphite::Configs getConfigs()
|
||||
return graphite_configs;
|
||||
}
|
||||
|
||||
void StorageSystemGraphite::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
|
||||
void StorageSystemGraphite::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
|
||||
{
|
||||
Configs graphite_configs = getConfigs();
|
||||
Configs graphite_configs = getConfigs(context);
|
||||
|
||||
for (const auto & config : graphite_configs)
|
||||
{
|
||||
|
@ -51,7 +51,7 @@ void StorageSystemMutations::fillData(MutableColumns & res_columns, const Contex
|
||||
|
||||
const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first);
|
||||
|
||||
for (auto iterator = db.second->getTablesIterator(); iterator->isValid(); iterator->next())
|
||||
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
if (!dynamic_cast<const MergeTreeData *>(iterator->table().get()))
|
||||
continue;
|
||||
|
@ -111,7 +111,7 @@ StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, const
|
||||
const DatabasePtr database = databases.at(database_name);
|
||||
|
||||
offsets[i] = i ? offsets[i - 1] : 0;
|
||||
for (auto iterator = database->getTablesIterator(); iterator->isValid(); iterator->next())
|
||||
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
String table_name = iterator->name();
|
||||
StoragePtr storage = iterator->table();
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user