2017-04-01 09:19:00 +00:00
|
|
|
#include <Core/Defines.h>
|
2012-08-26 11:14:52 +00:00
|
|
|
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <IO/ReadHelpers.h>
|
|
|
|
#include <IO/VarInt.h>
|
|
|
|
#include <IO/CompressedReadBufferFromFile.h>
|
2011-08-19 19:18:15 +00:00
|
|
|
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Columns/ColumnArray.h>
|
|
|
|
#include <Columns/ColumnNullable.h>
|
|
|
|
#include <Columns/ColumnsNumber.h>
|
|
|
|
#include <DataTypes/DataTypeArray.h>
|
|
|
|
#include <DataTypes/DataTypeNullable.h>
|
|
|
|
#include <DataTypes/DataTypesNumber.h>
|
|
|
|
#include <DataTypes/DataTypeFactory.h>
|
2017-07-13 20:58:19 +00:00
|
|
|
#include <Common/typeid_cast.h>
|
2017-07-12 18:41:08 +00:00
|
|
|
#include <ext/range.h>
|
2012-08-26 11:14:52 +00:00
|
|
|
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <DataStreams/NativeBlockInputStream.h>
|
2011-08-19 19:18:15 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int INCORRECT_INDEX;
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
extern const int CANNOT_READ_ALL_DATA;
|
2016-01-11 21:46:36 +00:00
|
|
|
}
|
|
|
|
|
2015-08-16 07:01:41 +00:00
|
|
|
NativeBlockInputStream::NativeBlockInputStream(
|
2017-04-01 07:20:54 +00:00
|
|
|
ReadBuffer & istr_, UInt64 server_revision_,
|
|
|
|
bool use_index_,
|
|
|
|
IndexForNativeFormat::Blocks::const_iterator index_block_it_,
|
|
|
|
IndexForNativeFormat::Blocks::const_iterator index_block_end_)
|
|
|
|
: istr(istr_), server_revision(server_revision_),
|
|
|
|
use_index(use_index_), index_block_it(index_block_it_), index_block_end(index_block_end_)
|
2015-08-16 07:01:41 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
if (use_index)
|
|
|
|
{
|
|
|
|
istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr);
|
|
|
|
if (!istr_concrete)
|
|
|
|
throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
index_column_it = index_block_it->columns.begin();
|
|
|
|
}
|
2015-08-16 07:01:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-07-12 18:41:08 +00:00
|
|
|
void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)
|
2012-08-26 11:14:52 +00:00
|
|
|
{
|
2017-08-07 07:31:16 +00:00
|
|
|
IDataType::InputStreamGetter input_stream_getter = [&] (const IDataType::SubstreamPath & path) { return &istr; };
|
|
|
|
type.deserializeBinaryBulkWithMultipleStreams(column, input_stream_getter, rows, avg_value_size_hint, false, {});
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
if (column.size() != rows)
|
|
|
|
throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);
|
2012-08-26 11:14:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2011-09-04 21:23:19 +00:00
|
|
|
Block NativeBlockInputStream::readImpl()
|
2011-08-19 19:18:15 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
Block res;
|
|
|
|
|
|
|
|
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
|
|
|
|
|
|
|
|
if (use_index && index_block_it == index_block_end)
|
|
|
|
return res;
|
|
|
|
|
|
|
|
if (istr.eof())
|
|
|
|
{
|
|
|
|
if (use_index)
|
|
|
|
throw Exception("Input doesn't contain all data for index.", ErrorCodes::CANNOT_READ_ALL_DATA);
|
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Additional information about the block.
|
2017-08-16 20:27:35 +00:00
|
|
|
if (server_revision > 0)
|
2017-04-01 07:20:54 +00:00
|
|
|
res.info.read(istr);
|
|
|
|
|
|
|
|
/// Dimensions
|
|
|
|
size_t columns = 0;
|
|
|
|
size_t rows = 0;
|
|
|
|
|
|
|
|
if (!use_index)
|
|
|
|
{
|
|
|
|
readVarUInt(columns, istr);
|
|
|
|
readVarUInt(rows, istr);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
columns = index_block_it->num_columns;
|
|
|
|
rows = index_block_it->num_rows;
|
|
|
|
}
|
|
|
|
|
|
|
|
for (size_t i = 0; i < columns; ++i)
|
|
|
|
{
|
|
|
|
if (use_index)
|
|
|
|
{
|
|
|
|
/// If the current position is what is required, the real seek does not occur.
|
|
|
|
istr_concrete->seek(index_column_it->location.offset_in_compressed_file, index_column_it->location.offset_in_decompressed_block);
|
|
|
|
}
|
|
|
|
|
|
|
|
ColumnWithTypeAndName column;
|
|
|
|
|
|
|
|
/// Name
|
|
|
|
readBinary(column.name, istr);
|
|
|
|
|
|
|
|
/// Type
|
|
|
|
String type_name;
|
|
|
|
readBinary(type_name, istr);
|
|
|
|
column.type = data_type_factory.get(type_name);
|
|
|
|
|
|
|
|
if (use_index)
|
|
|
|
{
|
|
|
|
/// Index allows to do more checks.
|
|
|
|
if (index_column_it->name != column.name)
|
|
|
|
throw Exception("Index points to column with wrong name: corrupted index or data", ErrorCodes::INCORRECT_INDEX);
|
|
|
|
if (index_column_it->type != type_name)
|
|
|
|
throw Exception("Index points to column with wrong type: corrupted index or data", ErrorCodes::INCORRECT_INDEX);
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Data
|
|
|
|
column.column = column.type->createColumn();
|
|
|
|
|
2017-07-12 18:41:08 +00:00
|
|
|
double avg_value_size_hint = avg_value_size_hints.empty() ? 0 : avg_value_size_hints[i];
|
2017-04-01 07:20:54 +00:00
|
|
|
if (rows) /// If no rows, nothing to read.
|
2017-07-12 18:41:08 +00:00
|
|
|
readData(*column.type, *column.column, istr, rows, avg_value_size_hint);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
res.insert(std::move(column));
|
|
|
|
|
|
|
|
if (use_index)
|
|
|
|
++index_column_it;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (use_index)
|
|
|
|
{
|
|
|
|
if (index_column_it != index_block_it->columns.end())
|
|
|
|
throw Exception("Inconsistent index: not all columns were read", ErrorCodes::INCORRECT_INDEX);
|
|
|
|
|
|
|
|
++index_block_it;
|
|
|
|
if (index_block_it != index_block_end)
|
|
|
|
index_column_it = index_block_it->columns.begin();
|
|
|
|
}
|
|
|
|
|
|
|
|
return res;
|
2011-08-19 19:18:15 +00:00
|
|
|
}
|
|
|
|
|
2017-07-12 18:41:08 +00:00
|
|
|
void NativeBlockInputStream::updateAvgValueSizeHints(const Block & block)
|
|
|
|
{
|
|
|
|
auto rows = block.rows();
|
|
|
|
if (rows < 10)
|
|
|
|
return;
|
|
|
|
|
2017-07-13 16:49:09 +00:00
|
|
|
avg_value_size_hints.resize_fill(block.columns(), 0);
|
2017-07-12 18:41:08 +00:00
|
|
|
|
|
|
|
for (auto idx : ext::range(0, block.columns()))
|
|
|
|
{
|
|
|
|
auto & avg_value_size_hint = avg_value_size_hints[idx];
|
2017-07-13 18:54:17 +00:00
|
|
|
IDataType::updateAvgValueSizeHint(*block.getByPosition(idx).column, avg_value_size_hint);
|
2017-07-12 18:41:08 +00:00
|
|
|
}
|
|
|
|
}
|
2015-08-16 07:01:41 +00:00
|
|
|
|
|
|
|
void IndexForNativeFormat::read(ReadBuffer & istr, const NameSet & required_columns)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
while (!istr.eof())
|
|
|
|
{
|
|
|
|
blocks.emplace_back();
|
|
|
|
IndexOfBlockForNativeFormat & block = blocks.back();
|
2015-08-16 07:01:41 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
readVarUInt(block.num_columns, istr);
|
|
|
|
readVarUInt(block.num_rows, istr);
|
2015-08-16 07:01:41 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (block.num_columns < required_columns.size())
|
|
|
|
throw Exception("Index contain less than required columns", ErrorCodes::INCORRECT_INDEX);
|
2015-08-16 07:01:41 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
for (size_t i = 0; i < block.num_columns; ++i)
|
|
|
|
{
|
|
|
|
IndexOfOneColumnForNativeFormat column_index;
|
2015-08-16 07:01:41 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
readBinary(column_index.name, istr);
|
|
|
|
readBinary(column_index.type, istr);
|
|
|
|
readBinary(column_index.location.offset_in_compressed_file, istr);
|
|
|
|
readBinary(column_index.location.offset_in_decompressed_block, istr);
|
2015-08-16 07:01:41 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (required_columns.count(column_index.name))
|
|
|
|
block.columns.push_back(std::move(column_index));
|
|
|
|
}
|
2015-08-16 07:01:41 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (block.columns.size() < required_columns.size())
|
|
|
|
throw Exception("Index contain less than required columns", ErrorCodes::INCORRECT_INDEX);
|
|
|
|
if (block.columns.size() > required_columns.size())
|
|
|
|
throw Exception("Index contain duplicate columns", ErrorCodes::INCORRECT_INDEX);
|
2015-08-16 07:01:41 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
block.num_columns = block.columns.size();
|
|
|
|
}
|
2015-08-16 07:01:41 +00:00
|
|
|
}
|
|
|
|
|
2011-08-19 19:18:15 +00:00
|
|
|
}
|