Better way to drop nested table for single storage

This commit is contained in:
kssenii 2021-04-10 17:58:09 +00:00
parent 1c501e7d97
commit bc228f4010
14 changed files with 135 additions and 173 deletions

View File

@ -1,5 +1,6 @@
#include "insertPostgreSQLValue.h"
#if USE_LIBPQXX
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
@ -233,3 +234,5 @@ void preparePostgreSQLArrayInfo(
array_info[column_idx] = {count_dimensions, default_value, parser};
}
}
#endif

View File

@ -1,5 +1,11 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h>
#include <Core/ExternalResultDescription.h>
@ -28,3 +34,5 @@ void preparePostgreSQLArrayInfo(
void insertDefaultPostgreSQLValue(IColumn & column, const IColumn & sample_column);
}
#endif

View File

@ -108,12 +108,10 @@ StoragePtr DatabaseAtomic::detachTable(const String & name)
void DatabaseAtomic::dropTable(const Context & context, const String & table_name, bool no_delay)
{
if (auto * mv = dynamic_cast<StorageMaterializedView *>(tryGetTable(table_name, context).get()))
{
/// Remove the inner table (if any) to avoid deadlock
/// (due to attempt to execute DROP from the worker thread)
mv->dropInnerTable(no_delay, context);
}
auto * storage = tryGetTable(table_name, context).get();
/// Remove the inner table (if any) to avoid deadlock
/// (due to attempt to execute DROP from the worker thread)
storage->dropInnerTableIfAny(no_delay, context);
String table_metadata_path = getObjectMetadataPath(table_name);
String table_metadata_path_drop;
@ -594,4 +592,3 @@ void DatabaseAtomic::waitDetachedTableNotInUse(const UUID & uuid)
}
}

View File

@ -70,9 +70,7 @@ void DatabaseMaterializePostgreSQL::startSynchronization()
auto storage = tryGetTable(table_name, global_context);
if (!storage)
{
storage = StorageMaterializePostgreSQL::create(StorageID(database_name, table_name), StoragePtr{}, global_context);
}
storage = StorageMaterializePostgreSQL::create(StorageID(database_name, table_name), global_context);
replication_handler->addStorage(table_name, storage->template as<StorageMaterializePostgreSQL>());
materialized_tables[table_name] = storage;
@ -151,13 +149,17 @@ void DatabaseMaterializePostgreSQL::createTable(const Context & context, const S
}
void DatabaseMaterializePostgreSQL::stopReplication()
{
if (replication_handler)
replication_handler->shutdown();
}
void DatabaseMaterializePostgreSQL::drop(const Context & context)
{
if (replication_handler)
{
replication_handler->shutdown();
replication_handler->shutdownFinal();
}
/// Remove metadata
Poco::File metadata(getMetadataPath() + METADATA_SUFFIX);

View File

@ -56,6 +56,8 @@ public:
void shutdown() override;
void stopReplication();
private:
void startSynchronization();

View File

@ -184,7 +184,7 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
table.primary_key_columns = readNamesAndTypesList(tx, postgres_table_name, query, use_nulls, true);
}
if (with_replica_identity_index)
if (with_replica_identity_index && !table.primary_key_columns)
{
query = fmt::format(
"SELECT "
@ -201,7 +201,7 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
"and a.attrelid = t.oid "
"and a.attnum = ANY(ix.indkey) "
"and t.relkind = 'r' " /// simple tables
"and t.relname = '{}' "
"and t.relname = '{}' " /// Connection is alread done to a needed database, only table name is needed.
"and ix.indisreplident = 't' " /// index is is replica identity index
"ORDER BY a.attname", /// column names
postgres_table_name);

View File

@ -23,7 +23,7 @@
#endif
#if USE_LIBPQXX
# include <Storages/PostgreSQL/StorageMaterializePostgreSQL.h>
# include <Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h>
#endif
namespace DB
@ -186,11 +186,6 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
table->shutdown();
#if USE_LIBPQXX
if (table->getName() == "MaterializePostgreSQL")
table->as<StorageMaterializePostgreSQL>()->shutdownFinal();
#endif
TableExclusiveLockHolder table_lock;
if (database->getUUID() == UUIDHelpers::Nil)
table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
@ -353,6 +348,10 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
#endif
if (auto * replicated = typeid_cast<DatabaseReplicated *>(database.get()))
replicated->stopReplication();
#if USE_LIBPQXX
if (auto * materialize_postgresql = typeid_cast<DatabaseMaterializePostgreSQL *>(database.get()))
materialize_postgresql->stopReplication();
#endif
if (database->shouldBeEmptyOnDetach())
{
@ -434,4 +433,33 @@ void InterpreterDropQuery::extendQueryLogElemImpl(QueryLogElement & elem, const
elem.query_kind = "Drop";
}
void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, const Context & global_context, const Context & current_context, const StorageID & target_table_id, bool no_delay)
{
if (DatabaseCatalog::instance().tryGetTable(target_table_id, current_context))
{
/// We create and execute `drop` query for internal table.
auto drop_query = std::make_shared<ASTDropQuery>();
drop_query->database = target_table_id.database_name;
drop_query->table = target_table_id.table_name;
drop_query->kind = kind;
drop_query->no_delay = no_delay;
drop_query->if_exists = true;
ASTPtr ast_drop_query = drop_query;
/// FIXME We have to use global context to execute DROP query for inner table
/// to avoid "Not enough privileges" error if current user has only DROP VIEW ON mat_view_name privilege
/// and not allowed to drop inner table explicitly. Allowing to drop inner table without explicit grant
/// looks like expected behaviour and we have tests for it.
auto drop_context = Context(global_context);
drop_context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (auto txn = current_context.getZooKeeperMetadataTransaction())
{
/// For Replicated database
drop_context.setQueryContext(const_cast<Context &>(current_context));
drop_context.initZooKeeperMetadataTransaction(txn, true);
}
InterpreterDropQuery drop_interpreter(ast_drop_query, drop_context);
drop_interpreter.execute();
}
}
}

