Allow to replicate a subset of database tables

This commit is contained in:
kssenii 2021-02-12 18:21:55 +00:00
parent 219dece1d0
commit 44f4f1a412
9 changed files with 140 additions and 106 deletions

View File

@ -103,8 +103,9 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
&& engine_name != "Lazy" && engine_define->engine->arguments)
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS);
if (engine_define->engine->parameters || engine_define->partition_by || engine_define->primary_key || engine_define->order_by ||
engine_define->sample_by || (!endsWith(engine_name, "MySQL") && engine_define->settings))
if (engine_define->engine->parameters || engine_define->partition_by || engine_define->primary_key ||
engine_define->order_by || engine_define->sample_by ||
(!endsWith(engine_name, "MySQL") && (engine_name != "PostgreSQLReplica") && engine_define->settings))
throw Exception("Database engine " + engine_name + " cannot have parameters, primary_key, order_by, sample_by, settings",
ErrorCodes::UNKNOWN_ELEMENT_IN_AST);

View File

@ -39,8 +39,6 @@ namespace ErrorCodes
static const auto METADATA_SUFFIX = ".postgresql_replica_metadata";
/// TODO: add detach, after which table structure is updated, need to update StoragePtr and recreate nested_storage.
/// Also pass new storagePtr to replication Handler. Stop replication stream mean while?
template<>
DatabasePostgreSQLReplica<DatabaseOrdinary>::DatabasePostgreSQLReplica(
@ -91,19 +89,15 @@ DatabasePostgreSQLReplica<DatabaseAtomic>::DatabasePostgreSQLReplica(
template<typename Base>
void DatabasePostgreSQLReplica<Base>::startSynchronization()
{
auto publication_name = global_context.getMacros()->expand(settings->postgresql_publication_name.value);
auto replication_slot = global_context.getMacros()->expand(settings->postgresql_replication_slot_name.value);
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
remote_database_name,
connection->conn_str(),
metadata_path + METADATA_SUFFIX,
std::make_shared<Context>(global_context),
replication_slot,
publication_name,
settings->postgresql_max_block_size.changed
? settings->postgresql_max_block_size.value
: (global_context.getSettingsRef().max_insert_block_size.value));
: (global_context.getSettingsRef().max_insert_block_size.value),
global_context.getMacros()->expand(settings->postgresql_tables_list.value));
std::unordered_set<std::string> tables_to_replicate = replication_handler->fetchRequiredTables(connection->conn());

View File

@ -40,6 +40,7 @@ public:
std::unique_ptr<PostgreSQLReplicaSettings> settings_);
String getEngineName() const override { return "PostgreSQLReplica"; }
String getMetadataPath() const override { return metadata_path; }
void loadStoredObjects(Context &, bool, bool force_attach) override;
@ -60,6 +61,7 @@ public:
private:
void startSynchronization();
StoragePtr getStorage(const String & name);
Poco::Logger * log;
@ -72,12 +74,6 @@ private:
std::shared_ptr<PostgreSQLReplicationHandler> replication_handler;
std::map<std::string, StoragePtr> tables;
bool checkPostgresTable(const String & table_name) const;
std::unordered_set<std::string> fetchTablesList() const;
StoragePtr fetchTable(const String & table_name, const Context & context, const bool table_checked) const;
void removeOutdatedTables();
ASTPtr getColumnDeclaration(const DataTypePtr & data_type) const;
};
}

View File

@ -8,9 +8,8 @@ namespace DB
#define LIST_OF_POSTGRESQL_REPLICA_SETTINGS(M) \
M(String, postgresql_replication_slot_name, "", "PostgreSQL replication slot name.", 0) \
M(String, postgresql_publication_name, "", "PostgreSQL publication name.", 0) \
M(UInt64, postgresql_max_block_size, 0, "Number of row collected before flushing data into table.", 0) \
M(String, postgresql_tables_list, "", "List of tables for PostgreSQLReplica database engine", 0) \
DECLARE_SETTINGS_TRAITS(PostgreSQLReplicaSettingsTraits, LIST_OF_POSTGRESQL_REPLICA_SETTINGS)

