Merge pull request #28301 from kssenii/materialized-postgresql

MaterializedPostgreSQL: allow dynamically adding/deleting tables, altering settings
This commit is contained in:
alesapin 2021-09-20 14:16:29 +03:00 committed by GitHub
commit a249dcc5f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 1607 additions and 715 deletions

View File

@ -23,6 +23,20 @@ ENGINE = MaterializedPostgreSQL('host:port', ['database' | database], 'user', 'p
- `user` — PostgreSQL user.
- `password` — User password.
## Dynamically adding new tables to replication
``` sql
ATTACH TABLE postgres_database.new_table;
```
It will work as well if there is a setting `materialized_postgresql_tables_list`.
## Dynamically removing tables from replication
``` sql
DETACH TABLE postgres_database.table_to_remove;
```
## Settings {#settings}
- [materialized_postgresql_max_block_size](../../operations/settings/settings.md#materialized-postgresql-max-block-size)
@ -44,6 +58,12 @@ SETTINGS materialized_postgresql_max_block_size = 65536,
SELECT * FROM database1.table1;
```
It is also possible to change settings at run time.
``` sql
ALTER DATABASE postgres_database MODIFY SETTING materialized_postgresql_max_block_size = <new_size>;
```
## Requirements {#requirements}
1. The [wal_level](https://www.postgresql.org/docs/current/runtime-config-wal.html) setting must have a value `logical` and `max_replication_slots` parameter must have a value at least `2` in the PostgreSQL config file.

View File

@ -72,7 +72,10 @@ enum class AccessType
M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
\
M(ALTER_DATABASE_SETTINGS, "ALTER DATABASE SETTING, ALTER MODIFY DATABASE SETTING, MODIFY DATABASE SETTING", DATABASE, ALTER_DATABASE) /* allows to execute ALTER MODIFY SETTING */\
\
M(ALTER_TABLE, "", GROUP, ALTER) \
M(ALTER_DATABASE, "", GROUP, ALTER) \
\
M(ALTER_VIEW_REFRESH, "ALTER LIVE VIEW REFRESH, REFRESH VIEW", VIEW, ALTER_VIEW) \
M(ALTER_VIEW_MODIFY_QUERY, "ALTER TABLE MODIFY QUERY", VIEW, ALTER_VIEW) \

View File

@ -586,6 +586,8 @@
M(616, UNKNOWN_READ_METHOD) \
M(617, LZ4_ENCODER_FAILED) \
M(618, LZ4_DECODER_FAILED) \
M(619, POSTGRESQL_REPLICATION_INTERNAL_ERROR) \
M(620, QUERY_NOT_ALLOWED) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -1,9 +1,9 @@
#include "Connection.h"
#if USE_LIBPQXX
#include <common/logger_useful.h>
namespace postgres
{
@ -42,7 +42,6 @@ void Connection::execWithRetry(const std::function<void(pqxx::nontransaction &)>
pqxx::connection & Connection::getRef()
{
connect();
assert(connection != nullptr);
return *connection;
}

View File

@ -19,6 +19,16 @@ ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, S
return std::make_pair(out.str(), host + ':' + DB::toString(port));
}
String formatNameForLogs(const String & postgres_database_name, const String & postgres_table_name)
{
/// Logger for StorageMaterializedPostgreSQL - both db and table names.
/// Logger for PostgreSQLReplicationHandler and Consumer - either both db and table names or only db name.
assert(!postgres_database_name.empty());
if (postgres_table_name.empty())
return postgres_database_name;
return postgres_database_name + '.' + postgres_table_name;
}
}
#endif

View File

@ -19,7 +19,11 @@ namespace pqxx
namespace postgres
{
ConnectionInfo formatConnectionString(String dbname, String host, UInt16 port, String user, String password);
String formatNameForLogs(const String & postgres_database_name, const String & postgres_table_name);
}
#endif

View File

@ -103,13 +103,20 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const String & engine_name = engine_define->engine->name;
const UUID & uuid = create.uuid;
static const std::unordered_set<std::string_view> database_engines{"Ordinary", "Atomic", "Memory",
"Dictionary", "Lazy", "Replicated", "MySQL", "MaterializeMySQL", "MaterializedMySQL",
"PostgreSQL", "MaterializedPostgreSQL", "SQLite"};
if (!database_engines.contains(engine_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine name `{}` does not exist", engine_name);
static const std::unordered_set<std::string_view> engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL",
"Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite"};
bool engine_may_have_arguments = engines_with_arguments.contains(engine_name);
if (engine_define->engine->arguments && !engine_may_have_arguments)
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have arguments", engine_name);
bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by ||
engine_define->primary_key || engine_define->order_by ||
@ -117,8 +124,8 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL";
if (has_unexpected_element || (!may_have_settings && engine_define->settings))
throw Exception("Database engine " + engine_name + " cannot have parameters, primary_key, order_by, sample_by, settings",
ErrorCodes::UNKNOWN_ELEMENT_IN_AST);
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_AST,
"Database engine `{}` cannot have parameters, primary_key, order_by, sample_by, settings", engine_name);
if (engine_name == "Ordinary")
return std::make_shared<DatabaseOrdinary>(database_name, metadata_path, context);
@ -176,12 +183,12 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (create.uuid == UUIDHelpers::Nil)
return std::make_shared<DatabaseMaterializedMySQL<DatabaseOrdinary>>(
context, database_name, metadata_path, uuid, mysql_database_name, std::move(mysql_pool), std::move(client)
, std::move(materialize_mode_settings));
context, database_name, metadata_path, uuid, mysql_database_name,
std::move(mysql_pool), std::move(client), std::move(materialize_mode_settings));
else
return std::make_shared<DatabaseMaterializedMySQL<DatabaseAtomic>>(
context, database_name, metadata_path, uuid, mysql_database_name, std::move(mysql_pool), std::move(client)
, std::move(materialize_mode_settings));
context, database_name, metadata_path, uuid, mysql_database_name,
std::move(mysql_pool), std::move(client), std::move(materialize_mode_settings));
}
catch (...)
{
@ -304,7 +311,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
postgresql_replica_settings->loadFromQuery(*engine_define);
return std::make_shared<DatabaseMaterializedPostgreSQL>(
context, metadata_path, uuid, engine_define, create.attach,
context, metadata_path, uuid, create.attach,
database_name, postgres_database_name, connection_info,
std::move(postgresql_replica_settings));
}

View File

@ -699,4 +699,55 @@ ASTPtr DatabaseOnDisk::getCreateQueryFromMetadata(const String & database_metada
return ast;
}
void DatabaseOnDisk::modifySettingsMetadata(const SettingsChanges & settings_changes, ContextPtr query_context)
{
std::lock_guard lock(modify_settings_mutex);
auto create_query = getCreateDatabaseQuery()->clone();
auto * create = create_query->as<ASTCreateQuery>();
auto * settings = create->storage->settings;
if (settings)
{
auto & storage_settings = settings->changes;
for (const auto & change : settings_changes)
{
auto it = std::find_if(storage_settings.begin(), storage_settings.end(),
[&](const auto & prev){ return prev.name == change.name; });
if (it != storage_settings.end())
it->value = change.value;
else
storage_settings.push_back(change);
}
}
else
{
auto storage_settings = std::make_shared<ASTSetQuery>();
storage_settings->is_standalone = false;
storage_settings->changes = settings_changes;
create->storage->set(create->storage->settings, storage_settings->clone());
}
create->attach = true;
create->if_not_exists = false;
WriteBufferFromOwnString statement_buf;
formatAST(*create, statement_buf, false);
writeChar('\n', statement_buf);
String statement = statement_buf.str();
String database_name_escaped = escapeForFileName(database_name);
fs::path metadata_root_path = fs::canonical(query_context->getGlobalContext()->getPath());
fs::path metadata_file_tmp_path = fs::path(metadata_root_path) / "metadata" / (database_name_escaped + ".sql.tmp");
fs::path metadata_file_path = fs::path(metadata_root_path) / "metadata" / (database_name_escaped + ".sql");
WriteBufferFromFile out(metadata_file_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(statement, out);
out.next();
if (getContext()->getSettingsRef().fsync_metadata)
out.sync();
out.close();
fs::rename(metadata_file_tmp_path, metadata_file_path);
}
}

View File

@ -74,6 +74,8 @@ public:
void checkMetadataFilenameAvailability(const String & to_table_name) const;
void checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name, std::unique_lock<std::mutex> &) const;
void modifySettingsMetadata(const SettingsChanges & settings_changes, ContextPtr query_context);
protected:
static constexpr const char * create_suffix = ".tmp";
static constexpr const char * drop_suffix = ".tmp_drop";
@ -97,6 +99,9 @@ protected:
const String metadata_path;
const String data_path;
/// For alter settings.
std::mutex modify_settings_mutex;
};
}

View File

@ -16,7 +16,8 @@ class DatabaseOrdinary : public DatabaseOnDisk
public:
DatabaseOrdinary(const String & name_, const String & metadata_path_, ContextPtr context);
DatabaseOrdinary(
const String & name_, const String & metadata_path_, const String & data_path_, const String & logger, ContextPtr context_);
const String & name_, const String & metadata_path_, const String & data_path_,
const String & logger, ContextPtr context_);
String getEngineName() const override { return "Ordinary"; }

View File

@ -24,6 +24,8 @@ struct IndicesDescription;
struct StorageInMemoryMetadata;
struct StorageID;
class ASTCreateQuery;
class AlterCommands;
class SettingsChanges;
using DictionariesWithID = std::vector<std::pair<String, UUID>>;
namespace ErrorCodes
@ -280,6 +282,13 @@ public:
/// Delete data and metadata stored inside the database, if exists.
virtual void drop(ContextPtr /*context*/) {}
virtual void applySettingsChanges(const SettingsChanges &, ContextPtr)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Database engine {} either does not support settings, or does not support altering settings",
getEngineName());
}
virtual ~IDatabase() = default;
protected:

View File

@ -5,45 +5,47 @@
#include <Storages/PostgreSQL/StorageMaterializedPostgreSQL.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <common/logger_useful.h>
#include <Common/Macros.h>
#include <Core/UUID.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseAtomic.h>
#include <Storages/StoragePostgreSQL.h>
#include <Storages/AlterCommands.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Common/escapeForFileName.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
#include <Common/Macros.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
extern const int QUERY_NOT_ALLOWED;
extern const int UNKNOWN_TABLE;
extern const int BAD_ARGUMENTS;
}
DatabaseMaterializedPostgreSQL::DatabaseMaterializedPostgreSQL(
ContextPtr context_,
const String & metadata_path_,
UUID uuid_,
const ASTStorage * database_engine_define_,
bool is_attach_,
const String & database_name_,
const String & postgres_database_name,
const postgres::ConnectionInfo & connection_info_,
std::unique_ptr<MaterializedPostgreSQLSettings> settings_)
: DatabaseAtomic(database_name_, metadata_path_, uuid_, "DatabaseMaterializedPostgreSQL (" + database_name_ + ")", context_)
, database_engine_define(database_engine_define_->clone())
, is_attach(is_attach_)
, remote_database_name(postgres_database_name)
, connection_info(connection_info_)
@ -64,11 +66,10 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
*settings,
/* is_materialized_postgresql_database = */ true);
postgres::Connection connection(connection_info);
NameSet tables_to_replicate;
try
{
tables_to_replicate = replication_handler->fetchRequiredTables(connection);
tables_to_replicate = replication_handler->fetchRequiredTables();
}
catch (...)
{
@ -87,12 +88,12 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
if (storage)
{
/// Nested table was already created and synchronized.
storage = StorageMaterializedPostgreSQL::create(storage, getContext());
storage = StorageMaterializedPostgreSQL::create(storage, getContext(), remote_database_name, table_name);
}
else
{
/// Nested table does not exist and will be created by replication thread.
storage = StorageMaterializedPostgreSQL::create(StorageID(database_name, table_name), getContext());
storage = StorageMaterializedPostgreSQL::create(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
}
/// Cache MaterializedPostgreSQL wrapper over nested table.
@ -126,6 +127,41 @@ void DatabaseMaterializedPostgreSQL::loadStoredObjects(
}
void DatabaseMaterializedPostgreSQL::applySettingsChanges(const SettingsChanges & settings_changes, ContextPtr query_context)
{
std::lock_guard lock(handler_mutex);
bool need_update_on_disk = false;
for (const auto & change : settings_changes)
{
if (!settings->has(change.name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine {} does not support setting `{}`", getEngineName(), change.name);
if ((change.name == "materialized_postgresql_tables_list"))
{
if (!query_context->isInternalQuery())
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Changing setting `{}` is not allowed", change.name);
need_update_on_disk = true;
}
else if ((change.name == "materialized_postgresql_allow_automatic_update") || (change.name == "materialized_postgresql_max_block_size"))
{
replication_handler->setSetting(change);
need_update_on_disk = true;
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown setting");
}
settings->applyChange(change);
}
if (need_update_on_disk)
DatabaseOnDisk::modifySettingsMetadata(settings_changes, query_context);
}
StoragePtr DatabaseMaterializedPostgreSQL::tryGetTable(const String & name, ContextPtr local_context) const
{
/// In otder to define which table access is needed - to MaterializedPostgreSQL table (only in case of SELECT queries) or
@ -153,6 +189,64 @@ StoragePtr DatabaseMaterializedPostgreSQL::tryGetTable(const String & name, Cont
}
/// `except` is not empty in case it is detach and it will contain only one table name - name of detached table.
/// In case we have a user defined setting `materialized_postgresql_tables_list`, then list of tables is always taken there.
/// Otherwise we traverse materialized storages to find out the list.
String DatabaseMaterializedPostgreSQL::getFormattedTablesList(const String & except) const
{
String tables_list;
for (const auto & table : materialized_tables)
{
if (table.first == except)
continue;
if (!tables_list.empty())
tables_list += ',';
tables_list += table.first;
}
return tables_list;
}
ASTPtr DatabaseMaterializedPostgreSQL::getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const
{
if (!local_context->hasQueryContext())
return DatabaseAtomic::getCreateTableQueryImpl(table_name, local_context, throw_on_error);
std::lock_guard lock(handler_mutex);
auto storage = StorageMaterializedPostgreSQL::create(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
auto ast_storage = replication_handler->getCreateNestedTableQuery(storage.get(), table_name);
assert_cast<ASTCreateQuery *>(ast_storage.get())->uuid = UUIDHelpers::generateV4();
return ast_storage;
}
ASTPtr DatabaseMaterializedPostgreSQL::createAlterSettingsQuery(const SettingChange & new_setting)
{
auto set = std::make_shared<ASTSetQuery>();
set->is_standalone = false;
set->changes = {new_setting};
auto command = std::make_shared<ASTAlterCommand>();
command->type = ASTAlterCommand::Type::MODIFY_DATABASE_SETTING;
command->settings_changes = std::move(set);
auto command_list = std::make_shared<ASTExpressionList>();
command_list->children.push_back(command);
auto query = std::make_shared<ASTAlterQuery>();
auto * alter = query->as<ASTAlterQuery>();
alter->alter_object = ASTAlterQuery::AlterObjectType::DATABASE;
alter->database = database_name;
alter->set(alter->command_list, command_list);
return query;
}
void DatabaseMaterializedPostgreSQL::createTable(ContextPtr local_context, const String & table_name, const StoragePtr & table, const ASTPtr & query)
{
/// Create table query can only be called from replication thread.
@ -162,8 +256,123 @@ void DatabaseMaterializedPostgreSQL::createTable(ContextPtr local_context, const
return;
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Create table query allowed only for ReplacingMergeTree engine and from synchronization thread");
const auto & create = query->as<ASTCreateQuery>();
if (!create->attach)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "CREATE TABLE is not allowed for database engine {}. Use ATTACH TABLE instead", getEngineName());
/// Create ReplacingMergeTree table.
auto query_copy = query->clone();
auto * create_query = assert_cast<ASTCreateQuery *>(query_copy.get());
create_query->attach = false;
create_query->attach_short_syntax = false;
DatabaseAtomic::createTable(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), table_name, table, query_copy);
/// Attach MaterializedPostgreSQL table.
attachTable(table_name, table, {});
}
void DatabaseMaterializedPostgreSQL::attachTable(const String & table_name, const StoragePtr & table, const String & relative_table_path)
{
/// If there is query context then we need to attach materialized storage.
/// If there is no query context then we need to attach internal storage from atomic database.
if (CurrentThread::isInitialized() && CurrentThread::get().getQueryContext())
{
auto current_context = Context::createCopy(getContext()->getGlobalContext());
current_context->setInternalQuery(true);
/// We just came from createTable() and created nested table there. Add assert.
auto nested_table = DatabaseAtomic::tryGetTable(table_name, current_context);
assert(nested_table != nullptr);
try
{
auto tables_to_replicate = settings->materialized_postgresql_tables_list.value;
if (tables_to_replicate.empty())
tables_to_replicate = getFormattedTablesList();
/// tables_to_replicate can be empty if postgres database had no tables when this database was created.
SettingChange new_setting("materialized_postgresql_tables_list", tables_to_replicate.empty() ? table_name : (tables_to_replicate + "," + table_name));
auto alter_query = createAlterSettingsQuery(new_setting);
InterpreterAlterQuery(alter_query, current_context).execute();
auto storage = StorageMaterializedPostgreSQL::create(table, getContext(), remote_database_name, table_name);
materialized_tables[table_name] = storage;
std::lock_guard lock(handler_mutex);
replication_handler->addTableToReplication(dynamic_cast<StorageMaterializedPostgreSQL *>(storage.get()), table_name);
}
catch (...)
{
/// This is a failed attach table. Remove already created nested table.
DatabaseAtomic::dropTable(current_context, table_name, true);
throw;
}
}
else
{
DatabaseAtomic::attachTable(table_name, table, relative_table_path);
}
}
StoragePtr DatabaseMaterializedPostgreSQL::detachTable(const String & table_name)
{
/// If there is query context then we need to detach materialized storage.
/// If there is no query context then we need to detach internal storage from atomic database.
if (CurrentThread::isInitialized() && CurrentThread::get().getQueryContext())
{
auto & table_to_delete = materialized_tables[table_name];
if (!table_to_delete)
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Materialized table `{}` does not exist", table_name);
auto tables_to_replicate = getFormattedTablesList(table_name);
/// tables_to_replicate can be empty if postgres database had no tables when this database was created.
SettingChange new_setting("materialized_postgresql_tables_list", tables_to_replicate);
auto alter_query = createAlterSettingsQuery(new_setting);
{
auto current_context = Context::createCopy(getContext()->getGlobalContext());
current_context->setInternalQuery(true);
InterpreterAlterQuery(alter_query, current_context).execute();
}
auto nested = table_to_delete->as<StorageMaterializedPostgreSQL>()->getNested();
if (!nested)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Inner table `{}` does not exist", table_name);
std::lock_guard lock(handler_mutex);
replication_handler->removeTableFromReplication(table_name);
try
{
auto current_context = Context::createCopy(getContext()->getGlobalContext());
current_context->makeQueryContext();
DatabaseAtomic::dropTable(current_context, table_name, true);
}
catch (Exception & e)
{
/// We already removed this table from replication and adding it back will be an overkill..
/// TODO: this is bad, we leave a table lying somewhere not dropped, and if user will want
/// to move it back into replication, he will fail to do so because there is undropped nested with the same name.
/// This can also happen if we crash after removing table from replication and before dropping nested.
/// As a solution, we could drop a table if it already exists and add a fresh one instead for these two cases.
/// TODO: sounds good.
materialized_tables.erase(table_name);
e.addMessage("while removing table `" + table_name + "` from replication");
throw;
}
materialized_tables.erase(table_name);
return nullptr;
}
else
{
return DatabaseAtomic::detachTable(table_name);
}
}
@ -176,6 +385,7 @@ void DatabaseMaterializedPostgreSQL::shutdown()
void DatabaseMaterializedPostgreSQL::stopReplication()
{
std::lock_guard lock(handler_mutex);
if (replication_handler)
replication_handler->shutdown();
@ -193,6 +403,7 @@ void DatabaseMaterializedPostgreSQL::dropTable(ContextPtr local_context, const S
void DatabaseMaterializedPostgreSQL::drop(ContextPtr local_context)
{
std::lock_guard lock(handler_mutex);
if (replication_handler)
replication_handler->shutdownFinal();
@ -207,7 +418,6 @@ DatabaseTablesIteratorPtr DatabaseMaterializedPostgreSQL::getTablesIterator(
return DatabaseAtomic::getTablesIterator(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), filter_by_table_name);
}
}
#endif

View File

@ -32,7 +32,6 @@ public:
ContextPtr context_,
const String & metadata_path_,
UUID uuid_,
const ASTStorage * database_engine_define_,
bool is_attach_,
const String & database_name_,
const String & postgres_database_name,
@ -50,7 +49,11 @@ public:
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;
void createTable(ContextPtr context, const String & name, const StoragePtr & table, const ASTPtr & query) override;
void createTable(ContextPtr context, const String & table_name, const StoragePtr & table, const ASTPtr & query) override;
void attachTable(const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(const String & table_name) override;
void dropTable(ContextPtr local_context, const String & name, bool no_delay) override;
@ -58,12 +61,22 @@ public:
void stopReplication();
void applySettingsChanges(const SettingsChanges & settings_changes, ContextPtr query_context) override;
void shutdown() override;
String getPostgreSQLDatabaseName() const { return remote_database_name; }
protected:
ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const override;
private:
void startSynchronization();
ASTPtr database_engine_define;
ASTPtr createAlterSettingsQuery(const SettingChange & new_setting);
String getFormattedTablesList(const String & except = {}) const;
bool is_attach;
String remote_database_name;
postgres::ConnectionInfo connection_info;
@ -72,6 +85,7 @@ private:
std::shared_ptr<PostgreSQLReplicationHandler> replication_handler;
std::map<std::string, StoragePtr> materialized_tables;
mutable std::mutex tables_mutex;
mutable std::mutex handler_mutex;
};
}

View File

@ -290,12 +290,20 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::ReplicationTransaction & tx, const String & postgres_table_name, bool use_nulls,
bool with_primary_key, bool with_replica_identity_index);
template
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::nontransaction & tx, const String & postgres_table_name, bool use_nulls,
bool with_primary_key, bool with_replica_identity_index);
template
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::work & tx, const String & postgres_schema);
template
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::ReadTransaction & tx, const String & postgres_schema);
template
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::nontransaction & tx, const String & postgres_schema);
}
#endif

View File

@ -30,6 +30,7 @@
#if USE_LIBPQXX
# include <Storages/PostgreSQL/StorageMaterializedPostgreSQL.h>
# include <Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h>
#endif
namespace fs = std::filesystem;
@ -249,7 +250,9 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
#if USE_LIBPQXX
if (!context_->isInternalQuery() && (db_and_table.first->getEngineName() == "MaterializedPostgreSQL"))
{
db_and_table.second = std::make_shared<StorageMaterializedPostgreSQL>(std::move(db_and_table.second), getContext());
db_and_table.second = std::make_shared<StorageMaterializedPostgreSQL>(std::move(db_and_table.second), getContext(),
assert_cast<const DatabaseMaterializedPostgreSQL *>(db_and_table.first.get())->getPostgreSQLDatabaseName(),
db_and_table.second->getStorageID().table_name);
}
#endif

View File

@ -40,12 +40,23 @@ InterpreterAlterQuery::InterpreterAlterQuery(const ASTPtr & query_ptr_, ContextP
{
}
BlockIO InterpreterAlterQuery::execute()
{
BlockIO res;
const auto & alter = query_ptr->as<ASTAlterQuery &>();
if (alter.alter_object == ASTAlterQuery::AlterObjectType::DATABASE)
return executeToDatabase(alter);
else if (alter.alter_object == ASTAlterQuery::AlterObjectType::TABLE
|| alter.alter_object == ASTAlterQuery::AlterObjectType::LIVE_VIEW)
return executeToTable(alter);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown alter object type");
}
BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
{
BlockIO res;
if (!alter.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, getContext(), getRequiredAccess());
@ -81,7 +92,9 @@ BlockIO InterpreterAlterQuery::execute()
{
auto * command_ast = child->as<ASTAlterCommand>();
if (auto alter_command = AlterCommand::parse(command_ast))
{
alter_commands.emplace_back(std::move(*alter_command));
}
else if (auto partition_command = PartitionCommand::parse(command_ast))
{
partition_commands.emplace_back(std::move(*partition_command));
@ -95,7 +108,9 @@ BlockIO InterpreterAlterQuery::execute()
mutation_commands.emplace_back(std::move(*mut_command));
}
else if (auto live_view_command = LiveViewCommand::parse(command_ast))
{
live_view_commands.emplace_back(std::move(*live_view_command));
}
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
}
@ -152,6 +167,45 @@ BlockIO InterpreterAlterQuery::execute()
}
BlockIO InterpreterAlterQuery::executeToDatabase(const ASTAlterQuery & alter)
{
BlockIO res;
getContext()->checkAccess(getRequiredAccess());
DatabasePtr database = DatabaseCatalog::instance().getDatabase(alter.database);
AlterCommands alter_commands;
for (const auto & child : alter.command_list->children)
{
auto * command_ast = child->as<ASTAlterCommand>();
if (auto alter_command = AlterCommand::parse(command_ast))
alter_commands.emplace_back(std::move(*alter_command));
else
throw Exception("Wrong parameter type in ALTER DATABASE query", ErrorCodes::LOGICAL_ERROR);
}
if (!alter_commands.empty())
{
/// Only ALTER SETTING is supported.
for (const auto & command : alter_commands)
{
if (command.type != AlterCommand::MODIFY_DATABASE_SETTING)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported alter type for database engines");
}
for (const auto & command : alter_commands)
{
if (!command.ignore)
{
if (command.type == AlterCommand::MODIFY_DATABASE_SETTING)
database->applySettingsChanges(command.settings_changes, getContext());
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported alter command");
}
}
}
return res;
}
AccessRightsElements InterpreterAlterQuery::getRequiredAccess() const
{
AccessRightsElements required_access;
@ -351,6 +405,11 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
required_access.emplace_back(AccessType::ALTER_RENAME_COLUMN, database, table, column_name());
break;
}
case ASTAlterCommand::MODIFY_DATABASE_SETTING:
{
required_access.emplace_back(AccessType::ALTER_DATABASE_SETTINGS, database, table);
break;
}
case ASTAlterCommand::NO_TYPE: break;
}
@ -362,7 +421,7 @@ void InterpreterAlterQuery::extendQueryLogElemImpl(QueryLogElement & elem, const
const auto & alter = ast->as<const ASTAlterQuery &>();
elem.query_kind = "Alter";
if (alter.command_list != nullptr)
if (alter.command_list != nullptr && alter.alter_object != ASTAlterQuery::AlterObjectType::DATABASE)
{
// Alter queries already have their target table inserted into `elem`.
if (elem.query_tables.size() != 1)

View File

@ -9,6 +9,7 @@ namespace DB
class AccessRightsElements;
class ASTAlterCommand;
class ASTAlterQuery;
/** Allows you add or remove a column in the table.
@ -28,6 +29,10 @@ public:
private:
AccessRightsElements getRequiredAccess() const;
BlockIO executeToTable(const ASTAlterQuery & alter);
BlockIO executeToDatabase(const ASTAlterQuery & alter);
ASTPtr query_ptr;
};

View File

@ -571,6 +571,7 @@ ASTs InterpreterAlterImpl::getRewrittenQueries(
auto rewritten_rename_query = std::make_shared<ASTRenameQuery>();
rewritten_alter_query->database = mapped_to_database;
rewritten_alter_query->table = alter_query.table;
rewritten_alter_query->alter_object = ASTAlterQuery::AlterObjectType::TABLE;
rewritten_alter_query->set(rewritten_alter_query->command_list, std::make_shared<ASTExpressionList>());
String default_after_column;

View File

@ -400,6 +400,11 @@ void ASTAlterCommand::formatImpl(
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "RESET SETTING " << (settings.hilite ? hilite_none : "");
settings_resets->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::MODIFY_DATABASE_SETTING)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY SETTING " << (settings.hilite ? hilite_none : "");
settings_changes->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::MODIFY_QUERY)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY QUERY " << settings.nl_or_ws << (settings.hilite ? hilite_none : "");
@ -472,11 +477,24 @@ void ASTAlterQuery::formatQueryImpl(const FormatSettings & settings, FormatState
frame.need_parens = false;
std::string indent_str = settings.one_line ? "" : std::string(4u * frame.indent, ' ');
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str;
if (is_live_view)
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ALTER LIVE VIEW " << (settings.hilite ? hilite_none : "");
else
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ALTER TABLE " << (settings.hilite ? hilite_none : "");
switch (alter_object)
{
case AlterObjectType::TABLE:
settings.ostr << "ALTER TABLE ";
break;
case AlterObjectType::DATABASE:
settings.ostr << "ALTER DATABASE ";
break;
case AlterObjectType::LIVE_VIEW:
settings.ostr << "ALTER LIVE VIEW ";
break;
default:
break;
}
settings.ostr << (settings.hilite ? hilite_none : "");
if (!table.empty())
{
@ -487,6 +505,11 @@ void ASTAlterQuery::formatQueryImpl(const FormatSettings & settings, FormatState
}
settings.ostr << indent_str << backQuoteIfNeed(table);
}
else if (alter_object == AlterObjectType::DATABASE && !database.empty())
{
settings.ostr << indent_str << backQuoteIfNeed(database);
}
formatOnCluster(settings);
settings.ostr << settings.nl_or_ws;

View File

@ -70,6 +70,8 @@ public:
NO_TYPE,
LIVE_VIEW_REFRESH,
MODIFY_DATABASE_SETTING,
};
Type type = NO_TYPE;
@ -210,7 +212,15 @@ protected:
class ASTAlterQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnCluster
{
public:
bool is_live_view{false}; /// true for ALTER LIVE VIEW
enum class AlterObjectType
{
TABLE,
DATABASE,
LIVE_VIEW,
UNKNOWN,
};
AlterObjectType alter_object = AlterObjectType::UNKNOWN;
ASTExpressionList * command_list = nullptr;

File diff suppressed because it is too large Load Diff

View File

@ -2,6 +2,7 @@
#include <Parsers/IParserBase.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ASTAlterQuery.h>
namespace DB
{
@ -45,9 +46,10 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
public:
bool is_live_view;
ASTAlterQuery::AlterObjectType alter_object;
ParserAlterCommandList(bool is_live_view_ = false) : is_live_view(is_live_view_) {}
ParserAlterCommandList(ASTAlterQuery::AlterObjectType alter_object_ = ASTAlterQuery::AlterObjectType::TABLE)
: alter_object(alter_object_) {}
};
@ -58,9 +60,10 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
public:
bool is_live_view;
ASTAlterQuery::AlterObjectType alter_object;
ParserAlterCommand(bool is_live_view_ = false) : is_live_view(is_live_view_) {}
ParserAlterCommand(ASTAlterQuery::AlterObjectType alter_object_ = ASTAlterQuery::AlterObjectType::TABLE)
: alter_object(alter_object_) {}
};

View File

@ -42,6 +42,22 @@ bool parseDatabaseAndTableName(IParser::Pos & pos, Expected & expected, String &
}
bool parseDatabase(IParser::Pos & pos, Expected & expected, String & database_str)
{
ParserToken s_dot(TokenType::Dot);
ParserIdentifier identifier_parser;
ASTPtr database;
database_str = "";
if (!identifier_parser.parse(pos, database, expected))
return false;
tryGetIdentifierNameInto(database, database_str);
return true;
}
bool parseDatabaseAndTableNameOrAsterisks(IParser::Pos & pos, Expected & expected, String & database, bool & any_database, String & table, bool & any_table)
{
return IParserBase::wrapParseImpl(pos, [&]

View File

@ -10,4 +10,6 @@ bool parseDatabaseAndTableName(IParser::Pos & pos, Expected & expected, String &
/// Parses [db.]name or [db.]* or [*.]*
bool parseDatabaseAndTableNameOrAsterisks(IParser::Pos & pos, Expected & expected, String & database, bool & any_database, String & table, bool & any_table);
bool parseDatabase(IParser::Pos & pos, Expected & expected, String & database_str);
}

View File

@ -312,6 +312,14 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
command.settings_changes = command_ast->settings_changes->as<ASTSetQuery &>().changes;
return command;
}
else if (command_ast->type == ASTAlterCommand::MODIFY_DATABASE_SETTING)
{
AlterCommand command;
command.ast = command_ast->clone();
command.type = AlterCommand::MODIFY_DATABASE_SETTING;
command.settings_changes = command_ast->settings_changes->as<ASTSetQuery &>().changes;
return command;
}
else if (command_ast->type == ASTAlterCommand::RESET_SETTING)
{
AlterCommand command;
@ -842,6 +850,7 @@ std::optional<MutationCommand> AlterCommand::tryConvertToMutationCommand(Storage
return result;
}
void AlterCommands::apply(StorageInMemoryMetadata & metadata, ContextPtr context) const
{
if (!prepared)
@ -960,6 +969,7 @@ void AlterCommands::prepare(const StorageInMemoryMetadata & metadata)
prepared = true;
}
void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPtr context) const
{
auto all_columns = metadata.columns;

View File

@ -13,6 +13,8 @@ namespace DB
{
class ASTAlterCommand;
class IDatabase;
using DatabasePtr = std::shared_ptr<IDatabase>;
/// Operation from the ALTER query (except for manipulation with PART/PARTITION).
/// Adding Nested columns is not expanded to add individual columns.
@ -42,6 +44,7 @@ struct AlterCommand
MODIFY_QUERY,
RENAME_COLUMN,
REMOVE_TTL,
MODIFY_DATABASE_SETTING,
};
/// Which property user wants to remove from column

View File

@ -8,6 +8,7 @@
#include <DataTypes/DataTypeNullable.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Common/SettingsChanges.h>
namespace DB
@ -26,8 +27,9 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
const std::string & start_lsn,
const size_t max_block_size_,
bool allow_automatic_update_,
Storages storages_)
: log(&Poco::Logger::get("PostgreSQLReaplicaConsumer"))
Storages storages_,
const String & name_for_logger)
: log(&Poco::Logger::get("PostgreSQLReplicaConsumer(" + name_for_logger + ")"))
, context(context_)
, replication_slot_name(replication_slot_name_)
, publication_name(publication_name_)
@ -270,12 +272,13 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
case 'I': // Insert
{
Int32 relation_id = readInt32(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
assert(!table_name.empty());
if (!isSyncAllowed(relation_id))
if (!isSyncAllowed(relation_id, table_name))
return;
Int8 new_tuple = readInt8(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
auto buffer = buffers.find(table_name);
assert(buffer != buffers.end());
@ -287,11 +290,12 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
case 'U': // Update
{
Int32 relation_id = readInt32(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
assert(!table_name.empty());
if (!isSyncAllowed(relation_id))
if (!isSyncAllowed(relation_id, table_name))
return;
const auto & table_name = relation_id_to_name[relation_id];
auto buffer = buffers.find(table_name);
assert(buffer != buffers.end());
@ -335,14 +339,15 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
case 'D': // Delete
{
Int32 relation_id = readInt32(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
assert(!table_name.empty());
if (!isSyncAllowed(relation_id))
if (!isSyncAllowed(relation_id, table_name))
return;
/// 0 or 1 if replica identity is set to full. For now only default replica identity is supported (with primary keys).
readInt8(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
auto buffer = buffers.find(table_name);
assert(buffer != buffers.end());
readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::DELETE);
@ -357,8 +362,6 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
constexpr size_t transaction_commit_timestamp_len = 8;
pos += unused_flags_len + commit_lsn_len + transaction_end_lsn_len + transaction_commit_timestamp_len;
LOG_DEBUG(log, "Current lsn: {} = {}", current_lsn, getLSNValue(current_lsn)); /// Will be removed
final_lsn = current_lsn;
break;
}
@ -371,7 +374,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
readString(replication_message, pos, size, relation_namespace);
readString(replication_message, pos, size, relation_name);
if (!isSyncAllowed(relation_id))
if (!isSyncAllowed(relation_id, relation_name))
return;
if (storages.find(relation_name) == storages.end())
@ -522,15 +525,35 @@ String MaterializedPostgreSQLConsumer::advanceLSN(std::shared_ptr<pqxx::nontrans
/// Sync for some table might not be allowed if:
/// 1. Table schema changed and might break synchronization.
/// 2. There is no storage for this table. (As a result of some exception or incorrect pg_publication)
bool MaterializedPostgreSQLConsumer::isSyncAllowed(Int32 relation_id)
/// 3. A new table was added to replication, it was loaded via snapshot, but consumer has not yet
/// read wal up to the lsn position of snapshot, from which table was loaded.
bool MaterializedPostgreSQLConsumer::isSyncAllowed(Int32 relation_id, const String & relation_name)
{
auto table_with_lsn = skip_list.find(relation_id);
if (deleted_tables.contains(relation_name))
return false;
auto new_table_with_lsn = waiting_list.find(relation_name);
if (new_table_with_lsn != waiting_list.end())
{
auto table_start_lsn = new_table_with_lsn->second;
assert(!table_start_lsn.empty());
if (getLSNValue(current_lsn) >= getLSNValue(table_start_lsn))
{
LOG_TRACE(log, "Synchronization is started for table: {} (start_lsn: {})", relation_name, table_start_lsn);
waiting_list.erase(new_table_with_lsn);
return true;
}
}
auto skipped_table_with_lsn = skip_list.find(relation_id);
/// Table is not present in a skip list - allow synchronization.
if (table_with_lsn == skip_list.end())
if (skipped_table_with_lsn == skip_list.end())
return true;
const auto & table_start_lsn = table_with_lsn->second;
const auto & table_start_lsn = skipped_table_with_lsn->second;
/// Table is in a skip list and has not yet received a valid lsn == it has not been reloaded.
if (table_start_lsn.empty())
@ -544,7 +567,7 @@ bool MaterializedPostgreSQLConsumer::isSyncAllowed(Int32 relation_id)
LOG_TRACE(log, "Synchronization is resumed for table: {} (start_lsn: {})",
relation_id_to_name[relation_id], table_start_lsn);
skip_list.erase(table_with_lsn);
skip_list.erase(skipped_table_with_lsn);
return true;
}
@ -576,6 +599,51 @@ void MaterializedPostgreSQLConsumer::markTableAsSkipped(Int32 relation_id, const
}
void MaterializedPostgreSQLConsumer::addNested(const String & postgres_table_name, StoragePtr nested_storage, const String & table_start_lsn)
{
/// Cache new pointer to replacingMergeTree table.
storages.emplace(postgres_table_name, nested_storage);
/// Add new in-memory buffer.
buffers.emplace(postgres_table_name, Buffer(nested_storage));
/// Replication consumer will read wall and check for currently processed table whether it is allowed to start applying
/// changes to this table.
waiting_list[postgres_table_name] = table_start_lsn;
}
void MaterializedPostgreSQLConsumer::updateNested(const String & table_name, StoragePtr nested_storage, Int32 table_id, const String & table_start_lsn)
{
/// Cache new pointer to replacingMergeTree table.
storages[table_name] = nested_storage;
/// Create a new empty buffer (with updated metadata), where data is first loaded before syncing into actual table.
auto & buffer = buffers.find(table_name)->second;
buffer.createEmptyBuffer(nested_storage);
/// Set start position to valid lsn. Before it was an empty string. Further read for table allowed, if it has a valid lsn.
skip_list[table_id] = table_start_lsn;
}
void MaterializedPostgreSQLConsumer::removeNested(const String & postgres_table_name)
{
storages.erase(postgres_table_name);
buffers.erase(postgres_table_name);
deleted_tables.insert(postgres_table_name);
}
void MaterializedPostgreSQLConsumer::setSetting(const SettingChange & setting)
{
if (setting.name == "materialized_postgresql_max_block_size")
max_block_size = setting.value.safeGet<UInt64>();
else if (setting.name == "materialized_postgresql_allow_automatic_update")
allow_automatic_update = setting.value.safeGet<bool>();
}
/// Read binary changes from replication slot via COPY command (starting from current lsn in a slot).
bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
{
@ -700,21 +768,10 @@ bool MaterializedPostgreSQLConsumer::consume(std::vector<std::pair<Int32, String
/// Read up to max_block_size changed (approximately - in same cases might be more).
/// false: no data was read, reschedule.
/// true: some data was read, schedule as soon as possible.
return readFromReplicationSlot();
auto read_next = readFromReplicationSlot();
LOG_TRACE(log, "LSN: {}", final_lsn);
return read_next;
}
void MaterializedPostgreSQLConsumer::updateNested(const String & table_name, StoragePtr nested_storage, Int32 table_id, const String & table_start_lsn)
{
/// Cache new pointer to replacingMergeTree table.
storages[table_name] = nested_storage;
/// Create a new empty buffer (with updated metadata), where data is first loaded before syncing into actual table.
auto & buffer = buffers.find(table_name)->second;
buffer.createEmptyBuffer(nested_storage);
/// Set start position to valid lsn. Before it was an empty string. Further read for table allowed, if it has a valid lsn.
skip_list[table_id] = table_start_lsn;
}
}

View File

@ -13,6 +13,7 @@
namespace DB
{
struct SettingChange;
class MaterializedPostgreSQLConsumer
{
@ -27,7 +28,8 @@ public:
const String & start_lsn,
const size_t max_block_size_,
bool allow_automatic_update_,
Storages storages_);
Storages storages_,
const String & name_for_logger);
bool consume(std::vector<std::pair<Int32, String>> & skipped_tables);
@ -35,6 +37,12 @@ public:
/// process if it was skipped due to schema changes.
void updateNested(const String & table_name, StoragePtr nested_storage, Int32 table_id, const String & table_start_lsn);
void addNested(const String & postgres_table_name, StoragePtr nested_storage, const String & table_start_lsn);
void removeNested(const String & postgres_table_name);
void setSetting(const SettingChange & setting);
private:
/// Read approximarely up to max_block_size changes from WAL.
bool readFromReplicationSlot();
@ -45,7 +53,7 @@ private:
void processReplicationMessage(const char * replication_message, size_t size);
bool isSyncAllowed(Int32 relation_id);
bool isSyncAllowed(Int32 relation_id, const String & relation_name);
struct Buffer
{
@ -105,15 +113,18 @@ private:
/// current_lsn converted from String to Int64 via getLSNValue().
UInt64 lsn_value;
const size_t max_block_size;
size_t max_block_size;
bool allow_automatic_update;
String table_to_insert;
/// List of tables which need to be synced after last replication stream.
/// Holds `postgres_table_name` set.
std::unordered_set<std::string> tables_to_sync;
/// `postgres_table_name` -> ReplacingMergeTree table.
Storages storages;
/// `postgres_table_name` -> In-memory buffer.
Buffers buffers;
std::unordered_map<Int32, String> relation_id_to_name;
@ -133,6 +144,7 @@ private:
/// if relation definition has changed since the last relation definition message.
std::unordered_map<Int32, SchemaData> schema_data;
/// `postgres_relation_id` -> `start_lsn`
/// skip_list contains relation ids for tables on which ddl was performed, which can break synchronization.
/// This breaking changes are detected in replication stream in according replication message and table is added to skip list.
/// After it is finished, a temporary replication slot is created with 'export snapshot' option, and start_lsn is returned.
@ -142,5 +154,16 @@ private:
/// No needed message, related to reloaded table will be missed, because messages are not consumed in the meantime,
/// i.e. we will not miss the first start_lsn position for reloaded table.
std::unordered_map<Int32, String> skip_list;
/// `postgres_table_name` -> `start_lsn`
/// For dynamically added tables. A new table is loaded via snapshot and we get a start lsn position.
/// Once consumer reaches this position, it starts applying replication messages to this table.
/// Inside replication handler we have to ensure that replication consumer does not read data from wal
/// while the process of adding a table to replication is not finished,
/// because we might go beyond this start lsn position before consumer knows that a new table was added.
std::unordered_map<String, String> waiting_list;
/// Since replication may be some time behind, we need to ensure that replication messages for deleted tables are ignored.
std::unordered_set<String> deleted_tables;
};
}

View File

@ -11,18 +11,20 @@
#include <Common/setThreadName.h>
#include <Interpreters/Context.h>
#include <DataStreams/copyData.h>
#include <Databases/DatabaseOnDisk.h>
namespace DB
{
static const auto RESCHEDULE_MS = 500;
static const auto RESCHEDULE_MS = 1000;
static const auto BACKOFF_TRESHOLD_MS = 10000;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int POSTGRESQL_REPLICATION_INTERNAL_ERROR;
}
PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
@ -45,7 +47,6 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
, is_materialized_postgresql_database(is_materialized_postgresql_database_)
, tables_list(replication_settings.materialized_postgresql_tables_list)
, user_provided_snapshot(replication_settings.materialized_postgresql_snapshot)
, connection(std::make_shared<postgres::Connection>(connection_info_))
, milliseconds_to_wait(RESCHEDULE_MS)
{
replication_slot = replication_settings.materialized_postgresql_replication_slot;
@ -77,7 +78,8 @@ void PostgreSQLReplicationHandler::waitConnectionAndStart()
{
try
{
connection->connect(); /// Will throw pqxx::broken_connection if no connection at the moment
postgres::Connection connection(connection_info);
connection.connect(); /// Will throw pqxx::broken_connection if no connection at the moment
startSynchronization(false);
}
catch (const pqxx::broken_connection & pqxx_error)
@ -102,14 +104,9 @@ void PostgreSQLReplicationHandler::shutdown()
void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
{
{
pqxx::work tx(connection->getRef());
createPublicationIfNeeded(tx);
tx.commit();
}
postgres::Connection replication_connection(connection_info, /* replication */true);
pqxx::nontransaction tx(replication_connection.getRef());
createPublicationIfNeeded(tx);
/// List of nested tables (table_name -> nested_storage), which is passed to replication consumer.
std::unordered_map<String, StoragePtr> nested_storages;
@ -120,9 +117,12 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
/// 2. if replication slot already exist, start_lsn is read from pg_replication_slots as
/// `confirmed_flush_lsn` - the address (LSN) up to which the logical slot's consumer has confirmed receiving data.
/// Data older than this is not available anymore.
/// TODO: more tests
String snapshot_name, start_lsn;
/// Also lets have a separate non-replication connection, because we need two parallel transactions and
/// one connection can have one transaction at a time.
auto tmp_connection = std::make_shared<postgres::Connection>(connection_info);
auto initial_sync = [&]()
{
LOG_TRACE(log, "Starting tables sync load");
@ -144,7 +144,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
{
try
{
nested_storages[table_name] = loadFromSnapshot(snapshot_name, table_name, storage->as <StorageMaterializedPostgreSQL>());
nested_storages[table_name] = loadFromSnapshot(*tmp_connection, snapshot_name, table_name, storage->as<StorageMaterializedPostgreSQL>());
}
catch (Exception & e)
{
@ -186,6 +186,8 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>();
try
{
/// FIXME: Looks like it is possible we might get here if there is no nested storage or at least nested storage id field might be empty.
/// Caught it somehow when doing something else incorrectly, but do not see any reason how it could happen.
/// Try load nested table, set materialized table metadata.
nested_storages[table_name] = materialized_storage->prepare();
}
@ -209,13 +211,14 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
/// Handler uses it only for loadFromSnapshot and shutdown methods.
consumer = std::make_shared<MaterializedPostgreSQLConsumer>(
context,
connection,
std::move(tmp_connection),
replication_slot,
publication_name,
start_lsn,
max_block_size,
allow_automatic_update,
nested_storages);
nested_storages,
(is_materialized_postgresql_database ? remote_database_name : remote_database_name + '.' + tables_list));
consumer_task->activateAndSchedule();
@ -224,10 +227,19 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
}
StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(String & snapshot_name, const String & table_name,
ASTPtr PostgreSQLReplicationHandler::getCreateNestedTableQuery(StorageMaterializedPostgreSQL * storage, const String & table_name)
{
postgres::Connection connection(connection_info);
pqxx::nontransaction tx(connection.getRef());
auto table_structure = std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, table_name, true, true, true));
return storage->getCreateNestedTableQuery(std::move(table_structure));
}
StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection & connection, String & snapshot_name, const String & table_name,
StorageMaterializedPostgreSQL * materialized_storage)
{
auto tx = std::make_shared<pqxx::ReplicationTransaction>(connection->getRef());
auto tx = std::make_shared<pqxx::ReplicationTransaction>(connection.getRef());
std::string query_str = fmt::format("SET TRANSACTION SNAPSHOT '{}'", snapshot_name);
tx->exec(query_str);
@ -312,7 +324,7 @@ void PostgreSQLReplicationHandler::consumerFunc()
}
bool PostgreSQLReplicationHandler::isPublicationExist(pqxx::work & tx)
bool PostgreSQLReplicationHandler::isPublicationExist(pqxx::nontransaction & tx)
{
std::string query_str = fmt::format("SELECT exists (SELECT 1 FROM pg_publication WHERE pubname = '{}')", publication_name);
pqxx::result result{tx.exec(query_str)};
@ -321,7 +333,7 @@ bool PostgreSQLReplicationHandler::isPublicationExist(pqxx::work & tx)
}
void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx)
void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::nontransaction & tx)
{
auto publication_exists = isPublicationExist(tx);
@ -332,7 +344,7 @@ void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx)
"Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped",
publication_name);
connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
dropPublication(tx);
}
if (!is_attach || !publication_exists)
@ -413,7 +425,7 @@ void PostgreSQLReplicationHandler::createReplicationSlot(
pqxx::result result{tx.exec(query_str)};
start_lsn = result[0][1].as<std::string>();
snapshot_name = result[0][2].as<std::string>();
LOG_TRACE(log, "Created replication slot: {}, start lsn: {}", replication_slot, start_lsn);
LOG_TRACE(log, "Created replication slot: {}, start lsn: {}, snapshot: {}", replication_slot, start_lsn, snapshot_name);
}
catch (Exception & e)
{
@ -448,16 +460,41 @@ void PostgreSQLReplicationHandler::dropPublication(pqxx::nontransaction & tx)
}
void PostgreSQLReplicationHandler::addTableToPublication(pqxx::nontransaction & ntx, const String & table_name)
{
std::string query_str = fmt::format("ALTER PUBLICATION {} ADD TABLE ONLY {}", publication_name, doubleQuoteString(table_name));
ntx.exec(query_str);
LOG_TRACE(log, "Added table `{}` to publication `{}`", table_name, publication_name);
}
void PostgreSQLReplicationHandler::removeTableFromPublication(pqxx::nontransaction & ntx, const String & table_name)
{
std::string query_str = fmt::format("ALTER PUBLICATION {} DROP TABLE ONLY {}", publication_name, doubleQuoteString(table_name));
ntx.exec(query_str);
LOG_TRACE(log, "Removed table `{}` from publication `{}`", table_name, publication_name);
}
void PostgreSQLReplicationHandler::setSetting(const SettingChange & setting)
{
consumer_task->deactivate();
consumer->setSetting(setting);
consumer_task->activateAndSchedule();
}
void PostgreSQLReplicationHandler::shutdownFinal()
{
try
{
shutdown();
connection->execWithRetry([&](pqxx::nontransaction & tx){ dropPublication(tx); });
postgres::Connection connection(connection_info);
connection.execWithRetry([&](pqxx::nontransaction & tx){ dropPublication(tx); });
String last_committed_lsn;
connection->execWithRetry([&](pqxx::nontransaction & tx)
connection.execWithRetry([&](pqxx::nontransaction & tx)
{
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */true))
dropReplicationSlot(tx, /* temporary */true);
@ -466,7 +503,7 @@ void PostgreSQLReplicationHandler::shutdownFinal()
if (user_managed_slot)
return;
connection->execWithRetry([&](pqxx::nontransaction & tx)
connection.execWithRetry([&](pqxx::nontransaction & tx)
{
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */false))
dropReplicationSlot(tx, /* temporary */false);
@ -482,12 +519,17 @@ void PostgreSQLReplicationHandler::shutdownFinal()
/// Used by MaterializedPostgreSQL database engine.
NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection & connection_)
NameSet PostgreSQLReplicationHandler::fetchRequiredTables()
{
pqxx::work tx(connection_.getRef());
postgres::Connection connection(connection_info);
NameSet result_tables;
bool publication_exists_before_startup;
{
pqxx::nontransaction tx(connection.getRef());
publication_exists_before_startup = isPublicationExist(tx);
}
bool publication_exists_before_startup = isPublicationExist(tx);
LOG_DEBUG(log, "Publication exists: {}, is attach: {}", publication_exists_before_startup, is_attach);
Strings expected_tables;
@ -508,7 +550,7 @@ NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection &
"Publication {} already exists, but it is a CREATE query, not ATTACH. Publication will be dropped",
publication_name);
connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
connection.execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
}
else
{
@ -518,13 +560,20 @@ NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection &
"Publication {} already exists and tables list is empty. Assuming publication is correct.",
publication_name);
result_tables = fetchPostgreSQLTablesList(tx, postgres_schema);
{
pqxx::nontransaction tx(connection.getRef());
result_tables = fetchPostgreSQLTablesList(tx, postgres_schema);
}
}
/// Check tables list from publication is the same as expected tables list.
/// If not - drop publication and return expected tables list.
else
{
result_tables = fetchTablesFromPublication(tx);
{
pqxx::work tx(connection.getRef());
result_tables = fetchTablesFromPublication(tx);
}
NameSet diff;
std::set_symmetric_difference(expected_tables.begin(), expected_tables.end(),
result_tables.begin(), result_tables.end(),
@ -543,7 +592,7 @@ NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection &
"Publication {} already exists, but specified tables list differs from publication tables list in tables: {}.",
publication_name, diff_tables);
connection->execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
connection.execWithRetry([&](pqxx::nontransaction & tx_){ dropPublication(tx_); });
}
}
}
@ -560,11 +609,13 @@ NameSet PostgreSQLReplicationHandler::fetchRequiredTables(postgres::Connection &
/// Fetch all tables list from database. Publication does not exist yet, which means
/// that no replication took place. Publication will be created in
/// startSynchronization method.
result_tables = fetchPostgreSQLTablesList(tx, postgres_schema);
{
pqxx::nontransaction tx(connection.getRef());
result_tables = fetchPostgreSQLTablesList(tx, postgres_schema);
}
}
}
tx.commit();
return result_tables;
}
@ -591,6 +642,88 @@ PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
}
void PostgreSQLReplicationHandler::addTableToReplication(StorageMaterializedPostgreSQL * materialized_storage, const String & postgres_table_name)
{
/// Note: we have to ensure that replication consumer task is stopped when we reload table, because otherwise
/// it can read wal beyond start lsn position (from which this table is being loaded), which will result in losing data.
consumer_task->deactivate();
try
{
LOG_TRACE(log, "Adding table `{}` to replication", postgres_table_name);
postgres::Connection replication_connection(connection_info, /* replication */true);
String snapshot_name, start_lsn;
StoragePtr nested_storage;
{
pqxx::nontransaction tx(replication_connection.getRef());
if (isReplicationSlotExist(tx, start_lsn, /* temporary */true))
dropReplicationSlot(tx, /* temporary */true);
createReplicationSlot(tx, start_lsn, snapshot_name, /* temporary */true);
/// Protect against deadlock.
auto nested = DatabaseCatalog::instance().tryGetTable(materialized_storage->getNestedStorageID(), materialized_storage->getNestedTableContext());
if (!nested)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Internal table was not created");
{
postgres::Connection tmp_connection(connection_info);
nested_storage = loadFromSnapshot(tmp_connection, snapshot_name, postgres_table_name, materialized_storage);
}
auto nested_table_id = nested_storage->getStorageID();
materialized_storage->setNestedStorageID(nested_table_id);
nested_storage = materialized_storage->prepare();
}
{
pqxx::nontransaction tx(replication_connection.getRef());
addTableToPublication(tx, postgres_table_name);
}
/// Pass storage to consumer and lsn position, from which to start receiving replication messages for this table.
consumer->addNested(postgres_table_name, nested_storage, start_lsn);
LOG_TRACE(log, "Table `{}` successfully added to replication", postgres_table_name);
}
catch (...)
{
consumer_task->activate();
consumer_task->scheduleAfter(RESCHEDULE_MS);
auto error_message = getCurrentExceptionMessage(false);
throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR,
"Failed to add table `{}` to replication. Info: {}", postgres_table_name, error_message);
}
consumer_task->activateAndSchedule();
}
void PostgreSQLReplicationHandler::removeTableFromReplication(const String & postgres_table_name)
{
consumer_task->deactivate();
try
{
postgres::Connection replication_connection(connection_info, /* replication */true);
{
pqxx::nontransaction tx(replication_connection.getRef());
removeTableFromPublication(tx, postgres_table_name);
}
/// Pass storage to consumer and lsn position, from which to start receiving replication messages for this table.
consumer->removeNested(postgres_table_name);
}
catch (...)
{
consumer_task->activate();
consumer_task->scheduleAfter(RESCHEDULE_MS);
auto error_message = getCurrentExceptionMessage(false);
throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR,
"Failed to remove table `{}` from replication. Info: {}", postgres_table_name, error_message);
}
consumer_task->activateAndSchedule();
}
void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pair<Int32, String>> & relation_data)
{
/// If table schema has changed, the table stops consuming changes from replication stream.
@ -608,6 +741,7 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
dropReplicationSlot(tx, /* temporary */true);
createReplicationSlot(tx, start_lsn, snapshot_name, /* temporary */true);
postgres::Connection tmp_connection(connection_info);
for (const auto & [relation_id, table_name] : relation_data)
{
@ -618,7 +752,7 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
auto temp_materialized_storage = materialized_storage->createTemporary();
/// This snapshot is valid up to the end of the transaction, which exported it.
StoragePtr temp_nested_storage = loadFromSnapshot(snapshot_name, table_name,
StoragePtr temp_nested_storage = loadFromSnapshot(tmp_connection, snapshot_name, table_name,
temp_materialized_storage->as <StorageMaterializedPostgreSQL>());
auto table_id = materialized_storage->getNestedStorageID();

View File

@ -4,17 +4,14 @@
#include "MaterializedPostgreSQLSettings.h"
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Core/PostgreSQL/Utils.h>
#include <Parsers/ASTCreateQuery.h>
namespace DB
{
/// IDEA: There is ALTER PUBLICATION command to dynamically add and remove tables for replicating (the command is transactional).
/// (Probably, if in a replication stream comes a relation name, which does not currently
/// exist in CH, it can be loaded via snapshot while stream is stopped and then comparing wal positions with
/// current lsn and table start lsn.
class StorageMaterializedPostgreSQL;
struct SettingChange;
class PostgreSQLReplicationHandler
{
@ -42,24 +39,36 @@ public:
void addStorage(const std::string & table_name, StorageMaterializedPostgreSQL * storage);
/// Fetch list of tables which are going to be replicated. Used for database engine.
NameSet fetchRequiredTables(postgres::Connection & connection_);
NameSet fetchRequiredTables();
/// Start replication setup immediately.
void startSynchronization(bool throw_on_error);
ASTPtr getCreateNestedTableQuery(StorageMaterializedPostgreSQL * storage, const String & table_name);
void addTableToReplication(StorageMaterializedPostgreSQL * storage, const String & postgres_table_name);
void removeTableFromReplication(const String & postgres_table_name);
void setSetting(const SettingChange & setting);
private:
using MaterializedStorages = std::unordered_map<String, StorageMaterializedPostgreSQL *>;
/// Methods to manage Publication.
bool isPublicationExist(pqxx::work & tx);
bool isPublicationExist(pqxx::nontransaction & tx);
void createPublicationIfNeeded(pqxx::work & tx);
void createPublicationIfNeeded(pqxx::nontransaction & tx);
NameSet fetchTablesFromPublication(pqxx::work & tx);
void dropPublication(pqxx::nontransaction & ntx);
void addTableToPublication(pqxx::nontransaction & ntx, const String & table_name);
void removeTableFromPublication(pqxx::nontransaction & ntx, const String & table_name);
/// Methods to manage Replication Slots.
bool isReplicationSlotExist(pqxx::nontransaction & tx, String & start_lsn, bool temporary = false);
@ -74,7 +83,7 @@ private:
void consumerFunc();
StoragePtr loadFromSnapshot(std::string & snapshot_name, const String & table_name, StorageMaterializedPostgreSQL * materialized_storage);
StoragePtr loadFromSnapshot(postgres::Connection & connection, std::string & snapshot_name, const String & table_name, StorageMaterializedPostgreSQL * materialized_storage);
void reloadFromSnapshot(const std::vector<std::pair<Int32, String>> & relation_data);
@ -112,9 +121,6 @@ private:
String replication_slot, publication_name;
/// Shared between replication_consumer and replication_handler, but never accessed concurrently.
std::shared_ptr<postgres::Connection> connection;
/// Replication consumer. Manages decoding of replication stream and syncing into tables.
std::shared_ptr<MaterializedPostgreSQLConsumer> consumer;

View File

@ -51,6 +51,7 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
std::unique_ptr<MaterializedPostgreSQLSettings> replication_settings)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, log(&Poco::Logger::get("StorageMaterializedPostgreSQL(" + postgres::formatNameForLogs(remote_database_name, remote_table_name_) + ")"))
, is_materialized_postgresql_database(false)
, has_nested(false)
, nested_context(makeNestedTableContext(context_->getGlobalContext()))
@ -90,12 +91,18 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
/// For the case of MaterializePosgreSQL database engine.
/// It is used when nested ReplacingMergeeTree table has not yet be created by replication thread.
/// In this case this storage can't be used for read queries.
StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(const StorageID & table_id_, ContextPtr context_)
StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
const StorageID & table_id_,
ContextPtr context_,
const String & postgres_database_name,
const String & postgres_table_name)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, log(&Poco::Logger::get("StorageMaterializedPostgreSQL(" + postgres::formatNameForLogs(postgres_database_name, postgres_table_name) + ")"))
, is_materialized_postgresql_database(true)
, has_nested(false)
, nested_context(makeNestedTableContext(context_->getGlobalContext()))
, nested_table_id(table_id_)
{
}
@ -103,9 +110,14 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(const StorageID & t
/// Constructor for MaterializedPostgreSQL table engine - for the case of MaterializePosgreSQL database engine.
/// It is used when nested ReplacingMergeeTree table has already been created by replication thread.
/// This storage is ready to handle read queries.
StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(StoragePtr nested_storage_, ContextPtr context_)
StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
StoragePtr nested_storage_,
ContextPtr context_,
const String & postgres_database_name,
const String & postgres_table_name)
: IStorage(nested_storage_->getStorageID())
, WithContext(context_->getGlobalContext())
, log(&Poco::Logger::get("StorageMaterializedPostgreSQL(" + postgres::formatNameForLogs(postgres_database_name, postgres_table_name) + ")"))
, is_materialized_postgresql_database(true)
, has_nested(true)
, nested_context(makeNestedTableContext(context_->getGlobalContext()))
@ -131,7 +143,7 @@ StoragePtr StorageMaterializedPostgreSQL::createTemporary() const
}
auto new_context = Context::createCopy(context);
return StorageMaterializedPostgreSQL::create(tmp_table_id, new_context);
return StorageMaterializedPostgreSQL::create(tmp_table_id, new_context, "", table_id.table_name);
}
@ -171,9 +183,13 @@ StorageID StorageMaterializedPostgreSQL::getNestedStorageID() const
void StorageMaterializedPostgreSQL::createNestedIfNeeded(PostgreSQLTableStructurePtr table_structure)
{
if (tryGetNested())
return;
const auto ast_create = getCreateNestedTableQuery(std::move(table_structure));
auto table_id = getStorageID();
auto tmp_nested_table_id = StorageID(table_id.database_name, getNestedTableName());
LOG_DEBUG(log, "Creating clickhouse table for postgresql table {}", table_id.getNameForLogs());
try
{
@ -463,9 +479,10 @@ void registerStorageMaterializedPostgreSQL(StorageFactory & factory)
postgresql_replication_settings->loadFromQuery(*args.storage_def);
if (engine_args.size() != 5)
throw Exception("Storage MaterializedPostgreSQL requires 5 parameters: "
"PostgreSQL('host:port', 'database', 'table', 'username', 'password'",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage MaterializedPostgreSQL requires 5 parameters: "
"PostgreSQL('host:port', 'database', 'table', 'username', 'password'. Got {}",
engine_args.size());
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.getContext());

View File

@ -49,7 +49,6 @@ namespace DB
*
* All database methods, apart from tryGetTable(), are devoted only to nested table.
* NOTE: It makes sense to allow rename method for MaterializedPostgreSQL table via database method.
* TODO: Make sure replication-to-table data channel is done only by relation_id.
*
* Also main table has the same InMemoryMetadata as its nested table, so if metadata of nested table changes - main table also has
* to update its metadata, because all read requests are passed to MaterializedPostgreSQL table and then it redirects read
@ -57,7 +56,7 @@ namespace DB
*
* When there is a need to update table structure, there will be created a new MaterializedPostgreSQL table with its own nested table,
* it will have updated table schema and all data will be loaded from scratch in the background, while previous table with outadted table
* structure will still serve read requests. When data is loaded, nested tables will be swapped, metadata of metarialzied table will be
* structure will still serve read requests. When data is loaded, nested tables will be swapped, metadata of materialized table will be
* updated according to nested table.
*
**/
@ -67,9 +66,11 @@ class StorageMaterializedPostgreSQL final : public shared_ptr_helper<StorageMate
friend struct shared_ptr_helper<StorageMaterializedPostgreSQL>;
public:
StorageMaterializedPostgreSQL(const StorageID & table_id_, ContextPtr context_);
StorageMaterializedPostgreSQL(const StorageID & table_id_, ContextPtr context_,
const String & postgres_database_name, const String & postgres_table_name);
StorageMaterializedPostgreSQL(StoragePtr nested_storage_, ContextPtr context_);
StorageMaterializedPostgreSQL(StoragePtr nested_storage_, ContextPtr context_,
const String & postgres_database_name, const String & postgres_table_name);
String getName() const override { return "MaterializedPostgreSQL"; }
@ -123,6 +124,8 @@ public:
bool supportsFinal() const override { return true; }
ASTPtr getCreateNestedTableQuery(PostgreSQLTableStructurePtr table_structure);
protected:
StorageMaterializedPostgreSQL(
const StorageID & table_id_,
@ -140,10 +143,10 @@ private:
ASTPtr getColumnDeclaration(const DataTypePtr & data_type) const;
ASTPtr getCreateNestedTableQuery(PostgreSQLTableStructurePtr table_structure);
String getNestedTableName() const;
Poco::Logger * log;
/// Not nullptr only for single MaterializedPostgreSQL storage, because for MaterializedPostgreSQL
/// database engine there is one replication handler for all tables.
std::unique_ptr<PostgreSQLReplicationHandler> replication_handler;

View File

@ -43,7 +43,7 @@ struct StorageInMemoryMetadata
TTLColumnsDescription column_ttls_by_name;
/// TTL expressions for table (Move and Rows)
TTLTableDescription table_ttl;
/// SETTINGS expression. Supported for MergeTree, Buffer and Kafka.
/// SETTINGS expression. Supported for MergeTree, Buffer, Kafka, RabbitMQ.
ASTPtr settings_changes;
/// SELECT QUERY. Supported for MaterializedView and View (have to support LiveView).
SelectQueryDescription select;

View File

@ -151,7 +151,7 @@ def test_grant_all_on_table():
instance.query("GRANT ALL ON test.table TO A WITH GRANT OPTION")
instance.query("GRANT ALL ON test.table TO B", user='A')
assert instance.query(
"SHOW GRANTS FOR B") == "GRANT SHOW TABLES, SHOW COLUMNS, SHOW DICTIONARIES, SELECT, INSERT, ALTER, CREATE TABLE, CREATE VIEW, CREATE DICTIONARY, DROP TABLE, DROP VIEW, DROP DICTIONARY, TRUNCATE, OPTIMIZE, SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, SYSTEM MOVES, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, SYSTEM RESTART REPLICA, SYSTEM RESTORE REPLICA, SYSTEM FLUSH DISTRIBUTED, dictGet ON test.table TO B\n"
"SHOW GRANTS FOR B") == "GRANT SHOW TABLES, SHOW COLUMNS, SHOW DICTIONARIES, SELECT, INSERT, ALTER TABLE, ALTER VIEW, CREATE TABLE, CREATE VIEW, CREATE DICTIONARY, DROP TABLE, DROP VIEW, DROP DICTIONARY, TRUNCATE, OPTIMIZE, SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, SYSTEM MOVES, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, SYSTEM RESTART REPLICA, SYSTEM RESTORE REPLICA, SYSTEM FLUSH DISTRIBUTED, dictGet ON test.table TO B\n"
instance.query("REVOKE ALL ON test.table FROM B", user='A')
assert instance.query("SHOW GRANTS FOR B") == ""

View File

@ -942,12 +942,11 @@ def test_abrupt_server_restart_while_heavy_replication(started_cluster):
def test_quoting(started_cluster):
drop_materialized_db()
table_name = 'user'
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
table_name = 'user'
create_postgres_table(cursor, table_name);
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(50)".format(table_name))
create_materialized_db(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port)
@ -984,6 +983,147 @@ def test_user_managed_slots(started_cluster):
drop_replication_slot(replication_connection, slot_name)
def test_add_new_table_to_replication(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 5
for i in range(NUM_TABLES):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(10000)".format(i, i))
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
table_name = 'postgresql_replica_5'
create_postgres_table(cursor, table_name)
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(") # Check without ip
assert(result[-59:] == "\\'postgres_database\\', \\'postgres\\', \\'mysecretpassword\\')\n")
result = instance.query_and_get_error("ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables_list='tabl1'")
assert('Changing setting `materialized_postgresql_tables_list` is not allowed' in result)
result = instance.query_and_get_error("ALTER DATABASE test_database MODIFY SETTING materialized_postgresql_tables='tabl1'")
assert('Database engine MaterializedPostgreSQL does not support setting' in result)
instance.query("ATTACH TABLE test_database.{}".format(table_name));
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\npostgresql_replica_5\n")
check_tables_are_synchronized(table_name);
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000, 10000)".format(table_name))
check_tables_are_synchronized(table_name);
result = instance.query_and_get_error("ATTACH TABLE test_database.{}".format(table_name));
assert('Table test_database.postgresql_replica_5 already exists' in result)
result = instance.query_and_get_error("ATTACH TABLE test_database.unknown_table");
assert('PostgreSQL table unknown_table does not exist' in result)
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-180:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5\\'\n")
table_name = 'postgresql_replica_6'
create_postgres_table(cursor, table_name)
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
instance.query("ATTACH TABLE test_database.{}".format(table_name));
instance.restart_clickhouse()
table_name = 'postgresql_replica_7'
create_postgres_table(cursor, table_name)
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
instance.query("ATTACH TABLE test_database.{}".format(table_name));
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-222:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4,postgresql_replica_5,postgresql_replica_6,postgresql_replica_7\\'\n")
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\npostgresql_replica_5\npostgresql_replica_6\npostgresql_replica_7\n")
for i in range(NUM_TABLES + 3):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
for i in range(NUM_TABLES + 3):
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
def test_remove_table_from_replication(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
NUM_TABLES = 5
for i in range(NUM_TABLES):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(10000)".format(i, i))
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port)
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\npostgresql_replica_4\n")
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-59:] == "\\'postgres_database\\', \\'postgres\\', \\'mysecretpassword\\')\n")
table_name = 'postgresql_replica_4'
instance.query('DETACH TABLE test_database.{}'.format(table_name));
result = instance.query_and_get_error('SELECT * FROM test_database.{}'.format(table_name))
assert("doesn't exist" in result)
result = instance.query("SHOW TABLES FROM test_database")
assert(result == "postgresql_replica_0\npostgresql_replica_1\npostgresql_replica_2\npostgresql_replica_3\n")
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-138:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3\\'\n")
instance.query('ATTACH TABLE test_database.{}'.format(table_name));
check_tables_are_synchronized(table_name);
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
check_tables_are_synchronized(table_name);
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-159:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_1,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4\\'\n")
table_name = 'postgresql_replica_1'
instance.query('DETACH TABLE test_database.{}'.format(table_name));
result = instance.query('SHOW CREATE DATABASE test_database')
assert(result[:63] == "CREATE DATABASE test_database\\nENGINE = MaterializedPostgreSQL(")
assert(result[-138:] == ")\\nSETTINGS materialized_postgresql_tables_list = \\'postgresql_replica_0,postgresql_replica_2,postgresql_replica_3,postgresql_replica_4\\'\n")
for i in range(NUM_TABLES):
cursor.execute('drop table if exists postgresql_replica_{};'.format(i))
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -36,7 +36,9 @@ ALTER SETTINGS ['ALTER SETTING','ALTER MODIFY SETTING','MODIFY SETTING','RESET S
ALTER MOVE PARTITION ['ALTER MOVE PART','MOVE PARTITION','MOVE PART'] TABLE ALTER TABLE
ALTER FETCH PARTITION ['ALTER FETCH PART','FETCH PARTITION'] TABLE ALTER TABLE
ALTER FREEZE PARTITION ['FREEZE PARTITION','UNFREEZE'] TABLE ALTER TABLE
ALTER DATABASE SETTINGS ['ALTER DATABASE SETTING','ALTER MODIFY DATABASE SETTING','MODIFY DATABASE SETTING'] DATABASE ALTER DATABASE
ALTER TABLE [] \N ALTER
ALTER DATABASE [] \N ALTER
ALTER VIEW REFRESH ['ALTER LIVE VIEW REFRESH','REFRESH VIEW'] VIEW ALTER VIEW
ALTER VIEW MODIFY QUERY ['ALTER TABLE MODIFY QUERY'] VIEW ALTER VIEW
ALTER VIEW [] \N ALTER