2012-03-25 03:47:13 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
|
|
|
|
#include <DB/IO/WriteBuffer.h>
|
|
|
|
|
#include <DB/IO/WriteHelpers.h>
|
|
|
|
|
|
2012-03-25 07:52:31 +00:00
|
|
|
|
#include <iostream>
|
|
|
|
|
|
2012-03-25 03:47:13 +00:00
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** Записывает данные в формат, состоящий из чанков
|
|
|
|
|
* (идентификатор запроса, признак последнего чанка, размер чанка, часть данных со сжатием или без).
|
|
|
|
|
* Нельзя использовать out напрямую.
|
|
|
|
|
*/
|
|
|
|
|
class ChunkedWriteBuffer : public WriteBuffer
|
|
|
|
|
{
|
|
|
|
|
protected:
|
|
|
|
|
WriteBuffer & out;
|
|
|
|
|
UInt64 query_id;
|
|
|
|
|
|
|
|
|
|
inline size_t headerSize() { return sizeof(query_id) + sizeof(bool) + sizeof(size_t); }
|
|
|
|
|
|
|
|
|
|
void checkBufferSize()
|
|
|
|
|
{
|
|
|
|
|
if (out.buffer().end() - out.position() < 2 * static_cast<int>(headerSize()))
|
|
|
|
|
throw Exception("Too small remaining buffer size to write chunked data", ErrorCodes::TOO_SMALL_BUFFER_SIZE);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void nextImpl()
|
|
|
|
|
{
|
2012-05-08 11:19:00 +00:00
|
|
|
|
/* std::cerr << out.offset() << std::endl;
|
2012-03-25 07:52:31 +00:00
|
|
|
|
std::cerr << query_id << std::endl;
|
2012-05-08 11:19:00 +00:00
|
|
|
|
std::cerr << offset() << std::endl;*/
|
2012-03-25 07:52:31 +00:00
|
|
|
|
|
2012-03-25 03:47:13 +00:00
|
|
|
|
checkBufferSize();
|
|
|
|
|
|
|
|
|
|
writeIntBinary(query_id, out);
|
|
|
|
|
writeIntBinary(false, out);
|
|
|
|
|
writeIntBinary(offset(), out);
|
|
|
|
|
|
2012-05-08 11:19:00 +00:00
|
|
|
|
// std::cerr << out.offset() << std::endl;
|
2012-03-25 07:52:31 +00:00
|
|
|
|
|
2012-03-25 03:47:13 +00:00
|
|
|
|
out.position() = position();
|
|
|
|
|
out.next();
|
|
|
|
|
working_buffer = Buffer(out.buffer().begin() + headerSize(), out.buffer().end());
|
|
|
|
|
pos = working_buffer.begin();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
ChunkedWriteBuffer(WriteBuffer & out_, UInt64 query_id_)
|
|
|
|
|
: WriteBuffer(out_.position() + headerSize(), out_.buffer().size() - out_.offset() - headerSize()), out(out_), query_id(query_id_)
|
|
|
|
|
{
|
|
|
|
|
checkBufferSize();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void finish()
|
|
|
|
|
{
|
|
|
|
|
next();
|
|
|
|
|
|
|
|
|
|
writeIntBinary(query_id, out);
|
|
|
|
|
writeIntBinary(true, out);
|
|
|
|
|
writeIntBinary(static_cast<size_t>(0), out);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|