Almost done

This commit is contained in:
kssenii 2021-08-27 09:30:21 +03:00
parent 2dfd5b14db
commit 4cd62227cf
33 changed files with 673 additions and 139 deletions

View File

@ -71,6 +71,8 @@ enum class AccessType
M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \ M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \ M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
\ \
M(ALTER_DATABASE_SETTINGS, "ALTER DATABASE SETTING, ALTER MODIFY DATABASE SETTING, MODIFY DATABASE SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\
\
M(ALTER_TABLE, "", GROUP, ALTER) \ M(ALTER_TABLE, "", GROUP, ALTER) \
\ \
M(ALTER_VIEW_REFRESH, "ALTER LIVE VIEW REFRESH, REFRESH VIEW", VIEW, ALTER_VIEW) \ M(ALTER_VIEW_REFRESH, "ALTER LIVE VIEW REFRESH, REFRESH VIEW", VIEW, ALTER_VIEW) \

View File

@ -577,12 +577,9 @@
M(606, BACKUP_IS_EMPTY) \ M(606, BACKUP_IS_EMPTY) \
M(607, BACKUP_ELEMENT_DUPLICATE) \ M(607, BACKUP_ELEMENT_DUPLICATE) \
M(608, CANNOT_RESTORE_TABLE) \ M(608, CANNOT_RESTORE_TABLE) \
M(609, FUNCTION_ALREADY_EXISTS) \ M(609, POSTGRESQL_CONNECTION_FAILURE) \
M(610, CANNOT_DROP_SYSTEM_FUNCTION) \ M(610, POSTGRESQL_REPLICATION_INTERNAL_ERROR) \
M(611, CANNOT_CREATE_RECURSIVE_FUNCTION) \ M(611, QUERY_NOT_ALLOWED) \
M(612, OBJECT_ALREADY_STORED_ON_DISK) \
M(613, OBJECT_WAS_NOT_STORED_ON_DISK) \
M(614, POSTGRESQL_CONNECTION_FAILURE) \
\ \
M(999, KEEPER_EXCEPTION) \ M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \ M(1000, POCO_EXCEPTION) \

View File

@ -3,6 +3,7 @@
#if USE_LIBPQXX #if USE_LIBPQXX
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <iostream>
namespace postgres namespace postgres
{ {
@ -42,7 +43,6 @@ void Connection::execWithRetry(const std::function<void(pqxx::nontransaction &)>
pqxx::connection & Connection::getRef() pqxx::connection & Connection::getRef()
{ {
connect(); connect();
assert(connection != nullptr);
return *connection; return *connection;
} }

View File

@ -19,6 +19,15 @@ ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, S
return std::make_pair(out.str(), host + ':' + DB::toString(port)); return std::make_pair(out.str(), host + ':' + DB::toString(port));
} }
String formatNameForLogs(const String & postgres_database_name, const String & postgres_table_name)
{
if (postgres_database_name.empty())
return postgres_table_name;
if (postgres_table_name.empty())
return postgres_database_name;
return fmt::format("{}.{}", postgres_database_name, postgres_table_name);
}
} }
#endif #endif

View File

@ -19,7 +19,11 @@ namespace pqxx
namespace postgres namespace postgres
{ {
ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, String user, String password); ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, String user, String password);
String formatNameForLogs(const String & postgres_database_name, const String & postgres_table_name);
} }
#endif #endif

View File

@ -2,6 +2,7 @@
#include <Databases/DatabaseOnDisk.h> #include <Databases/DatabaseOnDisk.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <Parsers/formatAST.h> #include <Parsers/formatAST.h>
#include <Common/renameat2.h> #include <Common/renameat2.h>
@ -36,19 +37,20 @@ public:
UUID uuid() const override { return table()->getStorageID().uuid; } UUID uuid() const override { return table()->getStorageID().uuid; }
}; };
DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, const String & logger_name, ContextPtr context_) DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, const String & logger_name, ContextPtr context_, ASTPtr storage_def_)
: DatabaseOrdinary(name_, std::move(metadata_path_), "store/", logger_name, context_) : DatabaseOrdinary(name_, std::move(metadata_path_), "store/", logger_name, context_)
, path_to_table_symlinks(fs::path(getContext()->getPath()) / "data" / escapeForFileName(name_) / "") , path_to_table_symlinks(fs::path(getContext()->getPath()) / "data" / escapeForFileName(name_) / "")
, path_to_metadata_symlink(fs::path(getContext()->getPath()) / "metadata" / escapeForFileName(name_)) , path_to_metadata_symlink(fs::path(getContext()->getPath()) / "metadata" / escapeForFileName(name_))
, db_uuid(uuid) , db_uuid(uuid)
, storage_def(storage_def_)
{ {
assert(db_uuid != UUIDHelpers::Nil); assert(db_uuid != UUIDHelpers::Nil);
fs::create_directories(path_to_table_symlinks); fs::create_directories(path_to_table_symlinks);
tryCreateMetadataSymlink(); tryCreateMetadataSymlink();
} }
DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, ContextPtr context_) DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, ContextPtr context_, ASTPtr storage_def_)
: DatabaseAtomic(name_, std::move(metadata_path_), uuid, "DatabaseAtomic (" + name_ + ")", context_) : DatabaseAtomic(name_, std::move(metadata_path_), uuid, "DatabaseAtomic (" + name_ + ")", context_, storage_def_)
{ {
} }
@ -566,4 +568,62 @@ void DatabaseAtomic::checkDetachedTableNotInUse(const UUID & uuid)
assertDetachedTableNotInUse(uuid); assertDetachedTableNotInUse(uuid);
} }
void DatabaseAtomic::modifySettings(const SettingsChanges & settings_changes, ContextPtr local_context)
{
applySettings(settings_changes, local_context);
ASTCreateQuery create;
create.attach = true;
create.database = "_";
create.uuid = getUUID();
create.if_not_exists = false;
create.storage = assert_cast<ASTStorage *>(storage_def.get());
auto * ast_set_query = create.storage->settings;
if (ast_set_query)
{
auto & previous_settings = ast_set_query->changes;
for (const auto & change : settings_changes)
{
auto it = std::find_if(previous_settings.begin(), previous_settings.end(),
[&](const auto & prev){ return prev.name == change.name; });
if (it != previous_settings.end())
it->value = change.value;
else
previous_settings.push_back(change);
}
}
else
{
auto settings = std::make_shared<ASTSetQuery>();
settings->is_standalone = false;
settings->changes = settings_changes;
create.storage->set(create.storage->settings, settings->clone());
}
create.attach = true;
create.if_not_exists = false;
WriteBufferFromOwnString statement_buf;
formatAST(create, statement_buf, false);
writeChar('\n', statement_buf);
String statement = statement_buf.str();
String database_name_escaped = escapeForFileName(database_name);
fs::path metadata_root_path = fs::canonical(local_context->getGlobalContext()->getPath());
fs::path metadata_file_tmp_path = fs::path(metadata_root_path) / "metadata" / (database_name_escaped + ".sql.tmp");
fs::path metadata_file_path = fs::path(metadata_root_path) / "metadata" / (database_name_escaped + ".sql");
/// Exclusive flag guarantees, that database is not created right now in another thread.
WriteBufferFromFile out(metadata_file_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(statement, out);
out.next();
if (getContext()->getSettingsRef().fsync_metadata)
out.sync();
out.close();
fs::rename(metadata_file_tmp_path, metadata_file_path);
}
} }

View File

@ -19,8 +19,8 @@ namespace DB
class DatabaseAtomic : public DatabaseOrdinary class DatabaseAtomic : public DatabaseOrdinary
{ {
public: public:
DatabaseAtomic(String name_, String metadata_path_, UUID uuid, const String & logger_name, ContextPtr context_); DatabaseAtomic(String name_, String metadata_path_, UUID uuid, const String & logger_name, ContextPtr context_, ASTPtr storage_def_);
DatabaseAtomic(String name_, String metadata_path_, UUID uuid, ContextPtr context_); DatabaseAtomic(String name_, String metadata_path_, UUID uuid, ContextPtr context_, ASTPtr storage_def_);
String getEngineName() const override { return "Atomic"; } String getEngineName() const override { return "Atomic"; }
UUID getUUID() const override { return db_uuid; } UUID getUUID() const override { return db_uuid; }
@ -61,6 +61,8 @@ public:
void checkDetachedTableNotInUse(const UUID & uuid) override; void checkDetachedTableNotInUse(const UUID & uuid) override;
void setDetachedTableNotInUseForce(const UUID & uuid); void setDetachedTableNotInUseForce(const UUID & uuid);
void modifySettings(const SettingsChanges & settings_changes, ContextPtr local_context) override;
protected: protected:
void commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path, const String & statement, ContextPtr query_context) override; void commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path, const String & statement, ContextPtr query_context) override;
void commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table, void commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table,
@ -80,6 +82,7 @@ protected:
String path_to_table_symlinks; String path_to_table_symlinks;
String path_to_metadata_symlink; String path_to_metadata_symlink;
const UUID db_uuid; const UUID db_uuid;
ASTPtr storage_def;
}; };
} }

View File