View File

@ -29,22 +29,19 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
const std::string & conn_str,
const std::string & metadata_path_,
std::shared_ptr<Context> context_,
const std::string & publication_name_,
const std::string & replication_slot_name_,
const size_t max_block_size_)
const size_t max_block_size_,
const String tables_list_)
: log(&Poco::Logger::get("PostgreSQLReplicaHandler"))
, context(context_)
, database_name(database_name_)
, connection_str(conn_str)
, metadata_path(metadata_path_)
, publication_name(publication_name_)
, replication_slot(replication_slot_name_)
, max_block_size(max_block_size_)
, tables_list(tables_list_)
, connection(std::make_shared<PostgreSQLConnection>(conn_str))
, replication_connection(std::make_shared<PostgreSQLConnection>(fmt::format("{} replication=database", connection->conn_str())))
{
if (replication_slot.empty())
replication_slot = fmt::format("{}_ch_replication_slot", database_name);
replication_slot = fmt::format("{}_ch_replication_slot", database_name);
publication_name = fmt::format("{}_ch_publication", database_name);
startup_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ waitConnectionAndStart(); });
startup_task->deactivate();
@ -93,71 +90,12 @@ void PostgreSQLReplicationHandler::shutdown()
}
bool PostgreSQLReplicationHandler::isPublicationExist(std::shared_ptr<pqxx::work> tx)
{
std::string query_str = fmt::format("SELECT exists (SELECT 1 FROM pg_publication WHERE pubname = '{}')", publication_name);
pqxx::result result{tx->exec(query_str)};
assert(!result.empty());
bool publication_exists = (result[0][0].as<std::string>() == "t");
if (publication_exists)
LOG_TRACE(log, "Publication {} already exists. Using existing version", publication_name);
return publication_exists;
}
void PostgreSQLReplicationHandler::createPublication(std::shared_ptr<pqxx::work> tx)
{
String table_names;
for (const auto & storage_data : storages)
{
if (!table_names.empty())
table_names += ", ";
table_names += storage_data.first;
}
/// 'ONLY' means just a table, without descendants.
std::string query_str = fmt::format("CREATE PUBLICATION {} FOR TABLE ONLY {}", publication_name, table_names);
try
{
tx->exec(query_str);
LOG_TRACE(log, "Created publication {} with tables list: {}", publication_name, table_names);
}
catch (Exception & e)
{
e.addMessage("while creating pg_publication");
throw;
}
}
void PostgreSQLReplicationHandler::startSynchronization()
{
/// Used commands require a specific transaction isolation mode.
createPublicationIfNeeded(connection->conn());
auto replication_connection = std::make_shared<PostgreSQLConnection>(fmt::format("{} replication=database", connection->conn_str()));
replication_connection->conn()->set_variable("default_transaction_isolation", "'repeatable read'");
auto tx = std::make_shared<pqxx::work>(*replication_connection->conn());
bool new_publication = false;
if (publication_name.empty())
{
publication_name = fmt::format("{}_ch_publication", database_name);
if (!isPublicationExist(tx))
{
createPublication(tx);
new_publication = true;
}
}
else if (!isPublicationExist(tx))
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Publication name '{}' is spesified in table arguments, but it does not exist", publication_name);
}
tx->commit();
auto ntx = std::make_shared<pqxx::nontransaction>(*replication_connection->conn());
std::string snapshot_name, start_lsn;
@ -173,7 +111,7 @@ void PostgreSQLReplicationHandler::startSynchronization()
{
initial_sync();
}
else if (!Poco::File(metadata_path).exists() || new_publication)
else if (!Poco::File(metadata_path).exists() || new_publication_created)
{
/// In case of some failure, the following cases are possible (since publication and replication slot are reused):
/// 1. If replication slot exists and metadata file (where last synced version is written) does not exist, it is not ok.
@ -258,6 +196,59 @@ void PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_name)
}
bool PostgreSQLReplicationHandler::isPublicationExist(std::shared_ptr<pqxx::work> tx)
{
std::string query_str = fmt::format("SELECT exists (SELECT 1 FROM pg_publication WHERE pubname = '{}')", publication_name);
pqxx::result result{tx->exec(query_str)};
assert(!result.empty());
bool publication_exists = (result[0][0].as<std::string>() == "t");
if (publication_exists)
LOG_INFO(log, "Publication {} already exists. Using existing version", publication_name);
return publication_exists;
}
void PostgreSQLReplicationHandler::createPublicationIfNeeded(
PostgreSQLConnection::ConnectionPtr connection_)
{
if (new_publication_created)
return;
auto tx = std::make_shared<pqxx::work>(*connection_);
if (!isPublicationExist(tx))
{
if (tables_list.empty())
{
for (const auto & storage_data : storages)
{
if (!tables_list.empty())
tables_list += ", ";
tables_list += storage_data.first;
}
}
/// 'ONLY' means just a table, without descendants.
std::string query_str = fmt::format("CREATE PUBLICATION {} FOR TABLE ONLY {}", publication_name, tables_list);
try
{
tx->exec(query_str);
new_publication_created = true;
LOG_TRACE(log, "Created publication {} with tables list: {}", publication_name, tables_list);
}
catch (Exception & e)
{
e.addMessage("while creating pg_publication");
throw;
}
}
tx->commit();
}
bool PostgreSQLReplicationHandler::isReplicationSlotExist(NontransactionPtr ntx, std::string & slot_name)
{
std::string query_str = fmt::format("SELECT active, restart_lsn FROM pg_replication_slots WHERE slot_name = '{}'", slot_name);
@ -304,9 +295,6 @@ void PostgreSQLReplicationHandler::dropReplicationSlot(NontransactionPtr ntx, st
void PostgreSQLReplicationHandler::dropPublication(NontransactionPtr ntx)
{
if (publication_name.empty())
return;
std::string query_str = fmt::format("DROP PUBLICATION IF EXISTS {}", publication_name);
ntx->exec(query_str);
}
@ -328,7 +316,6 @@ void PostgreSQLReplicationHandler::shutdownFinal()
}
/// TODO: publication can be created with option `whole_database`. Check this case.
std::unordered_set<std::string> PostgreSQLReplicationHandler::fetchRequiredTables(PostgreSQLConnection::ConnectionPtr connection_)
{
auto publication_exist = [&]()
@ -339,14 +326,17 @@ std::unordered_set<std::string> PostgreSQLReplicationHandler::fetchRequiredTable
return exist;
};
if (publication_name.empty() || !publication_exist())
if (publication_exist())
{
return fetchTablesFromPublication(connection_);
}
else if (tables_list.empty())
{
/// Replicate the whole database and create our own pg_publication
return fetchPostgreSQLTablesList(connection_);
}
else
{
/// Replicate only tables, which are included in a pg_publication
createPublicationIfNeeded(connection_);
return fetchTablesFromPublication(connection_);
}
}