View File

@ -26,6 +26,8 @@ public:
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr &, const Context &) const override;
static void executeDropQuery(ASTDropQuery::Kind kind, const Context & global_context, const Context & current_context, const StorageID & target_table_id, bool no_delay);
private:
AccessRightsElements getRequiredAccessForDDLOnCluster() const;
ASTPtr query_ptr;

View File

@ -345,6 +345,8 @@ public:
*/
virtual void drop() {}
virtual void dropInnerTableIfAny(bool /* no_delay */, const Context & /* context */) {}
/** Clear the table data and leave it empty.
* Must be called under exclusive lock (lockExclusively).
*/

View File

@ -4,10 +4,10 @@
#include <DataStreams/PostgreSQLBlockInputStream.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Storages/PostgreSQL/StorageMaterializePostgreSQL.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Common/setThreadName.h>
#include <DataStreams/copyData.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Poco/File.h>
@ -124,7 +124,7 @@ void PostgreSQLReplicationHandler::startSynchronization()
{
nested_storages[table_name] = storage->getNested();
storage->setStorageMetadata();
storage->setNestedLoaded();
storage->setNestedStatus(true);
}
catch (...)
{
@ -183,7 +183,7 @@ NameSet PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_na
assertBlocksHaveEqualStructure(input.getHeader(), block_io.out->getHeader(), "postgresql replica load from snapshot");
copyData(input, *block_io.out);
storage_data.second->setNestedLoaded();
storage_data.second->setNestedStatus(true);
nested_storages[table_name] = nested_storage;
/// This is needed if this method is called from reloadFromSnapshot() method below.
@ -401,7 +401,9 @@ std::unordered_map<Int32, String> PostgreSQLReplicationHandler::reloadFromSnapsh
const auto & table_name = relation.second;
auto * storage = storages[table_name];
sync_storages[table_name] = storage;
storage->dropNested();
auto nested_storage = storage->getNested();
storage->setNestedStatus(false);
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, context, context, nested_storage->getStorageID(), true);
}
auto replication_connection = postgres::createReplicationConnection(connection_info);