@ -103,13 +103,20 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const String & engine_name = engine_define->engine->name; const String & engine_name = engine_define->engine->name;
const UUID & uuid = create.uuid; const UUID & uuid = create.uuid;
static const std::unordered_set<std::string_view> database_engines{"Ordinary", "Atomic", "Memory",
"Dictionary", "Lazy", "Replicated", "MySQL", "MaterializeMySQL", "MaterializedMySQL",
"Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite"};
if (!database_engines.contains(engine_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine name `{}` does not exist", engine_name);
static const std::unordered_set<std::string_view> engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL", static const std::unordered_set<std::string_view> engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL",
"Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite"}; "Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite"};
bool engine_may_have_arguments = engines_with_arguments.contains(engine_name); bool engine_may_have_arguments = engines_with_arguments.contains(engine_name);
if (engine_define->engine->arguments && !engine_may_have_arguments) if (engine_define->engine->arguments && !engine_may_have_arguments)
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have arguments", engine_name);
bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by || bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by ||
engine_define->primary_key || engine_define->order_by || engine_define->primary_key || engine_define->order_by ||
@ -117,13 +124,13 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL"; bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL";
if (has_unexpected_element || (!may_have_settings && engine_define->settings)) if (has_unexpected_element || (!may_have_settings && engine_define->settings))
throw Exception("Database engine " + engine_name + " cannot have parameters, primary_key, order_by, sample_by, settings", throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_AST,
ErrorCodes::UNKNOWN_ELEMENT_IN_AST); "Database engine `{}` cannot have parameters, primary_key, order_by, sample_by, settings", engine_name);
if (engine_name == "Ordinary") if (engine_name == "Ordinary")
return std::make_shared<DatabaseOrdinary>(database_name, metadata_path, context); return std::make_shared<DatabaseOrdinary>(database_name, metadata_path, context);
else if (engine_name == "Atomic") else if (engine_name == "Atomic")
return std::make_shared<DatabaseAtomic>(database_name, metadata_path, uuid, context); return std::make_shared<DatabaseAtomic>(database_name, metadata_path, uuid, context, engine_define->clone());
else if (engine_name == "Memory") else if (engine_name == "Memory")
return std::make_shared<DatabaseMemory>(database_name, context); return std::make_shared<DatabaseMemory>(database_name, context);
else if (engine_name == "Dictionary") else if (engine_name == "Dictionary")
@ -177,11 +184,11 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (create.uuid == UUIDHelpers::Nil) if (create.uuid == UUIDHelpers::Nil)
return std::make_shared<DatabaseMaterializedMySQL<DatabaseOrdinary>>( return std::make_shared<DatabaseMaterializedMySQL<DatabaseOrdinary>>(
context, database_name, metadata_path, uuid, mysql_database_name, std::move(mysql_pool), std::move(client) context, database_name, metadata_path, uuid, mysql_database_name, std::move(mysql_pool), std::move(client)
, std::move(materialize_mode_settings)); , std::move(materialize_mode_settings), engine_define->clone());
else else
return std::make_shared<DatabaseMaterializedMySQL<DatabaseAtomic>>( return std::make_shared<DatabaseMaterializedMySQL<DatabaseAtomic>>(
context, database_name, metadata_path, uuid, mysql_database_name, std::move(mysql_pool), std::move(client) context, database_name, metadata_path, uuid, mysql_database_name, std::move(mysql_pool), std::move(client)
, std::move(materialize_mode_settings)); , std::move(materialize_mode_settings), engine_define->clone());
} }
catch (...) catch (...)
{ {
@ -227,7 +234,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
return std::make_shared<DatabaseReplicated>(database_name, metadata_path, uuid, return std::make_shared<DatabaseReplicated>(database_name, metadata_path, uuid,
zookeeper_path, shard_name, replica_name, zookeeper_path, shard_name, replica_name,
std::move(database_replicated_settings), context); std::move(database_replicated_settings), context, engine_define->clone());
} }
#if USE_LIBPQXX #if USE_LIBPQXX
@ -304,7 +311,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
postgresql_replica_settings->loadFromQuery(*engine_define); postgresql_replica_settings->loadFromQuery(*engine_define);
return std::make_shared<DatabaseMaterializedPostgreSQL>( return std::make_shared<DatabaseMaterializedPostgreSQL>(
context, metadata_path, uuid, engine_define, create.attach, context, metadata_path, uuid, engine_define->clone(), create.attach,
database_name, postgres_database_name, connection_info, database_name, postgres_database_name, connection_info,
std::move(postgresql_replica_settings)); std::move(postgresql_replica_settings));
} }

View File

@ -67,8 +67,9 @@ DatabaseReplicated::DatabaseReplicated(
const String & shard_name_, const String & shard_name_,
const String & replica_name_, const String & replica_name_,
DatabaseReplicatedSettings db_settings_, DatabaseReplicatedSettings db_settings_,
ContextPtr context_) ContextPtr context_,
: DatabaseAtomic(name_, metadata_path_, uuid, "DatabaseReplicated (" + name_ + ")", context_) ASTPtr storage_def_)
: DatabaseAtomic(name_, metadata_path_, uuid, "DatabaseReplicated (" + name_ + ")", context_, storage_def_)
, zookeeper_path(zookeeper_path_) , zookeeper_path(zookeeper_path_)
, shard_name(shard_name_) , shard_name(shard_name_)
, replica_name(replica_name_) , replica_name(replica_name_)

View File

@ -24,7 +24,7 @@ public:
DatabaseReplicated(const String & name_, const String & metadata_path_, UUID uuid, DatabaseReplicated(const String & name_, const String & metadata_path_, UUID uuid,
const String & zookeeper_path_, const String & shard_name_, const String & replica_name_, const String & zookeeper_path_, const String & shard_name_, const String & replica_name_,
DatabaseReplicatedSettings db_settings_, DatabaseReplicatedSettings db_settings_,
ContextPtr context); ContextPtr context, ASTPtr storage_def_);
~DatabaseReplicated() override; ~DatabaseReplicated() override;

View File

@ -24,6 +24,8 @@ struct IndicesDescription;
struct StorageInMemoryMetadata; struct StorageInMemoryMetadata;
struct StorageID; struct StorageID;
class ASTCreateQuery; class ASTCreateQuery;
class AlterCommands;
class SettingsChanges;
using DictionariesWithID = std::vector<std::pair<String, UUID>>; using DictionariesWithID = std::vector<std::pair<String, UUID>>;
namespace ErrorCodes namespace ErrorCodes
@ -272,6 +274,21 @@ public:
/// Delete data and metadata stored inside the database, if exists. /// Delete data and metadata stored inside the database, if exists.
virtual void drop(ContextPtr /*context*/) {} virtual void drop(ContextPtr /*context*/) {}
virtual void checkAlterIsPossible(const AlterCommands & /* commands */, ContextPtr /* context */) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter is not supported by database engine {}", getEngineName());
}
virtual void modifySettings(const SettingsChanges &, ContextPtr)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Database engine {} does not support settings", getEngineName());
}
virtual void applySettings(const SettingsChanges &, ContextPtr)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Database engine {} does not support settings", getEngineName());
}
virtual ~IDatabase() = default; virtual ~IDatabase() = default;
protected: protected:

View File

@ -37,7 +37,8 @@ DatabaseMaterializedMySQL<DatabaseOrdinary>::DatabaseMaterializedMySQL(
const String & mysql_database_name_, const String & mysql_database_name_,
mysqlxx::Pool && pool_, mysqlxx::Pool && pool_,
MySQLClient && client_, MySQLClient && client_,
std::unique_ptr<MaterializedMySQLSettings> settings_) std::unique_ptr<MaterializedMySQLSettings> settings_,
ASTPtr)
: DatabaseOrdinary( : DatabaseOrdinary(
database_name_, database_name_,
metadata_path_, metadata_path_,
@ -58,8 +59,9 @@ DatabaseMaterializedMySQL<DatabaseAtomic>::DatabaseMaterializedMySQL(
const String & mysql_database_name_, const String & mysql_database_name_,
mysqlxx::Pool && pool_, mysqlxx::Pool && pool_,
MySQLClient && client_, MySQLClient && client_,
std::unique_ptr<MaterializedMySQLSettings> settings_) std::unique_ptr<MaterializedMySQLSettings> settings_,
: DatabaseAtomic(database_name_, metadata_path_, uuid, "DatabaseMaterializedMySQL<Atomic> (" + database_name_ + ")", context_) ASTPtr storage_def_)
: DatabaseAtomic(database_name_, metadata_path_, uuid, "DatabaseMaterializedMySQL<Atomic> (" + database_name_ + ")", context_, storage_def_)
, settings(std::move(settings_)) , settings(std::move(settings_))
, materialize_thread(context_, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get()) , materialize_thread(context_, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get())
{ {

View File

@ -25,7 +25,7 @@ public:
DatabaseMaterializedMySQL( DatabaseMaterializedMySQL(
ContextPtr context, const String & database_name_, const String & metadata_path_, UUID uuid, ContextPtr context, const String & database_name_, const String & metadata_path_, UUID uuid,
const String & mysql_database_name_, mysqlxx::Pool && pool_, const String & mysql_database_name_, mysqlxx::Pool && pool_,
MySQLClient && client_, std::unique_ptr<MaterializedMySQLSettings> settings_); MySQLClient && client_, std::unique_ptr<MaterializedMySQLSettings> settings_, ASTPtr storage_def_);
void rethrowExceptionIfNeed() const; void rethrowExceptionIfNeed() const;

View File

@ -10,12 +10,14 @@
#include <Databases/DatabaseOrdinary.h> #include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseAtomic.h> #include <Databases/DatabaseAtomic.h>
#include <Storages/StoragePostgreSQL.h> #include <Storages/StoragePostgreSQL.h>
#include <Storages/AlterCommands.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h> #include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h> #include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h> #include <Parsers/queryToString.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Poco/DirectoryIterator.h> #include <Poco/DirectoryIterator.h>
#include <Poco/File.h> #include <Poco/File.h>
@ -30,20 +32,20 @@ namespace ErrorCodes
{ {
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int QUERY_NOT_ALLOWED;
} }
DatabaseMaterializedPostgreSQL::DatabaseMaterializedPostgreSQL( DatabaseMaterializedPostgreSQL::DatabaseMaterializedPostgreSQL(
ContextPtr context_, ContextPtr context_,
const String & metadata_path_, const String & metadata_path_,
UUID uuid_, UUID uuid_,
const ASTStorage * database_engine_define_, ASTPtr storage_def_,
bool is_attach_, bool is_attach_,
const String & database_name_, const String & database_name_,
const String & postgres_database_name, const String & postgres_database_name,
const postgres::ConnectionInfo & connection_info_, const postgres::ConnectionInfo & connection_info_,
std::unique_ptr<MaterializedPostgreSQLSettings> settings_) std::unique_ptr<MaterializedPostgreSQLSettings> settings_)
: DatabaseAtomic(database_name_, metadata_path_, uuid_, "DatabaseMaterializedPostgreSQL (" + database_name_ + ")", context_) : DatabaseAtomic(database_name_, metadata_path_, uuid_, "DatabaseMaterializedPostgreSQL (" + database_name_ + ")", context_, storage_def_)
, database_engine_define(database_engine_define_->clone())
, is_attach(is_attach_) , is_attach(is_attach_)
, remote_database_name(postgres_database_name) , remote_database_name(postgres_database_name)
, connection_info(connection_info_) , connection_info(connection_info_)
@ -66,11 +68,10 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
/* is_materialized_postgresql_database = */ true, /* is_materialized_postgresql_database = */ true,
settings->materialized_postgresql_tables_list.value); settings->materialized_postgresql_tables_list.value);
postgres::Connection connection(connection_info);
NameSet tables_to_replicate; NameSet tables_to_replicate;
try try
{ {
tables_to_replicate = replication_handler->fetchRequiredTables(connection); tables_to_replicate = replication_handler->fetchRequiredTables();
} }
catch (...) catch (...)
{ {
@ -89,12 +90,12 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
if (storage) if (storage)
{ {
/// Nested table was already created and synchronized. /// Nested table was already created and synchronized.
storage = StorageMaterializedPostgreSQL::create(storage, getContext()); storage = StorageMaterializedPostgreSQL::create(storage, getContext(), remote_database_name, table_name);
} }
else else
{ {
/// Nested table does not exist and will be created by replication thread. /// Nested table does not exist and will be created by replication thread.
storage = StorageMaterializedPostgreSQL::create(StorageID(database_name, table_name), getContext()); storage = StorageMaterializedPostgreSQL::create(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
} }
/// Cache MaterializedPostgreSQL wrapper over nested table. /// Cache MaterializedPostgreSQL wrapper over nested table.
@ -124,7 +125,34 @@ void DatabaseMaterializedPostgreSQL::loadStoredObjects(ContextMutablePtr local_c
if (!force_attach) if (!force_attach)
throw; throw;
} }
}
void DatabaseMaterializedPostgreSQL::checkAlterIsPossible(const AlterCommands & commands, ContextPtr) const
{
for (const auto & command : commands)
{
if (command.type != AlterCommand::MODIFY_DATABASE_SETTING)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by database engine {}", alterTypeToString(command.type), getEngineName());
}
}
void DatabaseMaterializedPostgreSQL::applySettings(const SettingsChanges & settings_changes, ContextPtr local_context)
{
for (const auto & change : settings_changes)
{
if (!settings->has(change.name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} does not support setting `{}`", getEngineName(), change.name);
if (change.name == "materialized_postgresql_tables_list")
{
if (local_context->isInternalQuery() || materialized_tables.empty())
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Changin settings `{}` is allowed only internally. Use CREATE TABLE query", change.name);
}
settings->applyChange(change);
}
} }
@ -164,8 +192,38 @@ void DatabaseMaterializedPostgreSQL::createTable(ContextPtr local_context, const
return; return;
} }
throw Exception(ErrorCodes::NOT_IMPLEMENTED, throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "CREATE TABLE is not allowed for database engine {}", getEngineName());
"Create table query allowed only for ReplacingMergeTree engine and from synchronization thread"); }
void DatabaseMaterializedPostgreSQL::attachTable(const String & table_name, const StoragePtr & table, const String & relative_table_path)
{
if (CurrentThread::isInitialized() && CurrentThread::get().getQueryContext())
{
auto set = std::make_shared<ASTSetQuery>();
set->is_standalone = false;
auto tables_to_replicate = settings->materialized_postgresql_tables_list.value;
set->changes = {SettingChange("materialized_postgresql_tables_list", tables_to_replicate.empty() ? table_name : (tables_to_replicate + "," + table_name))};
auto command = std::make_shared<ASTAlterCommand>();
command->type = ASTAlterCommand::Type::MODIFY_DATABASE_SETTING;
command->children.emplace_back(std::move(set));
auto expr = std::make_shared<ASTExpressionList>();
expr->children.push_back(command);
ASTAlterQuery alter;
alter.alter_object = ASTAlterQuery::AlterObjectType::DATABASE;
alter.children.emplace_back(std::move(expr));
auto storage = StorageMaterializedPostgreSQL::create(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
materialized_tables[table_name] = storage;
replication_handler->addTableToReplication(dynamic_cast<StorageMaterializedPostgreSQL *>(storage.get()), table_name);
}
else
{
DatabaseAtomic::attachTable(table_name, table, relative_table_path);
}
} }
@ -209,6 +267,63 @@ DatabaseTablesIteratorPtr DatabaseMaterializedPostgreSQL::getTablesIterator(
return DatabaseAtomic::getTablesIterator(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), filter_by_table_name); return DatabaseAtomic::getTablesIterator(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), filter_by_table_name);
} }
static ASTPtr getColumnDeclaration(const DataTypePtr & data_type)
{
WhichDataType which(data_type);
if (which.isNullable())
return makeASTFunction("Nullable", getColumnDeclaration(typeid_cast<const DataTypeNullable *>(data_type.get())->getNestedType()));
if (which.isArray())
return makeASTFunction("Array", getColumnDeclaration(typeid_cast<const DataTypeArray *>(data_type.get())->getNestedType()));
return std::make_shared<ASTIdentifier>(data_type->getName());
}
ASTPtr DatabaseMaterializedPostgreSQL::getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const
{
if (!local_context->hasQueryContext())
return DatabaseAtomic::getCreateTableQueryImpl(table_name, local_context, throw_on_error);
/// Note: here we make an assumption that table structure will not change between call to this method and to attachTable().
auto storage = StorageMaterializedPostgreSQL::create(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
replication_handler->addStructureToMaterializedStorage(storage.get(), table_name);
auto create_table_query = std::make_shared<ASTCreateQuery>();
auto table_storage_define = storage_def->clone();
create_table_query->set(create_table_query->storage, table_storage_define);
auto columns_declare_list = std::make_shared<ASTColumns>();
auto columns_expression_list = std::make_shared<ASTExpressionList>();
columns_declare_list->set(columns_declare_list->columns, columns_expression_list);
create_table_query->set(create_table_query->columns_list, columns_declare_list);
/// init create query.
auto table_id = storage->getStorageID();
create_table_query->table = table_id.table_name;
create_table_query->database = table_id.database_name;
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
for (const auto & column_type_and_name : metadata_snapshot->getColumns().getAllPhysical())
{
const auto & column_declaration = std::make_shared<ASTColumnDeclaration>();
column_declaration->name = column_type_and_name.name;
column_declaration->type = getColumnDeclaration(column_type_and_name.type);
columns_expression_list->children.emplace_back(column_declaration);
}
ASTStorage * ast_storage = table_storage_define->as<ASTStorage>();
ASTs storage_children = ast_storage->children;
auto storage_engine_arguments = ast_storage->engine->arguments;
/// Add table_name to engine arguments
storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 2, std::make_shared<ASTLiteral>(table_id.table_name));
return create_table_query;
}
} }

View File

@ -32,7 +32,7 @@ public:
ContextPtr context_, ContextPtr context_,
const String & metadata_path_, const String & metadata_path_,
UUID uuid_, UUID uuid_,
const ASTStorage * database_engine_define_, ASTPtr storage_def_,
bool is_attach_, bool is_attach_,
const String & database_name_, const String & database_name_,
const String & postgres_database_name, const String & postgres_database_name,
@ -52,18 +52,26 @@ public:
void createTable(ContextPtr context, const String & name, const StoragePtr & table, const ASTPtr & query) override; void createTable(ContextPtr context, const String & name, const StoragePtr & table, const ASTPtr & query) override;
void attachTable(const String & name, const StoragePtr & table, const String & relative_table_path) override;
void dropTable(ContextPtr local_context, const String & name, bool no_delay) override; void dropTable(ContextPtr local_context, const String & name, bool no_delay) override;
void drop(ContextPtr local_context) override; void drop(ContextPtr local_context) override;
void stopReplication(); void stopReplication();
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override;
void applySettings(const SettingsChanges & settings_changes, ContextPtr context) override;
void shutdown() override; void shutdown() override;
protected:
ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const override;
private: private:
void startSynchronization(); void startSynchronization();
ASTPtr database_engine_define;
bool is_attach; bool is_attach;
String remote_database_name; String remote_database_name;
postgres::ConnectionInfo connection_info; postgres::ConnectionInfo connection_info;

View File

@ -290,12 +290,20 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::ReplicationTransaction & tx, const String & postgres_table_name, bool use_nulls, pqxx::ReplicationTransaction & tx, const String & postgres_table_name, bool use_nulls,
bool with_primary_key, bool with_replica_identity_index); bool with_primary_key, bool with_replica_identity_index);
template
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::nontransaction & tx, const String & postgres_table_name, bool use_nulls,
bool with_primary_key, bool with_replica_identity_index);
template template
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::work & tx, const String & postgres_schema); std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::work & tx, const String & postgres_schema);
template template
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::ReadTransaction & tx, const String & postgres_schema); std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::ReadTransaction & tx, const String & postgres_schema);
template
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::nontransaction & tx, const String & postgres_schema);
} }
#endif #endif

View File

@ -241,7 +241,7 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
#if USE_LIBPQXX #if USE_LIBPQXX
if (!context_->isInternalQuery() && (db_and_table.first->getEngineName() == "MaterializedPostgreSQL")) if (!context_->isInternalQuery() && (db_and_table.first->getEngineName() == "MaterializedPostgreSQL"))
{ {
db_and_table.second = std::make_shared<StorageMaterializedPostgreSQL>(std::move(db_and_table.second), getContext()); db_and_table.second = std::make_shared<StorageMaterializedPostgreSQL>(std::move(db_and_table.second), getContext(), "", db_and_table.second->getStorageID().table_name);
} }
#endif #endif

View File

