ClickHouse/dbms/include/DB/IO/ChunkedWriteBuffer.h

70 lines
1.7 KiB
C++
Raw Normal View History

2012-03-25 03:47:13 +00:00
#pragma once
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/WriteHelpers.h>
2012-03-25 07:52:31 +00:00
#include <iostream>
2012-03-25 03:47:13 +00:00
namespace DB
{
2012-03-25 03:47:13 +00:00
/** Записывает данные в формат, состоящий из чанков
* (идентификатор запроса, признак последнего чанка, размер чанка, часть данных со сжатием или без).
* Нельзя использовать out напрямую.
*/
class ChunkedWriteBuffer : public WriteBuffer
{
protected:
WriteBuffer & out;
UInt64 query_id;
inline size_t headerSize() { return sizeof(query_id) + sizeof(bool) + sizeof(size_t); }
void checkBufferSize()
{
if (out.buffer().end() - out.position() < 2 * static_cast<int>(headerSize()))
throw Exception("Too small remaining buffer size to write chunked data", ErrorCodes::TOO_SMALL_BUFFER_SIZE);
}
void nextImpl() override
2012-03-25 03:47:13 +00:00
{
2012-05-08 11:19:00 +00:00
/* std::cerr << out.offset() << std::endl;
2012-03-25 07:52:31 +00:00
std::cerr << query_id << std::endl;
2012-05-08 11:19:00 +00:00
std::cerr << offset() << std::endl;*/
2012-03-25 03:47:13 +00:00
checkBufferSize();
writeIntBinary(query_id, out);
writeIntBinary(false, out);
writeIntBinary(offset(), out);
2012-05-08 11:19:00 +00:00
// std::cerr << out.offset() << std::endl;
2012-03-25 07:52:31 +00:00
2012-03-25 03:47:13 +00:00
out.position() = position();
out.next();
working_buffer = Buffer(out.buffer().begin() + headerSize(), out.buffer().end());
pos = working_buffer.begin();
}
public:
ChunkedWriteBuffer(WriteBuffer & out_, UInt64 query_id_)
: WriteBuffer(out_.position() + headerSize(), out_.buffer().size() - out_.offset() - headerSize()), out(out_), query_id(query_id_)
{
checkBufferSize();
}
void finish()
{
next();
writeIntBinary(query_id, out);
writeIntBinary(true, out);
writeIntBinary(static_cast<size_t>(0), out);
}
};
}