ClickHouse/dbms/include/DB/IO/AsynchronousWriteBuffer.h

79 lines
1.7 KiB
C++
Raw Normal View History

#pragma once
#include <math.h>
#include <vector>
#include <DB/Common/ThreadPool.h>
#include <DB/IO/WriteBuffer.h>
namespace DB
{
/** Записывает данные асинхронно с помощью двойной буферизации.
*/
class AsynchronousWriteBuffer : public WriteBuffer
{
private:
WriteBuffer & out; /// Основной буфер, отвечает за запись данных.
std::vector<char> memory; /// Кусок памяти для дублирования буфера.
ThreadPool pool; /// Для асинхронной записи данных.
bool started; /// Была ли запущена асинхронная запись данных.
/// Менять местами основной и дублирующий буферы.
void swapBuffers()
{
buffer().swap(out.buffer());
std::swap(position(), out.position());
}
void nextImpl() override
{
if (!offset())
return;
if (started)
pool.wait();
else
started = true;
swapBuffers();
/// Данные будут записываться в отельном потоке.
pool.schedule([this] { thread(); });
}
public:
2014-04-08 07:58:53 +00:00
AsynchronousWriteBuffer(WriteBuffer & out_) : WriteBuffer(nullptr, 0), out(out_), memory(out.buffer().size()), pool(1), started(false)
{
/// Данные пишутся в дублирующий буфер.
set(memory.data(), memory.size());
}
~AsynchronousWriteBuffer() override
{
2013-11-18 17:17:45 +00:00
try
{
if (started)
pool.wait();
2013-11-18 17:17:45 +00:00
swapBuffers();
out.next();
}
catch (...)
{
2013-11-18 19:18:03 +00:00
tryLogCurrentException(__PRETTY_FUNCTION__);
2013-11-18 17:17:45 +00:00
}
}
/// То, что выполняется в отдельном потоке
void thread()
{
out.next();
}
};
}