View File

@ -23,6 +23,7 @@
#include <common/logger_useful.h>
#include <Storages/ReadFinalForExternalReplicaStorage.h>
#include <Core/PostgreSQL/PostgreSQLConnectionPool.h>
#include <Interpreters/InterpreterDropQuery.h>
namespace DB
@ -52,6 +53,8 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
, replication_settings(std::move(replication_settings_))
, is_postgresql_replica_database(
DatabaseCatalog::instance().getDatabase(getStorageID().database_name)->getEngineName() == "MaterializePostgreSQL")
, nested_table_id(StorageID(table_id_.database_name, getNestedTableName()))
, nested_context(makeNestedTableContext())
{
setInMemoryMetadata(storage_metadata);
@ -70,16 +73,28 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
const StorageID & table_id_,
StoragePtr nested_storage_,
const Context & context_)
: IStorage(table_id_)
, global_context(context_)
, nested_storage(nested_storage_)
, is_postgresql_replica_database(true)
, nested_table_id(table_id_)
, nested_context(makeNestedTableContext())
{
}
StoragePtr StorageMaterializePostgreSQL::getNested() const
{
return DatabaseCatalog::instance().getTable(nested_table_id, nested_context);
}
StoragePtr StorageMaterializePostgreSQL::tryGetNested() const
{
return DatabaseCatalog::instance().tryGetTable(nested_table_id, nested_context);
}
std::string StorageMaterializePostgreSQL::getNestedTableName() const
{
auto table_name = getStorageID().table_name;
@ -91,6 +106,17 @@ std::string StorageMaterializePostgreSQL::getNestedTableName() const
}
void StorageMaterializePostgreSQL::setStorageMetadata()
{
/// If it is a MaterializePostgreSQL database engine, then storage with engine MaterializePostgreSQL
/// gets its metadata when it is fetch from postges, but if inner tables exist (i.e. it is a server restart)
/// then metadata for storage needs to be set from inner table metadata.
auto nested_table = getNested();
auto storage_metadata = nested_table->getInMemoryMetadataPtr();
setInMemoryMetadata(*storage_metadata);
}
std::shared_ptr<ASTColumnDeclaration> StorageMaterializePostgreSQL::getMaterializedColumnsDeclaration(
const String name, const String type, UInt64 default_value)
{
@ -150,13 +176,6 @@ ASTPtr StorageMaterializePostgreSQL::getColumnDeclaration(const DataTypePtr & da
}
void StorageMaterializePostgreSQL::setStorageMetadata()
{
auto storage_metadata = getNested()->getInMemoryMetadataPtr();
setInMemoryMetadata(*storage_metadata);
}
/// For single storage MaterializePostgreSQL get columns and primary key columns from storage definition.
/// For database engine MaterializePostgreSQL get columns and primary key columns by fetching from PostgreSQL, also using the same
/// transaction with snapshot, which is used for initial tables dump.
@ -231,8 +250,8 @@ ASTPtr StorageMaterializePostgreSQL::getCreateNestedTableQuery(PostgreSQLTableSt
columns_declare_list->set(columns_declare_list->columns, columns_expression_list);
columns_declare_list->columns->children.emplace_back(getMaterializedColumnsDeclaration("_sign", "Int8", UInt64(1)));
columns_declare_list->columns->children.emplace_back(getMaterializedColumnsDeclaration("_version", "UInt64", UInt64(1)));
columns_declare_list->columns->children.emplace_back(getMaterializedColumnsDeclaration("_sign", "Int8", 1));
columns_declare_list->columns->children.emplace_back(getMaterializedColumnsDeclaration("_version", "UInt64", 1));
create_table_query->set(create_table_query->columns_list, columns_declare_list);
@ -255,14 +274,6 @@ ASTPtr StorageMaterializePostgreSQL::getCreateNestedTableQuery(PostgreSQLTableSt
void StorageMaterializePostgreSQL::createNestedIfNeeded(PostgreSQLTableStructurePtr table_structure)
{
if (nested_loaded)
{
nested_storage = tryGetNested();
if (nested_storage)
return;
}
auto context = makeNestedTableContext();
const auto ast_create = getCreateNestedTableQuery(std::move(table_structure));
@ -275,8 +286,6 @@ void StorageMaterializePostgreSQL::createNestedIfNeeded(PostgreSQLTableStructure
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
nested_storage = getNested();
}
@ -290,32 +299,6 @@ Context StorageMaterializePostgreSQL::makeNestedTableContext() const
}
StoragePtr StorageMaterializePostgreSQL::getNested()
{
if (nested_storage)
return nested_storage;
auto context = makeNestedTableContext();
nested_storage = DatabaseCatalog::instance().getTable(
StorageID(getStorageID().database_name, getNestedTableName()), context);
return nested_storage;
}
StoragePtr StorageMaterializePostgreSQL::tryGetNested()
{
if (nested_storage)
return nested_storage;
auto context = makeNestedTableContext();
nested_storage = DatabaseCatalog::instance().tryGetTable(
StorageID(getStorageID().database_name, getNestedTableName()), context);
return nested_storage;
}
void StorageMaterializePostgreSQL::startup()
{
if (!is_postgresql_replica_database)
@ -333,47 +316,23 @@ void StorageMaterializePostgreSQL::shutdown()
}
void StorageMaterializePostgreSQL::shutdownFinal()
void StorageMaterializePostgreSQL::dropInnerTableIfAny(bool no_delay, const Context & context)
{
if (is_postgresql_replica_database)
return;
if (replication_handler)
replication_handler->shutdownFinal();
if (nested_storage)
dropNested();
}
void StorageMaterializePostgreSQL::dropNested()
{
std::lock_guard lock(nested_mutex);
nested_loaded = false;
auto table_id = nested_storage->getStorageID();
auto ast_drop = std::make_shared<ASTDropQuery>();
ast_drop->kind = ASTDropQuery::Drop;
ast_drop->table = table_id.table_name;
ast_drop->database = table_id.database_name;
ast_drop->if_exists = true;
auto context = makeNestedTableContext();
auto interpreter = InterpreterDropQuery(ast_drop, context);
interpreter.execute();
nested_storage = nullptr;
LOG_TRACE(&Poco::Logger::get("StorageMaterializePostgreSQL"), "Dropped (possibly temporarily) nested table {}", getNestedTableName());
auto nested_table = getNested();
if (nested_table && !is_postgresql_replica_database)
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, global_context, context, nested_table_id, no_delay);
}
NamesAndTypesList StorageMaterializePostgreSQL::getVirtuals() const
{
if (nested_storage)
return nested_storage->getVirtuals();
return {};
return NamesAndTypesList{
{"_sign", std::make_shared<DataTypeInt8>()},
{"_version", std::make_shared<DataTypeUInt64>()}
};
}
@ -386,26 +345,20 @@ Pipe StorageMaterializePostgreSQL::read(
size_t max_block_size,
unsigned num_streams)
{
std::unique_lock lock(nested_mutex, std::defer_lock);
if (!nested_loaded)
return Pipe();
if (nested_loaded && lock.try_lock())
{
if (!nested_storage)
getNested();
auto nested_table = getNested();
return readFinalFromNestedStorage(
nested_storage,
column_names,
metadata_snapshot,
query_info,
context,
processed_stage,
max_block_size,
num_streams);
}
LOG_WARNING(&Poco::Logger::get("StorageMaterializePostgreSQL"), "Nested table {} is unavailable or is not loaded yet", getNestedTableName());
return Pipe();
return readFinalFromNestedStorage(
nested_table,
column_names,
metadata_snapshot,
query_info,
context,
processed_stage,
max_block_size,
num_streams);
}

View File

@ -31,14 +31,16 @@ class StorageMaterializePostgreSQL final : public ext::shared_ptr_helper<Storage
public:
StorageMaterializePostgreSQL(
const StorageID & table_id_,
StoragePtr nested_storage_,
const Context & context_);
String getName() const override { return "MaterializePostgreSQL"; }
void startup() override;
void shutdown() override;
void dropInnerTableIfAny(bool no_delay, const Context & context) override;
NamesAndTypesList getVirtuals() const override;
Pipe read(
@ -50,25 +52,18 @@ public:
size_t max_block_size,
unsigned num_streams) override;
/// Called right after shutdown() in case of drop query
void shutdownFinal();
void createNestedIfNeeded(PostgreSQLTableStructurePtr table_structure);
/// Can be nullptr
StoragePtr tryGetNested();
StoragePtr getNested() const;
/// Throw if impossible to get
StoragePtr getNested();
StoragePtr tryGetNested() const;
Context makeNestedTableContext() const;
void setNestedLoaded() { nested_loaded.store(true); }
void setNestedStatus(bool loaded) { nested_loaded.store(loaded); }
bool isNestedLoaded() { return nested_loaded.load(); }
void dropNested();
void setStorageMetadata();
protected:
@ -98,10 +93,9 @@ private:
std::unique_ptr<PostgreSQLReplicationHandler> replication_handler;
std::atomic<bool> nested_loaded = false;
StoragePtr nested_storage;
std::mutex nested_mutex;
bool is_postgresql_replica_database = false;
StorageID nested_table_id;
const Context nested_context;
};
}

