This commit is contained in:
kssenii 2023-12-14 14:01:27 +01:00
parent fdfd1481df
commit a4a264c785
3 changed files with 66 additions and 39 deletions

View File

@ -138,15 +138,35 @@ MaterializedPostgreSQLConsumer::StorageData::Buffer::Buffer(
columns_ast.children.emplace_back(std::make_shared<ASTIdentifier>(name)); columns_ast.children.emplace_back(std::make_shared<ASTIdentifier>(name));
} }
MaterializedPostgreSQLConsumer::StorageData::Buffer & MaterializedPostgreSQLConsumer::StorageData::getBuffer() MaterializedPostgreSQLConsumer::StorageData::Buffer & MaterializedPostgreSQLConsumer::StorageData::getLastBuffer()
{ {
if (!buffer) if (!buffers.empty())
{ {
throw Exception(ErrorCodes::LOGICAL_ERROR, "Data buffer not initialized for {}", throw Exception(ErrorCodes::LOGICAL_ERROR, "No data buffer for {}",
storage->getStorageID().getNameForLogs()); storage->getStorageID().getNameForLogs());
} }
return *buffer; return *buffers.back();
}
MaterializedPostgreSQLConsumer::StorageData::BufferPtr MaterializedPostgreSQLConsumer::StorageData::popBuffer()
{
if (buffers.empty())
return nullptr;
auto buffer = std::move(buffers.front());
buffers.pop_front();
return buffer;
}
void MaterializedPostgreSQLConsumer::StorageData::addBuffer(BufferPtr buffer)
{
buffers.push_back(std::move(buffer));
}
void MaterializedPostgreSQLConsumer::StorageData::returnBuffer(BufferPtr buffer)
{
buffers.push_front(std::move(buffer));
} }
void MaterializedPostgreSQLConsumer::StorageData::Buffer::assertInsertIsPossible(size_t col_idx) const void MaterializedPostgreSQLConsumer::StorageData::Buffer::assertInsertIsPossible(size_t col_idx) const
@ -163,7 +183,7 @@ void MaterializedPostgreSQLConsumer::StorageData::Buffer::assertInsertIsPossible
void MaterializedPostgreSQLConsumer::insertValue(StorageData & storage_data, const std::string & value, size_t column_idx) void MaterializedPostgreSQLConsumer::insertValue(StorageData & storage_data, const std::string & value, size_t column_idx)
{ {
auto & buffer = storage_data.getBuffer(); auto & buffer = storage_data.getLastBuffer();
buffer.assertInsertIsPossible(column_idx); buffer.assertInsertIsPossible(column_idx);
const auto & column_type_and_name = buffer.sample_block.getByPosition(column_idx); const auto & column_type_and_name = buffer.sample_block.getByPosition(column_idx);
@ -203,7 +223,7 @@ void MaterializedPostgreSQLConsumer::insertValue(StorageData & storage_data, con
void MaterializedPostgreSQLConsumer::insertDefaultValue(StorageData & storage_data, size_t column_idx) void MaterializedPostgreSQLConsumer::insertDefaultValue(StorageData & storage_data, size_t column_idx)
{ {
auto & buffer = storage_data.getBuffer(); auto & buffer = storage_data.getLastBuffer();
buffer.assertInsertIsPossible(column_idx); buffer.assertInsertIsPossible(column_idx);
const auto & column_type_and_name = buffer.sample_block.getByPosition(column_idx); const auto & column_type_and_name = buffer.sample_block.getByPosition(column_idx);
@ -346,7 +366,7 @@ void MaterializedPostgreSQLConsumer::readTupleData(
} }
} }
auto & columns = storage_data.getBuffer().columns; auto & columns = storage_data.getLastBuffer().columns;
switch (type) switch (type)
{ {
case PostgreSQLQuery::INSERT: case PostgreSQLQuery::INSERT:
@ -637,7 +657,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
} }
} }
storage_data.setBuffer(std::make_unique<StorageData::Buffer>(std::move(columns), description)); storage_data.addBuffer(std::make_unique<StorageData::Buffer>(std::move(columns), description));
tables_to_sync.insert(table_name); tables_to_sync.insert(table_name);
break; break;
} }
@ -660,9 +680,10 @@ void MaterializedPostgreSQLConsumer::syncTables()
{ {
auto table_name = *tables_to_sync.begin(); auto table_name = *tables_to_sync.begin();
auto & storage_data = storages.find(table_name)->second; auto & storage_data = storages.find(table_name)->second;
auto & buffer = storage_data.getBuffer();
Block result_rows = buffer.sample_block.cloneWithColumns(std::move(buffer.columns));
while (auto buffer = storage_data.popBuffer())
{
Block result_rows = buffer->sample_block.cloneWithColumns(std::move(buffer->columns));
try try
{ {
if (result_rows.rows()) if (result_rows.rows())
@ -674,7 +695,7 @@ void MaterializedPostgreSQLConsumer::syncTables()
auto insert = std::make_shared<ASTInsertQuery>(); auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = storage->getStorageID(); insert->table_id = storage->getStorageID();
insert->columns = std::make_shared<ASTExpressionList>(buffer.columns_ast); insert->columns = std::make_shared<ASTExpressionList>(buffer->columns_ast);
InterpreterInsertQuery interpreter(insert, insert_context, true); InterpreterInsertQuery interpreter(insert, insert_context, true);
auto io = interpreter.execute(); auto io = interpreter.execute();
@ -692,11 +713,12 @@ void MaterializedPostgreSQLConsumer::syncTables()
catch (...) catch (...)
{ {
/// Retry this buffer later. /// Retry this buffer later.
buffer.columns = result_rows.mutateColumns(); buffer->columns = result_rows.mutateColumns();
storage_data.returnBuffer(std::move(buffer));
throw; throw;
} }
}
storage_data.setBuffer(nullptr);
tables_to_sync.erase(tables_to_sync.begin()); tables_to_sync.erase(tables_to_sync.begin());
} }

View File

@ -46,7 +46,7 @@ private:
const Names column_names; const Names column_names;
const ArrayInfo array_info; const ArrayInfo array_info;
struct Buffer struct Buffer : private boost::noncopyable
{ {
Block sample_block; Block sample_block;
MutableColumns columns; MutableColumns columns;
@ -56,13 +56,18 @@ private:
void assertInsertIsPossible(size_t col_idx) const; void assertInsertIsPossible(size_t col_idx) const;
}; };
using BufferPtr = std::unique_ptr<Buffer>;
Buffer & getBuffer(); Buffer & getLastBuffer();
void setBuffer(std::unique_ptr<Buffer> buffer_) { buffer = std::move(buffer_); } BufferPtr popBuffer();
void addBuffer(BufferPtr buffer);
void returnBuffer(BufferPtr buffer);
private: private:
std::unique_ptr<Buffer> buffer; std::deque<BufferPtr> buffers;
}; };
using Storages = std::unordered_map<String, StorageData>; using Storages = std::unordered_map<String, StorageData>;

View File

@ -431,7 +431,7 @@ def test_many_concurrent_queries(started_cluster):
# random update / delete query # random update / delete query
cursor.execute(query_pool[query_id].format(random_table_name)) cursor.execute(query_pool[query_id].format(random_table_name))
print("table {} query {} ok".format(random_table_name, query_id)) print("Executing for table {} query: {}".format(random_table_name, query_pool[query_id]))
# allow some thread to do inserts (not to violate key constraints) # allow some thread to do inserts (not to violate key constraints)
if thread_id < 5: if thread_id < 5: