Review fixes

This commit is contained in:
kssenii 2021-09-15 18:22:33 +03:00
parent 710c8d12dd
commit 489a92c067
6 changed files with 21 additions and 15 deletions

View File

@ -2,7 +2,6 @@
#include <Databases/DatabaseOnDisk.h> #include <Databases/DatabaseOnDisk.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <Parsers/formatAST.h> #include <Parsers/formatAST.h>
#include <Common/renameat2.h> #include <Common/renameat2.h>

View File

@ -282,7 +282,7 @@ public:
/// Delete data and metadata stored inside the database, if exists. /// Delete data and metadata stored inside the database, if exists.
virtual void drop(ContextPtr /*context*/) {} virtual void drop(ContextPtr /*context*/) {}
virtual void applyNewSettings(const SettingsChanges &, ContextPtr) virtual void applySettingsChanges(const SettingsChanges &, ContextPtr)
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Database engine {} either does not support settings, or does not support altering settings", "Database engine {} either does not support settings, or does not support altering settings",

View File

@ -127,9 +127,11 @@ void DatabaseMaterializedPostgreSQL::loadStoredObjects(
} }
void DatabaseMaterializedPostgreSQL::applyNewSettings(const SettingsChanges & settings_changes, ContextPtr query_context) void DatabaseMaterializedPostgreSQL::applySettingsChanges(const SettingsChanges & settings_changes, ContextPtr query_context)
{ {
std::lock_guard lock(handler_mutex); std::lock_guard lock(handler_mutex);
bool need_update_on_disk = false;
for (const auto & change : settings_changes) for (const auto & change : settings_changes)
{ {
if (!settings->has(change.name)) if (!settings->has(change.name))
@ -140,12 +142,12 @@ void DatabaseMaterializedPostgreSQL::applyNewSettings(const SettingsChanges & se
if (!query_context->isInternalQuery()) if (!query_context->isInternalQuery())
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Changing setting `{}` is not allowed", change.name); throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Changing setting `{}` is not allowed", change.name);
DatabaseOnDisk::modifySettingsMetadata(settings_changes, query_context); need_update_on_disk = true;
} }
else if ((change.name == "materialized_postgresql_allow_automatic_update") || (change.name == "materialized_postgresql_max_block_size")) else if ((change.name == "materialized_postgresql_allow_automatic_update") || (change.name == "materialized_postgresql_max_block_size"))
{ {
DatabaseOnDisk::modifySettingsMetadata(settings_changes, query_context);
replication_handler->setSetting(change); replication_handler->setSetting(change);
need_update_on_disk = true;
} }
else else
{ {
@ -154,6 +156,9 @@ void DatabaseMaterializedPostgreSQL::applyNewSettings(const SettingsChanges & se
settings->applyChange(change); settings->applyChange(change);
} }
if (need_update_on_disk)
DatabaseOnDisk::modifySettingsMetadata(settings_changes, query_context);
} }
@ -185,6 +190,8 @@ StoragePtr DatabaseMaterializedPostgreSQL::tryGetTable(const String & name, Cont
/// `except` is not empty in case it is detach and it will contain only one table name - name of detached table. /// `except` is not empty in case it is detach and it will contain only one table name - name of detached table.
/// In case we have a user defined setting `materialized_postgresql_tables_list`, then list of tables is always taken there.
/// Otherwise we traverse materialized storages to find out the list.
String DatabaseMaterializedPostgreSQL::getFormattedTablesList(const String & except) const String DatabaseMaterializedPostgreSQL::getFormattedTablesList(const String & except) const
{ {
String tables_list; String tables_list;
@ -334,7 +341,7 @@ StoragePtr DatabaseMaterializedPostgreSQL::detachTable(const String & table_name
auto nested = table_to_delete->as<StorageMaterializedPostgreSQL>()->getNested(); auto nested = table_to_delete->as<StorageMaterializedPostgreSQL>()->getNested();
if (!nested) if (!nested)
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Inner table `{}` does not exist", table_name); throw Exception(ErrorCodes::LOGICAL_ERROR, "Inner table `{}` does not exist", table_name);
std::lock_guard lock(handler_mutex); std::lock_guard lock(handler_mutex);
replication_handler->removeTableFromReplication(table_name); replication_handler->removeTableFromReplication(table_name);

View File

@ -61,7 +61,7 @@ public:
void stopReplication(); void stopReplication();
void applyNewSettings(const SettingsChanges & settings_changes, ContextPtr query_context) override; void applySettingsChanges(const SettingsChanges & settings_changes, ContextPtr query_context) override;
void shutdown() override; void shutdown() override;

View File

@ -189,7 +189,7 @@ BlockIO InterpreterAlterQuery::executeToDatabase(const ASTAlterQuery & alter)
for (const auto & command : alter_commands) for (const auto & command : alter_commands)
{ {
if (command.type != AlterCommand::MODIFY_DATABASE_SETTING) if (command.type != AlterCommand::MODIFY_DATABASE_SETTING)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by databases", alterTypeToString(command.type)); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported alter type for database engines");
} }
for (const auto & command : alter_commands) for (const auto & command : alter_commands)
@ -197,7 +197,7 @@ BlockIO InterpreterAlterQuery::executeToDatabase(const ASTAlterQuery & alter)
if (!command.ignore) if (!command.ignore)
{ {
if (command.type == AlterCommand::MODIFY_DATABASE_SETTING) if (command.type == AlterCommand::MODIFY_DATABASE_SETTING)
database->applyNewSettings(command.settings_changes, getContext()); database->applySettingsChanges(command.settings_changes, getContext());
else else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported alter command"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported alter command");
} }

View File

@ -119,6 +119,10 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
/// Data older than this is not available anymore. /// Data older than this is not available anymore.
String snapshot_name, start_lsn; String snapshot_name, start_lsn;
/// Also lets have a separate non-replication connection, because we need two parallel transactions and
/// one connection can have one transaction at a time.
auto tmp_connection = std::make_shared<postgres::Connection>(connection_info);
auto initial_sync = [&]() auto initial_sync = [&]()
{ {
LOG_TRACE(log, "Starting tables sync load"); LOG_TRACE(log, "Starting tables sync load");
@ -136,15 +140,11 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
createReplicationSlot(tx, start_lsn, snapshot_name); createReplicationSlot(tx, start_lsn, snapshot_name);
} }
/// Loading tables from snapshot requires a certain transaction type, so we need to open a new transactin.
/// But we cannot have more than one open transaciton on the same connection. Therefore we have
/// a separate connection to load tables.
postgres::Connection tmp_connection(connection_info);
for (const auto & [table_name, storage] : materialized_storages) for (const auto & [table_name, storage] : materialized_storages)
{ {
try try
{ {
nested_storages[table_name] = loadFromSnapshot(tmp_connection, snapshot_name, table_name, storage->as <StorageMaterializedPostgreSQL>()); nested_storages[table_name] = loadFromSnapshot(*tmp_connection, snapshot_name, table_name, storage->as<StorageMaterializedPostgreSQL>());
} }
catch (Exception & e) catch (Exception & e)
{ {
@ -211,7 +211,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
/// Handler uses it only for loadFromSnapshot and shutdown methods. /// Handler uses it only for loadFromSnapshot and shutdown methods.
consumer = std::make_shared<MaterializedPostgreSQLConsumer>( consumer = std::make_shared<MaterializedPostgreSQLConsumer>(
context, context,
std::make_shared<postgres::Connection>(connection_info), std::move(tmp_connection),
replication_slot, replication_slot,
publication_name, publication_name,
start_lsn, start_lsn,