ClickHouse/dbms/include/DB/IO/ChunkedReadBuffer.h

82 lines
2.1 KiB
C
Raw Normal View History

2012-03-25 03:47:13 +00:00
#pragma once
2012-05-09 08:16:09 +00:00
#include <Poco/NumberFormatter.h>
2012-03-25 03:47:13 +00:00
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/ReadHelpers.h>
2012-05-08 11:19:00 +00:00
#include <iostream>
2012-03-25 03:47:13 +00:00
namespace DB
{
/** Считывает данные, из формата, состоящего из чанков
* (идентификатор запроса, признак последнего чанка, размер чанка, часть данных со сжатием или без).
*/
class ChunkedReadBuffer : public ReadBuffer
{
protected:
ReadBuffer & in;
bool all_read;
size_t read_in_chunk;
size_t chunk_size;
UInt64 assert_query_id;
bool nextImpl()
{
/// Если прочитали ещё не весь блок - получим следующие данные. Если следующих данных нет - ошибка.
if (read_in_chunk < chunk_size)
{
if (!in.next())
2012-05-08 11:19:00 +00:00
throw Exception("Cannot read all data from chunked input", ErrorCodes::CANNOT_READ_ALL_DATA_FROM_CHUNKED_INPUT);
2012-03-25 03:47:13 +00:00
working_buffer = in.buffer();
if (chunk_size - read_in_chunk < working_buffer.size())
{
working_buffer.resize(chunk_size - read_in_chunk);
read_in_chunk = chunk_size;
}
else
read_in_chunk += working_buffer.size();
2012-05-08 11:19:00 +00:00
in.position() += working_buffer.size();
2012-03-25 03:47:13 +00:00
}
else
{
if (all_read)
return false;
UInt64 query_id = 0;
readIntBinary(query_id, in);
2012-03-25 07:52:31 +00:00
2012-03-25 03:47:13 +00:00
if (query_id != assert_query_id)
2012-05-09 08:16:09 +00:00
throw Exception("Received data for wrong query id (expected "
+ Poco::NumberFormatter::format(assert_query_id) + ", got "
+ Poco::NumberFormatter::format(query_id) + ")", ErrorCodes::RECEIVED_DATA_FOR_WRONG_QUERY_ID);
2012-03-25 03:47:13 +00:00
/// Флаг конца.
readIntBinary(all_read, in);
/// Размер блока.
readIntBinary(chunk_size, in);
read_in_chunk = std::min(chunk_size, in.buffer().size() - in.offset());
working_buffer = Buffer(in.position(), in.position() + read_in_chunk);
in.position() += read_in_chunk;
2012-03-25 07:52:31 +00:00
if (all_read)
return false;
2012-03-25 03:47:13 +00:00
}
return true;
}
public:
ChunkedReadBuffer(ReadBuffer & in_, UInt64 assert_query_id_)
: ReadBuffer(NULL, 0), in(in_), all_read(false), read_in_chunk(0), chunk_size(0), assert_query_id(assert_query_id_) {}
};
}