From 87c740730bcbcd1a093f4b3899ebc7bca713c81b Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 17 Mar 2021 09:58:10 +0000 Subject: [PATCH] Rename to MaterializePostgreSQL --- src/Databases/DatabaseFactory.cpp | 16 ++--- ....cpp => DatabaseMaterializePostgreSQL.cpp} | 42 ++++++------- ...lica.h => DatabaseMaterializePostgreSQL.h} | 12 ++-- src/Interpreters/InterpreterDropQuery.cpp | 4 +- ....cpp => MaterializePostgreSQLConsumer.cpp} | 44 +++++++------- ...umer.h => MaterializePostgreSQLConsumer.h} | 8 +-- ....cpp => MaterializePostgreSQLMetadata.cpp} | 14 ++--- ...data.h => MaterializePostgreSQLMetadata.h} | 4 +- ....cpp => MaterializePostgreSQLSettings.cpp} | 6 +- ...ings.h => MaterializePostgreSQLSettings.h} | 6 +- .../PostgreSQLReplicationHandler.cpp | 8 +-- .../PostgreSQL/PostgreSQLReplicationHandler.h | 13 ++-- ...a.cpp => StorageMaterializePostgreSQL.cpp} | 60 +++++++++---------- ...plica.h => StorageMaterializePostgreSQL.h} | 16 ++--- src/Storages/registerStorages.cpp | 4 +- 15 files changed, 128 insertions(+), 129 deletions(-) rename src/Databases/PostgreSQL/{DatabasePostgreSQLReplica.cpp => DatabaseMaterializePostgreSQL.cpp} (77%) rename src/Databases/PostgreSQL/{DatabasePostgreSQLReplica.h => DatabaseMaterializePostgreSQL.h} (84%) rename src/Storages/PostgreSQL/{PostgreSQLReplicaConsumer.cpp => MaterializePostgreSQLConsumer.cpp} (91%) rename src/Storages/PostgreSQL/{PostgreSQLReplicaConsumer.h => MaterializePostgreSQLConsumer.h} (97%) rename src/Storages/PostgreSQL/{PostgreSQLReplicaMetadata.cpp => MaterializePostgreSQLMetadata.cpp} (79%) rename src/Storages/PostgreSQL/{PostgreSQLReplicaMetadata.h => MaterializePostgreSQLMetadata.h} (81%) rename src/Storages/PostgreSQL/{PostgreSQLReplicaSettings.cpp => MaterializePostgreSQLSettings.cpp} (77%) rename src/Storages/PostgreSQL/{PostgreSQLReplicaSettings.h => MaterializePostgreSQLSettings.h} (71%) rename src/Storages/PostgreSQL/{StoragePostgreSQLReplica.cpp => StorageMaterializePostgreSQL.cpp} (87%) rename src/Storages/PostgreSQL/{StoragePostgreSQLReplica.h => StorageMaterializePostgreSQL.h} (83%) diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index f153b792caf..d5147b5539b 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -36,9 +36,9 @@ #if USE_LIBPQXX #include // Y_IGNORE -#include +#include #include -#include +#include #endif namespace DB @@ -101,14 +101,14 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String const UUID & uuid = create.uuid; bool engine_may_have_arguments = engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "Lazy" || - engine_name == "Replicated" || engine_name == "PostgreSQL" || engine_name == "PostgreSQLReplica"; + engine_name == "Replicated" || engine_name == "PostgreSQL" || engine_name == "MaterializePostgreSQL"; if (engine_define->engine->arguments && !engine_may_have_arguments) throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS); bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by || engine_define->primary_key || engine_define->order_by || engine_define->sample_by; - bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "PostgreSQLReplica"; + bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializePostgreSQL"; if (has_unexpected_element || (!may_have_settings && engine_define->settings)) throw Exception("Database engine " + engine_name + " cannot have parameters, primary_key, order_by, sample_by, settings", ErrorCodes::UNKNOWN_ELEMENT_IN_AST); @@ -254,7 +254,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String return std::make_shared( context, metadata_path, engine_define, database_name, postgres_database_name, connection, use_table_cache); } - else if (engine_name == "PostgreSQLReplica") + else if (engine_name == "MaterializePostgreSQL") { const ASTFunction * engine = engine_define->engine; @@ -279,21 +279,21 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String auto connection = std::make_shared( postgres_database_name, parsed_host_port.first, parsed_host_port.second, username, password); - auto postgresql_replica_settings = std::make_unique(); + auto postgresql_replica_settings = std::make_unique(); if (engine_define->settings) postgresql_replica_settings->loadFromQuery(*engine_define); if (create.uuid == UUIDHelpers::Nil) { - return std::make_shared>( + return std::make_shared>( context, metadata_path, uuid, engine_define, database_name, postgres_database_name, connection, std::move(postgresql_replica_settings)); } else { - return std::make_shared>( + return std::make_shared>( context, metadata_path, uuid, engine_define, database_name, postgres_database_name, connection, std::move(postgresql_replica_settings)); diff --git a/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.cpp b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp similarity index 77% rename from src/Databases/PostgreSQL/DatabasePostgreSQLReplica.cpp rename to src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp index 2b491e62fab..62c00dfd4a2 100644 --- a/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.cpp +++ b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.cpp @@ -1,9 +1,9 @@ -#include +#include #if USE_LIBPQXX #include -#include +#include #include #include @@ -30,7 +30,7 @@ namespace DB static const auto METADATA_SUFFIX = ".postgresql_replica_metadata"; template<> -DatabasePostgreSQLReplica::DatabasePostgreSQLReplica( +DatabaseMaterializePostgreSQL::DatabaseMaterializePostgreSQL( const Context & context, const String & metadata_path_, UUID /* uuid */, @@ -38,11 +38,11 @@ DatabasePostgreSQLReplica::DatabasePostgreSQLReplica( const String & database_name_, const String & postgres_database_name, PostgreSQLConnectionPtr connection_, - std::unique_ptr settings_) + std::unique_ptr settings_) : DatabaseOrdinary( database_name_, metadata_path_, "data/" + escapeForFileName(database_name_) + "/", - "DatabasePostgreSQLReplica (" + database_name_ + ")", context) - , log(&Poco::Logger::get("PostgreSQLReplicaDatabaseEngine")) + "DatabaseMaterializePostgreSQL (" + database_name_ + ")", context) + , log(&Poco::Logger::get("MaterializePostgreSQLDatabaseEngine")) , global_context(context.getGlobalContext()) , metadata_path(metadata_path_) , database_engine_define(database_engine_define_->clone()) @@ -55,7 +55,7 @@ DatabasePostgreSQLReplica::DatabasePostgreSQLReplica( template<> -DatabasePostgreSQLReplica::DatabasePostgreSQLReplica( +DatabaseMaterializePostgreSQL::DatabaseMaterializePostgreSQL( const Context & context, const String & metadata_path_, UUID uuid, @@ -63,8 +63,8 @@ DatabasePostgreSQLReplica::DatabasePostgreSQLReplica( const String & database_name_, const String & postgres_database_name, PostgreSQLConnectionPtr connection_, - std::unique_ptr settings_) - : DatabaseAtomic(database_name_, metadata_path_, uuid, "DatabasePostgreSQLReplica (" + database_name_ + ")", context) + std::unique_ptr settings_) + : DatabaseAtomic(database_name_, metadata_path_, uuid, "DatabaseMaterializePostgreSQL (" + database_name_ + ")", context) , global_context(context.getGlobalContext()) , metadata_path(metadata_path_) , database_engine_define(database_engine_define_->clone()) @@ -76,7 +76,7 @@ DatabasePostgreSQLReplica::DatabasePostgreSQLReplica( template -void DatabasePostgreSQLReplica::startSynchronization() +void DatabaseMaterializePostgreSQL::startSynchronization() { replication_handler = std::make_unique( remote_database_name, @@ -97,7 +97,7 @@ void DatabasePostgreSQLReplica::startSynchronization() if (storage) { - replication_handler->addStorage(table_name, storage->template as()); + replication_handler->addStorage(table_name, storage->template as()); tables[table_name] = storage; } } @@ -108,19 +108,19 @@ void DatabasePostgreSQLReplica::startSynchronization() template -StoragePtr DatabasePostgreSQLReplica::getStorage(const String & name) +StoragePtr DatabaseMaterializePostgreSQL::getStorage(const String & name) { auto storage = tryGetTable(name, global_context); if (storage) return storage; - return StoragePostgreSQLReplica::create(StorageID(database_name, name), StoragePtr{}, global_context); + return StorageMaterializePostgreSQL::create(StorageID(database_name, name), StoragePtr{}, global_context); } template -void DatabasePostgreSQLReplica::shutdown() +void DatabaseMaterializePostgreSQL::shutdown() { if (replication_handler) replication_handler->shutdown(); @@ -128,7 +128,7 @@ void DatabasePostgreSQLReplica::shutdown() template -void DatabasePostgreSQLReplica::loadStoredObjects( +void DatabaseMaterializePostgreSQL::loadStoredObjects( Context & context, bool has_force_restore_data_flag, bool force_attach) { Base::loadStoredObjects(context, has_force_restore_data_flag, force_attach); @@ -149,7 +149,7 @@ void DatabasePostgreSQLReplica::loadStoredObjects( template -StoragePtr DatabasePostgreSQLReplica::tryGetTable(const String & name, const Context & context) const +StoragePtr DatabaseMaterializePostgreSQL::tryGetTable(const String & name, const Context & context) const { if (context.hasQueryContext()) { @@ -171,7 +171,7 @@ StoragePtr DatabasePostgreSQLReplica::tryGetTable(const String & name, con template -void DatabasePostgreSQLReplica::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) +void DatabaseMaterializePostgreSQL::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query) { if (context.hasQueryContext()) { @@ -188,14 +188,14 @@ void DatabasePostgreSQLReplica::createTable(const Context & context, const template -void DatabasePostgreSQLReplica::dropTable(const Context & context, const String & name, bool no_delay) +void DatabaseMaterializePostgreSQL::dropTable(const Context & context, const String & name, bool no_delay) { Base::dropTable(context, name, no_delay); } template -void DatabasePostgreSQLReplica::drop(const Context & context) +void DatabaseMaterializePostgreSQL::drop(const Context & context) { if (replication_handler) { @@ -214,13 +214,13 @@ void DatabasePostgreSQLReplica::drop(const Context & context) template -DatabaseTablesIteratorPtr DatabasePostgreSQLReplica::getTablesIterator( +DatabaseTablesIteratorPtr DatabaseMaterializePostgreSQL::getTablesIterator( const Context & /* context */, const DatabaseOnDisk::FilterByNameFunction & /* filter_by_table_name */) { Tables nested_tables; for (const auto & [table_name, storage] : tables) { - auto nested_storage = storage->template as()->tryGetNested(); + auto nested_storage = storage->template as()->tryGetNested(); if (nested_storage) nested_tables[table_name] = nested_storage; diff --git a/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.h b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h similarity index 84% rename from src/Databases/PostgreSQL/DatabasePostgreSQLReplica.h rename to src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h index d8cb2ff5a6d..fd6d5982fdf 100644 --- a/src/Databases/PostgreSQL/DatabasePostgreSQLReplica.h +++ b/src/Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h @@ -7,7 +7,7 @@ #if USE_LIBPQXX #include -#include +#include #include #include @@ -25,11 +25,11 @@ using PostgreSQLConnectionPtr = std::shared_ptr; template -class DatabasePostgreSQLReplica : public Base +class DatabaseMaterializePostgreSQL : public Base { public: - DatabasePostgreSQLReplica( + DatabaseMaterializePostgreSQL( const Context & context, const String & metadata_path_, UUID uuid, @@ -37,9 +37,9 @@ public: const String & dbname_, const String & postgres_dbname, PostgreSQLConnectionPtr connection_, - std::unique_ptr settings_); + std::unique_ptr settings_); - String getEngineName() const override { return "PostgreSQLReplica"; } + String getEngineName() const override { return "MaterializePostgreSQL"; } String getMetadataPath() const override { return metadata_path; } @@ -69,7 +69,7 @@ private: ASTPtr database_engine_define; String database_name, remote_database_name; PostgreSQLConnectionPtr connection; - std::unique_ptr settings; + std::unique_ptr settings; std::shared_ptr replication_handler; std::map tables; diff --git a/src/Interpreters/InterpreterDropQuery.cpp b/src/Interpreters/InterpreterDropQuery.cpp index 575f5b43a32..1a282b6ef5d 100644 --- a/src/Interpreters/InterpreterDropQuery.cpp +++ b/src/Interpreters/InterpreterDropQuery.cpp @@ -23,7 +23,7 @@ #endif #if USE_LIBPQXX -# include +# include #endif namespace DB @@ -192,7 +192,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, Dat #if USE_LIBPQXX if (table->getName() == "PostgreSQLReplica") - table->as()->shutdownFinal(); + table->as()->shutdownFinal(); #endif TableExclusiveLockHolder table_lock; diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp b/src/Storages/PostgreSQL/MaterializePostgreSQLConsumer.cpp similarity index 91% rename from src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp rename to src/Storages/PostgreSQL/MaterializePostgreSQLConsumer.cpp index 91e48e9c358..8e7dc81647e 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp +++ b/src/Storages/PostgreSQL/MaterializePostgreSQLConsumer.cpp @@ -1,7 +1,7 @@ -#include "PostgreSQLReplicaConsumer.h" +#include "MaterializePostgreSQLConsumer.h" #if USE_LIBPQXX -#include "StoragePostgreSQLReplica.h" +#include "StorageMaterializePostgreSQL.h" #include #include @@ -22,7 +22,7 @@ namespace ErrorCodes extern const int UNKNOWN_TABLE; } -PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer( +MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer( std::shared_ptr context_, PostgreSQLConnectionPtr connection_, const std::string & replication_slot_name_, @@ -52,7 +52,7 @@ PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer( } -void PostgreSQLReplicaConsumer::Buffer::fillBuffer(StoragePtr storage) +void MaterializePostgreSQLConsumer::Buffer::fillBuffer(StoragePtr storage) { const auto storage_metadata = storage->getInMemoryMetadataPtr(); description.init(storage_metadata->getSampleBlock()); @@ -77,7 +77,7 @@ void PostgreSQLReplicaConsumer::Buffer::fillBuffer(StoragePtr storage) } -void PostgreSQLReplicaConsumer::readMetadata() +void MaterializePostgreSQLConsumer::readMetadata() { try { @@ -98,7 +98,7 @@ void PostgreSQLReplicaConsumer::readMetadata() } -void PostgreSQLReplicaConsumer::insertValue(Buffer & buffer, const std::string & value, size_t column_idx) +void MaterializePostgreSQLConsumer::insertValue(Buffer & buffer, const std::string & value, size_t column_idx) { const auto & sample = buffer.description.sample_block.getByPosition(column_idx); bool is_nullable = buffer.description.types[column_idx].second; @@ -124,14 +124,14 @@ void PostgreSQLReplicaConsumer::insertValue(Buffer & buffer, const std::string & } -void PostgreSQLReplicaConsumer::insertDefaultValue(Buffer & buffer, size_t column_idx) +void MaterializePostgreSQLConsumer::insertDefaultValue(Buffer & buffer, size_t column_idx) { const auto & sample = buffer.description.sample_block.getByPosition(column_idx); insertDefaultPostgreSQLValue(*buffer.columns[column_idx], *sample.column); } -void PostgreSQLReplicaConsumer::readString(const char * message, size_t & pos, size_t size, String & result) +void MaterializePostgreSQLConsumer::readString(const char * message, size_t & pos, size_t size, String & result) { assert(size > pos + 2); char current = unhex2(message + pos); @@ -145,7 +145,7 @@ void PostgreSQLReplicaConsumer::readString(const char * message, size_t & pos, s } -Int32 PostgreSQLReplicaConsumer::readInt32(const char * message, size_t & pos, [[maybe_unused]] size_t size) +Int32 MaterializePostgreSQLConsumer::readInt32(const char * message, size_t & pos, [[maybe_unused]] size_t size) { assert(size > pos + 8); Int32 result = (UInt32(unhex2(message + pos)) << 24) @@ -157,7 +157,7 @@ Int32 PostgreSQLReplicaConsumer::readInt32(const char * message, size_t & pos, [ } -Int16 PostgreSQLReplicaConsumer::readInt16(const char * message, size_t & pos, [[maybe_unused]] size_t size) +Int16 MaterializePostgreSQLConsumer::readInt16(const char * message, size_t & pos, [[maybe_unused]] size_t size) { assert(size > pos + 4); Int16 result = (UInt32(unhex2(message + pos)) << 8) @@ -167,7 +167,7 @@ Int16 PostgreSQLReplicaConsumer::readInt16(const char * message, size_t & pos, [ } -Int8 PostgreSQLReplicaConsumer::readInt8(const char * message, size_t & pos, [[maybe_unused]] size_t size) +Int8 MaterializePostgreSQLConsumer::readInt8(const char * message, size_t & pos, [[maybe_unused]] size_t size) { assert(size > pos + 2); Int8 result = unhex2(message + pos); @@ -176,7 +176,7 @@ Int8 PostgreSQLReplicaConsumer::readInt8(const char * message, size_t & pos, [[m } -Int64 PostgreSQLReplicaConsumer::readInt64(const char * message, size_t & pos, [[maybe_unused]] size_t size) +Int64 MaterializePostgreSQLConsumer::readInt64(const char * message, size_t & pos, [[maybe_unused]] size_t size) { assert(size > pos + 16); Int64 result = (UInt64(unhex4(message + pos)) << 48) @@ -188,7 +188,7 @@ Int64 PostgreSQLReplicaConsumer::readInt64(const char * message, size_t & pos, [ } -void PostgreSQLReplicaConsumer::readTupleData( +void MaterializePostgreSQLConsumer::readTupleData( Buffer & buffer, const char * message, size_t & pos, [[maybe_unused]] size_t size, PostgreSQLQuery type, bool old_value) { Int16 num_columns = readInt16(message, pos, size); @@ -257,7 +257,7 @@ void PostgreSQLReplicaConsumer::readTupleData( /// https://www.postgresql.org/docs/13/protocol-logicalrep-message-formats.html -void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replication_message, size_t size) +void MaterializePostgreSQLConsumer::processReplicationMessage(const char * replication_message, size_t size) { /// Skip '\x' size_t pos = 2; @@ -468,7 +468,7 @@ void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replicati } -void PostgreSQLReplicaConsumer::syncTables(std::shared_ptr tx) +void MaterializePostgreSQLConsumer::syncTables(std::shared_ptr tx) { for (const auto & table_name : tables_to_sync) { @@ -517,7 +517,7 @@ void PostgreSQLReplicaConsumer::syncTables(std::shared_ptr } -String PostgreSQLReplicaConsumer::advanceLSN(std::shared_ptr tx) +String MaterializePostgreSQLConsumer::advanceLSN(std::shared_ptr tx) { std::string query_str = fmt::format("SELECT end_lsn FROM pg_replication_slot_advance('{}', '{}')", replication_slot_name, final_lsn); pqxx::result result{tx->exec(query_str)}; @@ -529,7 +529,7 @@ String PostgreSQLReplicaConsumer::advanceLSN(std::shared_ptr tx; bool slot_empty = true; @@ -640,7 +640,7 @@ bool PostgreSQLReplicaConsumer::readFromReplicationSlot() } -bool PostgreSQLReplicaConsumer::consume(std::vector> & skipped_tables) +bool MaterializePostgreSQLConsumer::consume(std::vector> & skipped_tables) { if (!readFromReplicationSlot()) { @@ -660,7 +660,7 @@ bool PostgreSQLReplicaConsumer::consume(std::vector> & } -void PostgreSQLReplicaConsumer::updateNested(const String & table_name, StoragePtr nested_storage) +void MaterializePostgreSQLConsumer::updateNested(const String & table_name, StoragePtr nested_storage) { storages[table_name] = nested_storage; auto & buffer = buffers.find(table_name)->second; @@ -668,7 +668,7 @@ void PostgreSQLReplicaConsumer::updateNested(const String & table_name, StorageP } -void PostgreSQLReplicaConsumer::updateSkipList(const std::unordered_map & tables_with_lsn) +void MaterializePostgreSQLConsumer::updateSkipList(const std::unordered_map & tables_with_lsn) { for (const auto & [relation_id, lsn] : tables_with_lsn) { diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h b/src/Storages/PostgreSQL/MaterializePostgreSQLConsumer.h similarity index 97% rename from src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h rename to src/Storages/PostgreSQL/MaterializePostgreSQLConsumer.h index 37b36d4bfc5..74dd5d67ccf 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h +++ b/src/Storages/PostgreSQL/MaterializePostgreSQLConsumer.h @@ -6,7 +6,7 @@ #if USE_LIBPQXX #include "PostgreSQLConnection.h" -#include "PostgreSQLReplicaMetadata.h" +#include "MaterializePostgreSQLMetadata.h" #include "insertPostgreSQLValue.h" #include @@ -21,12 +21,12 @@ namespace DB { -class PostgreSQLReplicaConsumer +class MaterializePostgreSQLConsumer { public: using Storages = std::unordered_map; - PostgreSQLReplicaConsumer( + MaterializePostgreSQLConsumer( std::shared_ptr context_, PostgreSQLConnectionPtr connection_, const std::string & replication_slot_name_, @@ -103,7 +103,7 @@ private: std::shared_ptr context; const std::string replication_slot_name, publication_name; - PostgreSQLReplicaMetadata metadata; + MaterializePostgreSQLMetadata metadata; PostgreSQLConnectionPtr connection; std::string current_lsn, final_lsn; diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.cpp b/src/Storages/PostgreSQL/MaterializePostgreSQLMetadata.cpp similarity index 79% rename from src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.cpp rename to src/Storages/PostgreSQL/MaterializePostgreSQLMetadata.cpp index ad9ef4b22d3..5cc68b429c0 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.cpp +++ b/src/Storages/PostgreSQL/MaterializePostgreSQLMetadata.cpp @@ -1,4 +1,4 @@ -#include "PostgreSQLReplicaMetadata.h" +#include "MaterializePostgreSQLMetadata.h" #if USE_LIBPQXX #include @@ -12,7 +12,7 @@ namespace DB { -PostgreSQLReplicaMetadata::PostgreSQLReplicaMetadata(const std::string & metadata_file_path) +MaterializePostgreSQLMetadata::MaterializePostgreSQLMetadata(const std::string & metadata_file_path) : metadata_file(metadata_file_path) , tmp_metadata_file(metadata_file_path + ".tmp") , last_version(1) @@ -20,7 +20,7 @@ PostgreSQLReplicaMetadata::PostgreSQLReplicaMetadata(const std::string & metadat } -void PostgreSQLReplicaMetadata::readMetadata() +void MaterializePostgreSQLMetadata::readMetadata() { if (Poco::File(metadata_file).exists()) { @@ -41,13 +41,13 @@ void PostgreSQLReplicaMetadata::readMetadata() last_lsn = actual_lsn; } - LOG_DEBUG(&Poco::Logger::get("PostgreSQLReplicaMetadata"), + LOG_DEBUG(&Poco::Logger::get("MaterializePostgreSQLMetadata"), "Last written version is {}. (From metadata file {})", last_version, metadata_file); } } -void PostgreSQLReplicaMetadata::writeMetadata(bool append_metadata) +void MaterializePostgreSQLMetadata::writeMetadata(bool append_metadata) { WriteBufferFromFile out(tmp_metadata_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT); @@ -69,7 +69,7 @@ void PostgreSQLReplicaMetadata::writeMetadata(bool append_metadata) /// While data is received, version is updated. Before table sync, write last version to tmp file. /// Then sync data to table and rename tmp to non-tmp. -void PostgreSQLReplicaMetadata::commitMetadata(std::string & lsn, const std::function & finalizeStreamFunc) +void MaterializePostgreSQLMetadata::commitMetadata(std::string & lsn, const std::function & finalizeStreamFunc) { std::string actual_lsn; last_lsn = lsn; @@ -90,7 +90,7 @@ void PostgreSQLReplicaMetadata::commitMetadata(std::string & lsn, const std::fun if (actual_lsn != last_lsn) { writeMetadata(true); - LOG_WARNING(&Poco::Logger::get("PostgreSQLReplicaMetadata"), + LOG_WARNING(&Poco::Logger::get("MaterializePostgreSQLMetadata"), "Last written LSN {} is not equal to actual LSN {}", last_lsn, actual_lsn); } } diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.h b/src/Storages/PostgreSQL/MaterializePostgreSQLMetadata.h similarity index 81% rename from src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.h rename to src/Storages/PostgreSQL/MaterializePostgreSQLMetadata.h index f7e566cce90..d09adb61363 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaMetadata.h +++ b/src/Storages/PostgreSQL/MaterializePostgreSQLMetadata.h @@ -5,10 +5,10 @@ namespace DB { -class PostgreSQLReplicaMetadata +class MaterializePostgreSQLMetadata { public: - PostgreSQLReplicaMetadata(const std::string & metadata_file_path); + MaterializePostgreSQLMetadata(const std::string & metadata_file_path); void commitMetadata(std::string & lsn, const std::function & finalizeStreamFunc); diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaSettings.cpp b/src/Storages/PostgreSQL/MaterializePostgreSQLSettings.cpp similarity index 77% rename from src/Storages/PostgreSQL/PostgreSQLReplicaSettings.cpp rename to src/Storages/PostgreSQL/MaterializePostgreSQLSettings.cpp index dc714cb5488..48fe61b4182 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaSettings.cpp +++ b/src/Storages/PostgreSQL/MaterializePostgreSQLSettings.cpp @@ -1,4 +1,4 @@ -#include "PostgreSQLReplicaSettings.h" +#include "MaterializePostgreSQLSettings.h" #if USE_LIBPQXX #include @@ -15,9 +15,9 @@ namespace ErrorCodes extern const int UNKNOWN_SETTING; } -IMPLEMENT_SETTINGS_TRAITS(PostgreSQLReplicaSettingsTraits, LIST_OF_POSTGRESQL_REPLICA_SETTINGS) +IMPLEMENT_SETTINGS_TRAITS(MaterializePostgreSQLSettingsTraits, LIST_OF_POSTGRESQL_REPLICA_SETTINGS) -void PostgreSQLReplicaSettings::loadFromQuery(ASTStorage & storage_def) +void MaterializePostgreSQLSettings::loadFromQuery(ASTStorage & storage_def) { if (storage_def.settings) { diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaSettings.h b/src/Storages/PostgreSQL/MaterializePostgreSQLSettings.h similarity index 71% rename from src/Storages/PostgreSQL/PostgreSQLReplicaSettings.h rename to src/Storages/PostgreSQL/MaterializePostgreSQLSettings.h index 0f084ac6108..0df618f513e 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaSettings.h +++ b/src/Storages/PostgreSQL/MaterializePostgreSQLSettings.h @@ -15,12 +15,12 @@ namespace DB #define LIST_OF_POSTGRESQL_REPLICA_SETTINGS(M) \ M(UInt64, postgresql_replica_max_block_size, 0, "Number of row collected before flushing data into table.", 0) \ - M(String, postgresql_replica_tables_list, "", "List of tables for PostgreSQLReplica database engine", 0) \ + M(String, postgresql_replica_tables_list, "", "List of tables for MaterializePostgreSQL database engine", 0) \ M(Bool, postgresql_replica_allow_minimal_ddl, 0, "Allow to track minimal possible ddl. By default, table after ddl will get into a skip list", 0) \ -DECLARE_SETTINGS_TRAITS(PostgreSQLReplicaSettingsTraits, LIST_OF_POSTGRESQL_REPLICA_SETTINGS) +DECLARE_SETTINGS_TRAITS(MaterializePostgreSQLSettingsTraits, LIST_OF_POSTGRESQL_REPLICA_SETTINGS) -struct PostgreSQLReplicaSettings : public BaseSettings +struct MaterializePostgreSQLSettings : public BaseSettings { void loadFromQuery(ASTStorage & storage_def); }; diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index ee15c1ec13d..9e7364c9bb6 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -3,7 +3,7 @@ #if USE_LIBPQXX #include #include -#include +#include #include #include @@ -27,7 +27,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( bool allow_minimal_ddl_, bool is_postgresql_replica_database_engine_, const String tables_list_) - : log(&Poco::Logger::get("PostgreSQLReplicaHandler")) + : log(&Poco::Logger::get("PostgreSQLReplicationHandler")) , context(context_) , database_name(database_name_) , connection_str(conn_str) @@ -49,7 +49,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( } -void PostgreSQLReplicationHandler::addStorage(const std::string & table_name, StoragePostgreSQLReplica * storage) +void PostgreSQLReplicationHandler::addStorage(const std::string & table_name, StorageMaterializePostgreSQL * storage) { storages[table_name] = storage; } @@ -138,7 +138,7 @@ void PostgreSQLReplicationHandler::startSynchronization() ntx->commit(); - consumer = std::make_shared( + consumer = std::make_shared( context, connection, replication_slot, diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h index 7b9605be5dc..12fbd782887 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h @@ -6,8 +6,8 @@ #if USE_LIBPQXX #include "PostgreSQLConnection.h" -#include "PostgreSQLReplicaConsumer.h" -#include "PostgreSQLReplicaMetadata.h" +#include "MaterializePostgreSQLConsumer.h" +#include "MaterializePostgreSQLMetadata.h" #include @@ -19,7 +19,7 @@ namespace DB /// exist in CH, it can be loaded via snapshot while stream is stopped and then comparing wal positions with /// current lsn and table start lsn. -class StoragePostgreSQLReplica; +class StorageMaterializePostgreSQL; class PostgreSQLReplicationHandler { @@ -41,7 +41,7 @@ public: void shutdownFinal(); - void addStorage(const std::string & table_name, StoragePostgreSQLReplica * storage); + void addStorage(const std::string & table_name, StorageMaterializePostgreSQL * storage); NameSet fetchRequiredTables(PostgreSQLConnection::ConnectionPtr connection_); @@ -49,7 +49,7 @@ public: private: using NontransactionPtr = std::shared_ptr; - using Storages = std::unordered_map; + using Storages = std::unordered_map; bool isPublicationExist(std::shared_ptr tx); @@ -83,7 +83,7 @@ private: std::string tables_list, replication_slot, publication_name; PostgreSQLConnectionPtr connection; - std::shared_ptr consumer; + std::shared_ptr consumer; BackgroundSchedulePool::TaskHolder startup_task, consumer_task; std::atomic tables_loaded = false, stop_synchronization = false; @@ -96,4 +96,3 @@ private: } #endif - diff --git a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp similarity index 87% rename from src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp rename to src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp index dbacc995c67..f239b9e78d4 100644 --- a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp +++ b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.cpp @@ -1,4 +1,4 @@ -#include "StoragePostgreSQLReplica.h" +#include "StorageMaterializePostgreSQL.h" #if USE_LIBPQXX #include @@ -36,20 +36,20 @@ namespace ErrorCodes static const auto NESTED_STORAGE_SUFFIX = "_ReplacingMergeTree"; -StoragePostgreSQLReplica::StoragePostgreSQLReplica( +StorageMaterializePostgreSQL::StorageMaterializePostgreSQL( const StorageID & table_id_, const String & remote_database_name, const String & remote_table_name_, const String & connection_str, const StorageInMemoryMetadata & storage_metadata, const Context & context_, - std::unique_ptr replication_settings_) + std::unique_ptr replication_settings_) : IStorage(table_id_) , remote_table_name(remote_table_name_) , global_context(std::make_shared(context_.getGlobalContext())) , replication_settings(std::move(replication_settings_)) , is_postgresql_replica_database( - DatabaseCatalog::instance().getDatabase(getStorageID().database_name)->getEngineName() == "PostgreSQLReplica") + DatabaseCatalog::instance().getDatabase(getStorageID().database_name)->getEngineName() == "MaterializePostgreSQL") { setInMemoryMetadata(storage_metadata); @@ -68,7 +68,7 @@ StoragePostgreSQLReplica::StoragePostgreSQLReplica( } -StoragePostgreSQLReplica::StoragePostgreSQLReplica( +StorageMaterializePostgreSQL::StorageMaterializePostgreSQL( const StorageID & table_id_, StoragePtr nested_storage_, const Context & context_) @@ -76,13 +76,13 @@ StoragePostgreSQLReplica::StoragePostgreSQLReplica( , global_context(std::make_shared(context_)) , nested_storage(nested_storage_) , is_postgresql_replica_database( - DatabaseCatalog::instance().getDatabase(getStorageID().database_name)->getEngineName() == "PostgreSQLReplica") + DatabaseCatalog::instance().getDatabase(getStorageID().database_name)->getEngineName() == "MaterializePostgreSQL") { } -std::string StoragePostgreSQLReplica::getNestedTableName() const +std::string StorageMaterializePostgreSQL::getNestedTableName() const { auto table_name = getStorageID().table_name; @@ -93,7 +93,7 @@ std::string StoragePostgreSQLReplica::getNestedTableName() const } -std::shared_ptr StoragePostgreSQLReplica::getMaterializedColumnsDeclaration( +std::shared_ptr StorageMaterializePostgreSQL::getMaterializedColumnsDeclaration( const String name, const String type, UInt64 default_value) { auto column_declaration = std::make_shared(); @@ -111,7 +111,7 @@ std::shared_ptr StoragePostgreSQLReplica::getMaterializedC } -ASTPtr StoragePostgreSQLReplica::getColumnDeclaration(const DataTypePtr & data_type) const +ASTPtr StorageMaterializePostgreSQL::getColumnDeclaration(const DataTypePtr & data_type) const { WhichDataType which(data_type); @@ -152,10 +152,10 @@ ASTPtr StoragePostgreSQLReplica::getColumnDeclaration(const DataTypePtr & data_t } -/// For single storage PostgreSQLReplica get columns and primary key columns from storage definition. -/// For database engine PostgreSQLReplica get columns and primary key columns by fetching from PostgreSQL, also using the same +/// For single storage MaterializePostgreSQL get columns and primary key columns from storage definition. +/// For database engine MaterializePostgreSQL get columns and primary key columns by fetching from PostgreSQL, also using the same /// transaction with snapshot, which is used for initial tables dump. -ASTPtr StoragePostgreSQLReplica::getCreateNestedTableQuery(const std::function & fetch_table_structure) +ASTPtr StorageMaterializePostgreSQL::getCreateNestedTableQuery(const std::function & fetch_table_structure) { auto create_table_query = std::make_shared(); @@ -240,7 +240,7 @@ ASTPtr StoragePostgreSQLReplica::getCreateNestedTableQuery(const std::function

