mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-24 02:30:51 +00:00
less Context::getTable() usages
This commit is contained in:
parent
d710bd1812
commit
e6718e199e
@ -33,7 +33,7 @@ try
|
||||
Names column_names;
|
||||
column_names.push_back("WatchID");
|
||||
|
||||
StoragePtr table = context.getTable("default", "hits6");
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable({"default", "hits6"});
|
||||
|
||||
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
|
||||
BlockInputStreams streams = table->read(column_names, {}, context, stage, settings.max_block_size, settings.max_threads);
|
||||
|
@ -627,8 +627,8 @@ void DDLWorker::processTask(DDLTask & task, const ZooKeeperPtr & zookeeper)
|
||||
if (!query_with_table->table.empty())
|
||||
{
|
||||
/// It's not CREATE DATABASE
|
||||
String database = query_with_table->database.empty() ? context.getCurrentDatabase() : query_with_table->database;
|
||||
storage = context.tryGetTable(database, query_with_table->table);
|
||||
StorageID table_id{*query_with_table, context};
|
||||
storage = DatabaseCatalog::instance().tryGetTable(table_id);
|
||||
}
|
||||
|
||||
/// For some reason we check consistency of cluster definition only
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Databases/IDatabase.h>
|
||||
#include <Databases/DatabaseMemory.h>
|
||||
#include <Poco/File.h>
|
||||
#include <Common/quoteString.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -149,14 +150,13 @@ void DatabaseCatalog::attachDatabase(const String & database_name, const Databas
|
||||
}
|
||||
|
||||
|
||||
DatabasePtr DatabaseCatalog::detachDatabase(const String & database_name, bool drop)
|
||||
DatabasePtr DatabaseCatalog::detachDatabase(const String & database_name, bool drop, bool check_empty)
|
||||
{
|
||||
std::lock_guard lock{databases_mutex};
|
||||
assertDatabaseExistsUnlocked(database_name);
|
||||
auto db = databases.find(database_name)->second;
|
||||
|
||||
if (!db->empty(*global_context))
|
||||
if (!db->empty(*global_context))
|
||||
if (check_empty && !db->empty(*global_context))
|
||||
throw Exception("New table appeared in database being dropped or detached. Try again.", ErrorCodes::DATABASE_NOT_EMPTY);
|
||||
|
||||
databases.erase(database_name);
|
||||
|
@ -82,7 +82,7 @@ public:
|
||||
DatabasePtr getSystemDatabase() const;
|
||||
|
||||
void attachDatabase(const String & database_name, const DatabasePtr & database);
|
||||
DatabasePtr detachDatabase(const String & database_name, bool drop = false);
|
||||
DatabasePtr detachDatabase(const String & database_name, bool drop = false, bool check_empty = true);
|
||||
|
||||
DatabasePtr getDatabase(const String & database_name, const Context & local_context) const;
|
||||
DatabasePtr getDatabase(const String & database_name) const;
|
||||
|
@ -41,14 +41,12 @@ BlockIO InterpreterAlterQuery::execute()
|
||||
return executeDDLQueryOnCluster(query_ptr, context, getRequiredAccess());
|
||||
|
||||
context.checkAccess(getRequiredAccess());
|
||||
|
||||
const String & table_name = alter.table;
|
||||
String database_name = alter.database.empty() ? context.getCurrentDatabase() : alter.database;
|
||||
StoragePtr table = context.getTable(database_name, table_name);
|
||||
StorageID table_id{alter, context};
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(table_id);
|
||||
|
||||
/// Add default database to table identifiers that we can encounter in e.g. default expressions,
|
||||
/// mutation expression, etc.
|
||||
AddDefaultDatabaseVisitor visitor(database_name);
|
||||
AddDefaultDatabaseVisitor visitor(table_id.getDatabaseName());
|
||||
ASTPtr command_list_ptr = alter.command_list->ptr();
|
||||
visitor.visit(command_list_ptr);
|
||||
|
||||
|
@ -38,11 +38,10 @@ InterpreterCheckQuery::InterpreterCheckQuery(const ASTPtr & query_ptr_, const Co
|
||||
BlockIO InterpreterCheckQuery::execute()
|
||||
{
|
||||
const auto & check = query_ptr->as<ASTCheckQuery &>();
|
||||
const String & table_name = check.table;
|
||||
String database_name = check.database.empty() ? context.getCurrentDatabase() : check.database;
|
||||
StorageID table_id{check, context};
|
||||
|
||||
context.checkAccess(AccessType::SHOW, database_name, table_name);
|
||||
StoragePtr table = context.getTable(database_name, table_name);
|
||||
context.checkAccess(AccessType::SHOW, table_id.database_name, table_id.table_name);
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(table_id);
|
||||
auto check_results = table->checkData(query_ptr, context);
|
||||
|
||||
Block block;
|
||||
|
@ -147,6 +147,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
|
||||
bool renamed = false;
|
||||
try
|
||||
{
|
||||
//FIXME is it possible to attach db only after it was loaded? (no, loadStoredObjects adds view dependencies)
|
||||
DatabaseCatalog::instance().attachDatabase(database_name, database);
|
||||
added = true;
|
||||
|
||||
@ -163,7 +164,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
|
||||
if (renamed)
|
||||
Poco::File(metadata_file_tmp_path).remove();
|
||||
if (added)
|
||||
DatabaseCatalog::instance().detachDatabase(database_name);
|
||||
DatabaseCatalog::instance().detachDatabase(database_name, false, false);
|
||||
|
||||
throw;
|
||||
}
|
||||
@ -411,8 +412,8 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
|
||||
}
|
||||
else if (!create.as_table.empty())
|
||||
{
|
||||
String as_database_name = create.as_database.empty() ? context.getCurrentDatabase() : create.as_database;
|
||||
StoragePtr as_storage = context.getTable(as_database_name, create.as_table);
|
||||
String as_database_name = context.resolveDatabase(create.as_database);
|
||||
StoragePtr as_storage = DatabaseCatalog::instance().getTable({as_database_name, create.as_table});
|
||||
|
||||
/// as_storage->getColumns() and setEngine(...) must be called under structure lock of other_table for CREATE ... AS other_table.
|
||||
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
|
||||
|
@ -236,23 +236,22 @@ BlockIO InterpreterKillQueryQuery::execute()
|
||||
header.insert(0, {ColumnString::create(), std::make_shared<DataTypeString>(), "kill_status"});
|
||||
|
||||
MutableColumns res_columns = header.cloneEmptyColumns();
|
||||
String database_name, table_name;
|
||||
auto table_id = StorageID::createEmpty();
|
||||
|
||||
for (size_t i = 0; i < mutations_block.rows(); ++i)
|
||||
{
|
||||
database_name = database_col.getDataAt(i).toString();
|
||||
table_name = table_col.getDataAt(i).toString();
|
||||
table_id = StorageID{database_col.getDataAt(i).toString(), table_col.getDataAt(i).toString()};
|
||||
auto mutation_id = mutation_id_col.getDataAt(i).toString();
|
||||
|
||||
CancellationCode code = CancellationCode::Unknown;
|
||||
if (!query.test)
|
||||
{
|
||||
auto storage = context.tryGetTable(database_name, table_name);
|
||||
auto storage = DatabaseCatalog::instance().tryGetTable(table_id);
|
||||
if (!storage)
|
||||
code = CancellationCode::NotFound;
|
||||
else
|
||||
{
|
||||
if (!context.getAccessRights()->isGranted(&Poco::Logger::get("InterpreterKillQueryQuery"), AccessType::KILL_MUTATION, database_name, table_name))
|
||||
if (!context.getAccessRights()->isGranted(&Poco::Logger::get("InterpreterKillQueryQuery"), AccessType::KILL_MUTATION, table_id.database_name, table_id.table_name))
|
||||
continue;
|
||||
code = storage->killMutation(mutation_id);
|
||||
}
|
||||
@ -261,9 +260,9 @@ BlockIO InterpreterKillQueryQuery::execute()
|
||||
insertResultRow(i, code, mutations_block, header, res_columns);
|
||||
}
|
||||
|
||||
if (res_columns[0]->empty() && !table_name.empty())
|
||||
if (res_columns[0]->empty() && table_id)
|
||||
throw Exception(
|
||||
"Not allowed to kill mutation on " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name),
|
||||
"Not allowed to kill mutation on " + table_id.getNameForLogs(),
|
||||
ErrorCodes::ACCESS_DENIED);
|
||||
|
||||
res_io.in = std::make_shared<OneBlockInputStream>(header.cloneWithColumns(std::move(res_columns)));
|
||||
|
@ -25,7 +25,7 @@ BlockIO InterpreterOptimizeQuery::execute()
|
||||
|
||||
context.checkAccess(getRequiredAccess());
|
||||
|
||||
StoragePtr table = context.getTable(ast.database, ast.table);
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(StorageID{ast, context});
|
||||
table->optimize(query_ptr, ast.partition, ast.final, ast.deduplicate, context);
|
||||
return {};
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ BlockIO InterpreterRenameQuery::execute()
|
||||
for (auto & elem : descriptions)
|
||||
{
|
||||
database_catalog.assertTableDoesntExist(StorageID(elem.to_database_name, elem.to_table_name), context);
|
||||
auto from_table = context.getTable(elem.from_database_name, elem.from_table_name);
|
||||
auto from_table = database_catalog.getTable({elem.from_database_name, elem.from_table_name});
|
||||
auto from_table_lock = from_table->lockExclusively(context.getCurrentQueryId());
|
||||
|
||||
database_catalog.getDatabase(elem.from_database_name)->renameTable(
|
||||
|
@ -313,14 +313,14 @@ BlockIO InterpreterSystemQuery::execute()
|
||||
StoragePtr InterpreterSystemQuery::tryRestartReplica(const String & database_name, const String & table_name, Context & system_context)
|
||||
{
|
||||
context.checkAccess(AccessType::RESTART_REPLICA, database_name, table_name);
|
||||
auto database = DatabaseCatalog::instance().getDatabase(database_name, system_context);
|
||||
auto database = DatabaseCatalog::instance().getDatabase(database_name);
|
||||
|
||||
auto table_ddl_guard = DatabaseCatalog::instance().getDDLGuard(database_name, table_name);
|
||||
ASTPtr create_ast;
|
||||
|
||||
/// Detach actions
|
||||
{
|
||||
auto table = system_context.tryGetTable(database_name, table_name);
|
||||
auto table = DatabaseCatalog::instance().tryGetTable({database_name, table_name});
|
||||
|
||||
if (!table || !dynamic_cast<const StorageReplicatedMergeTree *>(table.get()))
|
||||
return nullptr;
|
||||
@ -385,11 +385,11 @@ void InterpreterSystemQuery::restartReplicas(Context & system_context)
|
||||
|
||||
void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query)
|
||||
{
|
||||
String database_name = !query.database.empty() ? query.database : context.getCurrentDatabase();
|
||||
String database_name = context.resolveDatabase(query.database);
|
||||
const String & table_name = query.table;
|
||||
|
||||
context.checkAccess(AccessType::SYNC_REPLICA, database_name, table_name);
|
||||
StoragePtr table = context.getTable(database_name, table_name);
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable({database_name, table_name});
|
||||
|
||||
if (auto storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
|
||||
{
|
||||
@ -409,11 +409,11 @@ void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query)
|
||||
|
||||
void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query)
|
||||
{
|
||||
String database_name = !query.database.empty() ? query.database : context.getCurrentDatabase();
|
||||
String database_name = context.resolveDatabase(query.database);
|
||||
String & table_name = query.table;
|
||||
context.checkAccess(AccessType::FLUSH_DISTRIBUTED, database_name, table_name);
|
||||
|
||||
if (auto storage_distributed = dynamic_cast<StorageDistributed *>(context.getTable(database_name, table_name).get()))
|
||||
if (auto storage_distributed = dynamic_cast<StorageDistributed *>(DatabaseCatalog::instance().getTable({database_name, table_name}).get()))
|
||||
storage_distributed->flushClusterNodesAllData();
|
||||
else
|
||||
throw Exception("Table " + database_name + "." + table_name + " is not distributed", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
@ -41,28 +41,18 @@ BlockIO InterpreterWatchQuery::execute()
|
||||
|
||||
BlockIO res;
|
||||
const ASTWatchQuery & query = typeid_cast<const ASTWatchQuery &>(*query_ptr);
|
||||
String database;
|
||||
String table;
|
||||
/// Get database
|
||||
if (!query.database.empty())
|
||||
database = query.database;
|
||||
else
|
||||
database = context.getCurrentDatabase();
|
||||
|
||||
/// Get table
|
||||
table = query.table;
|
||||
StorageID table_id{query, context};
|
||||
|
||||
/// Get storage
|
||||
storage = context.tryGetTable(database, table);
|
||||
storage = DatabaseCatalog::instance().tryGetTable(table_id);
|
||||
|
||||
if (!storage)
|
||||
throw Exception("Table " + backQuoteIfNeed(database) + "." +
|
||||
backQuoteIfNeed(table) + " doesn't exist.",
|
||||
throw Exception("Table " + table_id.getNameForLogs() + " doesn't exist.",
|
||||
ErrorCodes::UNKNOWN_TABLE);
|
||||
|
||||
/// List of columns to read to execute the query.
|
||||
Names required_columns = storage->getColumns().getNamesOfPhysical();
|
||||
context.checkAccess(AccessType::SELECT, database, table, required_columns);
|
||||
context.checkAccess(AccessType::SELECT, table_id.database_name, table_id.table_name, required_columns);
|
||||
|
||||
/// Get context settings for this query
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
|
@ -127,8 +127,9 @@ protected:
|
||||
private:
|
||||
/* Saving thread data */
|
||||
Context & context;
|
||||
const String database_name;
|
||||
const String table_name;
|
||||
const StorageID table_id;
|
||||
//const String database_name;
|
||||
//const String table_name;
|
||||
const String storage_def;
|
||||
StoragePtr table;
|
||||
bool is_prepared = false;
|
||||
@ -170,11 +171,12 @@ SystemLog<LogElement>::SystemLog(Context & context_,
|
||||
const String & table_name_,
|
||||
const String & storage_def_,
|
||||
size_t flush_interval_milliseconds_)
|
||||
: context(context_),
|
||||
database_name(database_name_), table_name(table_name_), storage_def(storage_def_),
|
||||
: context(context_)
|
||||
, table_id(database_name_, table_name_)
|
||||
, storage_def(storage_def_),
|
||||
flush_interval_milliseconds(flush_interval_milliseconds_)
|
||||
{
|
||||
log = &Logger::get("SystemLog (" + database_name + "." + table_name + ")");
|
||||
log = &Logger::get("SystemLog (" + database_name_ + "." + table_name_ + ")");
|
||||
|
||||
saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); });
|
||||
}
|
||||
@ -338,8 +340,8 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
|
||||
/// This is needed to support DEFAULT-columns in table.
|
||||
|
||||
std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
|
||||
insert->database = database_name;
|
||||
insert->table = table_name;
|
||||
insert->database = table_id.database_name;
|
||||
insert->table = table_id.table_name;
|
||||
ASTPtr query_ptr(insert.release());
|
||||
|
||||
InterpreterInsertQuery interpreter(query_ptr, context);
|
||||
@ -363,9 +365,9 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
|
||||
template <typename LogElement>
|
||||
void SystemLog<LogElement>::prepareTable()
|
||||
{
|
||||
String description = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name);
|
||||
String description = table_id.getNameForLogs();
|
||||
|
||||
table = context.tryGetTable(database_name, table_name);
|
||||
table = DatabaseCatalog::instance().tryGetTable(table_id);
|
||||
|
||||
if (table)
|
||||
{
|
||||
@ -376,18 +378,18 @@ void SystemLog<LogElement>::prepareTable()
|
||||
{
|
||||
/// Rename the existing table.
|
||||
int suffix = 0;
|
||||
while (DatabaseCatalog::instance().isTableExist({database_name, table_name + "_" + toString(suffix)}, context))
|
||||
while (DatabaseCatalog::instance().isTableExist({table_id.database_name, table_id.table_name + "_" + toString(suffix)}, context))
|
||||
++suffix;
|
||||
|
||||
auto rename = std::make_shared<ASTRenameQuery>();
|
||||
|
||||
ASTRenameQuery::Table from;
|
||||
from.database = database_name;
|
||||
from.table = table_name;
|
||||
from.database = table_id.database_name;
|
||||
from.table = table_id.table_name;
|
||||
|
||||
ASTRenameQuery::Table to;
|
||||
to.database = database_name;
|
||||
to.table = table_name + "_" + toString(suffix);
|
||||
to.database = table_id.database_name;
|
||||
to.table = table_id.table_name + "_" + toString(suffix);
|
||||
|
||||
ASTRenameQuery::Element elem;
|
||||
elem.from = from;
|
||||
@ -414,8 +416,8 @@ void SystemLog<LogElement>::prepareTable()
|
||||
|
||||
auto create = std::make_shared<ASTCreateQuery>();
|
||||
|
||||
create->database = database_name;
|
||||
create->table = table_name;
|
||||
create->database = table_id.database_name;
|
||||
create->table = table_id.table_name;
|
||||
|
||||
Block sample = LogElement::createBlock();
|
||||
|
||||
@ -433,7 +435,7 @@ void SystemLog<LogElement>::prepareTable()
|
||||
interpreter.setInternal(true);
|
||||
interpreter.execute();
|
||||
|
||||
table = context.getTable(database_name, table_name);
|
||||
table = DatabaseCatalog::instance().getTable(table_id);
|
||||
}
|
||||
|
||||
is_prepared = true;
|
||||
|
@ -5,9 +5,7 @@
|
||||
#include <Interpreters/castColumn.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Processors/Transforms/AddingMissedTransform.h>
|
||||
#include <DataStreams/ConvertingBlockInputStream.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Databases/IDatabase.h>
|
||||
#include <Storages/StorageBuffer.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/AlterCommands.h>
|
||||
@ -25,8 +23,6 @@
|
||||
#include <common/logger_useful.h>
|
||||
#include <common/getThreadId.h>
|
||||
#include <ext/range.h>
|
||||
#include <DataStreams/FilterBlockInputStream.h>
|
||||
#include <DataStreams/ExpressionBlockInputStream.h>
|
||||
#include <Processors/Transforms/ConvertingTransform.h>
|
||||
#include <Processors/Transforms/FilterTransform.h>
|
||||
#include <Processors/Transforms/ExpressionTransform.h>
|
||||
@ -68,17 +64,14 @@ StorageBuffer::StorageBuffer(
|
||||
size_t num_shards_,
|
||||
const Thresholds & min_thresholds_,
|
||||
const Thresholds & max_thresholds_,
|
||||
const String & destination_database_,
|
||||
const String & destination_table_,
|
||||
const StorageID & destination_id_,
|
||||
bool allow_materialized_)
|
||||
: IStorage(table_id_)
|
||||
, global_context(context_)
|
||||
, num_shards(num_shards_), buffers(num_shards_)
|
||||
, min_thresholds(min_thresholds_)
|
||||
, max_thresholds(max_thresholds_)
|
||||
, destination_database(destination_database_)
|
||||
, destination_table(destination_table_)
|
||||
, no_destination(destination_database.empty() && destination_table.empty())
|
||||
, destination_id(destination_id_)
|
||||
, allow_materialized(allow_materialized_)
|
||||
, log(&Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")"))
|
||||
{
|
||||
@ -142,9 +135,9 @@ private:
|
||||
|
||||
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context) const
|
||||
{
|
||||
if (!no_destination)
|
||||
if (destination_id)
|
||||
{
|
||||
auto destination = context.getTable(destination_database, destination_table);
|
||||
auto destination = DatabaseCatalog::instance().getTable(destination_id);
|
||||
|
||||
if (destination.get() == this)
|
||||
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
|
||||
@ -186,9 +179,9 @@ Pipes StorageBuffer::readWithProcessors(
|
||||
{
|
||||
Pipes pipes_from_dst;
|
||||
|
||||
if (!no_destination)
|
||||
if (destination_id)
|
||||
{
|
||||
auto destination = context.getTable(destination_database, destination_table);
|
||||
auto destination = DatabaseCatalog::instance().getTable(destination_id);
|
||||
|
||||
if (destination.get() == this)
|
||||
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
|
||||
@ -219,7 +212,7 @@ Pipes StorageBuffer::readWithProcessors(
|
||||
{
|
||||
if (!destination->hasColumn(column_name))
|
||||
{
|
||||
LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table)
|
||||
LOG_WARNING(log, "Destination table " << destination_id.getNameForLogs()
|
||||
<< " doesn't have column " << backQuoteIfNeed(column_name) << ". The default values are used.");
|
||||
boost::range::remove_erase(columns_intersection, column_name);
|
||||
continue;
|
||||
@ -228,7 +221,7 @@ Pipes StorageBuffer::readWithProcessors(
|
||||
const auto & col = getColumn(column_name);
|
||||
if (!dst_col.type->equals(*col.type))
|
||||
{
|
||||
LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table)
|
||||
LOG_WARNING(log, "Destination table " << destination_id.getNameForLogs()
|
||||
<< " has different type of column " << backQuoteIfNeed(column_name) << " ("
|
||||
<< dst_col.type->getName() << " != " << col.type->getName() << "). Data from destination table are converted.");
|
||||
header_after_adding_defaults.getByName(column_name) = ColumnWithTypeAndName(dst_col.type, column_name);
|
||||
@ -237,7 +230,7 @@ Pipes StorageBuffer::readWithProcessors(
|
||||
|
||||
if (columns_intersection.empty())
|
||||
{
|
||||
LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table)
|
||||
LOG_WARNING(log, "Destination table " << destination_id.getNameForLogs()
|
||||
<< " has no common columns with block in buffer. Block of data is skipped.");
|
||||
}
|
||||
else
|
||||
@ -366,9 +359,9 @@ public:
|
||||
return;
|
||||
|
||||
StoragePtr destination;
|
||||
if (!storage.no_destination)
|
||||
if (storage.destination_id)
|
||||
{
|
||||
destination = storage.global_context.tryGetTable(storage.destination_database, storage.destination_table);
|
||||
destination = DatabaseCatalog::instance().tryGetTable(storage.destination_id);
|
||||
if (destination.get() == &storage)
|
||||
throw Exception("Destination table is myself. Write will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
|
||||
}
|
||||
@ -378,7 +371,7 @@ public:
|
||||
/// If the block already exceeds the maximum limit, then we skip the buffer.
|
||||
if (rows > storage.max_thresholds.rows || bytes > storage.max_thresholds.bytes)
|
||||
{
|
||||
if (!storage.no_destination)
|
||||
if (storage.destination_id)
|
||||
{
|
||||
LOG_TRACE(storage.log, "Writing block with " << rows << " rows, " << bytes << " bytes directly.");
|
||||
storage.writeBlockToDestination(block, destination);
|
||||
@ -462,10 +455,10 @@ BlockOutputStreamPtr StorageBuffer::write(const ASTPtr & /*query*/, const Contex
|
||||
|
||||
bool StorageBuffer::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const
|
||||
{
|
||||
if (no_destination)
|
||||
if (!destination_id)
|
||||
return false;
|
||||
|
||||
auto destination = global_context.getTable(destination_database, destination_table);
|
||||
auto destination = DatabaseCatalog::instance().getTable(destination_id);
|
||||
|
||||
if (destination.get() == this)
|
||||
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
|
||||
@ -621,7 +614,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
|
||||
|
||||
LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds.");
|
||||
|
||||
if (no_destination)
|
||||
if (!destination_id)
|
||||
return;
|
||||
|
||||
/** For simplicity, buffer is locked during write.
|
||||
@ -632,7 +625,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
|
||||
*/
|
||||
try
|
||||
{
|
||||
writeBlockToDestination(block_to_write, global_context.tryGetTable(destination_database, destination_table));
|
||||
writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -656,12 +649,12 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
|
||||
|
||||
void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr table)
|
||||
{
|
||||
if (no_destination || !block)
|
||||
if (!destination_id || !block)
|
||||
return;
|
||||
|
||||
if (!table)
|
||||
{
|
||||
LOG_ERROR(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) << " doesn't exist. Block of data is discarded.");
|
||||
LOG_ERROR(log, "Destination table " << destination_id.getNameForLogs() << " doesn't exist. Block of data is discarded.");
|
||||
return;
|
||||
}
|
||||
|
||||
@ -669,8 +662,8 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
|
||||
|
||||
auto insert = std::make_shared<ASTInsertQuery>();
|
||||
|
||||
insert->database = destination_database;
|
||||
insert->table = destination_table;
|
||||
insert->database = destination_id.database_name;
|
||||
insert->table = destination_id.table_name;
|
||||
|
||||
/** We will insert columns that are the intersection set of columns of the buffer table and the subordinate table.
|
||||
* This will support some of the cases (but not all) when the table structure does not match.
|
||||
@ -685,7 +678,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
|
||||
auto column = block.getByName(dst_col.name);
|
||||
if (!column.type->equals(*dst_col.type))
|
||||
{
|
||||
LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table)
|
||||
LOG_WARNING(log, "Destination table " << destination_id.getNameForLogs()
|
||||
<< " have different type of column " << backQuoteIfNeed(column.name) << " ("
|
||||
<< dst_col.type->getName() << " != " << column.type->getName()
|
||||
<< "). Block of data is converted.");
|
||||
@ -699,14 +692,14 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
|
||||
|
||||
if (block_to_write.columns() == 0)
|
||||
{
|
||||
LOG_ERROR(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table)
|
||||
LOG_ERROR(log, "Destination table " << destination_id.getNameForLogs()
|
||||
<< " have no common columns with block in buffer. Block of data is discarded.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (block_to_write.columns() != block.columns())
|
||||
LOG_WARNING(log, "Not all columns from block in buffer exist in destination table "
|
||||
<< backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) << ". Some columns are discarded.");
|
||||
<< destination_id.getNameForLogs() << ". Some columns are discarded.");
|
||||
|
||||
auto list_of_columns = std::make_shared<ASTExpressionList>();
|
||||
insert->columns = list_of_columns;
|
||||
@ -788,6 +781,7 @@ void registerStorageBuffer(StorageFactory & factory)
|
||||
" destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
//FIXME currentDatabase() at the moment of table creation can be different from currentDatabase() at the moment when table is loaded|used
|
||||
engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context);
|
||||
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context);
|
||||
|
||||
@ -803,6 +797,14 @@ void registerStorageBuffer(StorageFactory & factory)
|
||||
UInt64 min_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[7]->as<ASTLiteral &>().value);
|
||||
UInt64 max_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[8]->as<ASTLiteral &>().value);
|
||||
|
||||
/// If destination_id is not set, do not write data from the buffer, but simply empty the buffer.
|
||||
StorageID destination_id = StorageID::createEmpty();
|
||||
if (!destination_table.empty())
|
||||
{
|
||||
destination_id.database_name = args.context.resolveDatabase(destination_database);
|
||||
destination_id.table_name = destination_table;
|
||||
}
|
||||
|
||||
return StorageBuffer::create(
|
||||
args.table_id,
|
||||
args.columns,
|
||||
@ -811,7 +813,7 @@ void registerStorageBuffer(StorageFactory & factory)
|
||||
num_buckets,
|
||||
StorageBuffer::Thresholds{min_time, min_rows, min_bytes},
|
||||
StorageBuffer::Thresholds{max_time, max_rows, max_bytes},
|
||||
destination_database, destination_table,
|
||||
destination_id,
|
||||
static_cast<bool>(args.local_context.getSettingsRef().insert_allow_materialized_columns));
|
||||
});
|
||||
}
|
||||
|
@ -76,9 +76,9 @@ public:
|
||||
bool supportsSampling() const override { return true; }
|
||||
bool supportsPrewhere() const override
|
||||
{
|
||||
if (no_destination)
|
||||
if (!destination_id)
|
||||
return false;
|
||||
auto dest = global_context.tryGetTable(destination_database, destination_table);
|
||||
auto dest = DatabaseCatalog::instance().tryGetTable(destination_id);
|
||||
if (dest && dest.get() != this)
|
||||
return dest->supportsPrewhere();
|
||||
return false;
|
||||
@ -112,9 +112,7 @@ private:
|
||||
const Thresholds min_thresholds;
|
||||
const Thresholds max_thresholds;
|
||||
|
||||
const String destination_database;
|
||||
const String destination_table;
|
||||
bool no_destination; /// If set, do not write data from the buffer, but simply empty the buffer.
|
||||
StorageID destination_id;
|
||||
bool allow_materialized;
|
||||
|
||||
Poco::Logger * log;
|
||||
@ -146,8 +144,7 @@ protected:
|
||||
size_t num_shards_,
|
||||
const Thresholds & min_thresholds_,
|
||||
const Thresholds & max_thresholds_,
|
||||
const String & destination_database_,
|
||||
const String & destination_table_,
|
||||
const StorageID & destination_id,
|
||||
bool allow_materialized_);
|
||||
};
|
||||
|
||||
|
50
dbms/src/Storages/StorageID.cpp
Normal file
50
dbms/src/Storages/StorageID.cpp
Normal file
@ -0,0 +1,50 @@
|
||||
#include <Storages/StorageID.h>
|
||||
#include <Parsers/ASTQueryWithTableAndOutput.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
StorageID::StorageID(const ASTQueryWithTableAndOutput & query, const Context & local_context)
|
||||
{
|
||||
database_name = local_context.resolveDatabase(query.database);
|
||||
table_name = query.table;
|
||||
uuid = query.uuid;
|
||||
assertNotEmpty();
|
||||
}
|
||||
|
||||
String StorageID::getNameForLogs() const
|
||||
{
|
||||
assertNotEmpty();
|
||||
return (database_name.empty() ? "" : backQuoteIfNeed(database_name) + ".") + backQuoteIfNeed(table_name)
|
||||
+ (hasUUID() ? " (UUID " + toString(uuid) + ")" : "");
|
||||
}
|
||||
|
||||
bool StorageID::operator<(const StorageID & rhs) const
|
||||
{
|
||||
assertNotEmpty();
|
||||
/// It's needed for ViewDependencies
|
||||
if (!hasUUID() && !rhs.hasUUID())
|
||||
/// If both IDs don't have UUID, compare them like pair of strings
|
||||
return std::tie(database_name, table_name) < std::tie(rhs.database_name, rhs.table_name);
|
||||
else if (hasUUID() && rhs.hasUUID())
|
||||
/// If both IDs have UUID, compare UUIDs and ignore database and table name
|
||||
return uuid < rhs.uuid;
|
||||
else
|
||||
/// All IDs without UUID are less, then all IDs with UUID
|
||||
return !hasUUID();
|
||||
}
|
||||
|
||||
void StorageID::assertNotEmpty() const
|
||||
{
|
||||
if (empty())
|
||||
throw Exception("Both table name and UUID are empty", ErrorCodes::LOGICAL_ERROR);
|
||||
if (table_name == TABLE_WITH_UUID_NAME_PLACEHOLDER && !hasUUID())
|
||||
throw Exception("Table name was replaced with placeholder, but UUID is Nil", ErrorCodes::LOGICAL_ERROR);
|
||||
if (table_name.empty() && !database_name.empty())
|
||||
throw Exception("Table name is empty, but database name is not", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
}
|
@ -1,8 +1,6 @@
|
||||
#pragma once
|
||||
#include <Core/Types.h>
|
||||
#include <Core/UUID.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <tuple>
|
||||
|
||||
namespace DB
|
||||
@ -15,19 +13,24 @@ namespace ErrorCodes
|
||||
|
||||
static constexpr char const * TABLE_WITH_UUID_NAME_PLACEHOLDER = "_";
|
||||
|
||||
class ASTQueryWithTableAndOutput;
|
||||
class Context;
|
||||
|
||||
struct StorageID
|
||||
{
|
||||
String database_name;
|
||||
String table_name;
|
||||
UUID uuid = UUID{UInt128(0, 0)};
|
||||
UUID uuid = UUIDHelpers::Nil;
|
||||
|
||||
|
||||
StorageID(const String & database, const String & table, UUID uuid_ = UUID{UInt128(0, 0)})
|
||||
StorageID(const String & database, const String & table, UUID uuid_ = UUIDHelpers::Nil)
|
||||
: database_name(database), table_name(table), uuid(uuid_)
|
||||
{
|
||||
assertNotEmpty();
|
||||
}
|
||||
|
||||
StorageID(const ASTQueryWithTableAndOutput & query, const Context & local_context);
|
||||
|
||||
String getDatabaseName() const
|
||||
{
|
||||
assertNotEmpty();
|
||||
@ -46,12 +49,7 @@ struct StorageID
|
||||
return (database_name.empty() ? "" : database_name + ".") + table_name;
|
||||
}
|
||||
|
||||
String getNameForLogs() const
|
||||
{
|
||||
assertNotEmpty();
|
||||
return (database_name.empty() ? "" : backQuoteIfNeed(database_name) + ".") + backQuoteIfNeed(table_name)
|
||||
+ (hasUUID() ? " (UUID " + toString(uuid) + ")" : "");
|
||||
}
|
||||
String getNameForLogs() const;
|
||||
|
||||
explicit operator bool () const
|
||||
{
|
||||
@ -68,30 +66,9 @@ struct StorageID
|
||||
return uuid != UUID{UInt128(0, 0)};
|
||||
}
|
||||
|
||||
bool operator<(const StorageID & rhs) const
|
||||
{
|
||||
assertNotEmpty();
|
||||
/// It's needed for ViewDependencies
|
||||
if (!hasUUID() && !rhs.hasUUID())
|
||||
/// If both IDs don't have UUID, compare them like pair of strings
|
||||
return std::tie(database_name, table_name) < std::tie(rhs.database_name, rhs.table_name);
|
||||
else if (hasUUID() && rhs.hasUUID())
|
||||
/// If both IDs have UUID, compare UUIDs and ignore database and table name
|
||||
return uuid < rhs.uuid;
|
||||
else
|
||||
/// All IDs without UUID are less, then all IDs with UUID
|
||||
return !hasUUID();
|
||||
}
|
||||
bool operator<(const StorageID & rhs) const;
|
||||
|
||||
void assertNotEmpty() const
|
||||
{
|
||||
if (empty())
|
||||
throw Exception("Both table name and UUID are empty", ErrorCodes::LOGICAL_ERROR);
|
||||
if (table_name == TABLE_WITH_UUID_NAME_PLACEHOLDER && !hasUUID())
|
||||
throw Exception("Table name was replaced with placeholder, but UUID is Nil", ErrorCodes::LOGICAL_ERROR);
|
||||
if (table_name.empty() && !database_name.empty())
|
||||
throw Exception("Table name is empty, but database name is not", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
void assertNotEmpty() const;
|
||||
|
||||
/// Avoid implicit construction of empty StorageID. However, it's needed for deferred initialization.
|
||||
static StorageID createEmpty() { return {}; }
|
||||
|
@ -146,7 +146,7 @@ StorageMaterializedView::StorageMaterializedView(
|
||||
create_interpreter.setInternal(true);
|
||||
create_interpreter.execute();
|
||||
|
||||
target_table_id = global_context.getTable(manual_create_query->database, manual_create_query->table)->getStorageID();
|
||||
target_table_id = DatabaseCatalog::instance().getTable({manual_create_query->database, manual_create_query->table})->getStorageID();
|
||||
}
|
||||
|
||||
if (!select_table_id.empty())
|
||||
|
@ -1044,8 +1044,8 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
|
||||
break;
|
||||
case PartitionCommand::MoveDestinationType::TABLE:
|
||||
checkPartitionCanBeDropped(command.partition);
|
||||
String dest_database = command.to_database.empty() ? context.getCurrentDatabase() : command.to_database;
|
||||
auto dest_storage = context.getTable(dest_database, command.to_table);
|
||||
String dest_database = context.resolveDatabase(command.to_database);
|
||||
auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table});
|
||||
movePartitionToTable(dest_storage, command.partition, context);
|
||||
break;
|
||||
}
|
||||
@ -1056,8 +1056,8 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
|
||||
case PartitionCommand::REPLACE_PARTITION:
|
||||
{
|
||||
checkPartitionCanBeDropped(command.partition);
|
||||
String from_database = command.from_database.empty() ? context.getCurrentDatabase() : command.from_database;
|
||||
auto from_storage = context.getTable(from_database, command.from_table);
|
||||
String from_database = context.resolveDatabase(command.from_database);
|
||||
auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table});
|
||||
replacePartitionFrom(from_storage, command.partition, command.replace, context);
|
||||
}
|
||||
break;
|
||||
|
@ -1670,14 +1670,14 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
|
||||
StoragePtr source_table;
|
||||
TableStructureReadLockHolder table_lock_holder_src_table;
|
||||
String source_table_name = entry_replace.from_database + "." + entry_replace.from_table;
|
||||
StorageID source_table_id{entry_replace.from_database, entry_replace.from_table};
|
||||
|
||||
auto clone_data_parts_from_source_table = [&] () -> size_t
|
||||
{
|
||||
source_table = global_context.tryGetTable(entry_replace.from_database, entry_replace.from_table);
|
||||
source_table = DatabaseCatalog::instance().tryGetTable(source_table_id);
|
||||
if (!source_table)
|
||||
{
|
||||
LOG_DEBUG(log, "Can't use " << source_table_name << " as source table for REPLACE PARTITION command. It does not exist.");
|
||||
LOG_DEBUG(log, "Can't use " << source_table_id.getNameForLogs() << " as source table for REPLACE PARTITION command. It does not exist.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1688,7 +1688,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
}
|
||||
catch (Exception &)
|
||||
{
|
||||
LOG_INFO(log, "Can't use " << source_table_name << " as source table for REPLACE PARTITION command. Will fetch all parts."
|
||||
LOG_INFO(log, "Can't use " << source_table_id.getNameForLogs() << " as source table for REPLACE PARTITION command. Will fetch all parts."
|
||||
<< " Reason: " << getCurrentExceptionMessage(false));
|
||||
return 0;
|
||||
}
|
||||
@ -1704,7 +1704,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
auto src_part = src_data->getPartIfExists(part_desc->src_part_info, valid_states);
|
||||
if (!src_part)
|
||||
{
|
||||
LOG_DEBUG(log, "There is no part " << part_desc->src_part_name << " in " << source_table_name);
|
||||
LOG_DEBUG(log, "There is no part " << part_desc->src_part_name << " in " << source_table_id.getNameForLogs());
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -1716,7 +1716,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
|
||||
if (checksum_hex != part_desc->checksum_hex)
|
||||
{
|
||||
LOG_DEBUG(log, "Part " << part_desc->src_part_name << " of " << source_table_name << " has inappropriate checksum");
|
||||
LOG_DEBUG(log, "Part " << part_desc->src_part_name << " of " << source_table_id.getNameForLogs() << " has inappropriate checksum");
|
||||
/// TODO: check version
|
||||
continue;
|
||||
}
|
||||
@ -3556,8 +3556,8 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
|
||||
break;
|
||||
case PartitionCommand::MoveDestinationType::TABLE:
|
||||
checkPartitionCanBeDropped(command.partition);
|
||||
String dest_database = command.to_database.empty() ? query_context.getCurrentDatabase() : command.to_database;
|
||||
auto dest_storage = query_context.getTable(dest_database, command.to_table);
|
||||
String dest_database = query_context.resolveDatabase(command.to_database);
|
||||
auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table});
|
||||
movePartitionToTable(dest_storage, command.partition, query_context);
|
||||
break;
|
||||
}
|
||||
@ -3567,8 +3567,8 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
|
||||
case PartitionCommand::REPLACE_PARTITION:
|
||||
{
|
||||
checkPartitionCanBeDropped(command.partition);
|
||||
String from_database = command.from_database.empty() ? query_context.getCurrentDatabase() : command.from_database;
|
||||
auto from_storage = query_context.getTable(from_database, command.from_table);
|
||||
String from_database = query_context.resolveDatabase(command.from_database);
|
||||
auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table});
|
||||
replacePartitionFrom(from_storage, command.partition, command.replace, query_context);
|
||||
}
|
||||
break;
|
||||
|
@ -28,6 +28,7 @@
|
||||
#include <aws/s3/model/ListObjectsRequest.h>
|
||||
|
||||
#include <Common/parseGlobs.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <re2/re2.h>
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user