Merge pull request #33200 from kssenii/fix-materialized-postgresql-ddl-validation

materialized postgresql fix ddl validation
This commit is contained in:
Kseniia Sumarokova 2022-01-09 21:12:13 +03:00 committed by GitHub
commit d0a847befc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 332 additions and 233 deletions

View File

@ -7,7 +7,7 @@ toc_title: MaterializedPostgreSQL
Creates a ClickHouse database with tables from PostgreSQL database. Firstly, database with engine `MaterializedPostgreSQL` creates a snapshot of PostgreSQL database and loads required tables. Required tables can include any subset of tables from any subset of schemas from specified database. Along with the snapshot database engine acquires LSN and once initial dump of tables is performed - it starts pulling updates from WAL. After database is created, newly added tables to PostgreSQL database are not automatically added to replication. They have to be added manually with `ATTACH TABLE db.table` query.
Replication is implemented with PostgreSQL Logical Replication Protocol, which does not allow to replicate DDL, but allows to know whether replication breaking changes happened (column type changes, adding/removing columns). Such changes are detected and according tables stop receiving updates. Such tables can be automatically reloaded in the background in case required setting is turned on. Safest way for now is to use `ATTACH`/ `DETACH` queries to reload table completely. If DDL does not break replication (for example, renaming a column) table will still receive updates (insertion is done by position).
Replication is implemented with PostgreSQL Logical Replication Protocol, which does not allow to replicate DDL, but allows to know whether replication breaking changes happened (column type changes, adding/removing columns). Such changes are detected and according tables stop receiving updates. Such tables can be automatically reloaded in the background in case required setting is turned on (can be used starting from 22.1). Safest way for now is to use `ATTACH`/ `DETACH` queries to reload table completely. If DDL does not break replication (for example, renaming a column) table will still receive updates (insertion is done by position).
## Creating a Database {#creating-a-database}
@ -46,7 +46,7 @@ After `MaterializedPostgreSQL` database is created, it does not automatically de
ATTACH TABLE postgres_database.new_table;
```
Warning: before version 21.13 adding table to replication left unremoved temprorary replication slot (named `{db_name}_ch_replication_slot_tmp`). If attaching tables in clickhouse version before 21.13, make sure to delete it manually (`SELECT pg_drop_replication_slot('{db_name}_ch_replication_slot_tmp')`). Otherwise disk usage will grow. Issue is fixed in 21.13.
Warning: before version 22.1 adding table to replication left unremoved temprorary replication slot (named `{db_name}_ch_replication_slot_tmp`). If attaching tables in clickhouse version before 22.1, make sure to delete it manually (`SELECT pg_drop_replication_slot('{db_name}_ch_replication_slot_tmp')`). Otherwise disk usage will grow. Issue is fixed in 22.1.
## Dynamically removing tables from replication {#dynamically-removing-table-from-replication}
@ -156,6 +156,8 @@ Default value: empty list. (Default schema is used)
4. materialized_postgresql_allow_automatic_update {#materialized-postgresql-allow-automatic-update}
Do not use this setting before 22.1 version.
Allows reloading table in the background, when schema changes are detected. DDL queries on the PostgreSQL side are not replicated via ClickHouse [MaterializedPostgreSQL](../../engines/database-engines/materialized-postgresql.md) engine, because it is not allowed with PostgreSQL logical replication protocol, but the fact of DDL changes is detected transactionally. In this case, the default behaviour is to stop replicating those tables once DDL is detected. However, if this setting is enabled, then, instead of stopping the replication of those tables, they will be reloaded in the background via database snapshot without data losses and replication will continue for them.
Possible values:

View File

@ -182,19 +182,19 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr,
return StoragePtr{};
auto connection_holder = pool->get();
auto columns = fetchPostgreSQLTableStructure(connection_holder->get(), table_name, configuration.schema).columns;
auto columns_info = fetchPostgreSQLTableStructure(connection_holder->get(), table_name, configuration.schema).physical_columns;
if (!columns)
if (!columns_info)
return StoragePtr{};
auto storage = StoragePostgreSQL::create(
StorageID(database_name, table_name), pool, table_name,
ColumnsDescription{*columns}, ConstraintsDescription{}, String{}, configuration.schema, configuration.on_conflict);
ColumnsDescription{columns_info->columns}, ConstraintsDescription{}, String{}, configuration.schema, configuration.on_conflict);
if (cache_tables)
cached_tables[table_name] = storage;
return storage;
return std::move(storage);
}
if (table_checked || checkPostgresTable(table_name))
@ -414,7 +414,7 @@ ASTPtr DatabasePostgreSQL::getCreateTableQueryImpl(const String & table_name, Co
assert(storage_engine_arguments->children.size() >= 2);
storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 2, std::make_shared<ASTLiteral>(table_id.table_name));
return create_table_query;
return std::move(create_table_query);
}

View File

@ -15,7 +15,7 @@
#include <Common/quoteString.h>
#include <Core/PostgreSQL/Utils.h>
#include <base/FnTraits.h>
#include <IO/ReadHelpers.h>
namespace DB
{
@ -155,10 +155,11 @@ static DataTypePtr convertPostgreSQLDataType(String & type, Fn<void()> auto && r
template<typename T>
std::shared_ptr<NamesAndTypesList> readNamesAndTypesList(
T & tx, const String & postgres_table, const String & query, bool use_nulls, bool only_names_and_types)
PostgreSQLTableStructure::ColumnsInfoPtr readNamesAndTypesList(
T & tx, const String & postgres_table, const String & query, bool use_nulls, bool only_names_and_types)
{
auto columns = NamesAndTypes();
PostgreSQLTableStructure::Attributes attributes;
try
{
@ -180,14 +181,22 @@ std::shared_ptr<NamesAndTypesList> readNamesAndTypesList(
}
else
{
std::tuple<std::string, std::string, std::string, uint16_t> row;
std::tuple<std::string, std::string, std::string, uint16_t, std::string, std::string> row;
while (stream >> row)
{
auto data_type = convertPostgreSQLDataType(std::get<1>(row),
recheck_array,
use_nulls && (std::get<2>(row) == "f"), /// 'f' means that postgres `not_null` is false, i.e. value is nullable
std::get<3>(row));
auto data_type = convertPostgreSQLDataType(
std::get<1>(row), recheck_array,
use_nulls && (std::get<2>(row) == /* not nullable */"f"),
std::get<3>(row));
columns.push_back(NameAndTypePair(std::get<0>(row), data_type));
attributes.emplace_back(
PostgreSQLTableStructure::PGAttribute{
.atttypid = parse<int>(std::get<4>(row)),
.atttypmod = parse<int>(std::get<5>(row)),
});
++i;
}
}
@ -226,7 +235,9 @@ std::shared_ptr<NamesAndTypesList> readNamesAndTypesList(
throw;
}
return !columns.empty() ? std::make_shared<NamesAndTypesList>(columns.begin(), columns.end()) : nullptr;
return !columns.empty()
? std::make_shared<PostgreSQLTableStructure::ColumnsInfo>(NamesAndTypesList(columns.begin(), columns.end()), std::move(attributes))
: nullptr;
}
@ -244,14 +255,14 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
std::string query = fmt::format(
"SELECT attname AS name, format_type(atttypid, atttypmod) AS type, "
"attnotnull AS not_null, attndims AS dims "
"attnotnull AS not_null, attndims AS dims, atttypid as type_id, atttypmod as type_modifier "
"FROM pg_attribute "
"WHERE attrelid = (SELECT oid FROM pg_class WHERE {}) "
"AND NOT attisdropped AND attnum > 0", where);
table.columns = readNamesAndTypesList(tx, postgres_table, query, use_nulls, false);
table.physical_columns = readNamesAndTypesList(tx, postgres_table, query, use_nulls, false);
if (!table.columns)
if (!table.physical_columns)
throw Exception(ErrorCodes::UNKNOWN_TABLE, "PostgreSQL table {} does not exist", postgres_table);
if (with_primary_key)

View File

@ -12,9 +12,24 @@ namespace DB
struct PostgreSQLTableStructure
{
std::shared_ptr<NamesAndTypesList> columns = nullptr;
std::shared_ptr<NamesAndTypesList> primary_key_columns = nullptr;
std::shared_ptr<NamesAndTypesList> replica_identity_columns = nullptr;
struct PGAttribute
{
Int32 atttypid;
Int32 atttypmod;
};
using Attributes = std::vector<PGAttribute>;
struct ColumnsInfo
{
NamesAndTypesList columns;
Attributes attributes;
ColumnsInfo(NamesAndTypesList && columns_, Attributes && attributes_) : columns(columns_), attributes(attributes_) {}
};
using ColumnsInfoPtr = std::shared_ptr<ColumnsInfo>;
ColumnsInfoPtr physical_columns;
ColumnsInfoPtr primary_key_columns;
ColumnsInfoPtr replica_identity_columns;
};
using PostgreSQLTableStructurePtr = std::unique_ptr<PostgreSQLTableStructure>;

View File

@ -18,6 +18,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int POSTGRESQL_REPLICATION_INTERNAL_ERROR;
}
MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
@ -29,7 +30,7 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
const size_t max_block_size_,
bool schema_as_a_part_of_table_name_,
bool allow_automatic_update_,
Storages storages_,
StorageInfos storages_info_,
const String & name_for_logger)
: log(&Poco::Logger::get("PostgreSQLReplicaConsumer(" + name_for_logger + ")"))
, context(context_)
@ -41,7 +42,6 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
, max_block_size(max_block_size_)
, schema_as_a_part_of_table_name(schema_as_a_part_of_table_name_)
, allow_automatic_update(allow_automatic_update_)
, storages(storages_)
{
final_lsn = start_lsn;
auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
@ -49,19 +49,28 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
LOG_TRACE(log, "Starting replication. LSN: {} (last: {})", getLSNValue(current_lsn), getLSNValue(final_lsn));
tx->commit();
for (const auto & [table_name, storage] : storages)
{
buffers.emplace(table_name, Buffer(storage));
}
for (const auto & [table_name, storage_info] : storages_info_)
storages.emplace(table_name, storage_info);
}
void MaterializedPostgreSQLConsumer::Buffer::createEmptyBuffer(StoragePtr storage)
MaterializedPostgreSQLConsumer::StorageData::StorageData(const StorageInfo & storage_info)
: storage(storage_info.storage), buffer(storage_info.storage->getInMemoryMetadataPtr(), storage_info.attributes)
{
auto table_id = storage_info.storage->getStorageID();
LOG_TRACE(&Poco::Logger::get("StorageMaterializedPostgreSQL"),
"New buffer for table {}, number of attributes: {}, number if columns: {}, structure: {}",
table_id.getNameForLogs(), buffer.attributes.size(), buffer.getColumnsNum(), buffer.description.sample_block.dumpStructure());
}
MaterializedPostgreSQLConsumer::StorageData::Buffer::Buffer(
StorageMetadataPtr storage_metadata, const PostgreSQLTableStructure::Attributes & attributes_)
: attributes(attributes_)
{
const auto storage_metadata = storage->getInMemoryMetadataPtr();
const Block sample_block = storage_metadata->getSampleBlock();
/// Need to clear type, because in description.init() the types are appended (emplace_back)
/// Need to clear type, because in description.init() the types are appended
description.types.clear();
description.init(sample_block);
@ -69,13 +78,13 @@ void MaterializedPostgreSQLConsumer::Buffer::createEmptyBuffer(StoragePtr storag
const auto & storage_columns = storage_metadata->getColumns().getAllPhysical();
auto insert_columns = std::make_shared<ASTExpressionList>();
auto table_id = storage->getStorageID();
LOG_TRACE(&Poco::Logger::get("MaterializedPostgreSQLBuffer"), "New buffer for table {}.{} ({}), structure: {}",
table_id.database_name, table_id.table_name, toString(table_id.uuid), sample_block.dumpStructure());
auto columns_num = description.sample_block.columns();
assert(columns_num == storage_columns.size());
if (attributes.size() + 2 != columns_num) /// +2 because sign and version columns
throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns number mismatch. Attributes: {}, buffer: {}",
attributes.size(), columns_num);
assert(description.sample_block.columns() == storage_columns.size());
size_t idx = 0;
for (const auto & column : storage_columns)
{
if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray)
@ -85,37 +94,45 @@ void MaterializedPostgreSQLConsumer::Buffer::createEmptyBuffer(StoragePtr storag
insert_columns->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
}
columnsAST = std::move(insert_columns);
columns_ast = std::move(insert_columns);
}
void MaterializedPostgreSQLConsumer::insertValue(Buffer & buffer, const std::string & value, size_t column_idx)
void MaterializedPostgreSQLConsumer::insertValue(StorageData::Buffer & buffer, const std::string & value, size_t column_idx)
{
const auto & sample = buffer.description.sample_block.getByPosition(column_idx);
bool is_nullable = buffer.description.types[column_idx].second;
if (is_nullable)
try
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*buffer.columns[column_idx]);
const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
if (is_nullable)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*buffer.columns[column_idx]);
const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
insertPostgreSQLValue(
column_nullable.getNestedColumn(), value,
buffer.description.types[column_idx].first, data_type.getNestedType(), buffer.array_info, column_idx);
insertPostgreSQLValue(
column_nullable.getNestedColumn(), value,
buffer.description.types[column_idx].first, data_type.getNestedType(), buffer.array_info, column_idx);
column_nullable.getNullMapData().emplace_back(0);
column_nullable.getNullMapData().emplace_back(0);
}
else
{
insertPostgreSQLValue(
*buffer.columns[column_idx], value,
buffer.description.types[column_idx].first, sample.type,
buffer.array_info, column_idx);
}
}
else
catch (const pqxx::conversion_error & e)
{
insertPostgreSQLValue(
*buffer.columns[column_idx], value,
buffer.description.types[column_idx].first, sample.type,
buffer.array_info, column_idx);
LOG_ERROR(log, "Conversion failed while inserting PostgreSQL value {}, will insert default value. Error: {}", value, e.what());
insertDefaultValue(buffer, column_idx);
}
}
void MaterializedPostgreSQLConsumer::insertDefaultValue(Buffer & buffer, size_t column_idx)
void MaterializedPostgreSQLConsumer::insertDefaultValue(StorageData::Buffer & buffer, size_t column_idx)
{
const auto & sample = buffer.description.sample_block.getByPosition(column_idx);
insertDefaultPostgreSQLValue(*buffer.columns[column_idx], *sample.column);
@ -186,10 +203,16 @@ Int8 MaterializedPostgreSQLConsumer::readInt8(const char * message, size_t & pos
void MaterializedPostgreSQLConsumer::readTupleData(
Buffer & buffer, const char * message, size_t & pos, [[maybe_unused]] size_t size, PostgreSQLQuery type, bool old_value)
StorageData::Buffer & buffer, const char * message, size_t & pos, [[maybe_unused]] size_t size, PostgreSQLQuery type, bool old_value)
{
Int16 num_columns = readInt16(message, pos, size);
/// Sanity check. In fact, it was already checked.
if (static_cast<size_t>(num_columns) + 2 != buffer.getColumnsNum()) /// +2 -- sign and version columns
throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR,
"Number of columns does not match. Got: {}, expected {}, current buffer structure: {}",
num_columns, buffer.getColumnsNum(), buffer.description.sample_block.dumpStructure());
auto proccess_column_value = [&](Int8 identifier, Int16 column_idx)
{
switch (identifier)
@ -202,8 +225,15 @@ void MaterializedPostgreSQLConsumer::readTupleData(
case 't': /// Text formatted value
{
Int32 col_len = readInt32(message, pos, size);
String value;
/// Sanity check for protocol misuse.
/// PostgreSQL uses a fixed page size (commonly 8 kB), and does not allow tuples to span multiple pages.
static constexpr Int32 sanity_check_max_col_len = 1024 * 8 * 2; /// *2 -- just in case.
if (unlikely(col_len > sanity_check_max_col_len))
throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR,
"Column length is suspiciously long: {}", col_len);
String value;
for (Int32 i = 0; i < col_len; ++i)
value += readInt8(message, pos, size);
@ -276,19 +306,20 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
{
Int32 relation_id = readInt32(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
/// FIXME:If table name is empty here, it means we failed to load it, but it was included in publication. Need to remove?
if (table_name.empty())
LOG_WARNING(log, "No table mapping for relation id: {}. Probably table failed to be loaded", relation_id);
{
LOG_ERROR(log, "No table mapping for relation id: {}. It's a bug", relation_id);
return;
}
if (!isSyncAllowed(relation_id, table_name))
return;
Int8 new_tuple = readInt8(replication_message, pos, size);
auto buffer = buffers.find(table_name);
assert(buffer != buffers.end());
auto & buffer = storages.find(table_name)->second.buffer;
if (new_tuple)
readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::INSERT);
readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::INSERT);
break;
}
@ -296,15 +327,16 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
{
Int32 relation_id = readInt32(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
/// FIXME:If table name is empty here, it means we failed to load it, but it was included in publication. Need to remove?
if (table_name.empty())
LOG_WARNING(log, "No table mapping for relation id: {}. Probably table failed to be loaded", relation_id);
{
LOG_ERROR(log, "No table mapping for relation id: {}. It's a bug", relation_id);
return;
}
if (!isSyncAllowed(relation_id, table_name))
return;
auto buffer = buffers.find(table_name);
assert(buffer != buffers.end());
auto & buffer = storages.find(table_name)->second.buffer;
auto proccess_identifier = [&](Int8 identifier) -> bool
{
@ -319,13 +351,13 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
/// it is much more efficient to use replica identity index, but support all possible cases.
case 'O':
{
readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::UPDATE, true);
readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::UPDATE, true);
break;
}
case 'N':
{
/// New row.
readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::UPDATE);
readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::UPDATE);
read_next = false;
break;
}
@ -347,9 +379,11 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
{
Int32 relation_id = readInt32(replication_message, pos, size);
const auto & table_name = relation_id_to_name[relation_id];
/// FIXME:If table name is empty here, it means we failed to load it, but it was included in publication. Need to remove?
if (table_name.empty())
LOG_WARNING(log, "No table mapping for relation id: {}. Probably table failed to be loaded", relation_id);
{
LOG_ERROR(log, "No table mapping for relation id: {}. It's a bug", relation_id);
return;
}
if (!isSyncAllowed(relation_id, table_name))
return;
@ -357,10 +391,8 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
/// 0 or 1 if replica identity is set to full. For now only default replica identity is supported (with primary keys).
readInt8(replication_message, pos, size);
auto buffer = buffers.find(table_name);
assert(buffer != buffers.end());
readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::DELETE);
auto & buffer = storages.find(table_name)->second.buffer;
readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::DELETE);
break;
}
case 'C': // Commit
@ -379,7 +411,6 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
Int32 relation_id = readInt32(replication_message, pos, size);
String relation_namespace, relation_name;
readString(replication_message, pos, size, relation_namespace);
readString(replication_message, pos, size, relation_name);
@ -389,22 +420,26 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
else
table_name = relation_name;
if (!relation_id_to_name.contains(relation_id))
relation_id_to_name[relation_id] = table_name;
if (!isSyncAllowed(relation_id, relation_name))
return;
if (storages.find(table_name) == storages.end())
auto storage_iter = storages.find(table_name);
if (storage_iter == storages.end())
{
markTableAsSkipped(relation_id, table_name);
/// TODO: This can happen if we created a publication with this table but then got an exception that this
/// FIXME: This can happen if we created a publication with this table but then got an exception that this
/// table has primary key or something else.
LOG_ERROR(log,
"Storage for table {} does not exist, but is included in replication stream. (Storages number: {})",
"Storage for table {} does not exist, but is included in replication stream. (Storages number: {})"
"Please manually remove this table from replication (DETACH TABLE query) to avoid redundant replication",
table_name, storages.size());
markTableAsSkipped(relation_id, table_name);
return;
}
assert(buffers.contains(table_name));
auto & buffer = storage_iter->second.buffer;
/// 'd' - default (primary key if any)
/// 'n' - nothing
@ -412,7 +447,6 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
/// 'i' - user defined index with indisreplident set
/// Only 'd' and 'i' - are supported.
char replica_identity = readInt8(replication_message, pos, size);
if (replica_identity != 'd' && replica_identity != 'i')
{
LOG_WARNING(log,
@ -423,25 +457,29 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
Int16 num_columns = readInt16(replication_message, pos, size);
Int32 data_type_id;
Int32 type_modifier; /// For example, n in varchar(n)
bool new_relation_definition = false;
if (schema_data.find(relation_id) == schema_data.end())
{
relation_id_to_name[relation_id] = table_name;
schema_data.emplace(relation_id, SchemaData(num_columns));
new_relation_definition = true;
}
auto & current_schema_data = schema_data.find(relation_id)->second;
if (current_schema_data.number_of_columns != num_columns)
if (static_cast<size_t>(num_columns) + 2 != buffer.getColumnsNum()) /// +2 -- sign and version columns
{
markTableAsSkipped(relation_id, table_name);
return;
}
if (static_cast<size_t>(num_columns) != buffer.attributes.size())
{
#ifndef NDEBUG
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Mismatch in attributes size. Got {}, expected {}. It's a bug. Current buffer structure: {}",
num_columns, buffer.attributes.size(), buffer.description.sample_block.dumpStructure());
#else
LOG_ERROR(log, "Mismatch in attributes size. Got {}, expected {}. It's a bug. Current buffer structure: {}",
num_columns, buffer.attributes.size(), buffer.description.sample_block.dumpStructure());
markTableAsSkipped(relation_id, table_name);
return;
#endif
}
Int32 data_type_id;
Int32 type_modifier; /// For example, n in varchar(n)
for (uint16_t i = 0; i < num_columns; ++i)
{
String column_name;
@ -451,23 +489,14 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
data_type_id = readInt32(replication_message, pos, size);
type_modifier = readInt32(replication_message, pos, size);
if (new_relation_definition)
if (buffer.attributes[i].atttypid != data_type_id || buffer.attributes[i].atttypmod != type_modifier)
{
current_schema_data.column_identifiers.emplace_back(std::make_pair(data_type_id, type_modifier));
}
else
{
if (current_schema_data.column_identifiers[i].first != data_type_id
|| current_schema_data.column_identifiers[i].second != type_modifier)
{
markTableAsSkipped(relation_id, table_name);
return;
}
markTableAsSkipped(relation_id, table_name);
return;
}
}
tables_to_sync.insert(table_name);
break;
}
case 'O': // Origin
@ -489,19 +518,19 @@ void MaterializedPostgreSQLConsumer::syncTables()
{
for (const auto & table_name : tables_to_sync)
{
auto & buffer = buffers.find(table_name)->second;
Block result_rows = buffer.description.sample_block.cloneWithColumns(std::move(buffer.columns));
auto & storage_data = storages.find(table_name)->second;
Block result_rows = storage_data.buffer.description.sample_block.cloneWithColumns(std::move(storage_data.buffer.columns));
if (result_rows.rows())
{
auto storage = storages[table_name];
auto storage = storage_data.storage;
auto insert_context = Context::createCopy(context);
insert_context->setInternalQuery(true);
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = storage->getStorageID();
insert->columns = buffer.columnsAST;
insert->columns = storage_data.buffer.columns_ast;
InterpreterInsertQuery interpreter(insert, insert_context, true);
auto io = interpreter.execute();
@ -514,7 +543,7 @@ void MaterializedPostgreSQLConsumer::syncTables()
CompletedPipelineExecutor executor(io.pipeline);
executor.execute();
buffer.columns = buffer.description.sample_block.cloneEmptyColumns();
storage_data.buffer.columns = storage_data.buffer.description.sample_block.cloneEmptyColumns();
}
}
@ -599,34 +628,21 @@ bool MaterializedPostgreSQLConsumer::isSyncAllowed(Int32 relation_id, const Stri
void MaterializedPostgreSQLConsumer::markTableAsSkipped(Int32 relation_id, const String & relation_name)
{
/// Empty lsn string means - continue waiting for valid lsn.
skip_list.insert({relation_id, ""});
skip_list.insert({relation_id, ""}); /// Empty lsn string means - continue waiting for valid lsn.
storages.erase(relation_name);
if (storages.count(relation_name))
{
/// Erase cached schema identifiers. It will be updated again once table is allowed back into replication stream
/// and it receives first data after update.
schema_data.erase(relation_id);
/// Clear table buffer.
auto & buffer = buffers.find(relation_name)->second;
buffer.columns = buffer.description.sample_block.cloneEmptyColumns();
if (allow_automatic_update)
LOG_TRACE(log, "Table {} (relation_id: {}) is skipped temporarily. It will be reloaded in the background", relation_name, relation_id);
else
LOG_WARNING(log, "Table {} (relation_id: {}) is skipped, because table schema has changed", relation_name, relation_id);
}
if (allow_automatic_update)
LOG_TRACE(log, "Table {} (relation_id: {}) is skipped temporarily. It will be reloaded in the background", relation_name, relation_id);
else
LOG_WARNING(log, "Table {} (relation_id: {}) is skipped, because table schema has changed", relation_name, relation_id);
}
void MaterializedPostgreSQLConsumer::addNested(const String & postgres_table_name, StoragePtr nested_storage, const String & table_start_lsn)
void MaterializedPostgreSQLConsumer::addNested(
const String & postgres_table_name, StorageInfo nested_storage_info, const String & table_start_lsn)
{
/// Cache new pointer to replacingMergeTree table.
storages.emplace(postgres_table_name, nested_storage);
/// Add new in-memory buffer.
buffers.emplace(postgres_table_name, Buffer(nested_storage));
assert(!storages.contains(postgres_table_name));
storages.emplace(postgres_table_name, nested_storage_info);
/// Replication consumer will read wall and check for currently processed table whether it is allowed to start applying
/// changes to this table.
@ -634,14 +650,10 @@ void MaterializedPostgreSQLConsumer::addNested(const String & postgres_table_nam
}
void MaterializedPostgreSQLConsumer::updateNested(const String & table_name, StoragePtr nested_storage, Int32 table_id, const String & table_start_lsn)
void MaterializedPostgreSQLConsumer::updateNested(const String & table_name, StorageInfo nested_storage_info, Int32 table_id, const String & table_start_lsn)
{
/// Cache new pointer to replacingMergeTree table.
storages[table_name] = nested_storage;
/// Create a new empty buffer (with updated metadata), where data is first loaded before syncing into actual table.
auto & buffer = buffers.find(table_name)->second;
buffer.createEmptyBuffer(nested_storage);
assert(!storages.contains(table_name));
storages.emplace(table_name, nested_storage_info);
/// Set start position to valid lsn. Before it was an empty string. Further read for table allowed, if it has a valid lsn.
skip_list[table_id] = table_start_lsn;
@ -651,7 +663,6 @@ void MaterializedPostgreSQLConsumer::updateNested(const String & table_name, Sto
void MaterializedPostgreSQLConsumer::removeNested(const String & postgres_table_name)
{
storages.erase(postgres_table_name);
buffers.erase(postgres_table_name);
deleted_tables.insert(postgres_table_name);
}
@ -706,7 +717,17 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
current_lsn = (*row)[0];
lsn_value = getLSNValue(current_lsn);
processReplicationMessage((*row)[1].c_str(), (*row)[1].size());
try
{
// LOG_DEBUG(log, "Current message: {}", (*row)[1]);
processReplicationMessage((*row)[1].c_str(), (*row)[1].size());
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR)
continue;
throw;
}
}
}
catch (const Exception &)
@ -737,11 +758,6 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
LOG_ERROR(log, "Conversion error: {}", e.what());
return false;
}
catch (const pqxx::in_doubt_error & e)
{
LOG_ERROR(log, "PostgreSQL library has some doubts: {}", e.what());
return false;
}
catch (const pqxx::internal_error & e)
{
LOG_ERROR(log, "PostgreSQL library internal error: {}", e.what());
@ -749,16 +765,8 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
}
catch (...)
{
/// Since reading is done from a background task, it is important to catch any possible error
/// in order to understand why something does not work.
try
{
std::rethrow_exception(std::current_exception());
}
catch (const std::exception& e)
{
LOG_ERROR(log, "Unexpected error: {}", e.what());
}
tryLogCurrentException(__PRETTY_FUNCTION__);
return false;
}
if (!tables_to_sync.empty())
@ -770,6 +778,11 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
bool MaterializedPostgreSQLConsumer::consume(std::vector<std::pair<Int32, String>> & skipped_tables)
{
/// Read up to max_block_size changed (approximately - in same cases might be more).
/// false: no data was read, reschedule.
/// true: some data was read, schedule as soon as possible.
auto read_next = readFromReplicationSlot();
/// Check if there are tables, which are skipped from being updated by changes from replication stream,
/// because schema changes were detected. Update them, if it is allowed.
if (allow_automatic_update && !skip_list.empty())
@ -786,10 +799,6 @@ bool MaterializedPostgreSQLConsumer::consume(std::vector<std::pair<Int32, String
}
}
/// Read up to max_block_size changed (approximately - in same cases might be more).
/// false: no data was read, reschedule.
/// true: some data was read, schedule as soon as possible.
auto read_next = readFromReplicationSlot();
return read_next;
}

View File

@ -8,36 +8,78 @@
#include <base/logger_useful.h>
#include <Storages/IStorage.h>
#include <Parsers/ASTExpressionList.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
namespace DB
{
struct SettingChange;
struct StorageInfo
{
StoragePtr storage;
PostgreSQLTableStructure::Attributes attributes;
StorageInfo(StoragePtr storage_, const PostgreSQLTableStructure::Attributes & attributes_)
: storage(storage_), attributes(attributes_) {}
};
using StorageInfos = std::unordered_map<String, StorageInfo>;
class MaterializedPostgreSQLConsumer
{
public:
using Storages = std::unordered_map<String, StoragePtr>;
private:
struct StorageData
{
struct Buffer
{
ExternalResultDescription description;
MutableColumns columns;
/// Needed to pass to insert query columns list in syncTables().
std::shared_ptr<ASTExpressionList> columns_ast;
/// Needed for insertPostgreSQLValue() method to parse array
std::unordered_map<size_t, PostgreSQLArrayInfo> array_info;
/// To validate ddl.
PostgreSQLTableStructure::Attributes attributes;
Buffer(StorageMetadataPtr storage_metadata, const PostgreSQLTableStructure::Attributes & attributes_);
size_t getColumnsNum() const
{
const auto & sample_block = description.sample_block;
return sample_block.columns();
}
};
StoragePtr storage;
Buffer buffer;
explicit StorageData(const StorageInfo & storage_info);
StorageData(const StorageData & other) = delete;
};
using Storages = std::unordered_map<String, StorageData>;
public:
MaterializedPostgreSQLConsumer(
ContextPtr context_,
std::shared_ptr<postgres::Connection> connection_,
const String & replication_slot_name_,
const String & publication_name_,
const String & start_lsn,
const size_t max_block_size_,
size_t max_block_size_,
bool schema_as_a_part_of_table_name_,
bool allow_automatic_update_,
Storages storages_,
StorageInfos storages_,
const String & name_for_logger);
bool consume(std::vector<std::pair<Int32, String>> & skipped_tables);
/// Called from reloadFromSnapshot by replication handler. This method is needed to move a table back into synchronization
/// process if it was skipped due to schema changes.
void updateNested(const String & table_name, StoragePtr nested_storage, Int32 table_id, const String & table_start_lsn);
void updateNested(const String & table_name, StorageInfo nested_storage_info, Int32 table_id, const String & table_start_lsn);
void addNested(const String & postgres_table_name, StoragePtr nested_storage, const String & table_start_lsn);
void addNested(const String & postgres_table_name, StorageInfo nested_storage_info, const String & table_start_lsn);
void removeNested(const String & postgres_table_name);
@ -55,25 +97,8 @@ private:
bool isSyncAllowed(Int32 relation_id, const String & relation_name);
struct Buffer
{
ExternalResultDescription description;
MutableColumns columns;
/// Needed to pass to insert query columns list in syncTables().
std::shared_ptr<ASTExpressionList> columnsAST;
/// Needed for insertPostgreSQLValue() method to parse array
std::unordered_map<size_t, PostgreSQLArrayInfo> array_info;
Buffer(StoragePtr storage) { createEmptyBuffer(storage); }
void createEmptyBuffer(StoragePtr storage);
};
using Buffers = std::unordered_map<String, Buffer>;
static void insertDefaultValue(Buffer & buffer, size_t column_idx);
static void insertValue(Buffer & buffer, const std::string & value, size_t column_idx);
static void insertDefaultValue(StorageData::Buffer & buffer, size_t column_idx);
void insertValue(StorageData::Buffer & buffer, const std::string & value, size_t column_idx);
enum class PostgreSQLQuery
{
@ -82,7 +107,7 @@ private:
DELETE
};
void readTupleData(Buffer & buffer, const char * message, size_t & pos, size_t size, PostgreSQLQuery type, bool old_value = false);
void readTupleData(StorageData::Buffer & buffer, const char * message, size_t & pos, size_t size, PostgreSQLQuery type, bool old_value = false);
template<typename T>
static T unhexN(const char * message, size_t pos, size_t n);
@ -95,7 +120,7 @@ private:
void markTableAsSkipped(Int32 relation_id, const String & relation_name);
/// lsn - log sequnce nuumber, like wal offset (64 bit).
Int64 getLSNValue(const std::string & lsn)
static Int64 getLSNValue(const std::string & lsn)
{
UInt32 upper_half, lower_half;
std::sscanf(lsn.data(), "%X/%X", &upper_half, &lower_half);
@ -125,28 +150,11 @@ private:
/// Holds `postgres_table_name` set.
std::unordered_set<std::string> tables_to_sync;
/// `postgres_table_name` -> ReplacingMergeTree table.
/// `postgres_table_name` -> StorageData.
Storages storages;
/// `postgres_table_name` -> In-memory buffer.
Buffers buffers;
std::unordered_map<Int32, String> relation_id_to_name;
struct SchemaData
{
Int16 number_of_columns;
/// data_type_id and type_modifier
std::vector<std::pair<Int32, Int32>> column_identifiers;
SchemaData(Int16 number_of_columns_) : number_of_columns(number_of_columns_) {}
};
/// Cache for table schema data to be able to detect schema changes, because ddl is not
/// replicated with postgresql logical replication protocol, but some table schema info
/// is received if it is the first time we received dml message for given relation in current session or
/// if relation definition has changed since the last relation definition message.
std::unordered_map<Int32, SchemaData> schema_data;
/// `postgres_relation_id` -> `start_lsn`
/// skip_list contains relation ids for tables on which ddl was performed, which can break synchronization.
/// This breaking changes are detected in replication stream in according replication message and table is added to skip list.

View File

@ -186,7 +186,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
createPublicationIfNeeded(tx);
/// List of nested tables (table_name -> nested_storage), which is passed to replication consumer.
std::unordered_map<String, StoragePtr> nested_storages;
std::unordered_map<String, StorageInfo> nested_storages;
/// snapshot_name is initialized only if a new replication slot is created.
/// start_lsn is initialized in two places:
@ -221,7 +221,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
{
try
{
nested_storages[table_name] = loadFromSnapshot(*tmp_connection, snapshot_name, table_name, storage->as<StorageMaterializedPostgreSQL>());
nested_storages.emplace(table_name, loadFromSnapshot(*tmp_connection, snapshot_name, table_name, storage->as<StorageMaterializedPostgreSQL>()));
}
catch (Exception & e)
{
@ -263,7 +263,12 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>();
try
{
nested_storages[table_name] = materialized_storage->getNested();
auto [postgres_table_schema, postgres_table_name] = getSchemaAndTableName(table_name);
auto table_structure = fetchPostgreSQLTableStructure(tx, postgres_table_name, postgres_table_schema, true, true, true);
if (!table_structure.physical_columns)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No columns");
auto storage_info = StorageInfo(materialized_storage->getNested(), table_structure.physical_columns->attributes);
nested_storages.emplace(table_name, std::move(storage_info));
}
catch (Exception & e)
{
@ -316,7 +321,7 @@ ASTPtr PostgreSQLReplicationHandler::getCreateNestedTableQuery(StorageMaterializ
}
StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection & connection, String & snapshot_name, const String & table_name,
StorageInfo PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection & connection, String & snapshot_name, const String & table_name,
StorageMaterializedPostgreSQL * materialized_storage)
{
auto tx = std::make_shared<pqxx::ReplicationTransaction>(connection.getRef());
@ -330,8 +335,13 @@ StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection &
query_str = fmt::format("SELECT * FROM {}", quoted_name);
LOG_DEBUG(log, "Loading PostgreSQL table {}.{}", postgres_database, quoted_name);
auto table_structure = fetchTableStructure(*tx, table_name);
if (!table_structure->physical_columns)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No table attributes");
auto table_attributes = table_structure->physical_columns->attributes;
auto table_override = tryGetTableOverride(current_database_name, table_name);
materialized_storage->createNestedIfNeeded(fetchTableStructure(*tx, table_name), table_override ? table_override->as<ASTTableOverride>() : nullptr);
materialized_storage->createNestedIfNeeded(std::move(table_structure), table_override ? table_override->as<ASTTableOverride>() : nullptr);
auto nested_storage = materialized_storage->getNested();
auto insert = std::make_shared<ASTInsertQuery>();
@ -356,7 +366,7 @@ StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection &
auto nested_table_id = nested_storage->getStorageID();
LOG_DEBUG(log, "Loaded table {}.{} (uuid: {})", nested_table_id.database_name, nested_table_id.table_name, toString(nested_table_id.uuid));
return nested_storage;
return StorageInfo(nested_storage, std::move(table_attributes));
}
@ -788,9 +798,6 @@ std::set<String> PostgreSQLReplicationHandler::fetchTablesFromPublication(pqxx::
PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
pqxx::ReplicationTransaction & tx, const std::string & table_name) const
{
if (!is_materialized_postgresql_database)
return nullptr;
PostgreSQLTableStructure structure;
try
{
@ -816,7 +823,7 @@ void PostgreSQLReplicationHandler::addTableToReplication(StorageMaterializedPost
LOG_TRACE(log, "Adding table `{}` to replication", postgres_table_name);
postgres::Connection replication_connection(connection_info, /* replication */true);
String snapshot_name, start_lsn;
StoragePtr nested_storage;
StorageInfo nested_storage_info{ nullptr, {} };
{
auto tx = std::make_shared<pqxx::nontransaction>(replication_connection.getRef());
@ -832,8 +839,8 @@ void PostgreSQLReplicationHandler::addTableToReplication(StorageMaterializedPost
throw Exception(ErrorCodes::LOGICAL_ERROR, "Internal table was not created");
postgres::Connection tmp_connection(connection_info);
nested_storage = loadFromSnapshot(tmp_connection, snapshot_name, postgres_table_name, materialized_storage);
materialized_storage->set(nested_storage);
nested_storage_info = loadFromSnapshot(tmp_connection, snapshot_name, postgres_table_name, materialized_storage);
materialized_storage->set(nested_storage_info.storage);
}
{
@ -842,7 +849,7 @@ void PostgreSQLReplicationHandler::addTableToReplication(StorageMaterializedPost
}
/// Pass storage to consumer and lsn position, from which to start receiving replication messages for this table.
consumer->addNested(postgres_table_name, nested_storage, start_lsn);
consumer->addNested(postgres_table_name, nested_storage_info, start_lsn);
LOG_TRACE(log, "Table `{}` successfully added to replication", postgres_table_name);
}
catch (...)
@ -915,8 +922,8 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
auto temp_materialized_storage = materialized_storage->createTemporary();
/// This snapshot is valid up to the end of the transaction, which exported it.
StoragePtr temp_nested_storage = loadFromSnapshot(tmp_connection, snapshot_name, table_name,
temp_materialized_storage->as <StorageMaterializedPostgreSQL>());
auto [temp_nested_storage, table_attributes] = loadFromSnapshot(
tmp_connection, snapshot_name, table_name, temp_materialized_storage->as <StorageMaterializedPostgreSQL>());
auto table_id = materialized_storage->getNestedStorageID();
auto temp_table_id = temp_nested_storage->getStorageID();
@ -950,7 +957,7 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
nested_storage->getStorageID().getNameForLogs(), nested_sample_block.dumpStructure());
/// Pass pointer to new nested table into replication consumer, remove current table from skip list and set start lsn position.
consumer->updateNested(table_name, nested_storage, relation_id, start_lsn);
consumer->updateNested(table_name, StorageInfo(nested_storage, std::move(table_attributes)), relation_id, start_lsn);
auto table_to_drop = DatabaseCatalog::instance().getTable(StorageID(temp_table_id.database_name, temp_table_id.table_name, table_id.uuid), nested_context);
auto drop_table_id = table_to_drop->getStorageID();

View File

@ -87,7 +87,7 @@ private:
void consumerFunc();
StoragePtr loadFromSnapshot(postgres::Connection & connection, std::string & snapshot_name, const String & table_name, StorageMaterializedPostgreSQL * materialized_storage);
StorageInfo loadFromSnapshot(postgres::Connection & connection, std::string & snapshot_name, const String & table_name, StorageMaterializedPostgreSQL * materialized_storage);
void reloadFromSnapshot(const std::vector<std::pair<Int32, String>> & relation_data);

View File

@ -365,7 +365,7 @@ ASTPtr StorageMaterializedPostgreSQL::getColumnDeclaration(const DataTypePtr & d
ast_expression->name = "DateTime64";
ast_expression->arguments = std::make_shared<ASTExpressionList>();
ast_expression->arguments->children.emplace_back(std::make_shared<ASTLiteral>(UInt32(6)));
return ast_expression;
return std::move(ast_expression);
}
return std::make_shared<ASTIdentifier>(data_type->getName());
@ -423,7 +423,7 @@ ASTPtr StorageMaterializedPostgreSQL::getCreateNestedTableQuery(
table_id.database_name, table_id.table_name);
}
if (!table_structure->columns && (!table_override || !table_override->columns))
if (!table_structure->physical_columns && (!table_override || !table_override->columns))
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "No columns returned for table {}.{}",
table_id.database_name, table_id.table_name);
@ -465,7 +465,7 @@ ASTPtr StorageMaterializedPostgreSQL::getCreateNestedTableQuery(
}
else
{
ordinary_columns_and_types = *table_structure->columns;
ordinary_columns_and_types = table_structure->physical_columns->columns;
columns_declare_list->set(columns_declare_list->columns, getColumnsExpressionList(ordinary_columns_and_types));
}
@ -475,7 +475,7 @@ ASTPtr StorageMaterializedPostgreSQL::getCreateNestedTableQuery(
}
else
{
ordinary_columns_and_types = *table_structure->columns;
ordinary_columns_and_types = table_structure->physical_columns->columns;
columns_declare_list->set(columns_declare_list->columns, getColumnsExpressionList(ordinary_columns_and_types));
}
@ -485,9 +485,9 @@ ASTPtr StorageMaterializedPostgreSQL::getCreateNestedTableQuery(
NamesAndTypesList merging_columns;
if (table_structure->primary_key_columns)
merging_columns = *table_structure->primary_key_columns;
merging_columns = table_structure->primary_key_columns->columns;
else
merging_columns = *table_structure->replica_identity_columns;
merging_columns = table_structure->replica_identity_columns->columns;
order_by_expression->name = "tuple";
order_by_expression->arguments = std::make_shared<ASTExpressionList>();
@ -524,7 +524,7 @@ ASTPtr StorageMaterializedPostgreSQL::getCreateNestedTableQuery(
storage_metadata.setConstraints(constraints);
setInMemoryMetadata(storage_metadata);
return create_table_query;
return std::move(create_table_query);
}

View File

@ -45,13 +45,13 @@ ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(ContextPtr c
{
const bool use_nulls = context->getSettingsRef().external_table_functions_use_nulls;
auto connection_holder = connection_pool->get();
auto columns = fetchPostgreSQLTableStructure(
connection_holder->get(), configuration->table, configuration->schema, use_nulls).columns;
auto columns_info = fetchPostgreSQLTableStructure(
connection_holder->get(), configuration->table, configuration->schema, use_nulls).physical_columns;
if (!columns)
if (!columns_info)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table structure not returned");
return ColumnsDescription{*columns};
return ColumnsDescription{columns_info->columns};
}

View File

@ -626,6 +626,53 @@ def test_table_override(started_cluster):
drop_postgres_table(cursor, table_name)
def test_table_schema_changes_2(started_cluster):
drop_materialized_db()
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
table_name = "test_table"
create_postgres_table(cursor, table_name, template=postgres_table_template_2);
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number, number, number from numbers(25)")
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=["materialized_postgresql_allow_automatic_update = 1, materialized_postgresql_tables_list='test_table'"])
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, number, number, number from numbers(25, 25)")
check_tables_are_synchronized(table_name);
cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value1")
cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value2")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value1 Text")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value2 Text")
cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value3")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value3 Text")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value4 Text")
cursor.execute(f"UPDATE {table_name} SET value3 = 'kek' WHERE key%2=0")
check_tables_are_synchronized(table_name);
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, toString(number), toString(number), toString(number), toString(number) from numbers(50, 25)")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value5 Integer")
cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value2")
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, toString(number), toString(number), toString(number), number from numbers(75, 25)")
check_tables_are_synchronized(table_name);
instance.restart_clickhouse()
check_tables_are_synchronized(table_name);
cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value5")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value5 Text")
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, toString(number), toString(number), toString(number), toString(number) from numbers(100, 25)")
check_tables_are_synchronized(table_name);
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value6 Text")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value7 Integer")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value8 Integer")
cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value5")
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, toString(number), toString(number), toString(number), toString(number), number, number from numbers(125, 25)")
check_tables_are_synchronized(table_name);
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")