View File

@ -20,9 +20,8 @@ public:
const std::string & conn_str_,
const std::string & metadata_path_,
std::shared_ptr<Context> context_,
const std::string & publication_slot_name_,
const std::string & replication_slot_name_,
const size_t max_block_size_);
const size_t max_block_size_,
const String tables_list = "");
void startup();
@ -43,7 +42,7 @@ private:
bool isReplicationSlotExist(NontransactionPtr ntx, std::string & slot_name);
void createPublication(std::shared_ptr<pqxx::work> tx);
void createPublicationIfNeeded(PostgreSQLConnection::ConnectionPtr connection_);
void createReplicationSlot(NontransactionPtr ntx, std::string & start_lsn, std::string & snapshot_name);
@ -62,14 +61,15 @@ private:
Poco::Logger * log;
std::shared_ptr<Context> context;
const std::string database_name, connection_str, metadata_path;
std::string publication_name, replication_slot;
const size_t max_block_size;
std::string tables_list, replication_slot, publication_name;
PostgreSQLConnectionPtr connection, replication_connection;
PostgreSQLConnectionPtr connection;
std::shared_ptr<PostgreSQLReplicaConsumer> consumer;
BackgroundSchedulePool::TaskHolder startup_task;
std::atomic<bool> tables_loaded = false;
bool new_publication_created = false;
std::unordered_map<String, StoragePostgreSQLReplica *> storages;
std::unordered_map<String, StoragePtr> nested_storages;

