Tiny fix.

This commit is contained in:
Nikolai Kochetov 2021-09-24 13:41:52 +03:00
parent 24dac111b7
commit 9d3e8fb9c3
2 changed files with 22 additions and 18 deletions

View File

@ -642,19 +642,21 @@ bool StorageKafka::streamToViews()
// It will be cancelled on underlying layer (kafka buffer)
size_t rows = 0;
PushingPipelineExecutor executor(block_io.pipeline);
in->readPrefix();
executor.start();
while (auto block = in->read())
{
rows += block.rows();
executor.push(std::move(block));
}
PushingPipelineExecutor executor(block_io.pipeline);
in->readSuffix();
executor.finish();
in->readPrefix();
executor.start();
while (auto block = in->read())
{
rows += block.rows();
executor.push(std::move(block));
}
in->readSuffix();
executor.finish();
}
bool some_stream_is_stalled = false;
for (auto & stream : streams)

View File

@ -997,13 +997,15 @@ bool StorageRabbitMQ::streamToViews()
looping_task->activateAndSchedule();
}
PushingPipelineExecutor executor(block_io.pipeline);
executor.start();
in->readPrefix();
while (auto block = in->read())
executor.push(std::move(block));
executor.finish();
in->readSuffix();
{
PushingPipelineExecutor executor(block_io.pipeline);
in->readPrefix();
executor.start();
while (auto block = in->read())
executor.push(std::move(block));
in->readSuffix();
executor.finish();
}
/* Note: sending ack() with loop running in another thread will lead to a lot of data races inside the library, but only in case
* error occurs or connection is lost while ack is being sent