Rename to MaterializePostgreSQL

This commit is contained in:
kssenii 2021-03-17 09:58:10 +00:00
parent a03e849a8f
commit 87c740730b
15 changed files with 128 additions and 129 deletions

View File

@ -36,9 +36,9 @@
#if USE_LIBPQXX
#include <Databases/PostgreSQL/DatabasePostgreSQL.h> // Y_IGNORE
#include <Databases/PostgreSQL/DatabasePostgreSQLReplica.h>
#include <Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h>
#include <Storages/PostgreSQL/PostgreSQLConnection.h>
#include <Storages/PostgreSQL/PostgreSQLReplicaSettings.h>
#include <Storages/PostgreSQL/MaterializePostgreSQLSettings.h>
#endif
namespace DB
@ -101,14 +101,14 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const UUID & uuid = create.uuid;
bool engine_may_have_arguments = engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "Lazy" ||
engine_name == "Replicated" || engine_name == "PostgreSQL" || engine_name == "PostgreSQLReplica";
engine_name == "Replicated" || engine_name == "PostgreSQL" || engine_name == "MaterializePostgreSQL";
if (engine_define->engine->arguments && !engine_may_have_arguments)
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS);
bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by ||
engine_define->primary_key || engine_define->order_by ||
engine_define->sample_by;
bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "PostgreSQLReplica";
bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializePostgreSQL";
if (has_unexpected_element || (!may_have_settings && engine_define->settings))
throw Exception("Database engine " + engine_name + " cannot have parameters, primary_key, order_by, sample_by, settings",
ErrorCodes::UNKNOWN_ELEMENT_IN_AST);
@ -254,7 +254,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
return std::make_shared<DatabasePostgreSQL>(
context, metadata_path, engine_define, database_name, postgres_database_name, connection, use_table_cache);
}
else if (engine_name == "PostgreSQLReplica")
else if (engine_name == "MaterializePostgreSQL")
{
const ASTFunction * engine = engine_define->engine;
@ -279,21 +279,21 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
auto connection = std::make_shared<PostgreSQLConnection>(
postgres_database_name, parsed_host_port.first, parsed_host_port.second, username, password);
auto postgresql_replica_settings = std::make_unique<PostgreSQLReplicaSettings>();
auto postgresql_replica_settings = std::make_unique<MaterializePostgreSQLSettings>();
if (engine_define->settings)
postgresql_replica_settings->loadFromQuery(*engine_define);
if (create.uuid == UUIDHelpers::Nil)
{
return std::make_shared<DatabasePostgreSQLReplica<DatabaseOrdinary>>(
return std::make_shared<DatabaseMaterializePostgreSQL<DatabaseOrdinary>>(
context, metadata_path, uuid, engine_define,
database_name, postgres_database_name, connection,
std::move(postgresql_replica_settings));
}
else
{
return std::make_shared<DatabasePostgreSQLReplica<DatabaseAtomic>>(
return std::make_shared<DatabaseMaterializePostgreSQL<DatabaseAtomic>>(
context, metadata_path, uuid, engine_define,
database_name, postgres_database_name, connection,
std::move(postgresql_replica_settings));

View File

@ -1,9 +1,9 @@
#include <Databases/PostgreSQL/DatabasePostgreSQLReplica.h>
#include <Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h>
#if USE_LIBPQXX
#include <Storages/PostgreSQL/PostgreSQLConnection.h>
#include <Storages/PostgreSQL/StoragePostgreSQLReplica.h>
#include <Storages/PostgreSQL/StorageMaterializePostgreSQL.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
@ -30,7 +30,7 @@ namespace DB
static const auto METADATA_SUFFIX = ".postgresql_replica_metadata";
template<>
DatabasePostgreSQLReplica<DatabaseOrdinary>::DatabasePostgreSQLReplica(
DatabaseMaterializePostgreSQL<DatabaseOrdinary>::DatabaseMaterializePostgreSQL(
const Context & context,
const String & metadata_path_,
UUID /* uuid */,
@ -38,11 +38,11 @@ DatabasePostgreSQLReplica<DatabaseOrdinary>::DatabasePostgreSQLReplica(
const String & database_name_,
const String & postgres_database_name,
PostgreSQLConnectionPtr connection_,
std::unique_ptr<PostgreSQLReplicaSettings> settings_)
std::unique_ptr<MaterializePostgreSQLSettings> settings_)
: DatabaseOrdinary(
database_name_, metadata_path_, "data/" + escapeForFileName(database_name_) + "/",
"DatabasePostgreSQLReplica<Ordinary> (" + database_name_ + ")", context)
, log(&Poco::Logger::get("PostgreSQLReplicaDatabaseEngine"))
"DatabaseMaterializePostgreSQL<Ordinary> (" + database_name_ + ")", context)
, log(&Poco::Logger::get("MaterializePostgreSQLDatabaseEngine"))
, global_context(context.getGlobalContext())
, metadata_path(metadata_path_)
, database_engine_define(database_engine_define_->clone())
@ -55,7 +55,7 @@ DatabasePostgreSQLReplica<DatabaseOrdinary>::DatabasePostgreSQLReplica(
template<>
DatabasePostgreSQLReplica<DatabaseAtomic>::DatabasePostgreSQLReplica(
DatabaseMaterializePostgreSQL<DatabaseAtomic>::DatabaseMaterializePostgreSQL(
const Context & context,
const String & metadata_path_,
UUID uuid,
@ -63,8 +63,8 @@ DatabasePostgreSQLReplica<DatabaseAtomic>::DatabasePostgreSQLReplica(
const String & database_name_,
const String & postgres_database_name,
PostgreSQLConnectionPtr connection_,
std::unique_ptr<PostgreSQLReplicaSettings> settings_)
: DatabaseAtomic(database_name_, metadata_path_, uuid, "DatabasePostgreSQLReplica<Atomic> (" + database_name_ + ")", context)
std::unique_ptr<MaterializePostgreSQLSettings> settings_)
: DatabaseAtomic(database_name_, metadata_path_, uuid, "DatabaseMaterializePostgreSQL<Atomic> (" + database_name_ + ")", context)
, global_context(context.getGlobalContext())
, metadata_path(metadata_path_)
, database_engine_define(database_engine_define_->clone())
@ -76,7 +76,7 @@ DatabasePostgreSQLReplica<DatabaseAtomic>::DatabasePostgreSQLReplica(
template<typename Base>
void DatabasePostgreSQLReplica<Base>::startSynchronization()
void DatabaseMaterializePostgreSQL<Base>::startSynchronization()
{
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
remote_database_name,
@ -97,7 +97,7 @@ void DatabasePostgreSQLReplica<Base>::startSynchronization()
if (storage)
{
replication_handler->addStorage(table_name, storage->template as<StoragePostgreSQLReplica>());
replication_handler->addStorage(table_name, storage->template as<StorageMaterializePostgreSQL>());
tables[table_name] = storage;
}
}
@ -108,19 +108,19 @@ void DatabasePostgreSQLReplica<Base>::startSynchronization()
template<typename Base>
StoragePtr DatabasePostgreSQLReplica<Base>::getStorage(const String & name)
StoragePtr DatabaseMaterializePostgreSQL<Base>::getStorage(const String & name)
{
auto storage = tryGetTable(name, global_context);
if (storage)
return storage;
return StoragePostgreSQLReplica::create(StorageID(database_name, name), StoragePtr{}, global_context);
return StorageMaterializePostgreSQL::create(StorageID(database_name, name), StoragePtr{}, global_context);
}
template<typename Base>
void DatabasePostgreSQLReplica<Base>::shutdown()
void DatabaseMaterializePostgreSQL<Base>::shutdown()
{
if (replication_handler)
replication_handler->shutdown();
@ -128,7 +128,7 @@ void DatabasePostgreSQLReplica<Base>::shutdown()
template<typename Base>
void DatabasePostgreSQLReplica<Base>::loadStoredObjects(
void DatabaseMaterializePostgreSQL<Base>::loadStoredObjects(
Context & context, bool has_force_restore_data_flag, bool force_attach)
{
Base::loadStoredObjects(context, has_force_restore_data_flag, force_attach);
@ -149,7 +149,7 @@ void DatabasePostgreSQLReplica<Base>::loadStoredObjects(
template<typename Base>
StoragePtr DatabasePostgreSQLReplica<Base>::tryGetTable(const String & name, const Context & context) const
StoragePtr DatabaseMaterializePostgreSQL<Base>::tryGetTable(const String & name, const Context & context) const
{
if (context.hasQueryContext())
{
@ -171,7 +171,7 @@ StoragePtr DatabasePostgreSQLReplica<Base>::tryGetTable(const String & name, con
template<typename Base>
void DatabasePostgreSQLReplica<Base>::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query)
void DatabaseMaterializePostgreSQL<Base>::createTable(const Context & context, const String & name, const StoragePtr & table, const ASTPtr & query)
{
if (context.hasQueryContext())
{
@ -188,14 +188,14 @@ void DatabasePostgreSQLReplica<Base>::createTable(const Context & context, const
template<typename Base>
void DatabasePostgreSQLReplica<Base>::dropTable(const Context & context, const String & name, bool no_delay)
void DatabaseMaterializePostgreSQL<Base>::dropTable(const Context & context, const String & name, bool no_delay)
{
Base::dropTable(context, name, no_delay);
}
template<typename Base>
void DatabasePostgreSQLReplica<Base>::drop(const Context & context)
void DatabaseMaterializePostgreSQL<Base>::drop(const Context & context)
{
if (replication_handler)
{
@ -214,13 +214,13 @@ void DatabasePostgreSQLReplica<Base>::drop(const Context & context)
template<typename Base>
DatabaseTablesIteratorPtr DatabasePostgreSQLReplica<Base>::getTablesIterator(
DatabaseTablesIteratorPtr DatabaseMaterializePostgreSQL<Base>::getTablesIterator(
const Context & /* context */, const DatabaseOnDisk::FilterByNameFunction & /* filter_by_table_name */)
{
Tables nested_tables;
for (const auto & [table_name, storage] : tables)
{
auto nested_storage = storage->template as<StoragePostgreSQLReplica>()->tryGetNested();
auto nested_storage = storage->template as<StorageMaterializePostgreSQL>()->tryGetNested();
if (nested_storage)
nested_tables[table_name] = nested_storage;

View File

@ -7,7 +7,7 @@
#if USE_LIBPQXX
#include <Storages/PostgreSQL/PostgreSQLReplicationHandler.h>
#include <Storages/PostgreSQL/PostgreSQLReplicaSettings.h>
#include <Storages/PostgreSQL/MaterializePostgreSQLSettings.h>
#include <Databases/DatabasesCommon.h>
#include <Core/BackgroundSchedulePool.h>
@ -25,11 +25,11 @@ using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>;
template<typename Base>
class DatabasePostgreSQLReplica : public Base
class DatabaseMaterializePostgreSQL : public Base
{
public:
DatabasePostgreSQLReplica(
DatabaseMaterializePostgreSQL(
const Context & context,
const String & metadata_path_,
UUID uuid,
@ -37,9 +37,9 @@ public:
const String & dbname_,
const String & postgres_dbname,
PostgreSQLConnectionPtr connection_,
std::unique_ptr<PostgreSQLReplicaSettings> settings_);
std::unique_ptr<MaterializePostgreSQLSettings> settings_);
String getEngineName() const override { return "PostgreSQLReplica"; }
String getEngineName() const override { return "MaterializePostgreSQL"; }
String getMetadataPath() const override { return metadata_path; }
@ -69,7 +69,7 @@ private:
ASTPtr database_engine_define;
String database_name, remote_database_name;
PostgreSQLConnectionPtr connection;
std::unique_ptr<PostgreSQLReplicaSettings> settings;
std::unique_ptr<MaterializePostgreSQLSettings> settings;
std::shared_ptr<PostgreSQLReplicationHandler> replication_handler;
std::map<std::string, StoragePtr> tables;

View File

@ -23,7 +23,7 @@
#endif
#if USE_LIBPQXX
# include <Storages/PostgreSQL/StoragePostgreSQLReplica.h>
# include <Storages/PostgreSQL/StorageMaterializePostgreSQL.h>
#endif
namespace DB
@ -192,7 +192,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, Dat
#if USE_LIBPQXX
if (table->getName() == "PostgreSQLReplica")
table->as<StoragePostgreSQLReplica>()->shutdownFinal();
table->as<StorageMaterializePostgreSQL>()->shutdownFinal();
#endif
TableExclusiveLockHolder table_lock;

View File

@ -1,7 +1,7 @@
#include "PostgreSQLReplicaConsumer.h"
#include "MaterializePostgreSQLConsumer.h"
#if USE_LIBPQXX
#include "StoragePostgreSQLReplica.h"
#include "StorageMaterializePostgreSQL.h"
#include <Columns/ColumnNullable.h>
#include <Common/hex.h>
@ -22,7 +22,7 @@ namespace ErrorCodes
extern const int UNKNOWN_TABLE;
}
PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer(
MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer(
std::shared_ptr<Context> context_,
PostgreSQLConnectionPtr connection_,
const std::string & replication_slot_name_,
@ -52,7 +52,7 @@ PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer(
}
void PostgreSQLReplicaConsumer::Buffer::fillBuffer(StoragePtr storage)
void MaterializePostgreSQLConsumer::Buffer::fillBuffer(StoragePtr storage)
{
const auto storage_metadata = storage->getInMemoryMetadataPtr();
description.init(storage_metadata->getSampleBlock());
@ -77,7 +77,7 @@ void PostgreSQLReplicaConsumer::Buffer::fillBuffer(StoragePtr storage)
}
void PostgreSQLReplicaConsumer::readMetadata()
void MaterializePostgreSQLConsumer::readMetadata()
{
try
{
@ -98,7 +98,7 @@ void PostgreSQLReplicaConsumer::readMetadata()
}
void PostgreSQLReplicaConsumer::insertValue(Buffer & buffer, const std::string & value, size_t column_idx)
void MaterializePostgreSQLConsumer::insertValue(Buffer & buffer, const std::string & value, size_t column_idx)
{
const auto & sample = buffer.description.sample_block.getByPosition(column_idx);
bool is_nullable = buffer.description.types[column_idx].second;
@ -124,14 +124,14 @@ void PostgreSQLReplicaConsumer::insertValue(Buffer & buffer, const std::string &
}
void PostgreSQLReplicaConsumer::insertDefaultValue(Buffer & buffer, size_t column_idx)
void MaterializePostgreSQLConsumer::insertDefaultValue(Buffer & buffer, size_t column_idx)
{
const auto & sample = buffer.description.sample_block.getByPosition(column_idx);
insertDefaultPostgreSQLValue(*buffer.columns[column_idx], *sample.column);
}
void PostgreSQLReplicaConsumer::readString(const char * message, size_t & pos, size_t size, String & result)
void MaterializePostgreSQLConsumer::readString(const char * message, size_t & pos, size_t size, String & result)
{
assert(size > pos + 2);
char current = unhex2(message + pos);
@ -145,7 +145,7 @@ void PostgreSQLReplicaConsumer::readString(const char * message, size_t & pos, s
}
Int32 PostgreSQLReplicaConsumer::readInt32(const char * message, size_t & pos, [[maybe_unused]] size_t size)
Int32 MaterializePostgreSQLConsumer::readInt32(const char * message, size_t & pos, [[maybe_unused]] size_t size)
{
assert(size > pos + 8);
Int32 result = (UInt32(unhex2(message + pos)) << 24)
@ -157,7 +157,7 @@ Int32 PostgreSQLReplicaConsumer::readInt32(const char * message, size_t & pos, [
}
Int16 PostgreSQLReplicaConsumer::readInt16(const char * message, size_t & pos, [[maybe_unused]] size_t size)
Int16 MaterializePostgreSQLConsumer::readInt16(const char * message, size_t & pos, [[maybe_unused]] size_t size)
{
assert(size > pos + 4);
Int16 result = (UInt32(unhex2(message + pos)) << 8)
@ -167,7 +167,7 @@ Int16 PostgreSQLReplicaConsumer::readInt16(const char * message, size_t & pos, [
}
Int8 PostgreSQLReplicaConsumer::readInt8(const char * message, size_t & pos, [[maybe_unused]] size_t size)
Int8 MaterializePostgreSQLConsumer::readInt8(const char * message, size_t & pos, [[maybe_unused]] size_t size)
{
assert(size > pos + 2);
Int8 result = unhex2(message + pos);
@ -176,7 +176,7 @@ Int8 PostgreSQLReplicaConsumer::readInt8(const char * message, size_t & pos, [[m
}
Int64 PostgreSQLReplicaConsumer::readInt64(const char * message, size_t & pos, [[maybe_unused]] size_t size)
Int64 MaterializePostgreSQLConsumer::readInt64(const char * message, size_t & pos, [[maybe_unused]] size_t size)
{
assert(size > pos + 16);
Int64 result = (UInt64(unhex4(message + pos)) << 48)
@ -188,7 +188,7 @@ Int64 PostgreSQLReplicaConsumer::readInt64(const char * message, size_t & pos, [
}
void PostgreSQLReplicaConsumer::readTupleData(
void MaterializePostgreSQLConsumer::readTupleData(
Buffer & buffer, const char * message, size_t & pos, [[maybe_unused]] size_t size, PostgreSQLQuery type, bool old_value)
{
Int16 num_columns = readInt16(message, pos, size);
@ -257,7 +257,7 @@ void PostgreSQLReplicaConsumer::readTupleData(
/// https://www.postgresql.org/docs/13/protocol-logicalrep-message-formats.html
void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replication_message, size_t size)
void MaterializePostgreSQLConsumer::processReplicationMessage(const char * replication_message, size_t size)
{
/// Skip '\x'
size_t pos = 2;
@ -468,7 +468,7 @@ void PostgreSQLReplicaConsumer::processReplicationMessage(const char * replicati
}
void PostgreSQLReplicaConsumer::syncTables(std::shared_ptr<pqxx::nontransaction> tx)
void MaterializePostgreSQLConsumer::syncTables(std::shared_ptr<pqxx::nontransaction> tx)
{
for (const auto & table_name : tables_to_sync)
{
@ -517,7 +517,7 @@ void PostgreSQLReplicaConsumer::syncTables(std::shared_ptr<pqxx::nontransaction>
}
String PostgreSQLReplicaConsumer::advanceLSN(std::shared_ptr<pqxx::nontransaction> tx)
String MaterializePostgreSQLConsumer::advanceLSN(std::shared_ptr<pqxx::nontransaction> tx)
{
std::string query_str = fmt::format("SELECT end_lsn FROM pg_replication_slot_advance('{}', '{}')", replication_slot_name, final_lsn);
pqxx::result result{tx->exec(query_str)};
@ -529,7 +529,7 @@ String PostgreSQLReplicaConsumer::advanceLSN(std::shared_ptr<pqxx::nontransactio
}
bool PostgreSQLReplicaConsumer::isSyncAllowed(Int32 relation_id)
bool MaterializePostgreSQLConsumer::isSyncAllowed(Int32 relation_id)
{
auto table_with_lsn = skip_list.find(relation_id);
if (table_with_lsn == skip_list.end())
@ -553,7 +553,7 @@ bool PostgreSQLReplicaConsumer::isSyncAllowed(Int32 relation_id)
}
void PostgreSQLReplicaConsumer::markTableAsSkipped(Int32 relation_id, const String & relation_name)
void MaterializePostgreSQLConsumer::markTableAsSkipped(Int32 relation_id, const String & relation_name)
{
skip_list.insert({relation_id, ""});
schema_data.erase(relation_id);
@ -567,7 +567,7 @@ void PostgreSQLReplicaConsumer::markTableAsSkipped(Int32 relation_id, const Stri
/// Read binary changes from replication slot via COPY command (starting from current lsn in a slot).
bool PostgreSQLReplicaConsumer::readFromReplicationSlot()
bool MaterializePostgreSQLConsumer::readFromReplicationSlot()
{
std::shared_ptr<pqxx::nontransaction> tx;
bool slot_empty = true;
@ -640,7 +640,7 @@ bool PostgreSQLReplicaConsumer::readFromReplicationSlot()
}
bool PostgreSQLReplicaConsumer::consume(std::vector<std::pair<Int32, String>> & skipped_tables)
bool MaterializePostgreSQLConsumer::consume(std::vector<std::pair<Int32, String>> & skipped_tables)
{
if (!readFromReplicationSlot())
{
@ -660,7 +660,7 @@ bool PostgreSQLReplicaConsumer::consume(std::vector<std::pair<Int32, String>> &
}
void PostgreSQLReplicaConsumer::updateNested(const String & table_name, StoragePtr nested_storage)
void MaterializePostgreSQLConsumer::updateNested(const String & table_name, StoragePtr nested_storage)
{
storages[table_name] = nested_storage;
auto & buffer = buffers.find(table_name)->second;
@ -668,7 +668,7 @@ void PostgreSQLReplicaConsumer::updateNested(const String & table_name, StorageP
}
void PostgreSQLReplicaConsumer::updateSkipList(const std::unordered_map<Int32, String> & tables_with_lsn)
void MaterializePostgreSQLConsumer::updateSkipList(const std::unordered_map<Int32, String> & tables_with_lsn)
{
for (const auto & [relation_id, lsn] : tables_with_lsn)
{

View File

@ -6,7 +6,7 @@
#if USE_LIBPQXX
#include "PostgreSQLConnection.h"
#include "PostgreSQLReplicaMetadata.h"
#include "MaterializePostgreSQLMetadata.h"
#include "insertPostgreSQLValue.h"
#include <Core/BackgroundSchedulePool.h>
@ -21,12 +21,12 @@
namespace DB
{
class PostgreSQLReplicaConsumer
class MaterializePostgreSQLConsumer
{
public:
using Storages = std::unordered_map<String, StoragePtr>;
PostgreSQLReplicaConsumer(
MaterializePostgreSQLConsumer(
std::shared_ptr<Context> context_,
PostgreSQLConnectionPtr connection_,
const std::string & replication_slot_name_,
@ -103,7 +103,7 @@ private:
std::shared_ptr<Context> context;
const std::string replication_slot_name, publication_name;
PostgreSQLReplicaMetadata metadata;
MaterializePostgreSQLMetadata metadata;
PostgreSQLConnectionPtr connection;
std::string current_lsn, final_lsn;

View File

@ -1,4 +1,4 @@
#include "PostgreSQLReplicaMetadata.h"
#include "MaterializePostgreSQLMetadata.h"
#if USE_LIBPQXX
#include <Poco/File.h>
@ -12,7 +12,7 @@
namespace DB
{
PostgreSQLReplicaMetadata::PostgreSQLReplicaMetadata(const std::string & metadata_file_path)
MaterializePostgreSQLMetadata::MaterializePostgreSQLMetadata(const std::string & metadata_file_path)
: metadata_file(metadata_file_path)
, tmp_metadata_file(metadata_file_path + ".tmp")
, last_version(1)
@ -20,7 +20,7 @@ PostgreSQLReplicaMetadata::PostgreSQLReplicaMetadata(const std::string & metadat
}
void PostgreSQLReplicaMetadata::readMetadata()
void MaterializePostgreSQLMetadata::readMetadata()
{
if (Poco::File(metadata_file).exists())
{
@ -41,13 +41,13 @@ void PostgreSQLReplicaMetadata::readMetadata()
last_lsn = actual_lsn;
}
LOG_DEBUG(&Poco::Logger::get("PostgreSQLReplicaMetadata"),
LOG_DEBUG(&Poco::Logger::get("MaterializePostgreSQLMetadata"),
"Last written version is {}. (From metadata file {})", last_version, metadata_file);
}
}
void PostgreSQLReplicaMetadata::writeMetadata(bool append_metadata)
void MaterializePostgreSQLMetadata::writeMetadata(bool append_metadata)
{
WriteBufferFromFile out(tmp_metadata_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT);
@ -69,7 +69,7 @@ void PostgreSQLReplicaMetadata::writeMetadata(bool append_metadata)
/// While data is received, version is updated. Before table sync, write last version to tmp file.
/// Then sync data to table and rename tmp to non-tmp.
void PostgreSQLReplicaMetadata::commitMetadata(std::string & lsn, const std::function<String()> & finalizeStreamFunc)
void MaterializePostgreSQLMetadata::commitMetadata(std::string & lsn, const std::function<String()> & finalizeStreamFunc)
{
std::string actual_lsn;
last_lsn = lsn;
@ -90,7 +90,7 @@ void PostgreSQLReplicaMetadata::commitMetadata(std::string & lsn, const std::fun
if (actual_lsn != last_lsn)
{
writeMetadata(true);
LOG_WARNING(&Poco::Logger::get("PostgreSQLReplicaMetadata"),
LOG_WARNING(&Poco::Logger::get("MaterializePostgreSQLMetadata"),
"Last written LSN {} is not equal to actual LSN {}", last_lsn, actual_lsn);
}
}

View File

@ -5,10 +5,10 @@
namespace DB
{
class PostgreSQLReplicaMetadata
class MaterializePostgreSQLMetadata
{
public:
PostgreSQLReplicaMetadata(const std::string & metadata_file_path);
MaterializePostgreSQLMetadata(const std::string & metadata_file_path);
void commitMetadata(std::string & lsn, const std::function<String()> & finalizeStreamFunc);

View File

@ -1,4 +1,4 @@
#include "PostgreSQLReplicaSettings.h"
#include "MaterializePostgreSQLSettings.h"
#if USE_LIBPQXX
#include <Parsers/ASTCreateQuery.h>
@ -15,9 +15,9 @@ namespace ErrorCodes
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_TRAITS(PostgreSQLReplicaSettingsTraits, LIST_OF_POSTGRESQL_REPLICA_SETTINGS)
IMPLEMENT_SETTINGS_TRAITS(MaterializePostgreSQLSettingsTraits, LIST_OF_POSTGRESQL_REPLICA_SETTINGS)
void PostgreSQLReplicaSettings::loadFromQuery(ASTStorage & storage_def)
void MaterializePostgreSQLSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{

View File

@ -15,12 +15,12 @@ namespace DB
#define LIST_OF_POSTGRESQL_REPLICA_SETTINGS(M) \
M(UInt64, postgresql_replica_max_block_size, 0, "Number of row collected before flushing data into table.", 0) \
M(String, postgresql_replica_tables_list, "", "List of tables for PostgreSQLReplica database engine", 0) \
M(String, postgresql_replica_tables_list, "", "List of tables for MaterializePostgreSQL database engine", 0) \
M(Bool, postgresql_replica_allow_minimal_ddl, 0, "Allow to track minimal possible ddl. By default, table after ddl will get into a skip list", 0) \
DECLARE_SETTINGS_TRAITS(PostgreSQLReplicaSettingsTraits, LIST_OF_POSTGRESQL_REPLICA_SETTINGS)
DECLARE_SETTINGS_TRAITS(MaterializePostgreSQLSettingsTraits, LIST_OF_POSTGRESQL_REPLICA_SETTINGS)
struct PostgreSQLReplicaSettings : public BaseSettings<PostgreSQLReplicaSettingsTraits>
struct MaterializePostgreSQLSettings : public BaseSettings<MaterializePostgreSQLSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};

View File

@ -3,7 +3,7 @@
#if USE_LIBPQXX
#include <DataStreams/PostgreSQLBlockInputStream.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Storages/PostgreSQL/StoragePostgreSQLReplica.h>
#include <Storages/PostgreSQL/StorageMaterializePostgreSQL.h>
#include <Common/setThreadName.h>
#include <DataStreams/copyData.h>
@ -27,7 +27,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
bool allow_minimal_ddl_,
bool is_postgresql_replica_database_engine_,
const String tables_list_)
: log(&Poco::Logger::get("PostgreSQLReplicaHandler"))
: log(&Poco::Logger::get("PostgreSQLReplicationHandler"))
, context(context_)
, database_name(database_name_)
, connection_str(conn_str)
@ -49,7 +49,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
}
void PostgreSQLReplicationHandler::addStorage(const std::string & table_name, StoragePostgreSQLReplica * storage)
void PostgreSQLReplicationHandler::addStorage(const std::string & table_name, StorageMaterializePostgreSQL * storage)
{
storages[table_name] = storage;
}
@ -138,7 +138,7 @@ void PostgreSQLReplicationHandler::startSynchronization()
ntx->commit();
consumer = std::make_shared<PostgreSQLReplicaConsumer>(
consumer = std::make_shared<MaterializePostgreSQLConsumer>(
context,
connection,
replication_slot,

View File

@ -6,8 +6,8 @@
#if USE_LIBPQXX
#include "PostgreSQLConnection.h"
#include "PostgreSQLReplicaConsumer.h"
#include "PostgreSQLReplicaMetadata.h"
#include "MaterializePostgreSQLConsumer.h"
#include "MaterializePostgreSQLMetadata.h"
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
@ -19,7 +19,7 @@ namespace DB
/// exist in CH, it can be loaded via snapshot while stream is stopped and then comparing wal positions with
/// current lsn and table start lsn.
class StoragePostgreSQLReplica;
class StorageMaterializePostgreSQL;
class PostgreSQLReplicationHandler
{
@ -41,7 +41,7 @@ public:
void shutdownFinal();
void addStorage(const std::string & table_name, StoragePostgreSQLReplica * storage);
void addStorage(const std::string & table_name, StorageMaterializePostgreSQL * storage);
NameSet fetchRequiredTables(PostgreSQLConnection::ConnectionPtr connection_);
@ -49,7 +49,7 @@ public:
private:
using NontransactionPtr = std::shared_ptr<pqxx::nontransaction>;
using Storages = std::unordered_map<String, StoragePostgreSQLReplica *>;
using Storages = std::unordered_map<String, StorageMaterializePostgreSQL *>;
bool isPublicationExist(std::shared_ptr<pqxx::work> tx);
@ -83,7 +83,7 @@ private:
std::string tables_list, replication_slot, publication_name;
PostgreSQLConnectionPtr connection;
std::shared_ptr<PostgreSQLReplicaConsumer> consumer;
std::shared_ptr<MaterializePostgreSQLConsumer> consumer;
BackgroundSchedulePool::TaskHolder startup_task, consumer_task;
std::atomic<bool> tables_loaded = false, stop_synchronization = false;
@ -96,4 +96,3 @@ private:
}
#endif

View File

@ -1,4 +1,4 @@
#include "StoragePostgreSQLReplica.h"
#include "StorageMaterializePostgreSQL.h"
#if USE_LIBPQXX
#include <Common/Macros.h>
@ -36,20 +36,20 @@ namespace ErrorCodes
static const auto NESTED_STORAGE_SUFFIX = "_ReplacingMergeTree";
StoragePostgreSQLReplica::StoragePostgreSQLReplica(
StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
const StorageID & table_id_,
const String & remote_database_name,
const String & remote_table_name_,
const String & connection_str,
const StorageInMemoryMetadata & storage_metadata,
const Context & context_,
std::unique_ptr<PostgreSQLReplicaSettings> replication_settings_)
std::unique_ptr<MaterializePostgreSQLSettings> replication_settings_)
: IStorage(table_id_)
, remote_table_name(remote_table_name_)
, global_context(std::make_shared<Context>(context_.getGlobalContext()))
, replication_settings(std::move(replication_settings_))
, is_postgresql_replica_database(
DatabaseCatalog::instance().getDatabase(getStorageID().database_name)->getEngineName() == "PostgreSQLReplica")
DatabaseCatalog::instance().getDatabase(getStorageID().database_name)->getEngineName() == "MaterializePostgreSQL")
{
setInMemoryMetadata(storage_metadata);
@ -68,7 +68,7 @@ StoragePostgreSQLReplica::StoragePostgreSQLReplica(
}
StoragePostgreSQLReplica::StoragePostgreSQLReplica(
StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
const StorageID & table_id_,
StoragePtr nested_storage_,
const Context & context_)
@ -76,13 +76,13 @@ StoragePostgreSQLReplica::StoragePostgreSQLReplica(
, global_context(std::make_shared<Context>(context_))
, nested_storage(nested_storage_)
, is_postgresql_replica_database(
DatabaseCatalog::instance().getDatabase(getStorageID().database_name)->getEngineName() == "PostgreSQLReplica")
DatabaseCatalog::instance().getDatabase(getStorageID().database_name)->getEngineName() == "MaterializePostgreSQL")
{
}
std::string StoragePostgreSQLReplica::getNestedTableName() const
std::string StorageMaterializePostgreSQL::getNestedTableName() const
{
auto table_name = getStorageID().table_name;
@ -93,7 +93,7 @@ std::string StoragePostgreSQLReplica::getNestedTableName() const
}
std::shared_ptr<ASTColumnDeclaration> StoragePostgreSQLReplica::getMaterializedColumnsDeclaration(
std::shared_ptr<ASTColumnDeclaration> StorageMaterializePostgreSQL::getMaterializedColumnsDeclaration(
const String name, const String type, UInt64 default_value)
{
auto column_declaration = std::make_shared<ASTColumnDeclaration>();
@ -111,7 +111,7 @@ std::shared_ptr<ASTColumnDeclaration> StoragePostgreSQLReplica::getMaterializedC
}
ASTPtr StoragePostgreSQLReplica::getColumnDeclaration(const DataTypePtr & data_type) const
ASTPtr StorageMaterializePostgreSQL::getColumnDeclaration(const DataTypePtr & data_type) const
{
WhichDataType which(data_type);
@ -152,10 +152,10 @@ ASTPtr StoragePostgreSQLReplica::getColumnDeclaration(const DataTypePtr & data_t
}
/// For single storage PostgreSQLReplica get columns and primary key columns from storage definition.
/// For database engine PostgreSQLReplica get columns and primary key columns by fetching from PostgreSQL, also using the same
/// For single storage MaterializePostgreSQL get columns and primary key columns from storage definition.
/// For database engine MaterializePostgreSQL get columns and primary key columns by fetching from PostgreSQL, also using the same
/// transaction with snapshot, which is used for initial tables dump.
ASTPtr StoragePostgreSQLReplica::getCreateNestedTableQuery(const std::function<PostgreSQLTableStructure()> & fetch_table_structure)
ASTPtr StorageMaterializePostgreSQL::getCreateNestedTableQuery(const std::function<PostgreSQLTableStructure()> & fetch_table_structure)
{
auto create_table_query = std::make_shared<ASTCreateQuery>();
@ -240,7 +240,7 @@ ASTPtr StoragePostgreSQLReplica::getCreateNestedTableQuery(const std::function<P
}
void StoragePostgreSQLReplica::createNestedIfNeeded(const std::function<PostgreSQLTableStructure()> & fetch_table_structure)
void StorageMaterializePostgreSQL::createNestedIfNeeded(const std::function<PostgreSQLTableStructure()> & fetch_table_structure)
{
if (nested_loaded)
{
@ -267,7 +267,7 @@ void StoragePostgreSQLReplica::createNestedIfNeeded(const std::function<PostgreS
}
Context StoragePostgreSQLReplica::makeNestedTableContext() const
Context StorageMaterializePostgreSQL::makeNestedTableContext() const
{
auto context(*global_context);
context.makeQueryContext();
@ -277,7 +277,7 @@ Context StoragePostgreSQLReplica::makeNestedTableContext() const
}
StoragePtr StoragePostgreSQLReplica::getNested()
StoragePtr StorageMaterializePostgreSQL::getNested()
{
if (nested_storage)
return nested_storage;
@ -290,7 +290,7 @@ StoragePtr StoragePostgreSQLReplica::getNested()
}
StoragePtr StoragePostgreSQLReplica::tryGetNested()
StoragePtr StorageMaterializePostgreSQL::tryGetNested()
{
if (nested_storage)
return nested_storage;
@ -303,7 +303,7 @@ StoragePtr StoragePostgreSQLReplica::tryGetNested()
}
void StoragePostgreSQLReplica::startup()
void StorageMaterializePostgreSQL::startup()
{
if (!is_postgresql_replica_database)
{
@ -313,14 +313,14 @@ void StoragePostgreSQLReplica::startup()
}
void StoragePostgreSQLReplica::shutdown()
void StorageMaterializePostgreSQL::shutdown()
{
if (replication_handler)
replication_handler->shutdown();
}
void StoragePostgreSQLReplica::shutdownFinal()
void StorageMaterializePostgreSQL::shutdownFinal()
{
if (is_postgresql_replica_database)
return;
@ -333,7 +333,7 @@ void StoragePostgreSQLReplica::shutdownFinal()
}
void StoragePostgreSQLReplica::dropNested()
void StorageMaterializePostgreSQL::dropNested()
{
std::lock_guard lock(nested_mutex);
nested_loaded = false;
@ -351,11 +351,11 @@ void StoragePostgreSQLReplica::dropNested()
interpreter.execute();
nested_storage = nullptr;
LOG_WARNING(&Poco::Logger::get("StoragePostgreSQLReplica"), "Dropped (or temporarily) nested table {}", getNestedTableName());
LOG_WARNING(&Poco::Logger::get("StorageMaterializePostgreSQL"), "Dropped (or temporarily) nested table {}", getNestedTableName());
}
NamesAndTypesList StoragePostgreSQLReplica::getVirtuals() const
NamesAndTypesList StorageMaterializePostgreSQL::getVirtuals() const
{
if (nested_storage)
return nested_storage->getVirtuals();
@ -364,7 +364,7 @@ NamesAndTypesList StoragePostgreSQLReplica::getVirtuals() const
}
Pipe StoragePostgreSQLReplica::read(
Pipe StorageMaterializePostgreSQL::read(
const Names & column_names,
const StorageMetadataPtr & /* metadata_snapshot */,
SelectQueryInfo & query_info,
@ -442,24 +442,24 @@ Pipe StoragePostgreSQLReplica::read(
return pipe;
}
LOG_WARNING(&Poco::Logger::get("StoragePostgreSQLReplica"), "Nested table {} is unavailable or is not loaded yet", getNestedTableName());
LOG_WARNING(&Poco::Logger::get("StorageMaterializePostgreSQL"), "Nested table {} is unavailable or is not loaded yet", getNestedTableName());
return Pipe();
}
void registerStoragePostgreSQLReplica(StorageFactory & factory)
void registerStorageMaterializePostgreSQL(StorageFactory & factory)
{
auto creator_fn = [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
bool has_settings = args.storage_def->settings;
auto postgresql_replication_settings = std::make_unique<PostgreSQLReplicaSettings>();
auto postgresql_replication_settings = std::make_unique<MaterializePostgreSQLSettings>();
if (has_settings)
postgresql_replication_settings->loadFromQuery(*args.storage_def);
if (engine_args.size() != 5)
throw Exception("Storage PostgreSQLReplica requires 5 parameters: "
throw Exception("Storage MaterializePostgreSQL requires 5 parameters: "
"PostgreSQL('host:port', 'database', 'table', 'username', 'password'",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -474,7 +474,7 @@ void registerStoragePostgreSQLReplica(StorageFactory & factory)
args.storage_def->set(args.storage_def->order_by, args.storage_def->primary_key->clone());
if (!args.storage_def->order_by)
throw Exception("Storage PostgreSQLReplica needs order by key or primary key", ErrorCodes::BAD_ARGUMENTS);
throw Exception("Storage MaterializePostgreSQL needs order by key or primary key", ErrorCodes::BAD_ARGUMENTS);
if (args.storage_def->primary_key)
metadata.primary_key = KeyDescription::getKeyFromAST(args.storage_def->primary_key->ptr(), metadata.columns, args.context);
@ -493,14 +493,14 @@ void registerStoragePostgreSQLReplica(StorageFactory & factory)
engine_args[3]->as<ASTLiteral &>().value.safeGet<String>(),
engine_args[4]->as<ASTLiteral &>().value.safeGet<String>());
return StoragePostgreSQLReplica::create(
return StorageMaterializePostgreSQL::create(
args.table_id, remote_database, remote_table, connection.conn_str(),
metadata, args.context,
std::move(postgresql_replication_settings));
};
factory.registerStorage(
"PostgreSQLReplica",
"MaterializePostgreSQL",
creator_fn,
StorageFactory::StorageFeatures{
.supports_settings = true,

View File

@ -6,7 +6,7 @@
#if USE_LIBPQXX
#include "PostgreSQLReplicationHandler.h"
#include "PostgreSQLReplicaSettings.h"
#include "MaterializePostgreSQLSettings.h"
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
@ -24,17 +24,17 @@
namespace DB
{
class StoragePostgreSQLReplica final : public ext::shared_ptr_helper<StoragePostgreSQLReplica>, public IStorage
class StorageMaterializePostgreSQL final : public ext::shared_ptr_helper<StorageMaterializePostgreSQL>, public IStorage
{
friend struct ext::shared_ptr_helper<StoragePostgreSQLReplica>;
friend struct ext::shared_ptr_helper<StorageMaterializePostgreSQL>;
public:
StoragePostgreSQLReplica(
StorageMaterializePostgreSQL(
const StorageID & table_id_,
StoragePtr nested_storage_,
const Context & context_);
String getName() const override { return "PostgreSQLReplica"; }
String getName() const override { return "MaterializePostgreSQL"; }
void startup() override;
void shutdown() override;
@ -70,14 +70,14 @@ public:
void dropNested();
protected:
StoragePostgreSQLReplica(
StorageMaterializePostgreSQL(
const StorageID & table_id_,
const String & remote_database_name,
const String & remote_table_name,
const String & connection_str,
const StorageInMemoryMetadata & storage_metadata,
const Context & context_,
std::unique_ptr<PostgreSQLReplicaSettings> replication_settings_);
std::unique_ptr<MaterializePostgreSQLSettings> replication_settings_);
private:
static std::shared_ptr<ASTColumnDeclaration> getMaterializedColumnsDeclaration(
@ -92,7 +92,7 @@ private:
std::string remote_table_name;
std::shared_ptr<Context> global_context;
std::unique_ptr<PostgreSQLReplicaSettings> replication_settings;
std::unique_ptr<MaterializePostgreSQLSettings> replication_settings;
std::unique_ptr<PostgreSQLReplicationHandler> replication_handler;
std::atomic<bool> nested_loaded = false;

View File

@ -60,7 +60,7 @@ void registerStorageEmbeddedRocksDB(StorageFactory & factory);
#if USE_LIBPQXX
void registerStoragePostgreSQL(StorageFactory & factory);
void registerStoragePostgreSQLReplica(StorageFactory & factory);
void registerStorageMaterializePostgreSQL(StorageFactory & factory);
#endif
void registerStorages()
@ -118,7 +118,7 @@ void registerStorages()
#if USE_LIBPQXX
registerStoragePostgreSQL(factory);
registerStoragePostgreSQLReplica(factory);
registerStorageMaterializePostgreSQL(factory);
#endif
}