& fetch_table_structure) +void StorageMaterializePostgreSQL::createNestedIfNeeded(const std::function & fetch_table_structure) { if (nested_loaded) { @@ -267,7 +267,7 @@ void StoragePostgreSQLReplica::createNestedIfNeeded(const std::functionshutdown(); } -void StoragePostgreSQLReplica::shutdownFinal() +void StorageMaterializePostgreSQL::shutdownFinal() { if (is_postgresql_replica_database) return; @@ -333,7 +333,7 @@ void StoragePostgreSQLReplica::shutdownFinal() } -void StoragePostgreSQLReplica::dropNested() +void StorageMaterializePostgreSQL::dropNested() { std::lock_guard lock(nested_mutex); nested_loaded = false; @@ -351,11 +351,11 @@ void StoragePostgreSQLReplica::dropNested() interpreter.execute(); nested_storage = nullptr; - LOG_WARNING(&Poco::Logger::get("StoragePostgreSQLReplica"), "Dropped (or temporarily) nested table {}", getNestedTableName()); + LOG_WARNING(&Poco::Logger::get("StorageMaterializePostgreSQL"), "Dropped (or temporarily) nested table {}", getNestedTableName()); } -NamesAndTypesList StoragePostgreSQLReplica::getVirtuals() const +NamesAndTypesList StorageMaterializePostgreSQL::getVirtuals() const { if (nested_storage) return nested_storage->getVirtuals(); @@ -364,7 +364,7 @@ NamesAndTypesList StoragePostgreSQLReplica::getVirtuals() const } -Pipe StoragePostgreSQLReplica::read( +Pipe StorageMaterializePostgreSQL::read( const Names & column_names, const StorageMetadataPtr & /* metadata_snapshot */, SelectQueryInfo & query_info, @@ -442,24 +442,24 @@ Pipe StoragePostgreSQLReplica::read( return pipe; } - LOG_WARNING(&Poco::Logger::get("StoragePostgreSQLReplica"), "Nested table {} is unavailable or is not loaded yet", getNestedTableName()); + LOG_WARNING(&Poco::Logger::get("StorageMaterializePostgreSQL"), "Nested table {} is unavailable or is not loaded yet", getNestedTableName()); return Pipe(); } -void registerStoragePostgreSQLReplica(StorageFactory & factory) +void registerStorageMaterializePostgreSQL(StorageFactory & factory) { auto creator_fn = [](const StorageFactory::Arguments & args) { ASTs & engine_args = args.engine_args; bool has_settings = args.storage_def->settings; - auto postgresql_replication_settings = std::make_unique(); + auto postgresql_replication_settings = std::make_unique(); if (has_settings) postgresql_replication_settings->loadFromQuery(*args.storage_def); if (engine_args.size() != 5) - throw Exception("Storage PostgreSQLReplica requires 5 parameters: " + throw Exception("Storage MaterializePostgreSQL requires 5 parameters: " "PostgreSQL('host:port', 'database', 'table', 'username', 'password'", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); @@ -474,7 +474,7 @@ void registerStoragePostgreSQLReplica(StorageFactory & factory) args.storage_def->set(args.storage_def->order_by, args.storage_def->primary_key->clone()); if (!args.storage_def->order_by) - throw Exception("Storage PostgreSQLReplica needs order by key or primary key", ErrorCodes::BAD_ARGUMENTS); + throw Exception("Storage MaterializePostgreSQL needs order by key or primary key", ErrorCodes::BAD_ARGUMENTS); if (args.storage_def->primary_key) metadata.primary_key = KeyDescription::getKeyFromAST(args.storage_def->primary_key->ptr(), metadata.columns, args.context); @@ -493,14 +493,14 @@ void registerStoragePostgreSQLReplica(StorageFactory & factory) engine_args[3]->as().value.safeGet(), engine_args[4]->as().value.safeGet()); - return StoragePostgreSQLReplica::create( + return StorageMaterializePostgreSQL::create( args.table_id, remote_database, remote_table, connection.conn_str(), metadata, args.context, std::move(postgresql_replication_settings)); }; factory.registerStorage( - "PostgreSQLReplica", + "MaterializePostgreSQL", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, diff --git a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.h b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h similarity index 83% rename from src/Storages/PostgreSQL/StoragePostgreSQLReplica.h rename to src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h index 4d407f337ad..544bae1bf85 100644 --- a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.h +++ b/src/Storages/PostgreSQL/StorageMaterializePostgreSQL.h @@ -6,7 +6,7 @@ #if USE_LIBPQXX #include "PostgreSQLReplicationHandler.h" -#include "PostgreSQLReplicaSettings.h" +#include "MaterializePostgreSQLSettings.h" #include #include @@ -24,17 +24,17 @@ namespace DB { -class StoragePostgreSQLReplica final : public ext::shared_ptr_helper, public IStorage +class StorageMaterializePostgreSQL final : public ext::shared_ptr_helper, public IStorage { - friend struct ext::shared_ptr_helper; + friend struct ext::shared_ptr_helper; public: - StoragePostgreSQLReplica( + StorageMaterializePostgreSQL( const StorageID & table_id_, StoragePtr nested_storage_, const Context & context_); - String getName() const override { return "PostgreSQLReplica"; } + String getName() const override { return "MaterializePostgreSQL"; } void startup() override; void shutdown() override; @@ -70,14 +70,14 @@ public: void dropNested(); protected: - StoragePostgreSQLReplica( + StorageMaterializePostgreSQL( const StorageID & table_id_, const String & remote_database_name, const String & remote_table_name, const String & connection_str, const StorageInMemoryMetadata & storage_metadata, const Context & context_, - std::unique_ptr replication_settings_); + std::unique_ptr replication_settings_); private: static std::shared_ptr getMaterializedColumnsDeclaration( @@ -92,7 +92,7 @@ private: std::string remote_table_name; std::shared_ptr global_context; - std::unique_ptr replication_settings; + std::unique_ptr replication_settings; std::unique_ptr replication_handler; std::atomic nested_loaded = false; diff --git a/src/Storages/registerStorages.cpp b/src/Storages/registerStorages.cpp index 0f5a3acaa86..bd32de1c315 100644 --- a/src/Storages/registerStorages.cpp +++ b/src/Storages/registerStorages.cpp @@ -60,7 +60,7 @@ void registerStorageEmbeddedRocksDB(StorageFactory & factory); #if USE_LIBPQXX void registerStoragePostgreSQL(StorageFactory & factory); -void registerStoragePostgreSQLReplica(StorageFactory & factory); +void registerStorageMaterializePostgreSQL(StorageFactory & factory); #endif void registerStorages() @@ -118,7 +118,7 @@ void registerStorages() #if USE_LIBPQXX registerStoragePostgreSQL(factory); - registerStoragePostgreSQLReplica(factory); + registerStorageMaterializePostgreSQL(factory); #endif }