Support replica identity index

This commit is contained in:
kssenii 2021-04-10 14:42:45 +00:00
parent dba1fe1989
commit 1c501e7d97
11 changed files with 99 additions and 43 deletions

View File

@ -15,19 +15,18 @@ Each replicated table must have one of the following **replica identity**:
2. **index**
```
``` bash
postgres# CREATE TABLE postgres_table (a Integer NOT NULL, b Integer, c Integer NOT NULL, d Integer, e Integer NOT NULL);
postgres# CREATE unique INDEX postgres_table_index on postgres_table(a, c, e);
postgres# ALTER TABLE postgres_table REPLICA IDENTITY USING INDEX postgres_table_index;
```
3. **full** (all columns, very inefficient)
Primary key is always checked first. If it is absent, then index, defined as replica identity index, is checked.
If index is used as replica identity, there has to be only one such index in a table.
You can check what type is used for a specific table with the following command:
``` sql
``` bash
postgres# SELECT CASE relreplident
WHEN 'd' THEN 'default'
WHEN 'n' THEN 'nothing'

View File

@ -418,7 +418,7 @@ class IColumn;
M(Bool, cast_keep_nullable, false, "CAST operator keep Nullable for result data type", 0) \
M(Bool, alter_partition_verbose_result, false, "Output information about affected parts. Currently works only for FREEZE and ATTACH commands.", 0) \
M(Bool, allow_experimental_database_materialize_mysql, false, "Allow to create database with Engine=MaterializeMySQL(...).", 0) \
M(Bool, allow_experimental_database_postgresql_replica, false, "Allow to create database with Engine=PostgreSQLReplica(...).", 0) \
M(Bool, allow_experimental_database_materialize_postgresql, false, "Allow to create database with Engine=MaterializePostgreSQL(...).", 0) \
M(Bool, external_databases_use_nulls, true, "If set to false, external databases will use default values instead of NULLs. (Sopported for PostgreSQL/MaterializePostgreSQL database engine)", 0) \
M(Bool, system_events_show_zero_values, false, "Include all metrics, even with zero values", 0) \
M(MySQLDataTypesSupport, mysql_datatypes_support_level, 0, "Which MySQL types should be converted to corresponding ClickHouse types (rather than being represented as String). Can be empty or any combination of 'decimal' or 'datetime64'. When empty MySQL's DECIMAL and DATETIME/TIMESTAMP with non-zero precision are seen as String on ClickHouse's side.", 0) \

View File

@ -158,7 +158,7 @@ std::shared_ptr<NamesAndTypesList> readNamesAndTypesList(
template<typename T>
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
std::shared_ptr<T> tx, const String & postgres_table_name, bool use_nulls, bool with_primary_key)
std::shared_ptr<T> tx, const String & postgres_table_name, bool use_nulls, bool with_primary_key, bool with_replica_identity_index)
{
PostgreSQLTableStructure table;
@ -171,18 +171,43 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
table.columns = readNamesAndTypesList(tx, postgres_table_name, query, use_nulls, false);
if (!with_primary_key)
return table;
if (with_primary_key)
{
/// wiki.postgresql.org/wiki/Retrieve_primary_key_columns
query = fmt::format(
"SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type "
"FROM pg_index i "
"JOIN pg_attribute a ON a.attrelid = i.indrelid "
"AND a.attnum = ANY(i.indkey) "
"WHERE i.indrelid = '{}'::regclass AND i.indisprimary", postgres_table_name);
/// wiki.postgresql.org/wiki/Retrieve_primary_key_columns
query = fmt::format(
"SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type "
"FROM pg_index i "
"JOIN pg_attribute a ON a.attrelid = i.indrelid "
"AND a.attnum = ANY(i.indkey) "
"WHERE i.indrelid = '{}'::regclass AND i.indisprimary", postgres_table_name);
table.primary_key_columns = readNamesAndTypesList(tx, postgres_table_name, query, use_nulls, true);
}
table.primary_key_columns = readNamesAndTypesList(tx, postgres_table_name, query, use_nulls, true);
if (with_replica_identity_index)
{
query = fmt::format(
"SELECT "
"a.attname AS column_name, " /// column name
"format_type(a.atttypid, a.atttypmod) as type " /// column type
"FROM "
"pg_class t, "
"pg_class i, "
"pg_index ix, "
"pg_attribute a "
"WHERE "
"t.oid = ix.indrelid "
"and i.oid = ix.indexrelid "
"and a.attrelid = t.oid "
"and a.attnum = ANY(ix.indkey) "
"and t.relkind = 'r' " /// simple tables
"and t.relname = '{}' "
"and ix.indisreplident = 't' " /// index is is replica identity index
"ORDER BY a.attname", /// column names
postgres_table_name);
table.replica_identity_columns = readNamesAndTypesList(tx, postgres_table_name, query, use_nulls, true);
}
return table;
}
@ -190,19 +215,21 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
template
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
std::shared_ptr<pqxx::ReadTransaction> tx, const String & postgres_table_name, bool use_nulls, bool with_primary_key);
std::shared_ptr<pqxx::ReadTransaction> tx, const String & postgres_table_name, bool use_nulls,
bool with_primary_key, bool with_replica_identity_index);
template
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
std::shared_ptr<pqxx::ReplicationTransaction> tx, const String & postgres_table_name, bool use_nulls, bool with_primary_key);
std::shared_ptr<pqxx::ReplicationTransaction> tx, const String & postgres_table_name, bool use_nulls,
bool with_primary_key, bool with_replica_identity_index);
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::connection & connection, const String & postgres_table_name, bool use_nulls, bool with_primary_key)
pqxx::connection & connection, const String & postgres_table_name, bool use_nulls)
{
auto tx = std::make_shared<pqxx::ReadTransaction>(connection);
auto table = fetchPostgreSQLTableStructure(tx, postgres_table_name, use_nulls, with_primary_key);
auto table = fetchPostgreSQLTableStructure(tx, postgres_table_name, use_nulls, false, false);
tx->commit();
return table;

View File

@ -18,16 +18,18 @@ struct PostgreSQLTableStructure
{
std::shared_ptr<NamesAndTypesList> columns;
std::shared_ptr<NamesAndTypesList> primary_key_columns;
std::shared_ptr<NamesAndTypesList> replica_identity_columns;
};
using PostgreSQLTableStructurePtr = std::unique_ptr<PostgreSQLTableStructure>;
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
pqxx::connection & connection, const String & postgres_table_name, bool use_nulls, bool with_primary_key = false);
pqxx::connection & connection, const String & postgres_table_name, bool use_nulls);
template<typename T>
PostgreSQLTableStructure fetchPostgreSQLTableStructure(
std::shared_ptr<T> tx, const String & postgres_table_name, bool use_nulls, bool with_primary_key = false);
std::shared_ptr<T> tx, const String & postgres_table_name, bool use_nulls,
bool with_primary_key = false, bool with_replica_identity_index = false);
}

View File

@ -215,7 +215,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
"Enable allow_experimental_database_replicated to use it.", ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}
if (create.storage->engine->name == "MaterializePostgreSQL" && !context.getSettingsRef().allow_experimental_database_postgresql_replica && !internal)
if (create.storage->engine->name == "MaterializePostgreSQL" && !context.getSettingsRef().allow_experimental_database_materialize_postgresql && !internal)
{
throw Exception("MaterializePostgreSQL is an experimental database engine. "
"Enable allow_experimental_database_postgresql_replica to use it.", ErrorCodes::UNKNOWN_DATABASE_ENGINE);

View File

@ -31,7 +31,6 @@ MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer(
const std::string & start_lsn,
const size_t max_block_size_,
bool allow_minimal_ddl_,
bool is_postgresql_replica_database_engine_,
Storages storages_)
: log(&Poco::Logger::get("PostgreSQLReaplicaConsumer"))
, context(context_)
@ -42,7 +41,6 @@ MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer(
, current_lsn(start_lsn)
, max_block_size(max_block_size_)
, allow_minimal_ddl(allow_minimal_ddl_)
, is_postgresql_replica_database_engine(is_postgresql_replica_database_engine_)
, storages(storages_)
{
for (const auto & [table_name, storage] : storages)
@ -401,15 +399,13 @@ void MaterializePostgreSQLConsumer::processReplicationMessage(const char * repli
/// 'n' - nothing
/// 'f' - all columns (set replica identity full)
/// 'i' - user defined index with indisreplident set
/// For database engine now supported only 'd', for table engine 'f' is also allowed.
/// Only 'd' and 'i' - are supported.
char replica_identity = readInt8(replication_message, pos, size);
if (replica_identity != 'd' && (replica_identity != 'f' || is_postgresql_replica_database_engine))
if (replica_identity != 'd' && replica_identity != 'i')
{
LOG_WARNING(log,
"Table has replica identity {} - not supported. "
"For database engine only default (with primary keys) replica identity is supported."
"For table engine full replica identity is also supported. Table will be skipped.");
"Table has replica identity {} - not supported. A table must have a primary key or a replica identity index");
markTableAsSkipped(relation_id, relation_name);
return;
}

View File

@ -35,7 +35,6 @@ public:
const std::string & start_lsn,
const size_t max_block_size_,
bool allow_minimal_ddl_,
bool is_postgresql_replica_database_engine_,
Storages storages_);
void readMetadata();
@ -110,7 +109,7 @@ private:
std::string current_lsn, final_lsn;
const size_t max_block_size;
bool allow_minimal_ddl, is_postgresql_replica_database_engine;
bool allow_minimal_ddl;
std::string table_to_insert;

View File

@ -142,7 +142,6 @@ void PostgreSQLReplicationHandler::startSynchronization()
start_lsn,
max_block_size,
allow_minimal_ddl,
is_postgresql_replica_database_engine,
nested_storages);
consumer_task->activateAndSchedule();
@ -386,7 +385,7 @@ PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
return nullptr;
auto use_nulls = context.getSettingsRef().external_databases_use_nulls;
return std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, table_name, use_nulls, true));
return std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, table_name, use_nulls, true, true));
}

View File

@ -200,18 +200,22 @@ ASTPtr StorageMaterializePostgreSQL::getCreateNestedTableQuery(PostgreSQLTableSt
storage_metadata.setColumns(ColumnsDescription(ordinary_columns_and_types));
setInMemoryMetadata(storage_metadata);
if (!table_structure->primary_key_columns)
if (!table_structure->primary_key_columns && !table_structure->replica_identity_columns)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"No primary key columns returned for table {}.{}", table_id.database_name, table_id.table_name);
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Table {}.{} has no primary key and no replica identity index", table_id.database_name, table_id.table_name);
}
auto primary_key_columns = *table_structure->primary_key_columns;
NamesAndTypesList merging_columns;
if (table_structure->primary_key_columns)
merging_columns = *table_structure->primary_key_columns;
else
merging_columns = *table_structure->replica_identity_columns;
order_by_expression->name = "tuple";
order_by_expression->arguments = std::make_shared<ASTExpressionList>();
for (const auto & column : primary_key_columns)
for (const auto & column : merging_columns)
order_by_expression->arguments->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
}

View File

@ -2,7 +2,7 @@
<yandex>
<profiles>
<default>
<allow_experimental_database_postgresql_replica>1</allow_experimental_database_postgresql_replica>
<allow_experimental_database_materialize_postgresql>1</allow_experimental_database_materialize_postgresql>
</default>
</profiles>
</yandex>

View File

@ -11,7 +11,7 @@ from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=['configs/log_conf.xml'],
main_configs = ['configs/log_conf.xml'],
user_configs = ['configs/users.xml'],
with_postgres=True, stay_alive=True)
@ -23,6 +23,10 @@ postgres_table_template_2 = """
CREATE TABLE IF NOT EXISTS {} (
key Integer NOT NULL, value1 Integer, value2 Integer, value3 Integer, PRIMARY KEY(key))
"""
postgres_table_template_3 = """
CREATE TABLE IF NOT EXISTS {} (
key1 Integer NOT NULL, value1 Integer, key2 Integer NOT NULL, value2 Integer NOT NULL)
"""
def get_postgres_conn(database=False):
if database == True:
@ -299,7 +303,7 @@ def test_load_and_sync_subset_of_database_tables(started_cluster):
assert 'test_database' not in instance.query('SHOW DATABASES')
@pytest.mark.timeout(240)
@pytest.mark.timeout(320)
def test_table_schema_changes(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
@ -392,6 +396,32 @@ def test_clickhouse_restart(started_cluster):
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
@pytest.mark.timeout(120)
def test_replica_identity_index(started_cluster):
instance.query("DROP DATABASE IF EXISTS test_database")
conn = get_postgres_conn(True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica', template=postgres_table_template_3);
cursor.execute("CREATE unique INDEX idx on postgresql_replica(key1, key2);")
cursor.execute("ALTER TABLE postgresql_replica REPLICA IDENTITY USING INDEX idx")
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(50, 10)")
instance.query(
"CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')")
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number, number, number from numbers(100, 10)")
check_tables_are_synchronized('postgresql_replica', order_by='key1');
cursor.execute("UPDATE postgresql_replica SET key1=key1-25 WHERE key1<100 ")
cursor.execute("UPDATE postgresql_replica SET key2=key2-25 WHERE key2>100 ")
cursor.execute("UPDATE postgresql_replica SET value1=value1+100 WHERE key1<100 ")
cursor.execute("UPDATE postgresql_replica SET value2=value2+200 WHERE key2>100 ")
check_tables_are_synchronized('postgresql_replica', order_by='key1');
cursor.execute('DELETE FROM postgresql_replica WHERE key2<75;')
check_tables_are_synchronized('postgresql_replica', order_by='key1');
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")