View File

@ -3,7 +3,6 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
@ -198,36 +197,6 @@ BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const
}
static void executeDropQuery(ASTDropQuery::Kind kind, const Context & global_context, const Context & current_context, const StorageID & target_table_id, bool no_delay)
{
if (DatabaseCatalog::instance().tryGetTable(target_table_id, current_context))
{
/// We create and execute `drop` query for internal table.
auto drop_query = std::make_shared<ASTDropQuery>();
drop_query->database = target_table_id.database_name;
drop_query->table = target_table_id.table_name;
drop_query->kind = kind;
drop_query->no_delay = no_delay;
drop_query->if_exists = true;
ASTPtr ast_drop_query = drop_query;
/// FIXME We have to use global context to execute DROP query for inner table
/// to avoid "Not enough privileges" error if current user has only DROP VIEW ON mat_view_name privilege
/// and not allowed to drop inner table explicitly. Allowing to drop inner table without explicit grant
/// looks like expected behaviour and we have tests for it.
auto drop_context = Context(global_context);
drop_context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (auto txn = current_context.getZooKeeperMetadataTransaction())
{
/// For Replicated database
drop_context.setQueryContext(const_cast<Context &>(current_context));
drop_context.initZooKeeperMetadataTransaction(txn, true);
}
InterpreterDropQuery drop_interpreter(ast_drop_query, drop_context);
drop_interpreter.execute();
}
}
void StorageMaterializedView::drop()
{
auto table_id = getStorageID();
@ -235,19 +204,19 @@ void StorageMaterializedView::drop()
if (!select_query.select_table_id.empty())
DatabaseCatalog::instance().removeDependency(select_query.select_table_id, table_id);
dropInnerTable(true, global_context);
dropInnerTableIfAny(true, global_context);
}
void StorageMaterializedView::dropInnerTable(bool no_delay, const Context & context)
void StorageMaterializedView::dropInnerTableIfAny(bool no_delay, const Context & context)
{
if (has_inner_table && tryGetTargetTable())
executeDropQuery(ASTDropQuery::Kind::Drop, global_context, context, target_table_id, no_delay);
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, global_context, context, target_table_id, no_delay);
}
void StorageMaterializedView::truncate(const ASTPtr &, const StorageMetadataPtr &, const Context & context, TableExclusiveLockHolder &)
{
if (has_inner_table)
executeDropQuery(ASTDropQuery::Kind::Truncate, global_context, context, target_table_id, true);
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Truncate, global_context, context, target_table_id, true);
}
void StorageMaterializedView::checkStatementCanBeForwarded() const

View File

@ -37,7 +37,7 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
void drop() override;
void dropInnerTable(bool no_delay, const Context & context);
void dropInnerTableIfAny(bool no_delay, const Context & context) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override;