View File

@ -62,8 +62,6 @@ StoragePostgreSQLReplica::StoragePostgreSQLReplica(
connection_str,
metadata_path,
global_context,
global_context->getMacros()->expand(replication_settings->postgresql_replication_slot_name.value),
global_context->getMacros()->expand(replication_settings->postgresql_publication_name.value),
replication_settings->postgresql_max_block_size.changed
? replication_settings->postgresql_max_block_size.value
: (global_context->getSettingsRef().max_insert_block_size.value)
@ -346,7 +344,10 @@ void StoragePostgreSQLReplica::dropNested()
NamesAndTypesList StoragePostgreSQLReplica::getVirtuals() const
{
return NamesAndTypesList{};
if (nested_storage)
return nested_storage->getVirtuals();
return {};
}

View File

@ -32,6 +32,7 @@ def create_postgres_db(cursor, name):
def create_postgres_table(cursor, table_name):
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name))
cursor.execute(postgres_table_template.format(table_name))
cursor.execute('ALTER TABLE {} REPLICA IDENTITY FULL;'.format(table_name))
@ -71,7 +72,8 @@ def postgresql_setup_teardown():
instance.query('DROP TABLE IF EXISTS test.postgresql_replica')
def test_load_and_sync_all_database(started_cluster):
def test_load_and_sync_all_database_tables(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
cursor = conn.cursor()
NUM_TABLES = 5
@ -95,6 +97,7 @@ def test_load_and_sync_all_database(started_cluster):
def test_replicating_dml(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
cursor = conn.cursor()
NUM_TABLES = 5
@ -135,6 +138,7 @@ def test_replicating_dml(started_cluster):
def test_different_data_types(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
cursor = conn.cursor()
cursor.execute('drop table if exists test_data_types;')
@ -209,6 +213,54 @@ def test_different_data_types(started_cluster):
assert(result == expected)
def test_load_and_sync_subset_of_database_tables(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
cursor = conn.cursor()
NUM_TABLES = 10
publication_tables = ''
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, number from numbers(50)".format(i))
if i < NUM_TABLES/2:
if publication_tables != '':
publication_tables += ', '
publication_tables += table_name
instance.query('''
CREATE DATABASE test_database
ENGINE = PostgreSQLReplica('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')
SETTINGS postgresql_tables_list = '{}';
'''.format(publication_tables))
assert 'test_database' in instance.query('SHOW DATABASES')
time.sleep(1)
result = instance.query('''SELECT count() FROM system.tables WHERE database = 'test_database';''')
assert(int(result) == NUM_TABLES/2)
database_tables = instance.query('SHOW TABLES FROM test_database')
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
if i < NUM_TABLES/2:
assert table_name in database_tables
else:
assert table_name not in database_tables
instance.query("INSERT INTO postgres_database.{} SELECT 50 + number, {} from numbers(100)".format(table_name, i))
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
if i < NUM_TABLES/2:
check_tables_are_synchronized(table_name);
cursor.execute('drop table {};'.format(table_name))
instance.query("DROP DATABASE test_database")
assert 'test_database' not in instance.query('SHOW DATABASES')
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -30,6 +30,7 @@ def create_postgres_db(cursor, name):
cursor.execute("CREATE DATABASE {}".format(name))
def create_postgres_table(cursor, table_name):
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name))
cursor.execute(postgres_table_template.format(table_name))
cursor.execute('ALTER TABLE {} REPLICA IDENTITY FULL;'.format(table_name))