Update MaterializedPostgreSQLConsumer.cpp

This commit is contained in:
Kseniia Sumarokova 2022-08-24 12:24:44 +02:00 committed by GitHub
parent 4486f26d51
commit de9d254290
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -45,7 +45,7 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
, schema_as_a_part_of_table_name(schema_as_a_part_of_table_name_) , schema_as_a_part_of_table_name(schema_as_a_part_of_table_name_)
, allow_automatic_update(allow_automatic_update_) , allow_automatic_update(allow_automatic_update_)
{ {
commited = false; committed = false;
final_lsn = start_lsn; final_lsn = start_lsn;
auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef()); auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
current_lsn = advanceLSN(tx); current_lsn = advanceLSN(tx);
@ -444,7 +444,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
pos += unused_flags_len + commit_lsn_len + transaction_end_lsn_len + transaction_commit_timestamp_len; pos += unused_flags_len + commit_lsn_len + transaction_end_lsn_len + transaction_commit_timestamp_len;
final_lsn = current_lsn; final_lsn = current_lsn;
commited = true; committed = true;
break; break;
} }
case 'R': // Relation case 'R': // Relation
@ -622,7 +622,7 @@ String MaterializedPostgreSQLConsumer::advanceLSN(std::shared_ptr<pqxx::nontrans
final_lsn = result[0][0].as<std::string>(); final_lsn = result[0][0].as<std::string>();
LOG_TRACE(log, "Advanced LSN up to: {}", getLSNValue(final_lsn)); LOG_TRACE(log, "Advanced LSN up to: {}", getLSNValue(final_lsn));
commited = false; committed = false;
return final_lsn; return final_lsn;
} }
@ -799,7 +799,7 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
} }
catch (const pqxx::broken_connection &) catch (const pqxx::broken_connection &)
{ {
LOG_DEBUG(log, "Connection was brocken"); LOG_DEBUG(log, "Connection was broken");
connection->tryUpdateConnection(); connection->tryUpdateConnection();
return false; return false;
} }
@ -836,7 +836,7 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
{ {
syncTables(); syncTables();
} }
else if (commited) else if (committed)
{ {
updateLsn(); updateLsn();
} }