dbms: Server: fixes [#METR-19266]

This commit is contained in:
Alexey Arno 2016-10-20 13:13:07 +03:00
parent 6c40d9b51e
commit 3792a9f302
2 changed files with 25 additions and 9 deletions

View File

@ -78,6 +78,11 @@ protected:
size_t aio_threshold; size_t aio_threshold;
CompressionMethod compression_method; CompressionMethod compression_method;
private:
/// Internal version of writeData.
void writeDataImpl(const String & name, const IDataType & type, const IColumn & column,
OffsetColumns & offset_columns, size_t level, bool write_array_data);
}; };

View File

@ -99,7 +99,19 @@ void IMergedBlockOutputStream::addStream(const String & path, const String & nam
/// Записать данные одного столбца. /// Записать данные одного столбца.
void IMergedBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns, size_t level) void IMergedBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns, size_t level)
{ {
writeDataImpl(name, type, column, offset_columns, level, false);
}
void IMergedBlockOutputStream::writeDataImpl(const String & name, const IDataType & type,
const IColumn & column, OffsetColumns & offset_columns, size_t level, bool write_array_data)
{
/// NOTE: the parameter write_array_data indicates whether we call this method
/// to write the contents of an array. This is to cope with the fact that
/// serialization of arrays for the MergeTree engine slightly differs from
/// what the other engines do.
size_t size = column.size(); size_t size = column.size();
const DataTypeArray * type_arr;
if (type.isNullable()) if (type.isNullable())
{ {
@ -120,9 +132,7 @@ void IMergedBlockOutputStream::writeData(const String & name, const IDataType &
/// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк. /// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк.
if (prev_mark == 0 && index_offset != 0) if (prev_mark == 0 && index_offset != 0)
{
limit = index_offset; limit = index_offset;
}
else else
{ {
limit = storage.index_granularity; limit = storage.index_granularity;
@ -144,9 +154,9 @@ void IMergedBlockOutputStream::writeData(const String & name, const IDataType &
} }
/// Then write data. /// Then write data.
writeData(name, nested_type, nested_col, offset_columns, level); writeDataImpl(name, nested_type, nested_col, offset_columns, level, write_array_data);
} }
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type)) else if (!write_array_data && ((type_arr = typeid_cast<const DataTypeArray *>(&type)) != nullptr))
{ {
/// Для массивов требуется сначала сериализовать размеры, а потом значения. /// Для массивов требуется сначала сериализовать размеры, а потом значения.
String size_name = DataTypeNested::extractNestedTableName(name) String size_name = DataTypeNested::extractNestedTableName(name)
@ -165,9 +175,7 @@ void IMergedBlockOutputStream::writeData(const String & name, const IDataType &
/// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк. /// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк.
if (prev_mark == 0 && index_offset != 0) if (prev_mark == 0 && index_offset != 0)
{
limit = index_offset; limit = index_offset;
}
else else
{ {
limit = storage.index_granularity; limit = storage.index_granularity;
@ -189,7 +197,12 @@ void IMergedBlockOutputStream::writeData(const String & name, const IDataType &
} }
} }
writeData(name, *type_arr->getNestedType(), typeid_cast<const ColumnArray &>(column).getData(), offset_columns, level + 1); if (type_arr->getNestedType()->isNullable())
writeDataImpl(name, *type_arr->getNestedType(),
typeid_cast<const ColumnArray &>(column).getData(), offset_columns,
level + 1, true);
else
writeDataImpl(name, type, column, offset_columns, level + 1, true);
} }
else else
{ {
@ -202,9 +215,7 @@ void IMergedBlockOutputStream::writeData(const String & name, const IDataType &
/// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк. /// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк.
if (prev_mark == 0 && index_offset != 0) if (prev_mark == 0 && index_offset != 0)
{
limit = index_offset; limit = index_offset;
}
else else
{ {
limit = storage.index_granularity; limit = storage.index_granularity;