Fix array writing (#1314)

* changed MergedBlockOutputStream [#CLICKHOUSE-3341]

* fix build [#CLICKHOUSE-3341]

* fix build [#CLICKHOUSE-3341]

* fix build [#CLICKHOUSE-3341]

* fix build [#CLICKHOUSE-3341]

* fix build [#CLICKHOUSE-3341]

* fix build [#CLICKHOUSE-3341]

* fix build [#CLICKHOUSE-3341]

* fix build [#CLICKHOUSE-3341]

* fix build [#CLICKHOUSE-3341]

* disabled checkNoMultidimensionalArrays [#CLICKHOUSE-3341]

* fix IMergedBlockOutputStream::writeDataImpl [#CLICKHOUSE-3341]

* fix IMergedBlockOutputStream::writeDataImpl [#CLICKHOUSE-3341]

* fix IMergedBlockOutputStream::writeDataImpl [#CLICKHOUSE-3341]

* fix IMergedBlockOutputStream::writeDataImpl [#CLICKHOUSE-3341]

* added test [#CLICKHOUSE-3341]

* fixed test [#CLICKHOUSE-3341]

* refactoring and comments [#CLICKHOUSE-3341]

* fix build [#CLICKHOUSE-3341]

* Update ColumnArray.h
This commit is contained in:
KochetovNicolai 2017-10-12 23:13:26 +03:00 committed by alexey-milovidov
parent 3de5e465d0
commit 83925f1d5e
7 changed files with 221 additions and 111 deletions

View File

@ -910,6 +910,23 @@ ColumnPtr ColumnArray::replicateTuple(const Offsets_t & replicate_offsets) const
}
ColumnPtr ColumnArray::getLengthsColumn() const
{
const auto & offsets_data = getOffsets();
size_t size = offsets_data.size();
auto column = std::make_shared<ColumnVector<ColumnArray::Offset_t>>(offsets->size());
auto & data = column->getData();
if (size)
data[0] = offsets_data[0];
for (size_t i = 1; i < size; ++i)
data[i] = offsets_data[i] - offsets_data[i - 1];
return column;
}
void ColumnArray::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);

View File

@ -80,6 +80,9 @@ public:
return scatterImpl<ColumnArray>(num_columns, selector);
}
/// Creates and returns a column with array sizes.
ColumnPtr getLengthsColumn() const;
void gather(ColumnGathererStream & gatherer_stream) override;
private:

View File

@ -108,9 +108,6 @@ MergeTreeData::MergeTreeData(
parts_clean_callback(parts_clean_callback_ ? parts_clean_callback_ : [this](){ clearOldParts(); }),
log_name(log_name_), log(&Logger::get(log_name + " (Data)"))
{
checkNoMultidimensionalArrays(*columns, attach);
checkNoMultidimensionalArrays(materialized_columns, attach);
merging_params.check(*columns);
if (!primary_expr_ast && merging_params.mode != MergingParams::Unsorted)

View File

@ -114,23 +114,23 @@ void IMergedBlockOutputStream::addStream(
void IMergedBlockOutputStream::writeData(
const String & name,
const IDataType & type,
const IColumn & column,
const DataTypePtr & type,
const ColumnPtr & column,
OffsetColumns & offset_columns,
size_t level,
bool skip_offsets)
{
writeDataImpl(name, type, column, offset_columns, level, false, skip_offsets);
writeDataImpl(name, type, column, nullptr, offset_columns, level, skip_offsets);
}
void IMergedBlockOutputStream::writeDataImpl(
const String & name,
const IDataType & type,
const IColumn & column,
const DataTypePtr & type,
const ColumnPtr & column,
const ColumnPtr & offsets,
OffsetColumns & offset_columns,
size_t level,
bool write_array_data,
bool skip_offsets)
{
/// NOTE: the parameter write_array_data indicates whether we call this method
@ -138,131 +138,124 @@ void IMergedBlockOutputStream::writeDataImpl(
/// serialization of arrays for the MergeTree engine slightly differs from
/// what the other engines do.
size_t size = column.size();
const DataTypeArray * type_arr = nullptr;
if (type.isNullable())
if (type->isNullable())
{
/// First write to the null map.
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *(nullable_type.getNestedType());
const auto & nullable_type = static_cast<const DataTypeNullable &>(*type);
const auto & nested_type = nullable_type.getNestedType();
const ColumnNullable & nullable_col = static_cast<const ColumnNullable &>(column);
const IColumn & nested_col = *(nullable_col.getNestedColumn());
const auto & nullable_col = static_cast<const ColumnNullable &>(*column);
const auto & nested_col = nullable_col.getNestedColumn();
std::string filename = name + NULL_MAP_EXTENSION;
ColumnStream & stream = *column_streams[filename];
auto null_map_type = std::make_shared<DataTypeUInt8>();
size_t prev_mark = 0;
while (prev_mark < size)
{
size_t limit = 0;
/// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
if (prev_mark == 0 && index_offset != 0)
limit = index_offset;
else
{
limit = storage.index_granularity;
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= min_compress_block_size)
stream.compressed.next();
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
DataTypeUInt8{}.serializeBinaryBulk(nullable_col.getNullMapConcreteColumn(), stream.compressed, prev_mark, limit);
/// This way that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
stream.compressed.nextIfAtEnd();
prev_mark += limit;
}
writeColumn(nullable_col.getNullMapColumn(), null_map_type, stream, offsets);
/// Then write data.
writeDataImpl(name, nested_type, nested_col, offset_columns, level, write_array_data, false);
writeDataImpl(name, nested_type, nested_col, offsets, offset_columns, level, skip_offsets);
}
else if (!write_array_data && ((type_arr = typeid_cast<const DataTypeArray *>(&type)) != nullptr))
else if (auto type_arr = typeid_cast<const DataTypeArray *>(type.get()))
{
/// For arrays, you first need to serialize dimensions, and then values.
String size_name = DataTypeNested::extractNestedTableName(name)
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
const auto & column_array = typeid_cast<const ColumnArray &>(*column);
ColumnPtr next_level_offsets;
ColumnPtr lengths_column;
auto offsets_data_type = std::make_shared<DataTypeNumber<ColumnArray::Offset_t>>();
if (offsets)
{
/// Have offsets from prev level. Calculate offsets for next level.
next_level_offsets = offsets->clone();
const auto & array_offsets = column_array.getOffsets();
auto & next_level_offsets_column = typeid_cast<ColumnArray::ColumnOffsets_t &>(*next_level_offsets);
auto & next_level_offsets_data = next_level_offsets_column.getData();
for (auto & offset : next_level_offsets_data)
offset = offset ? array_offsets[offset - 1] : 0;
/// Calculate lengths of arrays and write them as a new array.
lengths_column = column_array.getLengthsColumn();
}
if (!skip_offsets && offset_columns.count(size_name) == 0)
{
offset_columns.insert(size_name);
ColumnStream & stream = *column_streams[size_name];
size_t prev_mark = 0;
while (prev_mark < size)
{
size_t limit = 0;
/// If there is `index_offset`, the first mark goes not immediately, but after this number of rows.
if (prev_mark == 0 && index_offset != 0)
limit = index_offset;
else
{
limit = storage.index_granularity;
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= min_compress_block_size)
stream.compressed.next();
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
type_arr->serializeOffsets(column, stream.compressed, prev_mark, limit);
/// This way that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
stream.compressed.nextIfAtEnd();
prev_mark += limit;
}
if (offsets)
writeColumn(lengths_column, offsets_data_type, stream, offsets);
else
writeColumn(column, type, stream, nullptr);
}
if (type_arr->getNestedType()->isNullable())
writeDataImpl(name, *type_arr->getNestedType(),
typeid_cast<const ColumnArray &>(column).getData(), offset_columns,
level + 1, true, false);
else
writeDataImpl(name, type, column, offset_columns, level + 1, true, false);
writeDataImpl(name, type_arr->getNestedType(), column_array.getDataPtr(),
offsets ? next_level_offsets : column_array.getOffsetsColumn(),
offset_columns, level + 1, skip_offsets);
}
else
{
ColumnStream & stream = *column_streams[name];
writeColumn(column, type, stream, offsets);
}
}
size_t prev_mark = 0;
while (prev_mark < size)
void IMergedBlockOutputStream::writeColumn(
const ColumnPtr & column,
const DataTypePtr & type,
IMergedBlockOutputStream::ColumnStream & stream,
ColumnPtr offsets)
{
std::shared_ptr<DataTypeArray> array_type_holder;
DataTypeArray * array_type;
ColumnPtr array_column;
if (offsets)
{
array_type_holder = std::make_shared<DataTypeArray>(type);
array_type = array_type_holder.get();
array_column = std::make_shared<ColumnArray>(column, offsets);
}
else
array_type = typeid_cast<DataTypeArray *>(type.get());
size_t size = offsets ? offsets->size() : column->size();
size_t prev_mark = 0;
while (prev_mark < size)
{
size_t limit = 0;
/// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
if (prev_mark == 0 && index_offset != 0)
limit = index_offset;
else
{
size_t limit = 0;
limit = storage.index_granularity;
/// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
if (prev_mark == 0 && index_offset != 0)
limit = index_offset;
else
{
limit = storage.index_granularity;
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= min_compress_block_size)
stream.compressed.next();
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= min_compress_block_size)
stream.compressed.next();
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
type.serializeBinaryBulk(column, stream.compressed, prev_mark, limit);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
stream.compressed.nextIfAtEnd();
prev_mark += limit;
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
if (offsets)
array_type->serializeBinaryBulk(*array_column, stream.compressed, prev_mark, limit);
else if (array_type)
array_type->serializeOffsets(*column, stream.compressed, prev_mark, limit);
else
type->serializeBinaryBulk(*column, stream.compressed, prev_mark, limit);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
stream.compressed.nextIfAtEnd();
prev_mark += limit;
}
}
@ -514,18 +507,18 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
auto primary_column_it = primary_columns_name_to_position.find(it.name);
if (primary_columns_name_to_position.end() != primary_column_it)
{
writeData(column.name, *column.type, *primary_columns[primary_column_it->second].column, offset_columns, 0, false);
writeData(column.name, column.type, primary_columns[primary_column_it->second].column, offset_columns, 0, false);
}
else
{
/// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM.
ColumnPtr permutted_column = column.column->permute(*permutation, 0);
writeData(column.name, *column.type, *permutted_column, offset_columns, 0, false);
writeData(column.name, column.type, permutted_column, offset_columns, 0, false);
}
}
else
{
writeData(column.name, *column.type, *column.column, offset_columns, 0, false);
writeData(column.name, column.type, column.column, offset_columns, 0, false);
}
}
@ -591,7 +584,7 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
writeData(column.name, *column.type, *column.column, offset_columns, 0, skip_offsets);
writeData(column.name, column.type, column.column, offset_columns, 0, skip_offsets);
}
size_t written_for_last_mark = (storage.index_granularity - index_offset + rows) % storage.index_granularity;

View File

@ -66,8 +66,8 @@ protected:
size_t level, const String & filename, bool skip_offsets);
/// Write data of one column.
void writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns,
size_t level, bool skip_offsets);
void writeData(const String & name, const DataTypePtr & type, const ColumnPtr & column,
OffsetColumns & offset_columns, size_t level, bool skip_offsets);
MergeTreeData & storage;
@ -85,8 +85,11 @@ protected:
private:
/// Internal version of writeData.
void writeDataImpl(const String & name, const IDataType & type, const IColumn & column,
OffsetColumns & offset_columns, size_t level, bool write_array_data, bool skip_offsets);
void writeDataImpl(const String & name, const DataTypePtr & type, const ColumnPtr & column,
const ColumnPtr & offsets, OffsetColumns & offset_columns, size_t level, bool skip_offsets);
/// Writes column data into stream.
/// If type is Array, writes offsets only. To write array data, unpack array column and use offsets argument.
void writeColumn(const ColumnPtr & column, const DataTypePtr & type, ColumnStream & stream, ColumnPtr offsets);
};

View File

@ -0,0 +1,60 @@
2017-10-02 [0,42]
2017-10-02 [1,42]
2017-10-02 [2,42]
2017-10-02 [3,42]
2017-10-02 [4,42]
2017-10-02 [5,42]
2017-10-02 [6,42]
2017-10-02 [7,42]
2017-10-02 [8,42]
2017-10-02 [9,42]
2017-10-02 \N
2017-10-02 1
2017-10-02 \N
2017-10-02 3
2017-10-02 \N
2017-10-02 5
2017-10-02 \N
2017-10-02 7
2017-10-02 \N
2017-10-02 9
2017-10-02 [NULL,0,NULL]
2017-10-02 [1,1,NULL]
2017-10-02 [NULL,2,NULL]
2017-10-02 [3,3,NULL]
2017-10-02 [NULL,4,NULL]
2017-10-02 [5,5,NULL]
2017-10-02 [NULL,6,NULL]
2017-10-02 [7,7,NULL]
2017-10-02 [NULL,8,NULL]
2017-10-02 [9,9,NULL]
2017-10-02 [[0],[1,2]]
2017-10-02 [[1],[2,3]]
2017-10-02 [[2],[3,4]]
2017-10-02 [[3],[4,5]]
2017-10-02 [[4],[5,6]]
2017-10-02 [[5],[6,7]]
2017-10-02 [[6],[7,8]]
2017-10-02 [[7],[8,9]]
2017-10-02 [[8],[9,10]]
2017-10-02 [[9],[10,11]]
2017-10-02 [[1,NULL,0],[3,NULL,0]]
2017-10-02 [[1,NULL,1],[3,NULL,1]]
2017-10-02 [[1,NULL,2],[3,NULL,2]]
2017-10-02 [[1,NULL,3],[3,NULL,3]]
2017-10-02 [[1,NULL,4],[3,NULL,4]]
2017-10-02 [[1,NULL,5],[3,NULL,5]]
2017-10-02 [[1,NULL,6],[3,NULL,6]]
2017-10-02 [[1,NULL,7],[3,NULL,7]]
2017-10-02 [[1,NULL,8],[3,NULL,8]]
2017-10-02 [[1,NULL,9],[3,NULL,9]]
2017-10-02 [[[0]],[[1],[2,3]]]
2017-10-02 [[[1]],[[2],[3,4]]]
2017-10-02 [[[2]],[[3],[4,5]]]
2017-10-02 [[[3]],[[4],[5,6]]]
2017-10-02 [[[4]],[[5],[6,7]]]
2017-10-02 [[[5]],[[6],[7,8]]]
2017-10-02 [[[6]],[[7],[8,9]]]
2017-10-02 [[[7]],[[8],[9,10]]]
2017-10-02 [[[8]],[[9],[10,11]]]
2017-10-02 [[[9]],[[10],[11,12]]]

View File

@ -0,0 +1,37 @@
create database if not exists test;
drop table if exists test.test_ins_arr;
create table test.test_ins_arr (date Date, val Array(UInt64)) engine = MergeTree(date, (date), 8192);
insert into test.test_ins_arr select toDate('2017-10-02'), [number, 42] from system.numbers limit 10000;
select * from test.test_ins_arr limit 10;
drop table test.test_ins_arr;
drop table if exists test.test_ins_null;
create table test.test_ins_null (date Date, val Nullable(UInt64)) engine = MergeTree(date, (date), 8192);
insert into test.test_ins_null select toDate('2017-10-02'), if(number % 2, number, Null) from system.numbers limit 10000;
select * from test.test_ins_null limit 10;
drop table test.test_ins_null;
drop table if exists test.test_ins_arr_null;
create table test.test_ins_arr_null (date Date, val Array(Nullable(UInt64))) engine = MergeTree(date, (date), 8192);
insert into test.test_ins_arr_null select toDate('2017-10-02'), [if(number % 2, number, Null), number, Null] from system.numbers limit 10000;
select * from test.test_ins_arr_null limit 10;
drop table test.test_ins_arr_null;
drop table if exists test.test_ins_arr_arr;
create table test.test_ins_arr_arr (date Date, val Array(Array(UInt64))) engine = MergeTree(date, (date), 8192);
insert into test.test_ins_arr_arr select toDate('2017-10-02'), [[number],[number + 1, number + 2]] from system.numbers limit 10000;
select * from test.test_ins_arr_arr limit 10;
drop table test.test_ins_arr_arr;
drop table if exists test.test_ins_arr_arr_null;
create table test.test_ins_arr_arr_null (date Date, val Array(Array(Nullable(UInt64)))) engine = MergeTree(date, (date), 8192);
insert into test.test_ins_arr_arr_null select toDate('2017-10-02'), [[1, Null, number], [3, Null, number]] from system.numbers limit 10000;
select * from test.test_ins_arr_arr_null limit 10;
drop table test.test_ins_arr_arr_null;
drop table if exists test.test_ins_arr_arr_arr;
create table test.test_ins_arr_arr_arr (date Date, val Array(Array(Array(UInt64)))) engine = MergeTree(date, (date), 8192);
insert into test.test_ins_arr_arr_arr select toDate('2017-10-02'), [[[number]],[[number + 1], [number + 2, number + 3]]] from system.numbers limit 10000;
select * from test.test_ins_arr_arr_arr limit 10;
drop table test.test_ins_arr_arr_arr;