This commit is contained in:
kssenii 2022-01-09 00:37:11 +03:00
parent 61f85e9afc
commit 6639a93735
3 changed files with 92 additions and 102 deletions

View File

@ -30,7 +30,7 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
const size_t max_block_size_, const size_t max_block_size_,
bool schema_as_a_part_of_table_name_, bool schema_as_a_part_of_table_name_,
bool allow_automatic_update_, bool allow_automatic_update_,
Storages storages_, StorageInfos storages_info_,
const String & name_for_logger) const String & name_for_logger)
: log(&Poco::Logger::get("PostgreSQLReplicaConsumer(" + name_for_logger + ")")) : log(&Poco::Logger::get("PostgreSQLReplicaConsumer(" + name_for_logger + ")"))
, context(context_) , context(context_)
@ -42,7 +42,6 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, schema_as_a_part_of_table_name(schema_as_a_part_of_table_name_) , schema_as_a_part_of_table_name(schema_as_a_part_of_table_name_)
, allow_automatic_update(allow_automatic_update_) , allow_automatic_update(allow_automatic_update_)
, storages(storages_)
{ {
final_lsn = start_lsn; final_lsn = start_lsn;
auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef()); auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
@ -50,24 +49,24 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
LOG_TRACE(log, "Starting replication. LSN: {} (last: {})", getLSNValue(current_lsn), getLSNValue(final_lsn)); LOG_TRACE(log, "Starting replication. LSN: {} (last: {})", getLSNValue(current_lsn), getLSNValue(final_lsn));
tx->commit(); tx->commit();
for (const auto & [table_name, storage_info] : storages) for (const auto & [table_name, storage_info] : storages_info_)
buffers.emplace(table_name, Buffer(storage_info.storage, storage_info.attributes)); storages.emplace(table_name, storage_info);
} }
MaterializedPostgreSQLConsumer::Buffer::Buffer(StoragePtr storage, const PostgreSQLTableStructure::Attributes & attributes_) MaterializedPostgreSQLConsumer::StorageData::StorageData(const StorageInfo & storage_info)
: storage(storage_info.storage), buffer(storage_info.storage->getInMemoryMetadataPtr(), storage_info.attributes)
{
auto table_id = storage_info.storage->getStorageID();
LOG_TRACE(&Poco::Logger::get("StorageMaterializedPostgreSQL"), "New buffer for table {}.{} ({}), structure: {}",
table_id.database_name, table_id.table_name, toString(table_id.uuid), buffer.description.sample_block.dumpStructure());
}
MaterializedPostgreSQLConsumer::StorageData::Buffer::Buffer(
StorageMetadataPtr storage_metadata, const PostgreSQLTableStructure::Attributes & attributes_)
: attributes(attributes_) : attributes(attributes_)
{ {
createEmptyBuffer(storage);
if (attributes.size() + 2 != getColumnsNum()) /// +2 because sign and version columns
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Columns number mismatch. Attributes: {}, buffer: {}", attributes.size(), getColumnsNum());
}
void MaterializedPostgreSQLConsumer::Buffer::createEmptyBuffer(StoragePtr storage)
{
const auto storage_metadata = storage->getInMemoryMetadataPtr();
const Block sample_block = storage_metadata->getSampleBlock(); const Block sample_block = storage_metadata->getSampleBlock();
/// Need to clear type, because in description.init() the types are appended (emplace_back) /// Need to clear type, because in description.init() the types are appended (emplace_back)
@ -78,13 +77,13 @@ void MaterializedPostgreSQLConsumer::Buffer::createEmptyBuffer(StoragePtr storag
const auto & storage_columns = storage_metadata->getColumns().getAllPhysical(); const auto & storage_columns = storage_metadata->getColumns().getAllPhysical();
auto insert_columns = std::make_shared<ASTExpressionList>(); auto insert_columns = std::make_shared<ASTExpressionList>();
auto table_id = storage->getStorageID(); auto columns_num = description.sample_block.columns();
LOG_TRACE(&Poco::Logger::get("MaterializedPostgreSQLBuffer"), "New buffer for table {}.{} ({}), structure: {}", assert(columns_num == storage_columns.size());
table_id.database_name, table_id.table_name, toString(table_id.uuid), sample_block.dumpStructure()); if (attributes.size() + 2 != columns_num) /// +2 because sign and version columns
throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns number mismatch. Attributes: {}, buffer: {}",
attributes.size(), columns_num);
assert(description.sample_block.columns() == storage_columns.size());
size_t idx = 0; size_t idx = 0;
for (const auto & column : storage_columns) for (const auto & column : storage_columns)
{ {
if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray) if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray)
@ -98,7 +97,7 @@ void MaterializedPostgreSQLConsumer::Buffer::createEmptyBuffer(StoragePtr storag
} }
void MaterializedPostgreSQLConsumer::insertValue(Buffer & buffer, const std::string & value, size_t column_idx) void MaterializedPostgreSQLConsumer::insertValue(StorageData::Buffer & buffer, const std::string & value, size_t column_idx)
{ {
const auto & sample = buffer.description.sample_block.getByPosition(column_idx); const auto & sample = buffer.description.sample_block.getByPosition(column_idx);
bool is_nullable = buffer.description.types[column_idx].second; bool is_nullable = buffer.description.types[column_idx].second;
@ -132,7 +131,7 @@ void MaterializedPostgreSQLConsumer::insertValue(Buffer & buffer, const std::str
} }
void MaterializedPostgreSQLConsumer::insertDefaultValue(Buffer & buffer, size_t column_idx) void MaterializedPostgreSQLConsumer::insertDefaultValue(StorageData::Buffer & buffer, size_t column_idx)
{ {
const auto & sample = buffer.description.sample_block.getByPosition(column_idx); const auto & sample = buffer.description.sample_block.getByPosition(column_idx);
insertDefaultPostgreSQLValue(*buffer.columns[column_idx], *sample.column); insertDefaultPostgreSQLValue(*buffer.columns[column_idx], *sample.column);
@ -203,7 +202,7 @@ Int8 MaterializedPostgreSQLConsumer::readInt8(const char * message, size_t & pos
void MaterializedPostgreSQLConsumer::readTupleData( void MaterializedPostgreSQLConsumer::readTupleData(
Buffer & buffer, const char * message, size_t & pos, [[maybe_unused]] size_t size, PostgreSQLQuery type, bool old_value) StorageData::Buffer & buffer, const char * message, size_t & pos, [[maybe_unused]] size_t size, PostgreSQLQuery type, bool old_value)
{ {
Int16 num_columns = readInt16(message, pos, size); Int16 num_columns = readInt16(message, pos, size);
@ -231,7 +230,7 @@ void MaterializedPostgreSQLConsumer::readTupleData(
static constexpr Int32 sanity_check_max_col_len = 1024 * 8 * 2; /// *2 -- just in case. static constexpr Int32 sanity_check_max_col_len = 1024 * 8 * 2; /// *2 -- just in case.
if (unlikely(col_len > sanity_check_max_col_len)) if (unlikely(col_len > sanity_check_max_col_len))
throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR, throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR,
"Column legth is suspiciously long: {}", col_len); "Column length is suspiciously long: {}", col_len);
String value; String value;
for (Int32 i = 0; i < col_len; ++i) for (Int32 i = 0; i < col_len; ++i)
@ -316,11 +315,10 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
return; return;
Int8 new_tuple = readInt8(replication_message, pos, size); Int8 new_tuple = readInt8(replication_message, pos, size);
auto buffer = buffers.find(table_name); auto & buffer = storages.find(table_name)->second.buffer;
assert(buffer != buffers.end());
if (new_tuple) if (new_tuple)
readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::INSERT); readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::INSERT);
break; break;
} }
@ -337,8 +335,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
if (!isSyncAllowed(relation_id, table_name)) if (!isSyncAllowed(relation_id, table_name))
return; return;
auto buffer = buffers.find(table_name); auto & buffer = storages.find(table_name)->second.buffer;
assert(buffer != buffers.end());
auto proccess_identifier = [&](Int8 identifier) -> bool auto proccess_identifier = [&](Int8 identifier) -> bool
{ {
@ -353,13 +350,13 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
/// it is much more efficient to use replica identity index, but support all possible cases. /// it is much more efficient to use replica identity index, but support all possible cases.
case 'O': case 'O':
{ {
readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::UPDATE, true); readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::UPDATE, true);
break; break;
} }
case 'N': case 'N':
{ {
/// New row. /// New row.
readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::UPDATE); readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::UPDATE);
read_next = false; read_next = false;
break; break;
} }
@ -393,10 +390,8 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
/// 0 or 1 if replica identity is set to full. For now only default replica identity is supported (with primary keys). /// 0 or 1 if replica identity is set to full. For now only default replica identity is supported (with primary keys).
readInt8(replication_message, pos, size); readInt8(replication_message, pos, size);
auto buffer = buffers.find(table_name); auto & buffer = storages.find(table_name)->second.buffer;
assert(buffer != buffers.end()); readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::DELETE);
readTupleData(buffer->second, replication_message, pos, size, PostgreSQLQuery::DELETE);
break; break;
} }
case 'C': // Commit case 'C': // Commit
@ -430,7 +425,8 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
if (!isSyncAllowed(relation_id, relation_name)) if (!isSyncAllowed(relation_id, relation_name))
return; return;
if (storages.find(table_name) == storages.end()) auto storage_iter = storages.find(table_name);
if (storage_iter == storages.end())
{ {
/// FIXME: This can happen if we created a publication with this table but then got an exception that this /// FIXME: This can happen if we created a publication with this table but then got an exception that this
/// table has primary key or something else. /// table has primary key or something else.
@ -442,15 +438,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
return; return;
} }
auto buffer_iter = buffers.find(table_name); auto & buffer = storage_iter->second.buffer;
if (buffer_iter == buffers.end())
{
/// Must never happen if previous check for storage existance passed, but just in case.
LOG_ERROR(log, "No buffer found for table `{}`.", table_name);
markTableAsSkipped(relation_id, table_name);
return;
}
const auto & buffer = buffer_iter->second;
/// 'd' - default (primary key if any) /// 'd' - default (primary key if any)
/// 'n' - nothing /// 'n' - nothing
@ -517,19 +505,19 @@ void MaterializedPostgreSQLConsumer::syncTables()
{ {
for (const auto & table_name : tables_to_sync) for (const auto & table_name : tables_to_sync)
{ {
auto & buffer = buffers.find(table_name)->second; auto & storage_data = storages.find(table_name)->second;
Block result_rows = buffer.description.sample_block.cloneWithColumns(std::move(buffer.columns)); Block result_rows = storage_data.buffer.description.sample_block.cloneWithColumns(std::move(storage_data.buffer.columns));
if (result_rows.rows()) if (result_rows.rows())
{ {
auto storage = storages.find(table_name)->second.storage; auto storage = storage_data.storage;
auto insert_context = Context::createCopy(context); auto insert_context = Context::createCopy(context);
insert_context->setInternalQuery(true); insert_context->setInternalQuery(true);
auto insert = std::make_shared<ASTInsertQuery>(); auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = storage->getStorageID(); insert->table_id = storage->getStorageID();
insert->columns = buffer.columns_ast; insert->columns = storage_data.buffer.columns_ast;
InterpreterInsertQuery interpreter(insert, insert_context, true); InterpreterInsertQuery interpreter(insert, insert_context, true);
auto io = interpreter.execute(); auto io = interpreter.execute();
@ -542,7 +530,7 @@ void MaterializedPostgreSQLConsumer::syncTables()
CompletedPipelineExecutor executor(io.pipeline); CompletedPipelineExecutor executor(io.pipeline);
executor.execute(); executor.execute();
buffer.columns = buffer.description.sample_block.cloneEmptyColumns(); storage_data.buffer.columns = storage_data.buffer.description.sample_block.cloneEmptyColumns();
} }
} }
@ -629,11 +617,12 @@ void MaterializedPostgreSQLConsumer::markTableAsSkipped(Int32 relation_id, const
{ {
/// Empty lsn string means - continue waiting for valid lsn. /// Empty lsn string means - continue waiting for valid lsn.
skip_list.insert({relation_id, ""}); skip_list.insert({relation_id, ""});
auto storage_iter = storages.find(relation_name);
if (storages.count(relation_name)) if (storage_iter != storages.end())
{ {
/// Clear table buffer. /// Clear table buffer.
auto & buffer = buffers.find(relation_name)->second; auto & buffer = storage_iter->second.buffer;
buffer.columns = buffer.description.sample_block.cloneEmptyColumns(); buffer.columns = buffer.description.sample_block.cloneEmptyColumns();
/// Erase cached schema identifiers. It will be updated again once table is allowed back into replication stream /// Erase cached schema identifiers. It will be updated again once table is allowed back into replication stream
/// and it receives first data after update. /// and it receives first data after update.
@ -653,9 +642,6 @@ void MaterializedPostgreSQLConsumer::addNested(
/// Cache new pointer to replacingMergeTree table. /// Cache new pointer to replacingMergeTree table.
storages.emplace(postgres_table_name, nested_storage_info); storages.emplace(postgres_table_name, nested_storage_info);
/// Add new in-memory buffer.
buffers.emplace(postgres_table_name, Buffer(nested_storage_info.storage, nested_storage_info.attributes));
/// Replication consumer will read wall and check for currently processed table whether it is allowed to start applying /// Replication consumer will read wall and check for currently processed table whether it is allowed to start applying
/// changes to this table. /// changes to this table.
waiting_list[postgres_table_name] = table_start_lsn; waiting_list[postgres_table_name] = table_start_lsn;
@ -667,10 +653,6 @@ void MaterializedPostgreSQLConsumer::updateNested(const String & table_name, Sto
/// Cache new pointer to replacingMergeTree table. /// Cache new pointer to replacingMergeTree table.
storages.emplace(table_name, nested_storage_info); storages.emplace(table_name, nested_storage_info);
/// Create a new empty buffer (with updated metadata), where data is first loaded before syncing into actual table.
auto & buffer = buffers.find(table_name)->second;
buffer.createEmptyBuffer(nested_storage_info.storage);
/// Set start position to valid lsn. Before it was an empty string. Further read for table allowed, if it has a valid lsn. /// Set start position to valid lsn. Before it was an empty string. Further read for table allowed, if it has a valid lsn.
skip_list[table_id] = table_start_lsn; skip_list[table_id] = table_start_lsn;
} }
@ -679,7 +661,6 @@ void MaterializedPostgreSQLConsumer::updateNested(const String & table_name, Sto
void MaterializedPostgreSQLConsumer::removeNested(const String & postgres_table_name) void MaterializedPostgreSQLConsumer::removeNested(const String & postgres_table_name)
{ {
storages.erase(postgres_table_name); storages.erase(postgres_table_name);
buffers.erase(postgres_table_name);
deleted_tables.insert(postgres_table_name); deleted_tables.insert(postgres_table_name);
} }

View File

@ -23,12 +23,44 @@ struct StorageInfo
StorageInfo(StoragePtr storage_, const PostgreSQLTableStructure::Attributes & attributes_) StorageInfo(StoragePtr storage_, const PostgreSQLTableStructure::Attributes & attributes_)
: storage(storage_), attributes(attributes_) {} : storage(storage_), attributes(attributes_) {}
}; };
using StorageInfos = std::unordered_map<String, StorageInfo>;
class MaterializedPostgreSQLConsumer class MaterializedPostgreSQLConsumer
{ {
public: private:
using Storages = std::unordered_map<String, StorageInfo>; struct StorageData
{
struct Buffer
{
ExternalResultDescription description;
MutableColumns columns;
/// Needed to pass to insert query columns list in syncTables().
std::shared_ptr<ASTExpressionList> columns_ast;
/// Needed for insertPostgreSQLValue() method to parse array
std::unordered_map<size_t, PostgreSQLArrayInfo> array_info;
/// To validate ddl.
PostgreSQLTableStructure::Attributes attributes;
Buffer(StorageMetadataPtr storage_metadata, const PostgreSQLTableStructure::Attributes & attributes_);
size_t getColumnsNum() const
{
const auto & sample_block = description.sample_block;
return sample_block.columns();
}
};
StoragePtr storage;
Buffer buffer;
explicit StorageData(const StorageInfo & storage_info);
StorageData(const StorageData & other) = delete;
};
using Storages = std::unordered_map<String, StorageData>;
public:
MaterializedPostgreSQLConsumer( MaterializedPostgreSQLConsumer(
ContextPtr context_, ContextPtr context_,
std::shared_ptr<postgres::Connection> connection_, std::shared_ptr<postgres::Connection> connection_,
@ -38,7 +70,7 @@ public:
size_t max_block_size_, size_t max_block_size_,
bool schema_as_a_part_of_table_name_, bool schema_as_a_part_of_table_name_,
bool allow_automatic_update_, bool allow_automatic_update_,
Storages storages_, StorageInfos storages_,
const String & name_for_logger); const String & name_for_logger);
bool consume(std::vector<std::pair<Int32, String>> & skipped_tables); bool consume(std::vector<std::pair<Int32, String>> & skipped_tables);
@ -65,34 +97,8 @@ private:
bool isSyncAllowed(Int32 relation_id, const String & relation_name); bool isSyncAllowed(Int32 relation_id, const String & relation_name);
struct Buffer static void insertDefaultValue(StorageData::Buffer & buffer, size_t column_idx);
{ void insertValue(StorageData::Buffer & buffer, const std::string & value, size_t column_idx);
ExternalResultDescription description;
MutableColumns columns;
/// Needed to pass to insert query columns list in syncTables().
std::shared_ptr<ASTExpressionList> columns_ast;
/// Needed for insertPostgreSQLValue() method to parse array
std::unordered_map<size_t, PostgreSQLArrayInfo> array_info;
PostgreSQLTableStructure::Attributes attributes;
Buffer(StoragePtr storage, const PostgreSQLTableStructure::Attributes & attributes_);
void createEmptyBuffer(StoragePtr storage);
size_t getColumnsNum() const
{
const auto & sample_block = description.sample_block;
return sample_block.columns();
}
};
using Buffers = std::unordered_map<String, Buffer>;
static void insertDefaultValue(Buffer & buffer, size_t column_idx);
void insertValue(Buffer & buffer, const std::string & value, size_t column_idx);
enum class PostgreSQLQuery enum class PostgreSQLQuery
{ {
@ -101,7 +107,7 @@ private:
DELETE DELETE
}; };
void readTupleData(Buffer & buffer, const char * message, size_t & pos, size_t size, PostgreSQLQuery type, bool old_value = false); void readTupleData(StorageData::Buffer & buffer, const char * message, size_t & pos, size_t size, PostgreSQLQuery type, bool old_value = false);
template<typename T> template<typename T>
static T unhexN(const char * message, size_t pos, size_t n); static T unhexN(const char * message, size_t pos, size_t n);
@ -144,19 +150,11 @@ private:
/// Holds `postgres_table_name` set. /// Holds `postgres_table_name` set.
std::unordered_set<std::string> tables_to_sync; std::unordered_set<std::string> tables_to_sync;
/// `postgres_table_name` -> ReplacingMergeTree table. /// `postgres_table_name` -> StorageData.
Storages storages; Storages storages;
/// `postgres_table_name` -> In-memory buffer.
Buffers buffers;
std::unordered_map<Int32, String> relation_id_to_name; std::unordered_map<Int32, String> relation_id_to_name;
struct TableAttributes
{
Int16 number_of_columns;
PostgreSQLTableStructure::Attributes column_identifiers;
};
/// `postgres_relation_id` -> `start_lsn` /// `postgres_relation_id` -> `start_lsn`
/// skip_list contains relation ids for tables on which ddl was performed, which can break synchronization. /// skip_list contains relation ids for tables on which ddl was performed, which can break synchronization.
/// This breaking changes are detected in replication stream in according replication message and table is added to skip list. /// This breaking changes are detected in replication stream in according replication message and table is added to skip list.

View File

@ -659,7 +659,18 @@ def test_table_schema_changes_2(started_cluster):
cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value2") cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value2")
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, toString(number), toString(number), toString(number), number from numbers(75, 25)") instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, toString(number), toString(number), toString(number), number from numbers(75, 25)")
check_tables_are_synchronized(table_name); check_tables_are_synchronized(table_name);
# TODO: ADD RESTART instance.restart_clickhouse()
check_tables_are_synchronized(table_name);
cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value5")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value5 Text")
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, toString(number), toString(number), toString(number), toString(number) from numbers(100, 25)")
check_tables_are_synchronized(table_name);
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value6 Text")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value7 Integer")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN value8 Integer")
cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN value5")
instance.query(f"INSERT INTO postgres_database.{table_name} SELECT number, toString(number), toString(number), toString(number), toString(number), number, number from numbers(125, 25)")
check_tables_are_synchronized(table_name);
if __name__ == '__main__': if __name__ == '__main__':