diff --git a/dbms/src/DataTypes/DataTypeArray.cpp b/dbms/src/DataTypes/DataTypeArray.cpp index bd56b692dcc..88ebe65202a 100644 --- a/dbms/src/DataTypes/DataTypeArray.cpp +++ b/dbms/src/DataTypes/DataTypeArray.cpp @@ -234,7 +234,8 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams( /// Check consistency between offsets and elements subcolumns. /// But if elements column is empty - it's ok for columns of Nested types that was added by ALTER. if (!nested_column.empty() && nested_column.size() != last_offset) - throw Exception("Cannot read all array values", ErrorCodes::CANNOT_READ_ALL_DATA); + throw Exception("Cannot read all array values: read just " + toString(nested_column.size()) + " of " + toString(last_offset), + ErrorCodes::CANNOT_READ_ALL_DATA); } diff --git a/dbms/src/DataTypes/IDataType.cpp b/dbms/src/DataTypes/IDataType.cpp index 9fc2cc7d88e..68fe74d18f0 100644 --- a/dbms/src/DataTypes/IDataType.cpp +++ b/dbms/src/DataTypes/IDataType.cpp @@ -85,7 +85,13 @@ String IDataType::getFileNameForStream(const String & column_name, const IDataTy else if (elem.type == Substream::ArrayElements) ++array_level; else if (elem.type == Substream::TupleElement) - stream_name += "." + escapeForFileName(elem.tuple_element_name); + { + /// For compatibility reasons, we use %2E instead of dot. + /// Because nested data may be represented not by Array of Tuple, + /// but by separate Array columns with names in a form of a.b, + /// and name is encoded as a whole. + stream_name += "%2E" + escapeForFileName(elem.tuple_element_name); + } } return stream_name; } diff --git a/dbms/src/DataTypes/NestedUtils.cpp b/dbms/src/DataTypes/NestedUtils.cpp index 7481cfdebe6..6791772975e 100644 --- a/dbms/src/DataTypes/NestedUtils.cpp +++ b/dbms/src/DataTypes/NestedUtils.cpp @@ -6,7 +6,10 @@ #include #include #include -#include + +#include +#include +#include #include @@ -98,6 +101,57 @@ NamesAndTypesList flatten(const NamesAndTypesList & names_and_types) return res; } + +Block flatten(const Block & block) +{ + Block res; + + for (const auto & elem : block) + { + if (const DataTypeArray * type_arr = typeid_cast(elem.type.get())) + { + if (const DataTypeTuple * type_tuple = typeid_cast(type_arr->getNestedType().get())) + { + const DataTypes & element_types = type_tuple->getElements(); + const Strings & names = type_tuple->getElementNames(); + size_t tuple_size = element_types.size(); + + bool is_const = elem.column->isColumnConst(); + const ColumnArray * column_array; + if (is_const) + column_array = typeid_cast(&static_cast(*elem.column).getDataColumn()); + else + column_array = typeid_cast(elem.column.get()); + + const ColumnPtr & column_offsets = column_array->getOffsetsPtr(); + + const ColumnTuple & column_tuple = typeid_cast(column_array->getData()); + const Columns & element_columns = column_tuple.getColumns(); + + for (size_t i = 0; i < tuple_size; ++i) + { + String nested_name = concatenateName(elem.name, names[i]); + ColumnPtr column_array_of_element = ColumnArray::create(element_columns[i], column_offsets); + + res.insert(ColumnWithTypeAndName( + is_const + ? ColumnConst::create(std::move(column_array_of_element), block.rows()) + : std::move(column_array_of_element), + std::make_shared(element_types[i]), + nested_name)); + } + } + else + res.insert(elem); + } + else + res.insert(elem); + } + + return res; +} + + NamesAndTypesList collect(const NamesAndTypesList & names_and_types) { NamesAndTypesList res; diff --git a/dbms/src/DataTypes/NestedUtils.h b/dbms/src/DataTypes/NestedUtils.h index 28eef61d345..3bbc8845be2 100644 --- a/dbms/src/DataTypes/NestedUtils.h +++ b/dbms/src/DataTypes/NestedUtils.h @@ -1,5 +1,6 @@ #pragma once +#include #include @@ -17,6 +18,7 @@ namespace Nested /// Replace Array(Tuple(...)) columns to a multiple of Array columns in a form of `column_name.element_name`. NamesAndTypesList flatten(const NamesAndTypesList & names_and_types); + Block flatten(const Block & block); /// Collect Array columns in a form of `column_name.element_name` to single Array(Tuple(...)) column. NamesAndTypesList collect(const NamesAndTypesList & names_and_types); diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index b6c049b4dc8..4eb08c039f8 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -11,10 +11,7 @@ #include #include -#include -#include #include -#include #include #include @@ -105,7 +102,7 @@ private: using FileStreams = std::map; FileStreams streams; - void readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read, bool read_offsets = true); + void readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read); }; @@ -192,34 +189,13 @@ Block LogBlockInputStream::readImpl() /// How many rows to read for the next block. size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read); - /// Pointers to offset columns, shared for columns from nested data structures - using OffsetColumns = std::map; - OffsetColumns offset_columns; - for (const auto & name_type : columns) { - MutableColumnPtr column; - - bool read_offsets = true; - - /// For nested structures, remember pointers to columns with offsets - if (const DataTypeArray * type_arr = typeid_cast(name_type.type.get())) - { - String nested_name = Nested::extractTableName(name_type.name); - - if (offset_columns.count(nested_name) == 0) - offset_columns[nested_name] = ColumnArray::ColumnOffsets::create(); - else - read_offsets = false; /// on previous iterations the offsets were already read by `readData` - - column = ColumnArray::create(type_arr->getNestedType()->createColumn(), offset_columns[nested_name]); - } - else - column = name_type.type->createColumn(); + MutableColumnPtr column = name_type.type->createColumn(); try { - readData(name_type.name, *name_type.type, *column, max_rows_to_read, read_offsets); + readData(name_type.name, *name_type.type, *column, max_rows_to_read); } catch (Exception & e) { @@ -243,17 +219,14 @@ Block LogBlockInputStream::readImpl() streams.clear(); } - return res; + return Nested::flatten(res); } -void LogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read, bool with_offsets) +void LogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read) { IDataType::InputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer * { - if (!with_offsets && !path.empty() && path.back().type == IDataType::Substream::ArraySizes) - return nullptr; - String stream_name = IDataType::getFileNameForStream(name, path); const auto & file_it = storage.files.find(stream_name); @@ -529,7 +502,7 @@ BlockInputStreams StorageLog::read( processed_stage = QueryProcessingStage::FetchColumns; loadMarks(); - NamesAndTypesList columns = getColumnsList().addTypes(column_names); + NamesAndTypesList columns = Nested::collect(getColumnsList().addTypes(column_names)); std::shared_lock lock(rwlock); diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index 17e40ce3560..986cbcc97a7 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -17,9 +17,7 @@ #include #include -#include #include -#include #include #include @@ -87,7 +85,7 @@ private: using FileStreams = std::map>; FileStreams streams; - void readData(const String & name, const IDataType & type, IColumn & column, size_t limit, bool read_offsets = true); + void readData(const String & name, const IDataType & type, IColumn & column, size_t limit); }; @@ -179,34 +177,13 @@ Block TinyLogBlockInputStream::readImpl() return res; } - /// Pointers to offset columns, shared for columns from nested data structures - using OffsetColumns = std::map; - OffsetColumns offset_columns; - for (const auto & name_type : columns) { - MutableColumnPtr column; - - bool read_offsets = true; - - /// For nested structures, remember pointers to columns with offsets - if (const DataTypeArray * type_arr = typeid_cast(name_type.type.get())) - { - String nested_name = Nested::extractTableName(name_type.name); - - if (offset_columns.count(nested_name) == 0) - offset_columns[nested_name] = ColumnArray::ColumnOffsets::create(); - else - read_offsets = false; /// on previous iterations, the offsets were already calculated by `readData` - - column = ColumnArray::create(type_arr->getNestedType()->createColumn(), offset_columns[nested_name]); - } - else - column = name_type.type->createColumn(); + MutableColumnPtr column = name_type.type->createColumn(); try { - readData(name_type.name, *name_type.type, *column, block_size, read_offsets); + readData(name_type.name, *name_type.type, *column, block_size); } catch (Exception & e) { @@ -224,17 +201,14 @@ Block TinyLogBlockInputStream::readImpl() streams.clear(); } - return res; + return Nested::flatten(res); } -void TinyLogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t limit, bool with_offsets) +void TinyLogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t limit) { IDataType::InputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer * { - if (!with_offsets && !path.empty() && path.back().type == IDataType::Substream::ArraySizes) - return nullptr; - String stream_name = IDataType::getFileNameForStream(name, path); if (!streams.count(stream_name)) @@ -253,10 +227,11 @@ void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & { String stream_name = IDataType::getFileNameForStream(name, path); + if (!written_streams.insert(stream_name).second) + return nullptr; + if (!streams.count(stream_name)) streams[stream_name] = std::make_unique(storage.files[stream_name].data_file.path(), storage.max_compress_block_size); - else if (!written_streams.insert(stream_name).second) - return nullptr; return &streams[stream_name]->compressed; }; @@ -378,7 +353,7 @@ BlockInputStreams StorageTinyLog::read( check(column_names); processed_stage = QueryProcessingStage::FetchColumns; return BlockInputStreams(1, std::make_shared( - max_block_size, getColumnsList().addTypes(column_names), *this, context.getSettingsRef().max_read_buffer_size)); + max_block_size, Nested::collect(getColumnsList().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size)); } diff --git a/dbms/tests/queries/0_stateless/00554_nested_and_table_engines.reference b/dbms/tests/queries/0_stateless/00554_nested_and_table_engines.reference new file mode 100644 index 00000000000..d3d37ce8c1f --- /dev/null +++ b/dbms/tests/queries/0_stateless/00554_nested_and_table_engines.reference @@ -0,0 +1,30 @@ +1 [2,3] ['Hello','World'] +4 [5] ['Goodbye'] +1 [2,3] +4 [5] +[2,3] ['Hello','World'] +[5] ['Goodbye'] +1 [2,3] ['Hello','World'] +4 [5] ['Goodbye'] +1 [2,3] +4 [5] +[2,3] ['Hello','World'] +[5] ['Goodbye'] +1 [2,3] ['Hello','World'] +4 [5] ['Goodbye'] +1 [2,3] +4 [5] +[2,3] ['Hello','World'] +[5] ['Goodbye'] +1 [2,3] ['Hello','World'] +4 [5] ['Goodbye'] +1 [2,3] +4 [5] +[2,3] ['Hello','World'] +[5] ['Goodbye'] +1 [2,3] ['Hello','World'] +4 [5] ['Goodbye'] +1 [2,3] +4 [5] +[2,3] ['Hello','World'] +[5] ['Goodbye'] diff --git a/dbms/tests/queries/0_stateless/00554_nested_and_table_engines.sql b/dbms/tests/queries/0_stateless/00554_nested_and_table_engines.sql new file mode 100644 index 00000000000..892bb210e63 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00554_nested_and_table_engines.sql @@ -0,0 +1,61 @@ +DROP TABLE IF EXISTS test.nested; + +CREATE TABLE test.nested (x UInt8, n Nested(a UInt64, b String)) ENGINE = TinyLog; + +INSERT INTO test.nested VALUES (1, [2, 3], ['Hello', 'World']); +INSERT INTO test.nested VALUES (4, [5], ['Goodbye']); + +SELECT * FROM test.nested ORDER BY x; +SELECT x, n.a FROM test.nested ORDER BY x; +SELECT n.a, n.b FROM test.nested ORDER BY n.a; + + +DROP TABLE IF EXISTS test.nested; + +CREATE TABLE test.nested (x UInt8, n Nested(a UInt64, b String)) ENGINE = Log; + +INSERT INTO test.nested VALUES (1, [2, 3], ['Hello', 'World']); +INSERT INTO test.nested VALUES (4, [5], ['Goodbye']); + +SELECT * FROM test.nested ORDER BY x; +SELECT x, n.a FROM test.nested ORDER BY x; +SELECT n.a, n.b FROM test.nested ORDER BY n.a; + + +DROP TABLE IF EXISTS test.nested; + +CREATE TABLE test.nested (x UInt8, n Nested(a UInt64, b String)) ENGINE = StripeLog; + +INSERT INTO test.nested VALUES (1, [2, 3], ['Hello', 'World']); +INSERT INTO test.nested VALUES (4, [5], ['Goodbye']); + +SELECT * FROM test.nested ORDER BY x; +SELECT x, n.a FROM test.nested ORDER BY x; +SELECT n.a, n.b FROM test.nested ORDER BY n.a; + + +DROP TABLE IF EXISTS test.nested; + +CREATE TABLE test.nested (x UInt8, n Nested(a UInt64, b String)) ENGINE = Memory; + +INSERT INTO test.nested VALUES (1, [2, 3], ['Hello', 'World']); +INSERT INTO test.nested VALUES (4, [5], ['Goodbye']); + +SELECT * FROM test.nested ORDER BY x; +SELECT x, n.a FROM test.nested ORDER BY x; +SELECT n.a, n.b FROM test.nested ORDER BY n.a; + + +DROP TABLE IF EXISTS test.nested; + +CREATE TABLE test.nested (x UInt8, n Nested(a UInt64, b String)) ENGINE = MergeTree ORDER BY x; + +INSERT INTO test.nested VALUES (1, [2, 3], ['Hello', 'World']); +INSERT INTO test.nested VALUES (4, [5], ['Goodbye']); + +SELECT * FROM test.nested ORDER BY x; +SELECT x, n.a FROM test.nested ORDER BY x; +SELECT n.a, n.b FROM test.nested ORDER BY n.a; + + +DROP TABLE test.nested;