This commit is contained in:
kssenii 2023-07-18 12:20:56 +02:00
parent c9e752fdc5
commit 0cfd12aba4
2 changed files with 45 additions and 21 deletions

View File

@ -11,6 +11,7 @@
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <QueryPipeline/QueryPipeline.h>
#include <QueryPipeline/Pipe.h>
#include "Common/Exception.h"
#include <Common/SettingsChanges.h>
@ -22,6 +23,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int POSTGRESQL_REPLICATION_INTERNAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int TOO_MANY_PARTS;
}
MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
@ -556,34 +558,49 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
void MaterializedPostgreSQLConsumer::syncTables()
{
for (const auto & table_name : tables_to_sync)
while (!tables_to_sync.empty())
{
auto table_name = *tables_to_sync.begin();
auto & storage_data = storages.find(table_name)->second;
Block result_rows = storage_data.buffer.description.sample_block.cloneWithColumns(std::move(storage_data.buffer.columns));
storage_data.buffer.columns = storage_data.buffer.description.sample_block.cloneEmptyColumns();
if (result_rows.rows())
try
{
auto storage = storage_data.storage;
if (result_rows.rows())
{
auto storage = storage_data.storage;
auto insert_context = Context::createCopy(context);
insert_context->setInternalQuery(true);
auto insert_context = Context::createCopy(context);
insert_context->setInternalQuery(true);
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = storage->getStorageID();
insert->columns = storage_data.buffer.columns_ast;
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = storage->getStorageID();
insert->columns = storage_data.buffer.columns_ast;
InterpreterInsertQuery interpreter(insert, insert_context, true);
auto io = interpreter.execute();
auto input = std::make_shared<SourceFromSingleChunk>(
result_rows.cloneEmpty(), Chunk(result_rows.getColumns(), result_rows.rows()));
InterpreterInsertQuery interpreter(insert, insert_context, true);
auto io = interpreter.execute();
auto input = std::make_shared<SourceFromSingleChunk>(
result_rows.cloneEmpty(), Chunk(result_rows.getColumns(), result_rows.rows()));
assertBlocksHaveEqualStructure(input->getPort().getHeader(), io.pipeline.getHeader(), "postgresql replica table sync");
io.pipeline.complete(Pipe(std::move(input)));
assertBlocksHaveEqualStructure(input->getPort().getHeader(), io.pipeline.getHeader(), "postgresql replica table sync");
io.pipeline.complete(Pipe(std::move(input)));
CompletedPipelineExecutor executor(io.pipeline);
executor.execute();
CompletedPipelineExecutor executor(io.pipeline);
executor.execute();
}
}
catch (DB::Exception & e)
{
if (e.code() == ErrorCodes::TOO_MANY_PARTS)
{
/// Retry this buffer later.
storage_data.buffer.columns = result_rows.mutateColumns();
}
throw;
}
tables_to_sync.erase(tables_to_sync.begin());
}
LOG_DEBUG(log, "Table sync end for {} tables, last lsn: {} = {}, (attempted lsn {})", tables_to_sync.size(), current_lsn, getLSNValue(current_lsn), getLSNValue(final_lsn));
@ -735,8 +752,12 @@ void MaterializedPostgreSQLConsumer::setSetting(const SettingChange & setting)
/// Read binary changes from replication slot via COPY command (starting from current lsn in a slot).
bool MaterializedPostgreSQLConsumer::consume()
{
bool slot_empty = true;
if (!tables_to_sync.empty())
{
syncTables();
}
bool slot_empty = true;
try
{
auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef());

View File

@ -545,7 +545,9 @@ def test_database_with_multiple_non_default_schemas_2(started_cluster):
clickhouse_postgres_db = f"clickhouse_postgres_db{i}"
create_postgres_schema(cursor, schema_name)
pg_manager.create_clickhouse_postgres_db(
database_name=clickhouse_postgres_db, schema_name=schema_name, postgres_database="postgres_database",
database_name=clickhouse_postgres_db,
schema_name=schema_name,
postgres_database="postgres_database",
)
for ti in range(NUM_TABLES):
table_name = f"postgresql_replica_{ti}"
@ -695,15 +697,16 @@ def test_too_many_parts(started_cluster):
time.sleep(1)
print(f"wait sync try {i}")
if instance2.contains_in_log("DB::Exception: Too many parts"):
num = num - 1
break
assert num == int(
instance2.query("SELECT count() FROM test_database.test_table")
)
) or num - 1 == int(instance2.query("SELECT count() FROM test_database.test_table"))
assert instance2.contains_in_log("DB::Exception: Too many parts")
print(num)
assert num == int(instance2.query("SELECT count() FROM test_database.test_table"))
assert num == int(
instance2.query("SELECT count() FROM test_database.test_table")
) or num - 1 == int(instance2.query("SELECT count() FROM test_database.test_table"))
instance2.query("SYSTEM START MERGES")
check_tables_are_synchronized(