clickhouse: fixed arrays in TinyLog [#CONV-2944].

This commit is contained in:
Michael Kolupaev 2013-03-07 17:40:24 +00:00
parent 6f79845d57
commit 961e3c9c87
2 changed files with 8 additions and 7 deletions

View File

@ -47,7 +47,7 @@ private:
FileStreams streams;
void addStream(const String & name, const IDataType & type, size_t level = 0);
void readData(const String & name, const IDataType & type, IColumn & column, size_t level = 0);
void readData(const String & name, const IDataType & type, IColumn & column, size_t limit, size_t level = 0);
};

View File

@ -57,7 +57,7 @@ Block TinyLogBlockInputStream::readImpl()
column.name = *it;
column.type = storage.getDataTypeByName(*it);
column.column = column.type->createColumn();
readData(*it, *column.type, *column.column);
readData(*it, *column.type, *column.column, block_size);
if (column.column->size())
res.insert(column);
@ -88,7 +88,7 @@ void TinyLogBlockInputStream::addStream(const String & name, const IDataType & t
}
void TinyLogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t level)
void TinyLogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t level, size_t limit)
{
/// Для массивов требуется сначала десериализовать размеры, а потом значения.
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
@ -96,19 +96,20 @@ void TinyLogBlockInputStream::readData(const String & name, const IDataType & ty
type_arr->deserializeOffsets(
column,
streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco::NumberFormatter::format(level)]->compressed,
block_size);
limit);
if (column.size())
{
IColumn & nested_column = dynamic_cast<ColumnArray &>(column).getData();
readData(name, *type_arr->getNestedType(), nested_column, level + 1);
size_t nested_limit = dynamic_cast<ColumnArray &>(column).getOffsets()[column.size() - 1];
readData(name, *type_arr->getNestedType(), nested_column, nested_limit, level + 1);
if (nested_column.size() != dynamic_cast<ColumnArray &>(column).getOffsets()[column.size() - 1])
if (nested_column.size() != nested_limit)
throw Exception("Cannot read array data for all offsets", ErrorCodes::CANNOT_READ_ALL_DATA);
}
}
else
type.deserializeBinary(column, streams[name]->compressed, block_size);
type.deserializeBinary(column, streams[name]->compressed, limit);
}