use StorageID in views

This commit is contained in:
Alexander Tokmakov 2019-12-10 22:48:16 +03:00
parent 9319863a59
commit ef129b4b7c
14 changed files with 189 additions and 201 deletions

View File

@ -46,7 +46,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
for (const auto & database_table : dependencies)
{
auto dependent_table = context.getTable(database_table.database_name, database_table.table_name); //FIXME
auto dependent_table = context.getTable(database_table);
ASTPtr query;
BlockOutputStreamPtr out;
@ -219,7 +219,7 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
/// InterpreterSelectQuery will do processing of alias columns.
Context local_context = *views_context;
local_context.addViewSource(
StorageValues::create(StorageID(storage->getDatabaseName(), storage->getTableName()), storage->getColumns(), //FIXME
StorageValues::create(storage->getStorageID(), storage->getColumns(),
block));
select.emplace(view.query, local_context, SelectQueryOptions());
in = std::make_shared<MaterializingBlockInputStream>(select->execute().in);

View File

@ -758,7 +758,6 @@ void Context::checkDatabaseAccessRightsImpl(const std::string & database_name) c
throw Exception("Access denied to database " + database_name + " for user " + client_info.current_user , ErrorCodes::DATABASE_ACCESS_DENIED);
}
//FIXME use uuids if not empty
void Context::addDependencyUnsafe(const StorageID & from, const StorageID & where)
{
@ -767,7 +766,7 @@ void Context::addDependencyUnsafe(const StorageID & from, const StorageID & wher
shared->view_dependencies[from].insert(where);
// Notify table of dependencies change
auto table = tryGetTable(from.database_name, from.table_name);
auto table = tryGetTable(from);
if (table != nullptr)
table->updateDependencies();
}
@ -785,7 +784,7 @@ void Context::removeDependencyUnsafe(const StorageID & from, const StorageID & w
shared->view_dependencies[from].erase(where);
// Notify table of dependencies change
auto table = tryGetTable(from.database_name, from.table_name);
auto table = tryGetTable(from);
if (table != nullptr)
table->updateDependencies();
}
@ -940,24 +939,32 @@ StoragePtr Context::tryGetExternalTable(const String & table_name) const
return jt->second.first;
}
StoragePtr Context::getTable(const String & database_name, const String & table_name) const
{
return getTable(StorageID(database_name, table_name));
}
StoragePtr Context::getTable(const StorageID & table_id) const
{
Exception exc;
auto res = getTableImpl(database_name, table_name, &exc);
auto res = getTableImpl(table_id, &exc);
if (!res)
throw exc;
return res;
}
StoragePtr Context::tryGetTable(const String & database_name, const String & table_name) const
{
return getTableImpl(database_name, table_name, nullptr);
return tryGetTable(StorageID(database_name, table_name));
}
StoragePtr Context::tryGetTable(const StorageID & table_id) const
{
return getTableImpl(table_id, nullptr);
}
StoragePtr Context::getTableImpl(const String & database_name, const String & table_name, Exception * exception) const
StoragePtr Context::getTableImpl(const StorageID & table_id, Exception * exception) const
{
String db;
DatabasePtr database;
@ -965,14 +972,15 @@ StoragePtr Context::getTableImpl(const String & database_name, const String & ta
{
auto lock = getLock();
if (database_name.empty())
if (table_id.database_name.empty())
{
StoragePtr res = tryGetExternalTable(table_name);
StoragePtr res = tryGetExternalTable(table_id.table_name);
if (res)
return res;
}
db = resolveDatabase(database_name, current_database);
//FIXME what if table was moved to another database?
db = resolveDatabase(table_id.database_name, current_database);
checkDatabaseAccessRightsImpl(db);
Databases::const_iterator it = shared->databases.find(db);
@ -986,11 +994,11 @@ StoragePtr Context::getTableImpl(const String & database_name, const String & ta
database = it->second;
}
auto table = database->tryGetTable(*this, table_name);
auto table = database->tryGetTable(*this, table_id.table_name);
if (!table)
{
if (exception)
*exception = Exception("Table " + backQuoteIfNeed(db) + "." + backQuoteIfNeed(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
*exception = Exception("Table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
return {};
}

View File

@ -91,10 +91,6 @@ class CompiledExpressionCache;
#endif
/// (database name, table name)
//FIXME replace with StorageID
//using DatabaseAndTableName = std::pair<String, String>;
/// Table -> set of table-views that make SELECT from it.
using ViewDependencies = std::map<StorageID, std::set<StorageID>>;
using Dependencies = std::vector<StorageID>;
@ -128,6 +124,7 @@ using IHostContextPtr = std::shared_ptr<IHostContext>;
*
* Everything is encapsulated for all sorts of checks and locks.
*/
///TODO remove syntax sugar and legacy methods from Context (e.g. getInputFormat(...) which just returns object from factory)
class Context
{
private:
@ -286,7 +283,9 @@ public:
Tables getExternalTables() const;
StoragePtr tryGetExternalTable(const String & table_name) const;
StoragePtr getTable(const String & database_name, const String & table_name) const;
StoragePtr getTable(const StorageID & table_id) const;
StoragePtr tryGetTable(const String & database_name, const String & table_name) const;
StoragePtr tryGetTable(const StorageID & table_id) const;
void addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast = {});
void addScalar(const String & name, const Block & block);
bool hasScalar(const String & name) const;
@ -592,7 +591,7 @@ private:
EmbeddedDictionaries & getEmbeddedDictionariesImpl(bool throw_on_error) const;
StoragePtr getTableImpl(const String & database_name, const String & table_name, Exception * exception) const;
StoragePtr getTableImpl(const StorageID & table_id, Exception * exception) const;
SessionKey getSessionKey(const String & session_id) const;

View File

@ -142,6 +142,7 @@ public:
String with_name;
/// REPLACE(ATTACH) PARTITION partition FROM db.table
//FIXME use StorageID
String from_database;
String from_table;
/// To distinguish REPLACE and ATTACH PARTITION partition FROM db.table

View File

@ -4,6 +4,7 @@
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTDictionary.h>
#include <Parsers/ASTDictionaryAttributeDeclaration.h>
#include <Storages/StorageID.h>
namespace DB

View File

@ -30,7 +30,7 @@
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/queryToString.h>
#include <boost/algorithm/string.hpp>
#include <Storages/StorageID.h>
#include "ASTColumnsMatcher.h"
@ -198,6 +198,40 @@ bool ParserCompoundIdentifier::parseImpl(Pos & pos, ASTPtr & node, Expected & ex
}
bool parseStorageID(IParser::Pos & pos, StorageID & res, Expected & expected)
{
ParserKeyword s_uuid("UUID");
ParserIdentifier name_p;
ParserStringLiteral uuid_p;
ParserToken s_dot(TokenType::Dot);
ASTPtr database;
ASTPtr table;
ASTPtr uuid;
if (!name_p.parse(pos, table, expected))
return false;
if (s_dot.ignore(pos, expected))
{
database = table;
if (!name_p.parse(pos, table, expected))
return false;
}
if (s_uuid.ignore(pos, expected))
{
if (!uuid_p.parse(pos, uuid, expected))
return false;
}
tryGetIdentifierNameInto(database, res.database_name);
tryGetIdentifierNameInto(table, res.table_name);
res.uuid = uuid ? uuid->as<ASTLiteral>()->value.get<String>() : "";
return true;
}
bool ParserFunction::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserIdentifier id_parser;

View File

@ -56,6 +56,12 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};
struct StorageID;
/// Table name, possibly with database name and UUID as string literal
/// [db_name.]table_name [UUID 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx']
//TODO replace with class
bool parseStorageID(IParser::Pos & pos, StorageID & res, Expected & expected);
/// Just *
class ParserAsterisk : public IParserBase
{

View File

@ -13,6 +13,7 @@
#include <Parsers/ASTConstraintDeclaration.h>
#include <Parsers/ParserDictionary.h>
#include <Parsers/ParserDictionaryAttributeDeclaration.h>
#include <Storages/StorageID.h>
namespace DB

View File

@ -302,7 +302,7 @@ bool StorageKafka::checkDependencies(const StorageID & table_id)
// Check the dependencies are ready?
for (const auto & db_tab : dependencies)
{
auto table = global_context.tryGetTable(db_tab.database_name, db_tab.table_name); //FIXME
auto table = global_context.tryGetTable(db_tab);
if (!table)
return false;
@ -354,7 +354,7 @@ void StorageKafka::threadFunc()
bool StorageKafka::streamToViews()
{
auto table_id = getStorageID();
auto table = global_context.getTable(table_id.database_name, table_id.table_name);
auto table = global_context.getTable(table_id);
if (!table)
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);

View File

@ -14,7 +14,6 @@ limitations under the License. */
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTWatchQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterDropQuery.h>
@ -33,6 +32,7 @@ limitations under the License. */
#include <Storages/LiveView/LiveViewBlockOutputStream.h>
#include <Storages/LiveView/LiveViewEventsBlockInputStream.h>
#include <Storages/LiveView/ProxyStorage.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageFactory.h>
#include <Parsers/ASTTablesInSelectQuery.h>
@ -52,42 +52,6 @@ namespace ErrorCodes
extern const int SUPPORT_IS_DISABLED;
}
static void extractDependentTable(ASTSelectQuery & query, String & select_database_name, String & select_table_name)
{
auto db_and_table = getDatabaseAndTable(query, 0);
ASTPtr subquery = extractTableExpression(query, 0);
if (!db_and_table && !subquery)
return;
if (db_and_table)
{
select_table_name = db_and_table->table;
if (db_and_table->database.empty())
{
db_and_table->database = select_database_name;
AddDefaultDatabaseVisitor visitor(select_database_name);
visitor.visit(query);
}
else
select_database_name = db_and_table->database;
}
else if (auto * ast_select = subquery->as<ASTSelectWithUnionQuery>())
{
if (ast_select->list_of_selects->children.size() != 1)
throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW);
auto & inner_query = ast_select->list_of_selects->children.at(0);
extractDependentTable(inner_query->as<ASTSelectQuery &>(), select_database_name, select_table_name);
}
else
throw Exception("Logical error while creating StorageLiveView."
" Could not retrieve table name from select query.",
DB::ErrorCodes::LOGICAL_ERROR);
}
void StorageLiveView::writeIntoLiveView(
StorageLiveView & live_view,
@ -145,7 +109,7 @@ void StorageLiveView::writeIntoLiveView(
if (!is_block_processed)
{
auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName());
auto parent_storage = context.getTable(live_view.getSelectTableID());
BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)};
auto proxy_storage = std::make_shared<ProxyStorage>(parent_storage, std::move(streams), QueryProcessingStage::FetchColumns);
InterpreterSelectQuery select_block(live_view.getInnerQuery(),
@ -177,7 +141,7 @@ void StorageLiveView::writeIntoLiveView(
}
}
auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName());
auto parent_storage = context.getTable(live_view.getSelectTableID());
auto proxy_storage = std::make_shared<ProxyStorage>(parent_storage, std::move(from), QueryProcessingStage::WithMergeableState);
InterpreterSelectQuery select(live_view.getInnerQuery(), context, proxy_storage, QueryProcessingStage::Complete);
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
@ -205,25 +169,19 @@ StorageLiveView::StorageLiveView(
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
/// Default value, if only table name exist in the query
select_database_name = local_context.getCurrentDatabase();
if (query.select->list_of_selects->children.size() != 1)
throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW);
inner_query = query.select->list_of_selects->children.at(0);
ASTSelectQuery & select_query = typeid_cast<ASTSelectQuery &>(*inner_query);
extractDependentTable(select_query, select_database_name, select_table_name);
select_table_id = extractDependentTableFromSelectQuery(select_query, global_context, true);
/// If the table is not specified - use the table `system.one`
if (select_table_name.empty())
{
select_database_name = "system";
select_table_name = "one";
}
//FIXME why?
if (select_table_id.empty())
select_table_id = StorageID("system", "one");
global_context.addDependency(
StorageID(select_database_name, select_table_name),
table_id_); //FIXME
global_context.addDependency(select_table_id, table_id_);
is_temporary = query.temporary;
temporary_live_view_timeout = local_context.getSettingsRef().temporary_live_view_timeout.totalSeconds();
@ -255,7 +213,7 @@ Block StorageLiveView::getHeader() const
if (!sample_block)
{
auto storage = global_context.getTable(select_database_name, select_table_name);
auto storage = global_context.getTable(select_table_id);
sample_block = InterpreterSelectQuery(inner_query, global_context, storage,
SelectQueryOptions(QueryProcessingStage::Complete)).getSampleBlock();
sample_block.insert({DataTypeUInt64().createColumnConst(
@ -290,7 +248,7 @@ bool StorageLiveView::getNewBlocks()
mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
mergeable_blocks->push_back(new_mergeable_blocks);
BlockInputStreamPtr from = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(new_mergeable_blocks), mergeable_stream->getHeader());
auto proxy_storage = ProxyStorage::createProxyStorage(global_context.getTable(select_database_name, select_table_name), {from}, QueryProcessingStage::WithMergeableState);
auto proxy_storage = ProxyStorage::createProxyStorage(global_context.getTable(select_table_id), {from}, QueryProcessingStage::WithMergeableState);
InterpreterSelectQuery select(inner_query->clone(), global_context, proxy_storage, SelectQueryOptions(QueryProcessingStage::Complete));
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
@ -375,7 +333,7 @@ void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, co
if (drop_table)
{
if (storage->global_context.tryGetTable(table_id.database_name, table_id.table_name)) //FIXME
if (storage->global_context.tryGetTable(table_id))
{
try
{
@ -468,9 +426,7 @@ StorageLiveView::~StorageLiveView()
void StorageLiveView::drop(TableStructureWriteLockHolder &)
{
auto table_id = getStorageID();
global_context.removeDependency(
StorageID(select_database_name, select_table_name),
table_id); //FIXME
global_context.removeDependency(select_table_id, table_id);
std::lock_guard lock(mutex);
is_dropped = true;

View File

@ -41,8 +41,7 @@ friend class LiveViewBlockOutputStream;
public:
~StorageLiveView() override;
String getName() const override { return "LiveView"; }
String getSelectDatabaseName() const { return select_database_name; }
String getSelectTableName() const { return select_table_name; }
StorageID getSelectTableID() const { return select_table_id; }
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;
@ -140,8 +139,7 @@ public:
const Context & context);
private:
String select_database_name;
String select_table_name;
StorageID select_table_id;
ASTPtr inner_query;
Context & global_context;
bool is_temporary = false;

View File

@ -11,28 +11,36 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static constexpr char const * TABLE_WITH_UUID_NAME_PLACEHOLDER = "_";
struct StorageID
{
String database_name;
String table_name;
String uuid;
StorageID() = delete;
//StorageID() = delete;
StorageID() = default;
//TODO StorageID(const ASTPtr & query_with_one_table, const Context & context) to get db and table names (and maybe uuid) from query
//But there are a lot of different ASTs with db and table name
//And it looks like it depends on https://github.com/ClickHouse/ClickHouse/pull/7774
StorageID(const String & database, const String & table, const String & uuid_ = {})
: database_name(database), table_name(table), uuid(uuid_)
{
assert_not_empty();
}
String getFullTableName() const
{
assert_valid();
return (database_name.empty() ? "" : database_name + ".") + table_name;
}
String getNameForLogs() const
{
return (database_name.empty() ? "" : backQuoteIfNeed(database_name) + ".") + backQuoteIfNeed(table_name) + " (UUID " + uuid + ")";
assert_valid();
return (database_name.empty() ? "" : backQuoteIfNeed(database_name) + ".") + backQuoteIfNeed(table_name) + (uuid.empty() ? "" : " (UUID " + uuid + ")");
}
String getId() const
@ -45,13 +53,31 @@ struct StorageID
bool operator<(const StorageID & rhs) const
{
return std::tie(uuid, database_name, table_name) < std::tie(rhs.uuid, rhs.database_name, rhs.table_name);
assert_valid();
/// It's needed for ViewDependencies
if (uuid.empty() && rhs.uuid.empty())
/// If both IDs don't have UUID, compare them like pair of strings
return std::tie(database_name, table_name) < std::tie(rhs.database_name, rhs.table_name);
else if (!uuid.empty() && !rhs.uuid.empty())
/// If both IDs have UUID, compare UUIDs and ignore database and table name
return uuid < rhs.uuid;
else
/// All IDs without UUID are less, then all IDs with UUID
return uuid.empty();
}
void assert_not_empty() const
bool empty() const
{
if (database_name.empty() && table_name.empty())
return table_name.empty() || (table_name == TABLE_WITH_UUID_NAME_PLACEHOLDER && uuid.empty());
}
void assert_valid() const
{
if (empty())
throw Exception("empty table name", ErrorCodes::LOGICAL_ERROR);
if (table_name == TABLE_WITH_UUID_NAME_PLACEHOLDER && uuid.empty() && !database_name.empty())
throw Exception("unexpected database name", ErrorCodes::LOGICAL_ERROR);
}
};

View File

@ -28,6 +28,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int INCORRECT_QUERY;
extern const int QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW;
extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW;
}
static inline String generateInnerTableName(const String & table_name)
@ -35,39 +36,37 @@ static inline String generateInnerTableName(const String & table_name)
return ".inner." + table_name;
}
static void extractDependentTable(ASTSelectQuery & query, String & select_database_name, String & select_table_name)
StorageID extractDependentTableFromSelectQuery(ASTSelectQuery & query, Context & context, bool is_live_view /*= false*/, bool need_visitor /*= true*/)
{
if (need_visitor)
{
AddDefaultDatabaseVisitor visitor(context.getCurrentDatabase(), nullptr);
visitor.visit(query);
}
auto db_and_table = getDatabaseAndTable(query, 0);
ASTPtr subquery = extractTableExpression(query, 0);
if (!db_and_table && !subquery)
return;
return {}; //FIXME in which cases we cannot get table name?
if (db_and_table)
{
select_table_name = db_and_table->table;
if (db_and_table->database.empty())
{
db_and_table->database = select_database_name;
AddDefaultDatabaseVisitor visitor(select_database_name);
visitor.visit(query);
}
else
select_database_name = db_and_table->database;
//TODO uuid
return StorageID(db_and_table->database, db_and_table->table/*, db_and_table->uuid*/);
}
else if (auto * ast_select = subquery->as<ASTSelectWithUnionQuery>())
{
if (ast_select->list_of_selects->children.size() != 1)
throw Exception("UNION is not supported for MATERIALIZED VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW);
throw Exception(String("UNION is not supported for ") + (is_live_view ? "LIVE VIEW" : "MATERIALIZED VIEW"),
is_live_view ? ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW : ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW);
auto & inner_query = ast_select->list_of_selects->children.at(0);
extractDependentTable(inner_query->as<ASTSelectQuery &>(), select_database_name, select_table_name);
return extractDependentTableFromSelectQuery(inner_query->as<ASTSelectQuery &>(), context, is_live_view, false);
}
else
throw Exception("Logical error while creating StorageMaterializedView."
" Could not retrieve table name from select query.",
throw Exception(String("Logical error while creating Storage") + (is_live_view ? "Live" : "Materialized") +
"View. Could not retrieve table name from select query.",
DB::ErrorCodes::LOGICAL_ERROR);
}
@ -106,47 +105,36 @@ StorageMaterializedView::StorageMaterializedView(
if (!query.select)
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
if (!query.storage && query.to_table.empty())
/// If the destination table is not set, use inner table
has_inner_table = query.to_table.empty();
if (has_inner_table && !query.storage)
throw Exception(
"You must specify where to save results of a MaterializedView query: either ENGINE or an existing table in a TO clause",
ErrorCodes::INCORRECT_QUERY);
/// Default value, if only table name exist in the query
select_database_name = local_context.getCurrentDatabase();
if (query.select->list_of_selects->children.size() != 1)
throw Exception("UNION is not supported for MATERIALIZED VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW);
inner_query = query.select->list_of_selects->children.at(0);
auto & select_query = inner_query->as<ASTSelectQuery &>();
extractDependentTable(select_query, select_database_name, select_table_name);
select_table_id = extractDependentTableFromSelectQuery(select_query, local_context);
checkAllowedQueries(select_query);
if (!select_table_name.empty())
global_context.addDependency(
StorageID(select_database_name, select_table_name),
table_id_); //FIXME
// If the destination table is not set, use inner table
if (!query.to_table.empty())
if (!has_inner_table)
target_table_id = StorageID(query.to_database, query.to_table);
else if (attach_)
{
target_database_name = query.to_database;
target_table_name = query.to_table;
/// If there is an ATTACH request, then the internal table must already be created.
//TODO use uuid
target_table_id = StorageID(table_id_.database_name, generateInnerTableName(table_id_.table_name));
}
else
{
target_database_name = table_id_.database_name;
target_table_name = generateInnerTableName(table_id_.table_name);
has_inner_table = true;
}
/// If there is an ATTACH request, then the internal table must already be connected.
if (!attach_ && has_inner_table)
{
/// We will create a query to create an internal table.
auto manual_create_query = std::make_shared<ASTCreateQuery>();
manual_create_query->database = target_database_name;
manual_create_query->table = target_table_name;
manual_create_query->database = table_id_.database_name;
manual_create_query->table = generateInnerTableName(table_id_.table_name);
auto new_columns_list = std::make_shared<ASTColumns>();
new_columns_list->set(new_columns_list->columns, query.columns_list->columns->ptr());
@ -154,24 +142,15 @@ StorageMaterializedView::StorageMaterializedView(
manual_create_query->set(manual_create_query->columns_list, new_columns_list);
manual_create_query->set(manual_create_query->storage, query.storage->ptr());
/// Execute the query.
try
{
InterpreterCreateQuery create_interpreter(manual_create_query, local_context);
create_interpreter.setInternal(true);
create_interpreter.execute();
}
catch (...)
{
/// In case of any error we should remove dependency to the view.
if (!select_table_name.empty())
global_context.removeDependency(
StorageID(select_database_name, select_table_name),
table_id_); //FIXME
InterpreterCreateQuery create_interpreter(manual_create_query, local_context);
create_interpreter.setInternal(true);
create_interpreter.execute();
throw;
}
target_table_id = global_context.getTable(manual_create_query->database, manual_create_query->table)->getStorageID();
}
if (!select_table_id.empty())
global_context.addDependency(select_table_id, table_id_);
}
NameAndTypePair StorageMaterializedView::getColumn(const String & column_name) const
@ -218,14 +197,14 @@ BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const
}
static void executeDropQuery(ASTDropQuery::Kind kind, Context & global_context, const String & target_database_name, const String & target_table_name)
static void executeDropQuery(ASTDropQuery::Kind kind, Context & global_context, const StorageID & target_table_id)
{
if (global_context.tryGetTable(target_database_name, target_table_name))
if (global_context.tryGetTable(target_table_id))
{
/// We create and execute `drop` query for internal table.
auto drop_query = std::make_shared<ASTDropQuery>();
drop_query->database = target_database_name;
drop_query->table = target_table_name;
drop_query->database = target_table_id.database_name;
drop_query->table = target_table_id.table_name;
drop_query->kind = kind;
ASTPtr ast_drop_query = drop_query;
InterpreterDropQuery drop_interpreter(ast_drop_query, global_context);
@ -237,25 +216,23 @@ static void executeDropQuery(ASTDropQuery::Kind kind, Context & global_context,
void StorageMaterializedView::drop(TableStructureWriteLockHolder &)
{
auto table_id = getStorageID();
global_context.removeDependency(
StorageID(select_database_name, select_table_name),
table_id); //FIXME
global_context.removeDependency(select_table_id, table_id);
if (has_inner_table && tryGetTargetTable())
executeDropQuery(ASTDropQuery::Kind::Drop, global_context, target_database_name, target_table_name);
executeDropQuery(ASTDropQuery::Kind::Drop, global_context, target_table_id);
}
void StorageMaterializedView::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
{
if (has_inner_table)
executeDropQuery(ASTDropQuery::Kind::Truncate, global_context, target_database_name, target_table_name);
executeDropQuery(ASTDropQuery::Kind::Truncate, global_context, target_table_id);
}
void StorageMaterializedView::checkStatementCanBeForwarded() const
{
if (!has_inner_table)
throw Exception(
"MATERIALIZED VIEW targets existing table " + target_database_name + "." + target_table_name + ". "
"MATERIALIZED VIEW targets existing table " + target_table_id.getNameForLogs() + ". "
+ "Execute the statement directly on it.", ErrorCodes::INCORRECT_QUERY);
}
@ -277,72 +254,52 @@ void StorageMaterializedView::mutate(const MutationCommands & commands, const Co
getTargetTable()->mutate(commands, context);
}
static void executeRenameQuery(Context & global_context, const String & database_name, const String & table_original_name, const String & new_table_name)
{
if (global_context.tryGetTable(database_name, table_original_name))
{
auto rename = std::make_shared<ASTRenameQuery>();
ASTRenameQuery::Table from;
from.database = database_name;
from.table = table_original_name;
ASTRenameQuery::Table to;
to.database = database_name;
to.table = new_table_name;
ASTRenameQuery::Element elem;
elem.from = from;
elem.to = to;
rename->elements.emplace_back(elem);
InterpreterRenameQuery(rename, global_context).execute();
}
}
void StorageMaterializedView::rename(
const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
if (has_inner_table && tryGetTargetTable())
{
String new_target_table_name = generateInnerTableName(new_table_name);
executeRenameQuery(global_context, target_database_name, target_table_name, new_target_table_name);
target_table_name = new_target_table_name;
auto new_target_table_name = generateInnerTableName(new_table_name);
auto rename = std::make_shared<ASTRenameQuery>();
ASTRenameQuery::Table from;
from.database = target_table_id.database_name;
from.table = target_table_id.table_name;
ASTRenameQuery::Table to;
to.database = target_table_id.database_name;
to.table = new_target_table_name;
ASTRenameQuery::Element elem;
elem.from = from;
elem.to = to;
rename->elements.emplace_back(elem);
InterpreterRenameQuery(rename, global_context).execute();
target_table_id.table_name = new_target_table_name;
}
auto lock = global_context.getLock();
auto table_id = getStorageID();
global_context.removeDependencyUnsafe(
StorageID(select_database_name, select_table_name),
table_id);
global_context.removeDependencyUnsafe(select_table_id, getStorageID());
IStorage::renameInMemory(new_database_name, new_table_name);
global_context.addDependencyUnsafe(
StorageID(select_database_name, select_table_name),
table_id);
global_context.addDependencyUnsafe(select_table_id, getStorageID());
}
void StorageMaterializedView::shutdown()
{
auto table_id = getStorageID();
/// Make sure the dependency is removed after DETACH TABLE
global_context.removeDependency(
StorageID(select_database_name, select_table_name),
table_id); //FIXME
global_context.removeDependency(select_table_id, table_id);
}
StoragePtr StorageMaterializedView::getTargetTable() const
{
return global_context.getTable(target_database_name, target_table_name);
return global_context.getTable(target_table_id);
}
StoragePtr StorageMaterializedView::tryGetTargetTable() const
{
return global_context.tryGetTable(target_database_name, target_table_name);
return global_context.tryGetTable(target_table_id);
}
Strings StorageMaterializedView::getDataPaths() const

View File

@ -9,6 +9,9 @@
namespace DB
{
StorageID extractDependentTableFromSelectQuery(ASTSelectQuery & query, Context & context, bool is_live_view = false, bool need_visitor = true);
class StorageMaterializedView : public ext::shared_ptr_helper<StorageMaterializedView>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageMaterializedView>;
@ -66,10 +69,8 @@ public:
Strings getDataPaths() const override;
private:
String select_database_name;
String select_table_name;
String target_database_name;
String target_table_name;
StorageID select_table_id;
StorageID target_table_id;
ASTPtr inner_query;
Context & global_context;
bool has_inner_table = false;