This commit is contained in:
kssenii 2021-04-11 19:58:33 +00:00
parent c968ccb391
commit 01075677cf
11 changed files with 92 additions and 116 deletions

View File

@ -7,9 +7,9 @@ toc_title: MaterializePostgreSQL
## Creating a Database {#creating-a-database}
## Requirements
## Requirements {#requirements}
Each replicated table must have one of the following **replica identity**:
- Each replicated table must have one of the following **replica identity**:
1. **default** (primary key)
@ -38,3 +38,4 @@ WHERE oid = 'postgres_table'::regclass;
```
- Setting `wal_level`to `logical` and `max_replication_slots` to at least `2` in the postgresql config file.

View File

@ -25,7 +25,7 @@ class Connection;
using ConnectionPtr = std::shared_ptr<Connection>;
/// Connection string and address without login/password (for error logs)
/// Connection string and address without credentials (for logs)
using ConnectionInfo = std::pair<std::string, std::string>;
ConnectionInfo formatConnectionString(

View File

@ -9,7 +9,6 @@
#include <DataStreams/IBlockInputStream.h>
#include <Core/ExternalResultDescription.h>
#include <Core/Field.h>
#include <pqxx/pqxx>
#include <Core/PostgreSQL/insertPostgreSQLValue.h>
#include <Core/PostgreSQL/PostgreSQLConnection.h>
@ -28,6 +27,12 @@ public:
const Block & sample_block,
const UInt64 max_block_size_);
String getName() const override { return "PostgreSQL"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
void readPrefix() override;
protected:
PostgreSQLBlockInputStream(
std::shared_ptr<T> tx_,
const std::string & query_str_,
@ -35,12 +40,6 @@ public:
const UInt64 max_block_size_,
bool auto_commit_);
String getName() const override { return "PostgreSQL"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
void readPrefix() override;
protected:
String query_str;
std::shared_ptr<T> tx;
std::unique_ptr<pqxx::stream_from> stream;

View File

@ -60,7 +60,8 @@ void DatabaseMaterializePostgreSQL::startSynchronization()
metadata_path + METADATA_SUFFIX,
getContext(),
settings->postgresql_replica_max_block_size.value,
settings->postgresql_replica_allow_minimal_ddl, true,
settings->postgresql_replica_allow_minimal_ddl,
/* is_materialize_postgresql_database = */ true,
settings->postgresql_replica_tables_list.value);
std::unordered_set<std::string> tables_to_replicate = replication_handler->fetchRequiredTables(connection->getRef());
@ -123,9 +124,9 @@ StoragePtr DatabaseMaterializePostgreSQL::tryGetTable(const String & name, Conte
/// Note: In select query we call MaterializePostgreSQL table and it calls tryGetTable from its nested.
std::lock_guard lock(tables_mutex);
auto table = materialized_tables.find(name);
/// Here it is possible that nested table is temporarily out of reach, but return storage anyway,
/// it will not allow to read if nested is unavailable at the moment
if (table != materialized_tables.end())
/// Nested table is not created immediately. Consider that table exists only if nested table exists.
if (table != materialized_tables.end() && table->second->as<StorageMaterializePostgreSQL>()->isNestedLoaded())
return table->second;
return StoragePtr{};
@ -177,7 +178,7 @@ DatabaseTablesIteratorPtr DatabaseMaterializePostgreSQL::getTablesIterator(
Tables nested_tables;
for (const auto & [table_name, storage] : materialized_tables)
{
auto nested_storage = storage->template as<StorageMaterializePostgreSQL>()->tryGetNested();
auto nested_storage = storage->as<StorageMaterializePostgreSQL>()->tryGetNested();
if (nested_storage)
nested_tables[table_name] = nested_storage;
@ -186,6 +187,12 @@ DatabaseTablesIteratorPtr DatabaseMaterializePostgreSQL::getTablesIterator(
return std::make_unique<DatabaseTablesSnapshotIterator>(nested_tables, database_name);
}
void DatabaseMaterializePostgreSQL::renameTable(ContextPtr /* context_ */, const String & /* name */, IDatabase & /* to_database */, const String & /* to_name */, bool /* exchange */, bool /* dictionary */)
{
throw Exception("MaterializePostgreSQL database does not support rename table.", ErrorCodes::NOT_IMPLEMENTED);
}
}
#endif

View File

@ -51,6 +51,8 @@ public:
void createTable(ContextPtr context, const String & name, const StoragePtr & table, const ASTPtr & query) override;
void renameTable(ContextPtr context_, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary) override;
void drop(ContextPtr local_context) override;
void shutdown() override;

View File

@ -26,14 +26,14 @@ namespace ErrorCodes
}
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::connection & connection)
template<typename T>
std::unordered_set<std::string> fetchPostgreSQLTablesList(T & tx)
{
std::unordered_set<std::string> tables;
std::string query = "SELECT tablename FROM pg_catalog.pg_tables "
"WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'";
pqxx::read_transaction tx(connection);
for (auto table_name : tx.stream<std::string>(query))
for (auto table_name : tx.template stream<std::string>(query))
tables.insert(std::get<0>(table_name));
return tables;
@ -112,13 +112,13 @@ static DataTypePtr convertPostgreSQLDataType(std::string & type, bool is_nullabl
template<typename T>
std::shared_ptr<NamesAndTypesList> readNamesAndTypesList(
std::shared_ptr<T> tx, const String & postgres_table_name, const String & query, bool use_nulls, bool only_names_and_types)
T & tx, const String & postgres_table_name, const String & query, bool use_nulls, bool only_names_and_types)
{
auto columns = NamesAndTypesList();
try
{
pqxx::stream_from stream(*tx, pqxx::from_query, std::string_view(query));
pqxx::stream_from stream(tx, pqxx::from_query, std::string_view(query));
if (only_names_and_types)
{
@ -158,7 +158,7 @@ std::shared_ptr<NamesAndTypesList> readNamesAndTypesList(
template<typename T>
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
std::shared_ptr<T> tx, const String & postgres_table_name, bool use_nulls, bool with_primary_key, bool with_replica_identity_index)
T & tx, const String & postgres_table_name, bool use_nulls, bool with_primary_key, bool with_replica_identity_index)
{
PostgreSQLTableStructure table;
@ -213,29 +213,37 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
}
template
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
std::shared_ptr<pqxx::ReadTransaction> tx, const String & postgres_table_name, bool use_nulls,
bool with_primary_key, bool with_replica_identity_index);
template
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
std::shared_ptr<pqxx::ReplicationTransaction> tx, const String & postgres_table_name, bool use_nulls,
bool with_primary_key, bool with_replica_identity_index);
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::connection & connection, const String & postgres_table_name, bool use_nulls)
{
auto tx = std::make_shared<pqxx::ReadTransaction>(connection);
auto table = fetchPostgreSQLTableStructure(tx, postgres_table_name, use_nulls, false, false);
tx->commit();
return table;
postgres::Transaction<pqxx::ReadTransaction> tx(connection);
return fetchPostgreSQLTableStructure(tx.getRef(), postgres_table_name, use_nulls, false, false);
}
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::connection & connection)
{
postgres::Transaction<pqxx::ReadTransaction> tx(connection);
return fetchPostgreSQLTablesList(tx.getRef());
}
template
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::ReadTransaction & tx, const String & postgres_table_name, bool use_nulls,
bool with_primary_key, bool with_replica_identity_index);
template
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::ReplicationTransaction & tx, const String & postgres_table_name, bool use_nulls,
bool with_primary_key, bool with_replica_identity_index);
template
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::work & tx);
template
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::ReadTransaction & tx);
}
#endif

View File

@ -12,8 +12,6 @@
namespace DB
{
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::connection & connection);
struct PostgreSQLTableStructure
{
std::shared_ptr<NamesAndTypesList> columns;
@ -23,14 +21,19 @@ struct PostgreSQLTableStructure
using PostgreSQLTableStructurePtr = std::unique_ptr<PostgreSQLTableStructure>;
std::unordered_set<std::string> fetchPostgreSQLTablesList(pqxx::connection & connection);
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::connection & connection, const String & postgres_table_name, bool use_nulls);
template<typename T>
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
std::shared_ptr<T> tx, const String & postgres_table_name, bool use_nulls,
T & tx, const String & postgres_table_name, bool use_nulls,
bool with_primary_key = false, bool with_replica_identity_index = false);
template<typename T>
std::unordered_set<std::string> fetchPostgreSQLTablesList(T & tx);
}
#endif

View File

@ -87,7 +87,10 @@ void PostgreSQLReplicationHandler::shutdown()
void PostgreSQLReplicationHandler::startSynchronization()
{
createPublicationIfNeeded(connection->getRef());
{
postgres::Transaction<pqxx::work> tx(connection->getRef());
createPublicationIfNeeded(tx.getRef());
}
auto replication_connection = postgres::createReplicationConnection(connection_info);
postgres::Transaction<pqxx::nontransaction> tx(replication_connection->getRef());
@ -159,7 +162,7 @@ NameSet PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_na
std::string query_str = fmt::format("SET TRANSACTION SNAPSHOT '{}'", snapshot_name);
tx->exec(query_str);
storage_data.second->createNestedIfNeeded(fetchTableStructure(tx, table_name));
storage_data.second->createNestedIfNeeded(fetchTableStructure(*tx, table_name));
auto nested_storage = storage_data.second->getNested();
/// Load from snapshot, which will show table state before creation of replication slot.
@ -233,14 +236,12 @@ bool PostgreSQLReplicationHandler::isPublicationExist(pqxx::work & tx)
}
void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::connection & connection_)
void PostgreSQLReplicationHandler::createPublicationIfNeeded(pqxx::work & tx, bool create_without_check)
{
if (new_publication_created)
return;
postgres::Transaction<pqxx::work> tx(connection_);
if (!isPublicationExist(tx.getRef()))
if (create_without_check || !isPublicationExist(tx))
{
if (tables_list.empty())
{
@ -349,27 +350,33 @@ void PostgreSQLReplicationHandler::shutdownFinal()
}
/// Used by MaterializePostgreSQL database engine.
NameSet PostgreSQLReplicationHandler::fetchRequiredTables(pqxx::connection & connection_)
{
if (tables_list.empty())
postgres::Transaction<pqxx::work> tx(connection_);
bool publication_exists = isPublicationExist(tx.getRef());
if (tables_list.empty() && !publication_exists)
{
return fetchPostgreSQLTablesList(connection_);
}
else
{
createPublicationIfNeeded(connection_);
return fetchTablesFromPublication(connection_);
/// Fetch all tables list from database. Publication does not exist yet, which means
/// that no replication took place. Publication will be created in
/// startSynchronization method.
return fetchPostgreSQLTablesList(tx.getRef());
}
if (!publication_exists)
createPublicationIfNeeded(tx.getRef(), /* create_without_check = */ true);
return fetchTablesFromPublication(tx.getRef());
}
NameSet PostgreSQLReplicationHandler::fetchTablesFromPublication(pqxx::connection & connection_)
NameSet PostgreSQLReplicationHandler::fetchTablesFromPublication(pqxx::work & tx)
{
std::string query = fmt::format("SELECT tablename FROM pg_publication_tables WHERE pubname = '{}'", publication_name);
std::unordered_set<std::string> tables;
postgres::Transaction<pqxx::read_transaction> tx(connection_);
for (auto table_name : tx.getRef().stream<std::string>(query))
for (auto table_name : tx.stream<std::string>(query))
tables.insert(std::get<0>(table_name));
return tables;
@ -377,7 +384,7 @@ NameSet PostgreSQLReplicationHandler::fetchTablesFromPublication(pqxx::connectio
PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
std::shared_ptr<pqxx::ReplicationTransaction> tx, const std::string & table_name)
pqxx::ReplicationTransaction & tx, const std::string & table_name)
{
if (!is_postgresql_replica_database_engine)
return nullptr;

View File

@ -52,18 +52,20 @@ public:
private:
using Storages = std::unordered_map<String, StorageMaterializePostgreSQL *>;
void createPublicationIfNeeded(pqxx::connection & connection_);
bool isPublicationExist(pqxx::work & tx);
void createPublicationIfNeeded(pqxx::work & tx, bool create_without_check = false);
NameSet fetchTablesFromPublication(pqxx::work & tx);
void dropPublication(pqxx::nontransaction & ntx);
bool isReplicationSlotExist(pqxx::nontransaction & tx, std::string & slot_name);
void createReplicationSlot(pqxx::nontransaction & tx, std::string & start_lsn, std::string & snapshot_name, bool temporary = false);
void dropReplicationSlot(pqxx::nontransaction & tx, bool temporary = false);
void dropPublication(pqxx::nontransaction & ntx);
void waitConnectionAndStart();
void startSynchronization();
@ -72,11 +74,9 @@ private:
NameSet loadFromSnapshot(std::string & snapshot_name, Storages & sync_storages);
NameSet fetchTablesFromPublication(pqxx::connection & connection_);
std::unordered_map<Int32, String> reloadFromSnapshot(const std::vector<std::pair<Int32, String>> & relation_data);
PostgreSQLTableStructurePtr fetchTableStructure(std::shared_ptr<pqxx::ReplicationTransaction> tx, const std::string & table_name);
PostgreSQLTableStructurePtr fetchTableStructure(pqxx::ReplicationTransaction & tx, const std::string & table_name);
Poco::Logger * log;
ContextPtr context;

View File

@ -10,7 +10,6 @@
#include <Storages/IStorage.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Core/PostgreSQL/PostgreSQLPoolWithFailover.h>
#include <pqxx/pqxx>
namespace DB

View File

@ -303,56 +303,6 @@ def test_load_and_sync_subset_of_database_tables(started_cluster):
assert 'test_database' not in instance.query('SHOW DATABASES')
@pytest.mark.timeout(320)
def test_table_schema_changes(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
cursor = conn.cursor()
NUM_TABLES = 5
for i in range(NUM_TABLES):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i), template=postgres_table_template_2);
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {}, {}, {} from numbers(25)".format(i, i, i, i))
instance.query(
"""CREATE DATABASE test_database
ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')
SETTINGS postgresql_replica_allow_minimal_ddl = 1;
""")
for i in range(NUM_TABLES):
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 25 + number, {}, {}, {} from numbers(25)".format(i, i, i, i))
for i in range(NUM_TABLES):
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
expected = instance.query("SELECT key, value1, value3 FROM test_database.postgresql_replica_3 ORDER BY key");
altered_table = random.randint(0, 4)
cursor.execute("ALTER TABLE postgresql_replica_{} DROP COLUMN value2".format(altered_table))
for i in range(NUM_TABLES):
cursor.execute("INSERT INTO postgresql_replica_{} VALUES (50, {}, {})".format(i, i, i))
cursor.execute("UPDATE postgresql_replica_{} SET value3 = 12 WHERE key%2=0".format(i))
for i in range(NUM_TABLES):
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
for i in range(NUM_TABLES):
if i != altered_table:
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {}, {} from numbers(49)".format(i, i, i, i))
else:
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 51 + number, {}, {} from numbers(49)".format(i, i, i))
for i in range(NUM_TABLES):
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
for i in range(NUM_TABLES):
cursor.execute('drop table postgresql_replica_{};'.format(i))
instance.query("DROP DATABASE test_database")
@pytest.mark.timeout(120)
def test_changing_replica_identity_value(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")