From 6639a9373560612feb18227b6f278b6c5bbe9787 Mon Sep 17 00:00:00 2001 From: kssenii Date: Sun, 9 Jan 2022 00:37:11 +0300 Subject: [PATCH] Better --- .../MaterializedPostgreSQLConsumer.cpp | 101 +++++++----------- .../MaterializedPostgreSQLConsumer.h | 80 +++++++------- .../test.py | 13 ++- 3 files changed, 92 insertions(+), 102 deletions(-) diff --git a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp index 731b3c36889..f633e3ba20c 100644 --- a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp +++ b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp @@ -30,7 +30,7 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer( const size_t max_block_size_, bool schema_as_a_part_of_table_name_, bool allow_automatic_update_, - Storages storages_, + StorageInfos storages_info_, const String & name_for_logger) : log(&Poco::Logger::get("PostgreSQLReplicaConsumer(" + name_for_logger + ")")) , context(context_) @@ -42,7 +42,6 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer( , max_block_size(max_block_size_) , schema_as_a_part_of_table_name(schema_as_a_part_of_table_name_) , allow_automatic_update(allow_automatic_update_) - , storages(storages_) { final_lsn = start_lsn; auto tx = std::make_shared(connection->getRef()); @@ -50,24 +49,24 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer( LOG_TRACE(log, "Starting replication. LSN: {} (last: {})", getLSNValue(current_lsn), getLSNValue(final_lsn)); tx->commit(); - for (const auto & [table_name, storage_info] : storages) - buffers.emplace(table_name, Buffer(storage_info.storage, storage_info.attributes)); + for (const auto & [table_name, storage_info] : storages_info_) + storages.emplace(table_name, storage_info); } -MaterializedPostgreSQLConsumer::Buffer::Buffer(StoragePtr storage, const PostgreSQLTableStructure::Attributes & attributes_) +MaterializedPostgreSQLConsumer::StorageData::StorageData(const StorageInfo & storage_info) + : storage(storage_info.storage), buffer(storage_info.storage->getInMemoryMetadataPtr(), storage_info.attributes) +{ + auto table_id = storage_info.storage->getStorageID(); + LOG_TRACE(&Poco::Logger::get("StorageMaterializedPostgreSQL"), "New buffer for table {}.{} ({}), structure: {}", + table_id.database_name, table_id.table_name, toString(table_id.uuid), buffer.description.sample_block.dumpStructure()); +} + + +MaterializedPostgreSQLConsumer::StorageData::Buffer::Buffer( + StorageMetadataPtr storage_metadata, const PostgreSQLTableStructure::Attributes & attributes_) : attributes(attributes_) { - createEmptyBuffer(storage); - if (attributes.size() + 2 != getColumnsNum()) /// +2 because sign and version columns - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Columns number mismatch. Attributes: {}, buffer: {}", attributes.size(), getColumnsNum()); -} - - -void MaterializedPostgreSQLConsumer::Buffer::createEmptyBuffer(StoragePtr storage) -{ - const auto storage_metadata = storage->getInMemoryMetadataPtr(); const Block sample_block = storage_metadata->getSampleBlock(); /// Need to clear type, because in description.init() the types are appended (emplace_back) @@ -78,13 +77,13 @@ void MaterializedPostgreSQLConsumer::Buffer::createEmptyBuffer(StoragePtr storag const auto & storage_columns = storage_metadata->getColumns().getAllPhysical(); auto insert_columns = std::make_shared(); - auto table_id = storage->getStorageID(); - LOG_TRACE(&Poco::Logger::get("MaterializedPostgreSQLBuffer"), "New buffer for table {}.{} ({}), structure: {}", - table_id.database_name, table_id.table_name, toString(table_id.uuid), sample_block.dumpStructure()); + auto columns_num = description.sample_block.columns(); + assert(columns_num == storage_columns.size()); + if (attributes.size() + 2 != columns_num) /// +2 because sign and version columns + throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns number mismatch. Attributes: {}, buffer: {}", + attributes.size(), columns_num); - assert(description.sample_block.columns() == storage_columns.size()); size_t idx = 0; - for (const auto & column : storage_columns) { if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray) @@ -98,7 +97,7 @@ void MaterializedPostgreSQLConsumer::Buffer::createEmptyBuffer(StoragePtr storag } -void MaterializedPostgreSQLConsumer::insertValue(Buffer & buffer, const std::string & value, size_t column_idx) +void MaterializedPostgreSQLConsumer::insertValue(StorageData::Buffer & buffer, const std::string & value, size_t column_idx) { const auto & sample = buffer.description.sample_block.getByPosition(column_idx); bool is_nullable = buffer.description.types[column_idx].second; @@ -132,7 +131,7 @@ void MaterializedPostgreSQLConsumer::insertValue(Buffer & buffer, const std::str } -void MaterializedPostgreSQLConsumer::insertDefaultValue(Buffer & buffer, size_t column_idx) +void MaterializedPostgreSQLConsumer::insertDefaultValue(StorageData::Buffer & buffer, size_t column_idx) { const auto & sample = buffer.description.sample_block.getByPosition(column_idx); insertDefaultPostgreSQLValue(*buffer.columns[column_idx], *sample.column); @@ -203,7 +202,7 @@ Int8 MaterializedPostgreSQLConsumer::readInt8(const char * message, size_t & pos void MaterializedPostgreSQLConsumer::readTupleData( - Buffer & buffer, const char * message, size_t & pos, [[maybe_unused]] size_t size, PostgreSQLQuery type, bool old_value) + StorageData::Buffer & buffer, const char * message, size_t & pos, [[maybe_unused]] size_t size, PostgreSQLQuery type, bool old_value) { Int16 num_columns = readInt16(message, pos, size); @@ -231,7 +230,7 @@ void MaterializedPostgreSQLConsumer::readTupleData( static constexpr Int32 sanity_check_max_col_len = 1024 * 8 * 2; /// *2 -- just in case. if (unlikely(col_len > sanity_check_max_col_len)) throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR, - "Column legth is suspiciously long: {}", col_len); + "Column length is suspiciously long: {}", col_len); String value; for (Int32 i = 0; i < col_len; ++i) @@ -316,11 +315,10 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl return; Int8 new_tuple = readInt8(replication_message, pos, size); - auto buffer = buffers.find(table_name); - assert(buffer != buffers.end()); + auto & buffer = storages.find(table_name)->second.buffer; if (new_tuple) - readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::INSERT); + readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::INSERT); break; } @@ -337,8 +335,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl if (!isSyncAllowed(relation_id, table_name)) return; - auto buffer = buffers.find(table_name); - assert(buffer != buffers.end()); + auto & buffer = storages.find(table_name)->second.buffer; auto proccess_identifier = [&](Int8 identifier) -> bool { @@ -353,13 +350,13 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl /// it is much more efficient to use replica identity index, but support all possible cases. case 'O': { - readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::UPDATE, true); + readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::UPDATE, true); break; } case 'N': { /// New row. - readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::UPDATE); + readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::UPDATE); read_next = false; break; } @@ -393,10 +390,8 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl /// 0 or 1 if replica identity is set to full. For now only default replica identity is supported (with primary keys). readInt8(replication_message, pos, size); - auto buffer = buffers.find(table_name); - assert(buffer != buffers.end()); - - readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::DELETE); + auto & buffer = storages.find(table_name)->second.buffer; + readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::DELETE); break; } case 'C': // Commit @@ -430,7 +425,8 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl if (!isSyncAllowed(relation_id, relation_name)) return; - if (storages.find(table_name) == storages.end()) + auto storage_iter = storages.find(table_name); + if (storage_iter == storages.end()) { /// FIXME: This can happen if we created a publication with this table but then got an exception that this /// table has primary key or something else. @@ -442,15 +438,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl return; } - auto buffer_iter = buffers.find(table_name); - if (buffer_iter == buffers.end()) - { - /// Must never happen if previous check for storage existance passed, but just in case. - LOG_ERROR(log, "No buffer found for table `{}`.", table_name); - markTableAsSkipped(relation_id, table_name); - return; - } - const auto & buffer = buffer_iter->second; + auto & buffer = storage_iter->second.buffer; /// 'd' - default (primary key if any) /// 'n' - nothing @@ -517,19 +505,19 @@ void MaterializedPostgreSQLConsumer::syncTables() { for (const auto & table_name : tables_to_sync) { - auto & buffer = buffers.find(table_name)->second; - Block result_rows = buffer.description.sample_block.cloneWithColumns(std::move(buffer.columns)); + auto & storage_data = storages.find(table_name)->second; + Block result_rows = storage_data.buffer.description.sample_block.cloneWithColumns(std::move(storage_data.buffer.columns)); if (result_rows.rows()) { - auto storage = storages.find(table_name)->second.storage; + auto storage = storage_data.storage; auto insert_context = Context::createCopy(context); insert_context->setInternalQuery(true); auto insert = std::make_shared(); insert->table_id = storage->getStorageID(); - insert->columns = buffer.columns_ast; + insert->columns = storage_data.buffer.columns_ast; InterpreterInsertQuery interpreter(insert, insert_context, true); auto io = interpreter.execute(); @@ -542,7 +530,7 @@ void MaterializedPostgreSQLConsumer::syncTables() CompletedPipelineExecutor executor(io.pipeline); executor.execute(); - buffer.columns = buffer.description.sample_block.cloneEmptyColumns(); + storage_data.buffer.columns = storage_data.buffer.description.sample_block.cloneEmptyColumns(); } } @@ -629,11 +617,12 @@ void MaterializedPostgreSQLConsumer::markTableAsSkipped(Int32 relation_id, const { /// Empty lsn string means - continue waiting for valid lsn. skip_list.insert({relation_id, ""}); + auto storage_iter = storages.find(relation_name); - if (storages.count(relation_name)) + if (storage_iter != storages.end()) { /// Clear table buffer. - auto & buffer = buffers.find(relation_name)->second; + auto & buffer = storage_iter->second.buffer; buffer.columns = buffer.description.sample_block.cloneEmptyColumns(); /// Erase cached schema identifiers. It will be updated again once table is allowed back into replication stream /// and it receives first data after update. @@ -653,9 +642,6 @@ void MaterializedPostgreSQLConsumer::addNested( /// Cache new pointer to replacingMergeTree table. storages.emplace(postgres_table_name, nested_storage_info); - /// Add new in-memory buffer. - buffers.emplace(postgres_table_name, Buffer(nested_storage_info.storage, nested_storage_info.attributes)); - /// Replication consumer will read wall and check for currently processed table whether it is allowed to start applying /// changes to this table. waiting_list[postgres_table_name] = table_start_lsn; @@ -667,10 +653,6 @@ void MaterializedPostgreSQLConsumer::updateNested(const String & table_name, Sto /// Cache new pointer to replacingMergeTree table. storages.emplace(table_name, nested_storage_info); - /// Create a new empty buffer (with updated metadata), where data is first loaded before syncing into actual table. - auto & buffer = buffers.find(table_name)->second; - buffer.createEmptyBuffer(nested_storage_info.storage); - /// Set start position to valid lsn. Before it was an empty string. Further read for table allowed, if it has a valid lsn. skip_list[table_id] = table_start_lsn; } @@ -679,7 +661,6 @@ void MaterializedPostgreSQLConsumer::updateNested(const String & table_name, Sto void MaterializedPostgreSQLConsumer::removeNested(const String & postgres_table_name) { storages.erase(postgres_table_name); - buffers.erase(postgres_table_name); deleted_tables.insert(postgres_table_name); } diff --git a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.h b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.h index 6d35f7fcdfd..f37cb3bffef 100644 --- a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.h +++ b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.h @@ -23,12 +23,44 @@ struct StorageInfo StorageInfo(StoragePtr storage_, const PostgreSQLTableStructure::Attributes & attributes_) : storage(storage_), attributes(attributes_) {} }; +using StorageInfos = std::unordered_map; class MaterializedPostgreSQLConsumer { -public: - using Storages = std::unordered_map; +private: + struct StorageData + { + struct Buffer + { + ExternalResultDescription description; + MutableColumns columns; + /// Needed to pass to insert query columns list in syncTables(). + std::shared_ptr columns_ast; + /// Needed for insertPostgreSQLValue() method to parse array + std::unordered_map array_info; + /// To validate ddl. + PostgreSQLTableStructure::Attributes attributes; + + Buffer(StorageMetadataPtr storage_metadata, const PostgreSQLTableStructure::Attributes & attributes_); + + size_t getColumnsNum() const + { + const auto & sample_block = description.sample_block; + return sample_block.columns(); + } + }; + + StoragePtr storage; + Buffer buffer; + + explicit StorageData(const StorageInfo & storage_info); + StorageData(const StorageData & other) = delete; + }; + + using Storages = std::unordered_map; + +public: MaterializedPostgreSQLConsumer( ContextPtr context_, std::shared_ptr connection_, @@ -38,7 +70,7 @@ public: size_t max_block_size_, bool schema_as_a_part_of_table_name_, bool allow_automatic_update_, - Storages storages_, + StorageInfos storages_, const String & name_for_logger); bool consume(std::vector> & skipped_tables); @@ -65,34 +97,8 @@ private: bool isSyncAllowed(Int32 relation_id, const String & relation_name); - struct Buffer - { - ExternalResultDescription description; - MutableColumns columns; - - /// Needed to pass to insert query columns list in syncTables(). - std::shared_ptr columns_ast; - - /// Needed for insertPostgreSQLValue() method to parse array - std::unordered_map array_info; - - PostgreSQLTableStructure::Attributes attributes; - - Buffer(StoragePtr storage, const PostgreSQLTableStructure::Attributes & attributes_); - - void createEmptyBuffer(StoragePtr storage); - - size_t getColumnsNum() const - { - const auto & sample_block = description.sample_block; - return sample_block.columns(); - } - }; - - using Buffers = std::unordered_map; - - static void insertDefaultValue(Buffer & buffer, size_t column_idx); - void insertValue(Buffer & buffer, const std::string & value, size_t column_idx); + static void insertDefaultValue(StorageData::Buffer & buffer, size_t column_idx); + void insertValue(StorageData::Buffer & buffer, const std::string & value, size_t column_idx); enum class PostgreSQLQuery { @@ -101,7 +107,7 @@ private: DELETE }; - void readTupleData(Buffer & buffer, const char * message, size_t & pos, size_t size, PostgreSQLQuery type, bool old_value = false); + void readTupleData(StorageData::Buffer & buffer, const char * message, size_t & pos, size_t size, PostgreSQLQuery type, bool old_value = false); template static T unhexN(const char * message, size_t pos, size_t n); @@ -144,19 +150,11 @@ private: /// Holds `postgres_table_name` set. std::unordered_set tables_to_sync; - /// `postgres_table_name` -> ReplacingMergeTree table. + /// `postgres_table_name` -> StorageData. Storages storages; - /// `postgres_table_name` -> In-memory buffer. - Buffers buffers; std::unordered_map relation_id_to_name; - struct TableAttributes - { - Int16 number_of_columns; - PostgreSQLTableStructure::Attributes column_identifiers; - }; - /// `postgres_relation_id` -> `start_lsn` /// skip_list contains relation ids for tables on which ddl was performed, which can break synchronization. /// This breaking changes are detected in replication stream in according replication message and table is added to skip list. diff --git a/tests/integration/test_postgresql_replica_database_engine_2/test.py b/tests/integration/test_postgresql_replica_database_engine_2/test.py index 0991811e3a1..9a1e2cd9a38 100644 --- a/tests/integration/test_postgresql_replica_database_engine_2/test.py +++ b/tests/integration/test_postgresql_replica_database_engine_2/test.py @@ -659,7 +659,18 @@ def test_table_schema_changes_2(started_cluster): cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value2") instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, toString(number), toString(number), toString(number), number from numbers(75, 25)") check_tables_are_synchronized(table_name); - # TODO: ADD RESTART + instance.restart_clickhouse() + check_tables_are_synchronized(table_name); + cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value5") + cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value5 Text") + instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, toString(number), toString(number), toString(number), toString(number) from numbers(100, 25)") + check_tables_are_synchronized(table_name); + cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value6 Text") + cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value7 Integer") + cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value8 Integer") + cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value5") + instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, toString(number), toString(number), toString(number), toString(number), number, number from numbers(125, 25)") + check_tables_are_synchronized(table_name); if __name__ == '__main__':