@ -39,12 +39,25 @@ InterpreterAlterQuery::InterpreterAlterQuery(const ASTPtr & query_ptr_, ContextP
{ {
} }
BlockIO InterpreterAlterQuery::execute() BlockIO InterpreterAlterQuery::execute()
{ {
BlockIO res;
const auto & alter = query_ptr->as<ASTAlterQuery &>(); const auto & alter = query_ptr->as<ASTAlterQuery &>();
std::cerr << "\n\n\n" << query_ptr->dumpTree() << std::endl;
if (alter.alter_object == ASTAlterQuery::AlterObjectType::DATABASE)
return executeToDatabase(alter);
else if (alter.alter_object == ASTAlterQuery::AlterObjectType::DATABASE)
return executeToTable(alter);
else if (alter.alter_object == ASTAlterQuery::AlterObjectType::LIVE_VIEW)
return executeToTable(alter);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown alter");
}
BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
{
BlockIO res;
if (!alter.cluster.empty()) if (!alter.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, getContext(), getRequiredAccess()); return executeDDLQueryOnCluster(query_ptr, getContext(), getRequiredAccess());
@ -78,7 +91,9 @@ BlockIO InterpreterAlterQuery::execute()
{ {
auto * command_ast = child->as<ASTAlterCommand>(); auto * command_ast = child->as<ASTAlterCommand>();
if (auto alter_command = AlterCommand::parse(command_ast)) if (auto alter_command = AlterCommand::parse(command_ast))
{
alter_commands.emplace_back(std::move(*alter_command)); alter_commands.emplace_back(std::move(*alter_command));
}
else if (auto partition_command = PartitionCommand::parse(command_ast)) else if (auto partition_command = PartitionCommand::parse(command_ast))
{ {
partition_commands.emplace_back(std::move(*partition_command)); partition_commands.emplace_back(std::move(*partition_command));
@ -92,7 +107,9 @@ BlockIO InterpreterAlterQuery::execute()
mutation_commands.emplace_back(std::move(*mut_command)); mutation_commands.emplace_back(std::move(*mut_command));
} }
else if (auto live_view_command = LiveViewCommand::parse(command_ast)) else if (auto live_view_command = LiveViewCommand::parse(command_ast))
{
live_view_commands.emplace_back(std::move(*live_view_command)); live_view_commands.emplace_back(std::move(*live_view_command));
}
else else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR); throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
} }
@ -149,6 +166,30 @@ BlockIO InterpreterAlterQuery::execute()
} }
BlockIO InterpreterAlterQuery::executeToDatabase(const ASTAlterQuery & alter)
{
BlockIO res;
getContext()->checkAccess(getRequiredAccess());
DatabasePtr database = DatabaseCatalog::instance().getDatabase(alter.database);
AlterCommands alter_commands;
for (const auto & child : alter.command_list->children)
{
auto * command_ast = child->as<ASTAlterCommand>();
if (auto alter_command = AlterCommand::parse(command_ast))
alter_commands.emplace_back(std::move(*alter_command));
else
throw Exception("Wrong parameter type in ALTER DATABASE query", ErrorCodes::LOGICAL_ERROR);
}
if (!alter_commands.empty())
{
database->checkAlterIsPossible(alter_commands, getContext());
alter_commands.apply(database, getContext());
}
return res;
}
AccessRightsElements InterpreterAlterQuery::getRequiredAccess() const AccessRightsElements InterpreterAlterQuery::getRequiredAccess() const
{ {
AccessRightsElements required_access; AccessRightsElements required_access;
@ -343,6 +384,11 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
required_access.emplace_back(AccessType::ALTER_RENAME_COLUMN, database, table, column_name()); required_access.emplace_back(AccessType::ALTER_RENAME_COLUMN, database, table, column_name());
break; break;
} }
case ASTAlterCommand::MODIFY_DATABASE_SETTING:
{
required_access.emplace_back(AccessType::ALTER_DATABASE_SETTINGS, database, table);
break;
}
case ASTAlterCommand::NO_TYPE: break; case ASTAlterCommand::NO_TYPE: break;
} }
@ -354,7 +400,7 @@ void InterpreterAlterQuery::extendQueryLogElemImpl(QueryLogElement & elem, const
const auto & alter = ast->as<const ASTAlterQuery &>(); const auto & alter = ast->as<const ASTAlterQuery &>();
elem.query_kind = "Alter"; elem.query_kind = "Alter";
if (alter.command_list != nullptr) if (alter.command_list != nullptr && alter.alter_object != ASTAlterQuery::AlterObjectType::DATABASE)
{ {
// Alter queries already have their target table inserted into `elem`. // Alter queries already have their target table inserted into `elem`.
if (elem.query_tables.size() != 1) if (elem.query_tables.size() != 1)

View File

@ -9,6 +9,7 @@ namespace DB
class AccessRightsElements; class AccessRightsElements;
class ASTAlterCommand; class ASTAlterCommand;
class ASTAlterQuery;
/** Allows you add or remove a column in the table. /** Allows you add or remove a column in the table.
@ -28,6 +29,10 @@ public:
private: private:
AccessRightsElements getRequiredAccess() const; AccessRightsElements getRequiredAccess() const;
BlockIO executeToTable(const ASTAlterQuery & alter);
BlockIO executeToDatabase(const ASTAlterQuery & alter);
ASTPtr query_ptr; ASTPtr query_ptr;
}; };

View File

@ -389,6 +389,11 @@ void ASTAlterCommand::formatImpl(
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "RESET SETTING " << (settings.hilite ? hilite_none : ""); settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "RESET SETTING " << (settings.hilite ? hilite_none : "");
settings_resets->formatImpl(settings, state, frame); settings_resets->formatImpl(settings, state, frame);
} }
else if (type == ASTAlterCommand::MODIFY_DATABASE_SETTING)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY SETTING " << (settings.hilite ? hilite_none : "");
settings_changes->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::MODIFY_QUERY) else if (type == ASTAlterCommand::MODIFY_QUERY)
{ {
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY QUERY " << settings.nl_or_ws << (settings.hilite ? hilite_none : ""); settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY QUERY " << settings.nl_or_ws << (settings.hilite ? hilite_none : "");
@ -474,6 +479,8 @@ void ASTAlterQuery::formatQueryImpl(const FormatSettings & settings, FormatState
case AlterObjectType::LIVE_VIEW: case AlterObjectType::LIVE_VIEW:
settings.ostr << "ALTER LIVE VIEW "; settings.ostr << "ALTER LIVE VIEW ";
break; break;
default:
break;
} }
settings.ostr << (settings.hilite ? hilite_none : ""); settings.ostr << (settings.hilite ? hilite_none : "");

View File

@ -68,6 +68,8 @@ public:
NO_TYPE, NO_TYPE,
LIVE_VIEW_REFRESH, LIVE_VIEW_REFRESH,
MODIFY_DATABASE_SETTING,
}; };
Type type = NO_TYPE; Type type = NO_TYPE;
@ -212,11 +214,12 @@ public:
{ {
TABLE, TABLE,
DATABASE, DATABASE,
LIVE_VIEW LIVE_VIEW,
UNKNOWN,
}; };
// bool is_live_view{false}; /// true for ALTER LIVE VIEW // bool is_live_view{false}; /// true for ALTER LIVE VIEW
AlterObjectType alter_object = AlterObjectType::TABLE; AlterObjectType alter_object = AlterObjectType::UNKNOWN;
ASTExpressionList * command_list = nullptr; ASTExpressionList * command_list = nullptr;

View File

@ -141,12 +141,14 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
{ {
if (!parser_settings.parse(pos, command->settings_changes, expected)) if (!parser_settings.parse(pos, command->settings_changes, expected))
return false; return false;
command->type = ASTAlterCommand::MODIFY_SETTING; command->type = ASTAlterCommand::MODIFY_DATABASE_SETTING;
} }
else else
return false; return false;
break; break;
} }
default:
break;
case ASTAlterQuery::AlterObjectType::TABLE: case ASTAlterQuery::AlterObjectType::TABLE:
{ {
if (s_add_column.ignore(pos, expected)) if (s_add_column.ignore(pos, expected))
@ -828,16 +830,27 @@ bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
else else
return false; return false;
if (!parseDatabaseAndTableName(pos, expected, query->database, query->table)) if (alter_object_type == ASTAlterQuery::AlterObjectType::DATABASE)
return false;
String cluster_str;
if (ParserKeyword{"ON"}.ignore(pos, expected))
{ {
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected)) std::cerr << "\n\n\nOK!\n\n";
if (!parseDatabase(pos, expected, query->database))
return false; return false;
std::cerr << "database name: " << query->database << std::endl;
}
else
{
std::cerr << "\n\n\nNOT OK!\n\n";
if (!parseDatabaseAndTableName(pos, expected, query->database, query->table))
return false;
String cluster_str;
if (ParserKeyword{"ON"}.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
query->cluster = cluster_str;
} }
query->cluster = cluster_str;
ParserAlterCommandList p_command_list(alter_object_type); ParserAlterCommandList p_command_list(alter_object_type);
ASTPtr command_list; ASTPtr command_list;
@ -845,6 +858,8 @@ bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false; return false;
query->set(query->command_list, command_list); query->set(query->command_list, command_list);
query->alter_object = alter_object_type;
std::cerr << "\n\n\nalter query: " << query->dumpTree() << std::endl;
return true; return true;
} }

View File

@ -42,6 +42,22 @@ bool parseDatabaseAndTableName(IParser::Pos & pos, Expected & expected, String &
} }
bool parseDatabase(IParser::Pos & pos, Expected & expected, String & database_str)
{
ParserToken s_dot(TokenType::Dot);
ParserIdentifier identifier_parser;
ASTPtr database;
database_str = "";
if (!identifier_parser.parse(pos, database, expected))
return false;
tryGetIdentifierNameInto(database, database_str);
return true;
}
bool parseDatabaseAndTableNameOrAsterisks(IParser::Pos & pos, Expected & expected, String & database, bool & any_database, String & table, bool & any_table) bool parseDatabaseAndTableNameOrAsterisks(IParser::Pos & pos, Expected & expected, String & database, bool & any_database, String & table, bool & any_table)
{ {
return IParserBase::wrapParseImpl(pos, [&] return IParserBase::wrapParseImpl(pos, [&]

View File

@ -10,4 +10,6 @@ bool parseDatabaseAndTableName(IParser::Pos & pos, Expected & expected, String &
/// Parses [db.]name or [db.]* or [*.]* /// Parses [db.]name or [db.]* or [*.]*
bool parseDatabaseAndTableNameOrAsterisks(IParser::Pos & pos, Expected & expected, String & database, bool & any_database, String & table, bool & any_table); bool parseDatabaseAndTableNameOrAsterisks(IParser::Pos & pos, Expected & expected, String & database, bool & any_database, String & table, bool & any_table);
bool parseDatabase(IParser::Pos & pos, Expected & expected, String & database_str);
} }

View File

@ -312,6 +312,14 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
command.settings_changes = command_ast->settings_changes->as<ASTSetQuery &>().changes; command.settings_changes = command_ast->settings_changes->as<ASTSetQuery &>().changes;
return command; return command;
} }
else if (command_ast->type == ASTAlterCommand::MODIFY_DATABASE_SETTING)
{
AlterCommand command;
command.ast = command_ast->clone();
command.type = AlterCommand::MODIFY_DATABASE_SETTING;
command.settings_changes = command_ast->settings_changes->as<ASTSetQuery &>().changes;
return command;
}
else if (command_ast->type == ASTAlterCommand::RESET_SETTING) else if (command_ast->type == ASTAlterCommand::RESET_SETTING)
{ {
AlterCommand command; AlterCommand command;
@ -350,6 +358,21 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
} }
void AlterCommands::apply(DatabasePtr database, ContextPtr context) const
{
for (const AlterCommand & command : *this)
{
if (!command.ignore)
{
if (command.type == AlterCommand::MODIFY_DATABASE_SETTING)
database->modifySettings(command.settings_changes, context);
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported alter command");
}
}
}
void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context) const void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context) const
{ {
if (type == ADD_COLUMN) if (type == ADD_COLUMN)
@ -877,6 +900,8 @@ String alterTypeToString(const AlterCommand::Type type)
return "MODIFY SETTING"; return "MODIFY SETTING";
case AlterCommand::Type::RESET_SETTING: case AlterCommand::Type::RESET_SETTING:
return "RESET SETTING"; return "RESET SETTING";
case AlterCommand::Type::MODIFY_DATABASE_SETTING:
return "MODIFY DATABASE SETTING";
case AlterCommand::Type::MODIFY_QUERY: case AlterCommand::Type::MODIFY_QUERY:
return "MODIFY QUERY"; return "MODIFY QUERY";
case AlterCommand::Type::RENAME_COLUMN: case AlterCommand::Type::RENAME_COLUMN:
@ -1007,6 +1032,7 @@ void AlterCommands::prepare(const StorageInMemoryMetadata & metadata)
prepared = true; prepared = true;
} }
void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPtr context) const void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPtr context) const
{ {
auto all_columns = metadata.columns; auto all_columns = metadata.columns;

View File

@ -13,6 +13,8 @@ namespace DB
{ {
class ASTAlterCommand; class ASTAlterCommand;
class IDatabase;
using DatabasePtr = std::shared_ptr<IDatabase>;
/// Operation from the ALTER query (except for manipulation with PART/PARTITION). /// Operation from the ALTER query (except for manipulation with PART/PARTITION).
/// Adding Nested columns is not expanded to add individual columns. /// Adding Nested columns is not expanded to add individual columns.
@ -42,6 +44,7 @@ struct AlterCommand
MODIFY_QUERY, MODIFY_QUERY,
RENAME_COLUMN, RENAME_COLUMN,
REMOVE_TTL, REMOVE_TTL,
MODIFY_DATABASE_SETTING,
}; };
/// Which property user wants to remove from column /// Which property user wants to remove from column
@ -194,6 +197,8 @@ public:
/// Commands have to be prepared before apply. /// Commands have to be prepared before apply.
void apply(StorageInMemoryMetadata & metadata, ContextPtr context) const; void apply(StorageInMemoryMetadata & metadata, ContextPtr context) const;
void apply(DatabasePtr database, ContextPtr context) const;
/// At least one command modify settings. /// At least one command modify settings.
bool hasSettingsAlterCommand() const; bool hasSettingsAlterCommand() const;

View File

@ -26,8 +26,9 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
const std::string & start_lsn, const std::string & start_lsn,
const size_t max_block_size_, const size_t max_block_size_,
bool allow_automatic_update_, bool allow_automatic_update_,
Storages storages_) Storages storages_,
: log(&Poco::Logger::get("PostgreSQLReaplicaConsumer")) const String & name_for_logger)
: log(&Poco::Logger::get("PostgreSQLReplicaConsumer("+ name_for_logger +")"))
, context(context_) , context(context_)
, replication_slot_name(replication_slot_name_) , replication_slot_name(replication_slot_name_)
, publication_name(publication_name_) , publication_name(publication_name_)
@ -270,12 +271,13 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
case 'I': // Insert case 'I': // Insert
{ {
Int32 relation_id = readInt32(replication_message, pos, size); Int32 relation_id = readInt32(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
assert(!table_name.empty());
if (!isSyncAllowed(relation_id)) if (!isSyncAllowed(relation_id, table_name))
return; return;
Int8 new_tuple = readInt8(replication_message, pos, size); Int8 new_tuple = readInt8(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
auto buffer = buffers.find(table_name); auto buffer = buffers.find(table_name);
assert(buffer != buffers.end()); assert(buffer != buffers.end());
@ -287,11 +289,12 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
case 'U': // Update case 'U': // Update
{ {
Int32 relation_id = readInt32(replication_message, pos, size); Int32 relation_id = readInt32(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
assert(!table_name.empty());
if (!isSyncAllowed(relation_id)) if (!isSyncAllowed(relation_id, table_name))
return; return;
const auto & table_name = relation_id_to_name[relation_id];
auto buffer = buffers.find(table_name); auto buffer = buffers.find(table_name);
assert(buffer != buffers.end()); assert(buffer != buffers.end());
@ -335,14 +338,15 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
case 'D': // Delete case 'D': // Delete
{ {
Int32 relation_id = readInt32(replication_message, pos, size); Int32 relation_id = readInt32(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
assert(!table_name.empty());
if (!isSyncAllowed(relation_id)) if (!isSyncAllowed(relation_id, table_name))
return; return;
/// 0 or 1 if replica identity is set to full. For now only default replica identity is supported (with primary keys). /// 0 or 1 if replica identity is set to full. For now only default replica identity is supported (with primary keys).
readInt8(replication_message, pos, size); readInt8(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
auto buffer = buffers.find(table_name); auto buffer = buffers.find(table_name);
assert(buffer != buffers.end()); assert(buffer != buffers.end());
readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::DELETE); readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::DELETE);
@ -357,7 +361,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
constexpr size_t transaction_commit_timestamp_len = 8; constexpr size_t transaction_commit_timestamp_len = 8;
pos += unused_flags_len + commit_lsn_len + transaction_end_lsn_len + transaction_commit_timestamp_len; pos += unused_flags_len + commit_lsn_len + transaction_end_lsn_len + transaction_commit_timestamp_len;
LOG_DEBUG(log, "Current lsn: {} = {}", current_lsn, getLSNValue(current_lsn)); /// Will be removed // LOG_DEBUG(log, "Current lsn: {} = {}", current_lsn, getLSNValue(current_lsn)); /// Will be removed
final_lsn = current_lsn; final_lsn = current_lsn;
break; break;
@ -371,7 +375,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
readString(replication_message, pos, size, relation_namespace); readString(replication_message, pos, size, relation_namespace);
readString(replication_message, pos, size, relation_name); readString(replication_message, pos, size, relation_name);
if (!isSyncAllowed(relation_id)) if (!isSyncAllowed(relation_id, relation_name))
return; return;
if (storages.find(relation_name) == storages.end()) if (storages.find(relation_name) == storages.end())
@ -522,15 +526,32 @@ String MaterializedPostgreSQLConsumer::advanceLSN(std::shared_ptr<pqxx::nontrans
/// Sync for some table might not be allowed if: /// Sync for some table might not be allowed if:
/// 1. Table schema changed and might break synchronization. /// 1. Table schema changed and might break synchronization.
/// 2. There is no storage for this table. (As a result of some exception or incorrect pg_publication) /// 2. There is no storage for this table. (As a result of some exception or incorrect pg_publication)
bool MaterializedPostgreSQLConsumer::isSyncAllowed(Int32 relation_id) /// 3. A new table was added to replication, it was loaded via snapshot, but consumer has not yet
/// read wal up to the lsn position of snapshot, from which table was loaded.
bool MaterializedPostgreSQLConsumer::isSyncAllowed(Int32 relation_id, const String & relation_name)
{ {
auto table_with_lsn = skip_list.find(relation_id); auto new_table_with_lsn = waiting_list.find(relation_name);
if (new_table_with_lsn != waiting_list.end())
{
auto table_start_lsn = new_table_with_lsn->second;
assert(!table_start_lsn.empty());
if (getLSNValue(current_lsn) >= getLSNValue(table_start_lsn))
{
LOG_TRACE(log, "Synchronization is started for table: {} (start_lsn: {})", relation_name, table_start_lsn);
waiting_list.erase(new_table_with_lsn);
return true;
}
}
auto skipped_table_with_lsn = skip_list.find(relation_id);
/// Table is not present in a skip list - allow synchronization. /// Table is not present in a skip list - allow synchronization.
if (table_with_lsn == skip_list.end()) if (skipped_table_with_lsn == skip_list.end())
return true; return true;
const auto & table_start_lsn = table_with_lsn->second; const auto & table_start_lsn = skipped_table_with_lsn->second;
/// Table is in a skip list and has not yet received a valid lsn == it has not been reloaded. /// Table is in a skip list and has not yet received a valid lsn == it has not been reloaded.
if (table_start_lsn.empty()) if (table_start_lsn.empty())
@ -544,7 +565,7 @@ bool MaterializedPostgreSQLConsumer::isSyncAllowed(Int32 relation_id)
LOG_TRACE(log, "Synchronization is resumed for table: {} (start_lsn: {})", LOG_TRACE(log, "Synchronization is resumed for table: {} (start_lsn: {})",
relation_id_to_name[relation_id], table_start_lsn); relation_id_to_name[relation_id], table_start_lsn);
skip_list.erase(table_with_lsn); skip_list.erase(skipped_table_with_lsn);
return true; return true;
} }
@ -576,6 +597,34 @@ void MaterializedPostgreSQLConsumer::markTableAsSkipped(Int32 relation_id, const
} }
void MaterializedPostgreSQLConsumer::addNested(const String & postgres_table_name, StoragePtr nested_storage, const String & table_start_lsn)
{
/// Cache new pointer to replacingMergeTree table.
storages.emplace(postgres_table_name, nested_storage);
/// Add new in-memory buffer.
buffers.emplace(postgres_table_name, Buffer(nested_storage));
/// Replication consumer will read wall and check for currently processed table whether it is allowed to start applying
/// changed to this table.
waiting_list[postgres_table_name] = table_start_lsn;
}
void MaterializedPostgreSQLConsumer::updateNested(const String & table_name, StoragePtr nested_storage, Int32 table_id, const String & table_start_lsn)
{
/// Cache new pointer to replacingMergeTree table.
storages[table_name] = nested_storage;
/// Create a new empty buffer (with updated metadata), where data is first loaded before syncing into actual table.
auto & buffer = buffers.find(table_name)->second;
buffer.createEmptyBuffer(nested_storage);
/// Set start position to valid lsn. Before it was an empty string. Further read for table allowed, if it has a valid lsn.
skip_list[table_id] = table_start_lsn;
}
/// Read binary changes from replication slot via COPY command (starting from current lsn in a slot). /// Read binary changes from replication slot via COPY command (starting from current lsn in a slot).
bool MaterializedPostgreSQLConsumer::readFromReplicationSlot() bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
{ {
@ -625,9 +674,8 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
return false; return false;
} }
catch (const pqxx::broken_connection & e) catch (const pqxx::broken_connection &)
{ {
LOG_ERROR(log, "Connection error: {}", e.what());
connection->tryUpdateConnection(); connection->tryUpdateConnection();
return false; return false;
} }
@ -641,6 +689,7 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
if (error_message.find("out of relcache_callback_list slots") == std::string::npos) if (error_message.find("out of relcache_callback_list slots") == std::string::npos)
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
connection->tryUpdateConnection();
return false; return false;
} }
catch (const pqxx::conversion_error & e) catch (const pqxx::conversion_error & e)
@ -704,17 +753,4 @@ bool MaterializedPostgreSQLConsumer::consume(std::vector<std::pair<Int32, String
} }
void MaterializedPostgreSQLConsumer::updateNested(const String & table_name, StoragePtr nested_storage, Int32 table_id, const String & table_start_lsn)
{
/// Cache new pointer to replacingMergeTree table.
storages[table_name] = nested_storage;
/// Create a new empty buffer (with updated metadata), where data is first loaded before syncing into actual table.
auto & buffer = buffers.find(table_name)->second;
buffer.createEmptyBuffer(nested_storage);
/// Set start position to valid lsn. Before it was an empty string. Further read for table allowed, if it has a valid lsn.
skip_list[table_id] = table_start_lsn;
}
} }

View File

@ -27,7 +27,8 @@ public:
const String & start_lsn, const String & start_lsn,
const size_t max_block_size_, const size_t max_block_size_,
bool allow_automatic_update_, bool allow_automatic_update_,
Storages storages_); Storages storages_,
const String & name_for_logger);
bool consume(std::vector<std::pair<Int32, String>> & skipped_tables); bool consume(std::vector<std::pair<Int32, String>> & skipped_tables);
@ -35,6 +36,8 @@ public:
/// process if it was skipped due to schema changes. /// process if it was skipped due to schema changes.
void updateNested(const String & table_name, StoragePtr nested_storage, Int32 table_id, const String & table_start_lsn); void updateNested(const String & table_name, StoragePtr nested_storage, Int32 table_id, const String & table_start_lsn);
void addNested(const String & postgres_table_name, StoragePtr nested_storage, const String & table_start_lsn);
private: private:
/// Read approximarely up to max_block_size changes from WAL. /// Read approximarely up to max_block_size changes from WAL.
bool readFromReplicationSlot(); bool readFromReplicationSlot();
@ -45,7 +48,7 @@ private:
void processReplicationMessage(const char * replication_message, size_t size); void processReplicationMessage(const char * replication_message, size_t size);
bool isSyncAllowed(Int32 relation_id); bool isSyncAllowed(Int32 relation_id, const String & relation_name);
struct Buffer struct Buffer
{ {
@ -111,9 +114,12 @@ private:
String table_to_insert; String table_to_insert;
/// List of tables which need to be synced after last replication stream. /// List of tables which need to be synced after last replication stream.
/// Holds `postgres_table_name` set.
std::unordered_set<std::string> tables_to_sync; std::unordered_set<std::string> tables_to_sync;
/// `postgres_table_name` -> ReplacingMergeTree table.
Storages storages; Storages storages;
/// `postgres_table_name` -> In-memory buffer.
Buffers buffers; Buffers buffers;
std::unordered_map<Int32, String> relation_id_to_name; std::unordered_map<Int32, String> relation_id_to_name;
@ -133,6 +139,7 @@ private:
/// if relation definition has changed since the last relation definition message. /// if relation definition has changed since the last relation definition message.
std::unordered_map<Int32, SchemaData> schema_data; std::unordered_map<Int32, SchemaData> schema_data;
/// `postgres_relation_id` -> `start_lsn`
/// skip_list contains relation ids for tables on which ddl was performed, which can break synchronization. /// skip_list contains relation ids for tables on which ddl was performed, which can break synchronization.
/// This breaking changes are detected in replication stream in according replication message and table is added to skip list. /// This breaking changes are detected in replication stream in according replication message and table is added to skip list.
/// After it is finished, a temporary replication slot is created with 'export snapshot' option, and start_lsn is returned. /// After it is finished, a temporary replication slot is created with 'export snapshot' option, and start_lsn is returned.
@ -142,5 +149,13 @@ private:
/// No needed message, related to reloaded table will be missed, because messages are not consumed in the meantime, /// No needed message, related to reloaded table will be missed, because messages are not consumed in the meantime,
/// i.e. we will not miss the first start_lsn position for reloaded table. /// i.e. we will not miss the first start_lsn position for reloaded table.
std::unordered_map<Int32, String> skip_list; std::unordered_map<Int32, String> skip_list;
/// `postgres_table_name` -> `start_lsn`
/// For dynamically added tables. A new table is loaded via snapshot and we get a start lsn position.
/// Once consumer reaches this position, it starts applying replication messages to this table.
/// Inside replication handler we have to ensure that replication consumer does not read data from wal
/// while the process of adding a table to replication is not finished,
/// because we might go beyond this start lsn position before consumer knows that a new table was added.
std::unordered_map<String, String> waiting_list;
}; };
} }

View File

@ -11,18 +11,20 @@
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <DataStreams/copyData.h> #include <DataStreams/copyData.h>
#include <Databases/DatabaseOnDisk.h>
namespace DB namespace DB
{ {
static const auto RESCHEDULE_MS = 500; static const auto RESCHEDULE_MS = 1000;
static const auto BACKOFF_TRESHOLD_MS = 10000; static const auto BACKOFF_TRESHOLD_MS = 10000;
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int POSTGRESQL_REPLICATION_INTERNAL_ERROR;
} }
PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
@ -36,7 +38,9 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
bool allow_automatic_update_, bool allow_automatic_update_,
bool is_materialized_postgresql_database_, bool is_materialized_postgresql_database_,
const String tables_list_) const String tables_list_)
: log(&Poco::Logger::get("PostgreSQLReplicationHandler")) : log(&Poco::Logger::get("PostgreSQLReplicationHandler(" +
(is_materialized_postgresql_database_ ? remote_database_name_ : remote_database_name_ + '.' + tables_list_)
+ ")"))
, context(context_) , context(context_)
, is_attach(is_attach_) , is_attach(is_attach_)
, remote_database_name(remote_database_name_) , remote_database_name(remote_database_name_)
@ -46,7 +50,6 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
, allow_automatic_update(allow_automatic_update_) , allow_automatic_update(allow_automatic_update_)
, is_materialized_postgresql_database(is_materialized_postgresql_database_) , is_materialized_postgresql_database(is_materialized_postgresql_database_)
, tables_list(tables_list_) , tables_list(tables_list_)
, connection(std::make_shared<postgres::Connection>(connection_info_))
, milliseconds_to_wait(RESCHEDULE_MS) , milliseconds_to_wait(RESCHEDULE_MS)
{ {
replication_slot = fmt::format("{}_ch_replication_slot", replication_identifier); replication_slot = fmt::format("{}_ch_replication_slot", replication_identifier);
@ -73,7 +76,8 @@ void PostgreSQLReplicationHandler::waitConnectionAndStart()
{ {
try try
{ {
connection->connect(); /// Will throw pqxx::broken_connection if no connection at the moment postgres::Connection connection(connection_info);
connection.connect(); /// Will throw pqxx::broken_connection if no connection at the moment
startSynchronization(false); startSynchronization(false);
} }
catch (const pqxx::broken_connection & pqxx_error) catch (const pqxx::broken_connection & pqxx_error)
@ -98,14 +102,9 @@ void PostgreSQLReplicationHandler::shutdown()
void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
{ {
{
pqxx::work tx(connection->getRef());
createPublicationIfNeeded(tx);
tx.commit();
}
postgres::Connection replication_connection(connection_info, /* replication */true); postgres::Connection replication_connection(connection_info, /* replication */true);
pqxx::nontransaction tx(replication_connection.getRef()); pqxx::nontransaction tx(replication_connection.getRef());
createPublicationIfNeeded(tx);
/// List of nested tables (table_name -> nested_storage), which is passed to replication consumer. /// List of nested tables (table_name -> nested_storage), which is passed to replication consumer.
std::unordered_map<String, StoragePtr> nested_storages; std::unordered_map<String, StoragePtr> nested_storages;
@ -116,18 +115,21 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
/// 2. if replication slot already exist, start_lsn is read from pg_replication_slots as /// 2. if replication slot already exist, start_lsn is read from pg_replication_slots as
/// `confirmed_flush_lsn` - the address (LSN) up to which the logical slot's consumer has confirmed receiving data. /// `confirmed_flush_lsn` - the address (LSN) up to which the logical slot's consumer has confirmed receiving data.
/// Data older than this is not available anymore. /// Data older than this is not available anymore.
/// TODO: more tests
String snapshot_name, start_lsn; String snapshot_name, start_lsn;
auto initial_sync = [&]() auto initial_sync = [&]()
{ {
createReplicationSlot(tx, start_lsn, snapshot_name); createReplicationSlot(tx, start_lsn, snapshot_name);
/// Loading tables from snapshot requires a certain transaction type, so we need to open a new transactin.
/// But we cannot have more than one open transaciton on the same connection. Therefore we have
/// a separate connection to load tables.
postgres::Connection tmp_connection(connection_info);
for (const auto & [table_name, storage] : materialized_storages) for (const auto & [table_name, storage] : materialized_storages)
{ {
try try
{ {
nested_storages[table_name] = loadFromSnapshot(snapshot_name, table_name, storage->as <StorageMaterializedPostgreSQL>()); nested_storages[table_name] = loadFromSnapshot(tmp_connection, snapshot_name, table_name, storage->as <StorageMaterializedPostgreSQL>());
} }
catch (Exception & e) catch (Exception & e)
{ {
@ -164,6 +166,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>(); auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>();
try try
{ {
/// TODO: THIS IS INCORRENT, we might get here if there is no nested, need to check and reload.
/// Try load nested table, set materialized table metadata. /// Try load nested table, set materialized table metadata.
nested_storages[table_name] = materialized_storage->prepare(); nested_storages[table_name] = materialized_storage->prepare();
} }
@ -187,13 +190,14 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
/// Handler uses it only for loadFromSnapshot and shutdown methods. /// Handler uses it only for loadFromSnapshot and shutdown methods.
consumer = std::make_shared<MaterializedPostgreSQLConsumer>( consumer = std::make_shared<MaterializedPostgreSQLConsumer>(
context, context,
connection, std::make_shared<postgres::Connection>(connection_info),
replication_slot, replication_slot,
publication_name, publication_name,
start_lsn, start_lsn,
max_block_size, max_block_size,
allow_automatic_update, allow_automatic_update,
nested_storages); nested_storages,
(is_materialized_postgresql_database ? remote_database_name : remote_database_name + '.' + tables_list));
consumer_task->activateAndSchedule(); consumer_task->activateAndSchedule();
@ -202,10 +206,31 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
} }
StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(String & snapshot_name, const String & table_name, void PostgreSQLReplicationHandler::addStructureToMaterializedStorage(StorageMaterializedPostgreSQL * storage, const String & table_name, ASTPtr database_def)
{
postgres::Connection connection(connection_info);
pqxx::nontransaction tx(connection.getRef());
auto table_structure = std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, table_name, true, true, true));
auto engine = std::make_shared<ASTFunction>();
engine->name = "MaterializedPostgreSQL";
engine->arguments = args;
auto ast_storage = std::make_shared<ASTStorage>();
storage->set(storage->engine, engine);
auto storage_def = storage->getCreateNestedTableQuery(std::move(table_structure));
ContextMutablePtr local_context = Context::createCopy(context);
auto table = createTableFromAST(*assert_cast<ASTCreateQuery *>(storage_def.get()), remote_database_name, "", local_context, false).second;
storage->setInMemoryMetadata(table->getInMemoryMetadata());
}
StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection & connection, String & snapshot_name, const String & table_name,
StorageMaterializedPostgreSQL * materialized_storage) StorageMaterializedPostgreSQL * materialized_storage)
{ {
auto tx = std::make_shared<pqxx::ReplicationTransaction>(connection->getRef()); auto tx = std::make_shared<pqxx::ReplicationTransaction>(connection.getRef());
std::string query_str = fmt::format("SET TRANSACTION SNAPSHOT '{}'", snapshot_name); std::string query_str = fmt::format("SET TRANSACTION SNAPSHOT '{}'", snapshot_name);
tx->exec(query_str); tx->exec(query_str);
@ -290,7 +315,7 @@ void PostgreSQLReplicationHandler::consumerFunc()
} }
bool PostgreSQLReplicationHandler::isPublicationExist(pqxx::work & tx) bool PostgreSQLReplicationHandler::isPublicationExist(pqxx::nontransaction & tx)
{ {
std::string query_str = fmt::format("SELECT exists (SELECT 1 FROM pg_publication WHERE pubname = '{}')", publication_name); std::string query_str = fmt::format("SELECT exists (SELECT 1 FROM pg_publication WHERE pubname = '{}')", publication_name);
pqxx::result result{tx.exec(query_str)}; pqxx::result result{tx.exec(query_str)};
@ -299,7 +324,7 @@ bool PostgreSQLReplicationHandler::isPublicationExist(pqxx::work & tx)
} }
void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx) void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::nontransaction & tx)
{ {
auto publication_exists = isPublicationExist(tx); auto publication_exists = isPublicationExist(tx);
@ -310,7 +335,7 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx)
"Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped", "Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped",
publication_name); publication_name);
connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); }); dropPublication(tx);
} }
if (!is_attach || !publication_exists) if (!is_attach || !publication_exists)
@ -389,7 +414,7 @@ void PostgreSQLReplicationHandler::createReplicationSlot(
pqxx::result result{tx.exec(query_str)}; pqxx::result result{tx.exec(query_str)};
start_lsn = result[0][1].as<std::string>(); start_lsn = result[0][1].as<std::string>();
snapshot_name = result[0][2].as<std::string>(); snapshot_name = result[0][2].as<std::string>();
LOG_TRACE(log, "Created replication slot: {}, start lsn: {}", replication_slot, start_lsn); LOG_TRACE(log, "Created replication slot: {}, start lsn: {}, snapshot: {}", replication_slot, start_lsn, snapshot_name);
} }
catch (Exception & e) catch (Exception & e)
{ {
@ -422,22 +447,39 @@ void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx)
} }
void PostgreSQLReplicationHandler::addTableToPublication(pqxx::nontransaction & ntx, const String & table_name)
{
std::string query_str = fmt::format("ALTER PUBLICATION {} ADD TABLE ONLY {}", publication_name, doubleQuoteString(table_name));
ntx.exec(query_str);
LOG_TRACE(log, "Added table `{}` to publication `{}`", table_name, publication_name);
}
void PostgreSQLReplicationHandler::removeTableFromPublication(pqxx::nontransaction & ntx, const String & table_name)
{
std::string query_str = fmt::format("ALTER PUBLICATION {} DROP TABLE ONLY {}", publication_name, doubleQuoteString(table_name));
ntx.exec(query_str);
LOG_TRACE(log, "Removed table `{}` from publication `{}`", table_name, publication_name);
}
void PostgreSQLReplicationHandler::shutdownFinal() void PostgreSQLReplicationHandler::shutdownFinal()
{ {
try try
{ {
shutdown(); shutdown();
connection->execWithRetry([&](pqxx::nontransaction & tx){ dropPublication(tx); }); postgres::Connection connection(connection_info);
connection.execWithRetry([&](pqxx::nontransaction & tx){ dropPublication(tx); });
String last_committed_lsn; String last_committed_lsn;
connection->execWithRetry([&](pqxx::nontransaction & tx) connection.execWithRetry([&](pqxx::nontransaction & tx)
{ {
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */false)) if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */false))
dropReplicationSlot(tx, /* temporary */false); dropReplicationSlot(tx, /* temporary */false);
}); });
connection->execWithRetry([&](pqxx::nontransaction & tx) connection.execWithRetry([&](pqxx::nontransaction & tx)
{ {
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */true)) if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */true))
dropReplicationSlot(tx, /* temporary */true); dropReplicationSlot(tx, /* temporary */true);
@ -453,12 +495,17 @@ void PostgreSQLReplicationHandler::shutdownFinal()
/// Used by MaterializedPostgreSQL database engine. /// Used by MaterializedPostgreSQL database engine.
NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection & connection_) NameSet PostgreSQLReplicationHandler::fetchRequiredTables()
{ {
pqxx::work tx(connection_.getRef()); postgres::Connection connection(connection_info);
NameSet result_tables; NameSet result_tables;
bool publication_exists_before_startup;
{
pqxx::nontransaction tx(connection.getRef());
publication_exists_before_startup = isPublicationExist(tx);
}
bool publication_exists_before_startup = isPublicationExist(tx);
LOG_DEBUG(log, "Publication exists: {}, is attach: {}", publication_exists_before_startup, is_attach); LOG_DEBUG(log, "Publication exists: {}, is attach: {}", publication_exists_before_startup, is_attach);
Strings expected_tables; Strings expected_tables;
@ -479,7 +526,7 @@ NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection &
"Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped", "Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped",
publication_name); publication_name);
connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); }); connection.execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
} }
else else
{ {
@ -489,13 +536,20 @@ NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection &
"Publication {} already exists and tables list is empty. Assuming publication is correct.", "Publication {} already exists and tables list is empty. Assuming publication is correct.",
publication_name); publication_name);
result_tables = fetchPostgreSQLTablesList(tx, postgres_schema); {
pqxx::nontransaction tx(connection.getRef());
result_tables = fetchPostgreSQLTablesList(tx, postgres_schema);
}
} }
/// Check tables list from publication is the same as expected tables list. /// Check tables list from publication is the same as expected tables list.
/// If not - drop publication and return expected tables list. /// If not - drop publication and return expected tables list.
else else
{ {
result_tables = fetchTablesFromPublication(tx); {
pqxx::work tx(connection.getRef());
result_tables = fetchTablesFromPublication(tx);
}
NameSet diff; NameSet diff;
std::set_symmetric_difference(expected_tables.begin(), expected_tables.end(), std::set_symmetric_difference(expected_tables.begin(), expected_tables.end(),
result_tables.begin(), result_tables.end(), result_tables.begin(), result_tables.end(),
@ -514,7 +568,7 @@ NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection &
"Publication {} already exists, but specified tables list differs from publication tables list in tables: {}.", "Publication {} already exists, but specified tables list differs from publication tables list in tables: {}.",
publication_name, diff_tables); publication_name, diff_tables);
connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); }); connection.execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
} }
} }
} }
@ -531,11 +585,13 @@ NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection &
/// Fetch all tables list from database. Publication does not exist yet, which means /// Fetch all tables list from database. Publication does not exist yet, which means
/// that no replication took place. Publication will be created in /// that no replication took place. Publication will be created in
/// startSynchronization method. /// startSynchronization method.
result_tables = fetchPostgreSQLTablesList(tx, postgres_schema); {
pqxx::nontransaction tx(connection.getRef());
result_tables = fetchPostgreSQLTablesList(tx, postgres_schema);
}
} }
} }
tx.commit();
return result_tables; return result_tables;
} }
@ -562,6 +618,57 @@ PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
} }
void PostgreSQLReplicationHandler::addTableToReplication(StorageMaterializedPostgreSQL * materialized_storage, const String & postgres_table_name)
{
/// Note: we have to ensure that replication consumer task is stopped when we reload table, because otherwise
/// it can read wal beyond start lsn position (from which this table is being loaded), which will result in loosing data.
/// Therefore we wait here for it to finish current reading stream. We have to wait, because we cannot return OK to client right now.
consumer_task->deactivate();
try
{
postgres::Connection replication_connection(connection_info, /* replication */true);
String snapshot_name, start_lsn;
StoragePtr nested_storage;
{
pqxx::nontransaction tx(replication_connection.getRef());
auto table_structure = std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, postgres_table_name, true, true, true));
if (isReplicationSlotExist(tx, start_lsn, /* temporary */true))
dropReplicationSlot(tx, /* temporary */true);
createReplicationSlot(tx, start_lsn, snapshot_name, /* temporary */true);
{
postgres::Connection tmp_connection(connection_info);
nested_storage = loadFromSnapshot(tmp_connection, snapshot_name, postgres_table_name, materialized_storage->as <StorageMaterializedPostgreSQL>());
}
auto nested_table_id = nested_storage->getStorageID();
materialized_storage->setNestedStorageID(nested_table_id);
nested_storage = materialized_storage->prepare();
}
{
pqxx::nontransaction tx(replication_connection.getRef());
addTableToPublication(tx, postgres_table_name);
}
/// Pass storage to consumer and lsn position, from which to start receiving replication messages for this table.
consumer->addNested(postgres_table_name, nested_storage, start_lsn);
}
catch (...)
{
consumer_task->scheduleAfter(RESCHEDULE_MS);
auto error_message = getCurrentExceptionMessage(false);
throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR,
"Failed to add table `{}` to replication. Info: {}", postgres_table_name, error_message);
}
consumer_task->schedule();
}
void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pair<Int32, String>> & relation_data) void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pair<Int32, String>> & relation_data)
{ {
/// If table schema has changed, the table stops consuming changes from replication stream. /// If table schema has changed, the table stops consuming changes from replication stream.
@ -579,6 +686,7 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
dropReplicationSlot(tx, /* temporary */true); dropReplicationSlot(tx, /* temporary */true);
createReplicationSlot(tx, start_lsn, snapshot_name, /* temporary */true); createReplicationSlot(tx, start_lsn, snapshot_name, /* temporary */true);
postgres::Connection tmp_connection(connection_info);
for (const auto & [relation_id, table_name] : relation_data) for (const auto & [relation_id, table_name] : relation_data)
{ {
@ -589,7 +697,7 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
auto temp_materialized_storage = materialized_storage->createTemporary(); auto temp_materialized_storage = materialized_storage->createTemporary();
/// This snapshot is valid up to the end of the transaction, which exported it. /// This snapshot is valid up to the end of the transaction, which exported it.
StoragePtr temp_nested_storage = loadFromSnapshot(snapshot_name, table_name, StoragePtr temp_nested_storage = loadFromSnapshot(tmp_connection, snapshot_name, table_name,
temp_materialized_storage->as <StorageMaterializedPostgreSQL>()); temp_materialized_storage->as <StorageMaterializedPostgreSQL>());
auto table_id = materialized_storage->getNestedStorageID(); auto table_id = materialized_storage->getNestedStorageID();

View File

@ -8,11 +8,6 @@
namespace DB namespace DB
{ {
/// IDEA: There is ALTER PUBLICATION command to dynamically add and remove tables for replicating (the command is transactional).
/// (Probably, if in a replication stream comes a relation name, which does not currently
/// exist in CH, it can be loaded via snapshot while stream is stopped and then comparing wal positions with
/// current lsn and table start lsn.
class StorageMaterializedPostgreSQL; class StorageMaterializedPostgreSQL;
class PostgreSQLReplicationHandler class PostgreSQLReplicationHandler
@ -43,24 +38,32 @@ public:
void addStorage(const std::string & table_name, StorageMaterializedPostgreSQL * storage); void addStorage(const std::string & table_name, StorageMaterializedPostgreSQL * storage);
/// Fetch list of tables which are going to be replicated. Used for database engine. /// Fetch list of tables which are going to be replicated. Used for database engine.
NameSet fetchRequiredTables(postgres::Connection & connection_); NameSet fetchRequiredTables();
/// Start replication setup immediately. /// Start replication setup immediately.
void startSynchronization(bool throw_on_error); void startSynchronization(bool throw_on_error);
void addTableToReplication(StorageMaterializedPostgreSQL * storage, const String & postgres_table_name);
void addStructureToMaterializedStorage(StorageMaterializedPostgreSQL * storage, const String & table_name);
private: private:
using MaterializedStorages = std::unordered_map<String, StorageMaterializedPostgreSQL *>; using MaterializedStorages = std::unordered_map<String, StorageMaterializedPostgreSQL *>;
/// Methods to manage Publication. /// Methods to manage Publication.
bool isPublicationExist(pqxx::work & tx); bool isPublicationExist(pqxx::nontransaction & tx);
void createPublicationIfNeeded(pqxx::work & tx); void createPublicationIfNeeded(pqxx::nontransaction & tx);
NameSet fetchTablesFromPublication(pqxx::work & tx); NameSet fetchTablesFromPublication(pqxx::work & tx);
void dropPublication(pqxx::nontransaction & ntx); void dropPublication(pqxx::nontransaction & ntx);
void addTableToPublication(pqxx::nontransaction & ntx, const String & table_name);
void removeTableFromPublication(pqxx::nontransaction & ntx, const String & table_name);
/// Methods to manage Replication Slots. /// Methods to manage Replication Slots.
bool isReplicationSlotExist(pqxx::nontransaction & tx, String & start_lsn, bool temporary = false); bool isReplicationSlotExist(pqxx::nontransaction & tx, String & start_lsn, bool temporary = false);
@ -75,7 +78,7 @@ private:
void consumerFunc(); void consumerFunc();
StoragePtr loadFromSnapshot(std::string & snapshot_name, const String & table_name, StorageMaterializedPostgreSQL * materialized_storage); StoragePtr loadFromSnapshot(postgres::Connection & connection, std::string & snapshot_name, const String & table_name, StorageMaterializedPostgreSQL * materialized_storage);
void reloadFromSnapshot(const std::vector<std::pair<Int32, String>> & relation_data); void reloadFromSnapshot(const std::vector<std::pair<Int32, String>> & relation_data);
@ -110,9 +113,6 @@ private:
String replication_slot, publication_name; String replication_slot, publication_name;
/// Shared between replication_consumer and replication_handler, but never accessed concurrently.
std::shared_ptr<postgres::Connection> connection;
/// Replication consumer. Manages decoding of replication stream and syncing into tables. /// Replication consumer. Manages decoding of replication stream and syncing into tables.
std::shared_ptr<MaterializedPostgreSQLConsumer> consumer; std::shared_ptr<MaterializedPostgreSQLConsumer> consumer;

View File

@ -51,6 +51,7 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
std::unique_ptr<MaterializedPostgreSQLSettings> replication_settings) std::unique_ptr<MaterializedPostgreSQLSettings> replication_settings)
: IStorage(table_id_) : IStorage(table_id_)
, WithContext(context_->getGlobalContext()) , WithContext(context_->getGlobalContext())
, log(&Poco::Logger::get("StorageMaterializedPostgreSQL(" + postgres::formatNameForLogs(remote_database_name, remote_table_name) + ")"))
, is_materialized_postgresql_database(false) , is_materialized_postgresql_database(false)
, has_nested(false) , has_nested(false)
, nested_context(makeNestedTableContext(context_->getGlobalContext())) , nested_context(makeNestedTableContext(context_->getGlobalContext()))
@ -72,19 +73,26 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
getContext(), getContext(),
is_attach, is_attach,
replication_settings->materialized_postgresql_max_block_size.value, replication_settings->materialized_postgresql_max_block_size.value,
/* allow_automatic_update */ false, /* is_materialized_postgresql_database */false); /* allow_automatic_update */ false, /* is_materialized_postgresql_database */false,
remote_table_name_);
} }
/// For the case of MaterializePosgreSQL database engine. /// For the case of MaterializePosgreSQL database engine.
/// It is used when nested ReplacingMergeeTree table has not yet be created by replication thread. /// It is used when nested ReplacingMergeeTree table has not yet be created by replication thread.
/// In this case this storage can't be used for read queries. /// In this case this storage can't be used for read queries.
StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(const StorageID & table_id_, ContextPtr context_) StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
const StorageID & table_id_,
ContextPtr context_,
const String & postgres_database_name,
const String & postgres_table_name)
: IStorage(table_id_) : IStorage(table_id_)
, WithContext(context_->getGlobalContext()) , WithContext(context_->getGlobalContext())
, log(&Poco::Logger::get("StorageMaterializedPostgreSQL(" + postgres::formatNameForLogs(postgres_database_name, postgres_table_name) + ")"))
, is_materialized_postgresql_database(true) , is_materialized_postgresql_database(true)
, has_nested(false) , has_nested(false)
, nested_context(makeNestedTableContext(context_->getGlobalContext())) , nested_context(makeNestedTableContext(context_->getGlobalContext()))
, nested_table_id(table_id_)
{ {
} }
@ -92,9 +100,14 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(const StorageID & t
/// Constructor for MaterializedPostgreSQL table engine - for the case of MaterializePosgreSQL database engine. /// Constructor for MaterializedPostgreSQL table engine - for the case of MaterializePosgreSQL database engine.
/// It is used when nested ReplacingMergeeTree table has already been created by replication thread. /// It is used when nested ReplacingMergeeTree table has already been created by replication thread.
/// This storage is ready to handle read queries. /// This storage is ready to handle read queries.
StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(StoragePtr nested_storage_, ContextPtr context_) StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
StoragePtr nested_storage_,
ContextPtr context_,
const String & postgres_database_name,
const String & postgres_table_name)
: IStorage(nested_storage_->getStorageID()) : IStorage(nested_storage_->getStorageID())
, WithContext(context_->getGlobalContext()) , WithContext(context_->getGlobalContext())
, log(&Poco::Logger::get("StorageMaterializedPostgreSQL(" + postgres::formatNameForLogs(postgres_database_name, postgres_table_name) + ")"))
, is_materialized_postgresql_database(true) , is_materialized_postgresql_database(true)
, has_nested(true) , has_nested(true)
, nested_context(makeNestedTableContext(context_->getGlobalContext())) , nested_context(makeNestedTableContext(context_->getGlobalContext()))
@ -120,7 +133,7 @@ StoragePtr StorageMaterializedPostgreSQL::createTemporary() const
} }
auto new_context = Context::createCopy(context); auto new_context = Context::createCopy(context);
return StorageMaterializedPostgreSQL::create(tmp_table_id, new_context); return StorageMaterializedPostgreSQL::create(tmp_table_id, new_context, "", table_id.table_name);
} }
@ -163,6 +176,7 @@ void StorageMaterializedPostgreSQL::createNestedIfNeeded(PostgreSQLTableStructur
const auto ast_create = getCreateNestedTableQuery(std::move(table_structure)); const auto ast_create = getCreateNestedTableQuery(std::move(table_structure));
auto table_id = getStorageID(); auto table_id = getStorageID();
auto tmp_nested_table_id = StorageID(table_id.database_name, getNestedTableName()); auto tmp_nested_table_id = StorageID(table_id.database_name, getNestedTableName());
LOG_DEBUG(log, "Creating clickhouse table for postgresql table {}", table_id.getNameForLogs());
try try
{ {

View File

@ -49,7 +49,6 @@ namespace DB
* *
* All database methods, apart from tryGetTable(), are devoted only to nested table. * All database methods, apart from tryGetTable(), are devoted only to nested table.
* NOTE: It makes sense to allow rename method for MaterializedPostgreSQL table via database method. * NOTE: It makes sense to allow rename method for MaterializedPostgreSQL table via database method.
* TODO: Make sure replication-to-table data channel is done only by relation_id.
* *
* Also main table has the same InMemoryMetadata as its nested table, so if metadata of nested table changes - main table also has * Also main table has the same InMemoryMetadata as its nested table, so if metadata of nested table changes - main table also has
* to update its metadata, because all read requests are passed to MaterializedPostgreSQL table and then it redirects read * to update its metadata, because all read requests are passed to MaterializedPostgreSQL table and then it redirects read
@ -57,7 +56,7 @@ namespace DB
* *
* When there is a need to update table structure, there will be created a new MaterializedPostgreSQL table with its own nested table, * When there is a need to update table structure, there will be created a new MaterializedPostgreSQL table with its own nested table,
* it will have updated table schema and all data will be loaded from scratch in the background, while previous table with outadted table * it will have updated table schema and all data will be loaded from scratch in the background, while previous table with outadted table
* structure will still serve read requests. When data is loaded, nested tables will be swapped, metadata of metarialzied table will be * structure will still serve read requests. When data is loaded, nested tables will be swapped, metadata of materialized table will be
* updated according to nested table. * updated according to nested table.
* *
**/ **/
@ -67,9 +66,11 @@ class StorageMaterializedPostgreSQL final : public shared_ptr_helper<StorageMate
friend struct shared_ptr_helper<StorageMaterializedPostgreSQL>; friend struct shared_ptr_helper<StorageMaterializedPostgreSQL>;
public: public:
StorageMaterializedPostgreSQL(const StorageID & table_id_, ContextPtr context_); StorageMaterializedPostgreSQL(const StorageID & table_id_, ContextPtr context_,
const String & postgres_database_name, const String & postgres_table_name);
StorageMaterializedPostgreSQL(StoragePtr nested_storage_, ContextPtr context_); StorageMaterializedPostgreSQL(StoragePtr nested_storage_, ContextPtr context_,
const String & postgres_database_name, const String & postgres_table_name);
String getName() const override { return "MaterializedPostgreSQL"; } String getName() const override { return "MaterializedPostgreSQL"; }
@ -123,6 +124,8 @@ public:
bool supportsFinal() const override { return true; } bool supportsFinal() const override { return true; }
ASTPtr getCreateNestedTableQuery(PostgreSQLTableStructurePtr table_structure);
protected: protected:
StorageMaterializedPostgreSQL( StorageMaterializedPostgreSQL(
const StorageID & table_id_, const StorageID & table_id_,
@ -140,10 +143,10 @@ private:
ASTPtr getColumnDeclaration(const DataTypePtr & data_type) const; ASTPtr getColumnDeclaration(const DataTypePtr & data_type) const;
ASTPtr getCreateNestedTableQuery(PostgreSQLTableStructurePtr table_structure);
String getNestedTableName() const; String getNestedTableName() const;
Poco::Logger * log;
/// Not nullptr only for single MaterializedPostgreSQL storage, because for MaterializedPostgreSQL /// Not nullptr only for single MaterializedPostgreSQL storage, because for MaterializedPostgreSQL
/// database engine there is one replication handler for all tables. /// database engine there is one replication handler for all tables.
std::unique_ptr<PostgreSQLReplicationHandler> replication_handler; std::unique_ptr<PostgreSQLReplicationHandler> replication_handler;

View File

@ -41,7 +41,7 @@ struct StorageInMemoryMetadata
TTLColumnsDescription column_ttls_by_name; TTLColumnsDescription column_ttls_by_name;
/// TTL expressions for table (Move and Rows) /// TTL expressions for table (Move and Rows)
TTLTableDescription table_ttl; TTLTableDescription table_ttl;
/// SETTINGS expression. Supported for MergeTree, Buffer and Kafka. /// SETTINGS expression. Supported for MergeTree, Buffer, Kafka, RabbitMQ.
ASTPtr settings_changes; ASTPtr settings_changes;
/// SELECT QUERY. Supported for MaterializedView and View (have to support LiveView). /// SELECT QUERY. Supported for MaterializedView and View (have to support LiveView).
SelectQueryDescription select; SelectQueryDescription select;