From 0dbbca5b3e6ff8959604bfb3c69b097261c147b1 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 11 May 2022 19:33:45 +0000 Subject: [PATCH 01/12] fix filling of missed Nested columns with multiple levels --- .../Serializations/SerializationArray.cpp | 38 +++++----- .../Serializations/SerializationArray.h | 1 + src/Interpreters/inplaceBlockConversions.cpp | 73 ++++++++++++++----- src/Interpreters/inplaceBlockConversions.h | 1 + src/Storages/MergeTree/IMergeTreeReader.cpp | 6 +- src/Storages/StorageMemory.cpp | 2 +- .../0_stateless/01825_type_json_17.reference | 7 ++ .../0_stateless/01825_type_json_17.sql | 18 +++++ 8 files changed, 108 insertions(+), 38 deletions(-) create mode 100644 tests/queries/0_stateless/01825_type_json_17.reference create mode 100644 tests/queries/0_stateless/01825_type_json_17.sql diff --git a/src/DataTypes/Serializations/SerializationArray.cpp b/src/DataTypes/Serializations/SerializationArray.cpp index 30ee5e98b74..aebfb1b27b2 100644 --- a/src/DataTypes/Serializations/SerializationArray.cpp +++ b/src/DataTypes/Serializations/SerializationArray.cpp @@ -132,29 +132,29 @@ namespace offset_values.resize(i); } +} - ColumnPtr arraySizesToOffsets(const IColumn & column) - { - const auto & column_sizes = assert_cast(column); - MutableColumnPtr column_offsets = column_sizes.cloneEmpty(); - - if (column_sizes.empty()) - return column_offsets; - - const auto & sizes_data = column_sizes.getData(); - auto & offsets_data = assert_cast(*column_offsets).getData(); - - offsets_data.resize(sizes_data.size()); - - IColumn::Offset prev_offset = 0; - for (size_t i = 0, size = sizes_data.size(); i < size; ++i) - { - prev_offset += sizes_data[i]; - offsets_data[i] = prev_offset; - } +ColumnPtr arraySizesToOffsets(const IColumn & column) +{ + const auto & column_sizes = assert_cast(column); + MutableColumnPtr column_offsets = column_sizes.cloneEmpty(); + if (column_sizes.empty()) return column_offsets; + + const auto & sizes_data = column_sizes.getData(); + auto & offsets_data = assert_cast(*column_offsets).getData(); + + offsets_data.resize(sizes_data.size()); + + IColumn::Offset prev_offset = 0; + for (size_t i = 0, size = sizes_data.size(); i < size; ++i) + { + prev_offset += sizes_data[i]; + offsets_data[i] = prev_offset; } + + return column_offsets; } ColumnPtr arrayOffsetsToSizes(const IColumn & column) diff --git a/src/DataTypes/Serializations/SerializationArray.h b/src/DataTypes/Serializations/SerializationArray.h index 3769f8a4513..9179988bf10 100644 --- a/src/DataTypes/Serializations/SerializationArray.h +++ b/src/DataTypes/Serializations/SerializationArray.h @@ -80,5 +80,6 @@ private: }; ColumnPtr arrayOffsetsToSizes(const IColumn & column); +ColumnPtr arraySizesToOffsets(const IColumn & column); } diff --git a/src/Interpreters/inplaceBlockConversions.cpp b/src/Interpreters/inplaceBlockConversions.cpp index 1bde6fe5a8c..bc40d64de16 100644 --- a/src/Interpreters/inplaceBlockConversions.cpp +++ b/src/Interpreters/inplaceBlockConversions.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -198,6 +199,9 @@ static bool arrayHasNoElementsRead(const IColumn & column) if (!size) return false; + if (const auto * nested_array = typeid_cast(&column_array->getData())) + return arrayHasNoElementsRead(*nested_array); + size_t data_size = column_array->getData().size(); if (data_size) return false; @@ -210,6 +214,7 @@ void fillMissingColumns( Columns & res_columns, size_t num_rows, const NamesAndTypesList & requested_columns, + const NamesAndTypesList & available_columns, StorageMetadataPtr metadata_snapshot) { size_t num_columns = requested_columns.size(); @@ -224,26 +229,35 @@ void fillMissingColumns( /// First, collect offset columns for all arrays in the block. std::unordered_map offset_columns; - auto requested_column = requested_columns.begin(); - for (size_t i = 0; i < num_columns; ++i, ++requested_column) + auto available_column = available_columns.begin(); + for (size_t i = 0; i < num_columns; ++i, ++available_column) { if (res_columns[i] == nullptr) continue; - if (const auto * array = typeid_cast(res_columns[i].get())) + auto serialization = IDataType::getSerialization(*available_column); + auto name_in_storage = Nested::extractTableName(available_column->name); + + ISerialization::SubstreamPath path; + serialization->enumerateStreams(path, [&](const auto & subpath) { - String offsets_name = Nested::extractTableName(requested_column->name); - auto & offsets_column = offset_columns[offsets_name]; + if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes) + return; + + auto subname = ISerialization::getSubcolumnNameForStream(subpath); + auto & offsets_column = offset_columns[Nested::concatenateName(name_in_storage, subname)]; /// If for some reason multiple offsets columns are present for the same nested data structure, /// choose the one that is not empty. + /// TODO: more optimal if (!offsets_column || offsets_column->empty()) - offsets_column = array->getOffsetsPtr(); - } + offsets_column = arraySizesToOffsets(*subpath.back().data.column); + + }, {serialization, available_column->type, res_columns[i], nullptr}); } /// insert default values only for columns without default expressions - requested_column = requested_columns.begin(); + auto requested_column = requested_columns.begin(); for (size_t i = 0; i < num_columns; ++i, ++requested_column) { const auto & [name, type] = *requested_column; @@ -256,19 +270,44 @@ void fillMissingColumns( if (metadata_snapshot && metadata_snapshot->getColumns().hasDefault(name)) continue; - String offsets_name = Nested::extractTableName(name); - auto offset_it = offset_columns.find(offsets_name); + std::vector current_offsets; + bool has_all_offsets = true; + const auto * array_type = typeid_cast(type.get()); - if (offset_it != offset_columns.end() && array_type) + if (array_type) { - const auto & nested_type = array_type->getNestedType(); - ColumnPtr offsets_column = offset_it->second; - size_t nested_rows = typeid_cast(*offsets_column).getData().back(); + auto serialization = IDataType::getSerialization(*requested_column); + auto name_in_storage = Nested::extractTableName(requested_column->name); - ColumnPtr nested_column = - nested_type->createColumnConstWithDefaultValue(nested_rows)->convertToFullColumnIfConst(); + ISerialization::SubstreamPath path; + serialization->enumerateStreams(path, [&](const auto & subpath) + { + if (!has_all_offsets) + return; - res_columns[i] = ColumnArray::create(nested_column, offsets_column); + if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes) + return; + + auto subname = ISerialization::getSubcolumnNameForStream(subpath); + auto it = offset_columns.find(Nested::concatenateName(name_in_storage, subname)); + if (it != offset_columns.end()) + current_offsets.emplace_back(it->second); + else + has_all_offsets = false; + + }, {serialization, type, nullptr, nullptr}); + } + + if (array_type && has_all_offsets) + { + assert(!current_offsets.empty()); + auto scalar_type = getBaseTypeOfArray(type); + + size_t data_size = assert_cast(*current_offsets.back()).getData().back(); + res_columns[i] = scalar_type->createColumnConstWithDefaultValue(data_size)->convertToFullColumnIfConst(); + + for (auto it = current_offsets.rbegin(); it != current_offsets.rend(); ++it) + res_columns[i] = ColumnArray::create(res_columns[i], *it); } else { diff --git a/src/Interpreters/inplaceBlockConversions.h b/src/Interpreters/inplaceBlockConversions.h index b3113ddfa5c..70187d5aace 100644 --- a/src/Interpreters/inplaceBlockConversions.h +++ b/src/Interpreters/inplaceBlockConversions.h @@ -43,6 +43,7 @@ void fillMissingColumns( Columns & res_columns, size_t num_rows, const NamesAndTypesList & requested_columns, + const NamesAndTypesList & available_columns, StorageMetadataPtr metadata_snapshot); } diff --git a/src/Storages/MergeTree/IMergeTreeReader.cpp b/src/Storages/MergeTree/IMergeTreeReader.cpp index 3a823345dda..4eff1653d1e 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.cpp +++ b/src/Storages/MergeTree/IMergeTreeReader.cpp @@ -66,7 +66,11 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e { try { - DB::fillMissingColumns(res_columns, num_rows, columns, metadata_snapshot); + NamesAndTypesList available_columns; + for (const auto & column : columns) + available_columns.push_back(getColumnFromPart(column)); + + DB::fillMissingColumns(res_columns, num_rows, columns, available_columns, metadata_snapshot); should_evaluate_missing_defaults = std::any_of( res_columns.begin(), res_columns.end(), [](const auto & column) { return column == nullptr; }); } diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index e7911125383..20ca84452e7 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -91,7 +91,7 @@ protected: ++name_and_type; } - fillMissingColumns(columns, src.rows(), column_names_and_types, /*metadata_snapshot=*/ nullptr); + fillMissingColumns(columns, src.rows(), column_names_and_types, column_names_and_types, /*metadata_snapshot=*/ nullptr); assert(std::all_of(columns.begin(), columns.end(), [](const auto & column) { return column != nullptr; })); return Chunk(std::move(columns), src.rows()); diff --git a/tests/queries/0_stateless/01825_type_json_17.reference b/tests/queries/0_stateless/01825_type_json_17.reference new file mode 100644 index 00000000000..96e58224f32 --- /dev/null +++ b/tests/queries/0_stateless/01825_type_json_17.reference @@ -0,0 +1,7 @@ +Tuple(arr Nested(k1 Nested(k2 String, k3 String, k4 Int8), k5 Tuple(k6 String)), id Int8) +{"obj":{"arr":[{"k1":[{"k2":"aaa","k3":"bbb","k4":0},{"k2":"ccc","k3":"","k4":0}],"k5":{"k6":""}}],"id":1}} +{"obj":{"arr":[{"k1":[{"k2":"","k3":"ddd","k4":10},{"k2":"","k3":"","k4":20}],"k5":{"k6":"foo"}}],"id":2}} +[['bbb','']] [['aaa','ccc']] +[['ddd','']] [['','']] +[[0,0]] +[[10,20]] diff --git a/tests/queries/0_stateless/01825_type_json_17.sql b/tests/queries/0_stateless/01825_type_json_17.sql new file mode 100644 index 00000000000..b34357f8ef1 --- /dev/null +++ b/tests/queries/0_stateless/01825_type_json_17.sql @@ -0,0 +1,18 @@ +-- Tags: no-fasttest + +DROP TABLE IF EXISTS t_json_17; +SET allow_experimental_object_type = 1; +SET output_format_json_named_tuples_as_objects = 1; + +CREATE TABLE t_json_17(obj JSON) +ENGINE = MergeTree ORDER BY tuple(); + +INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k1": [{"k2": "aaa", "k3": "bbb"}, {"k2": "ccc"}]}]} +INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k3": "ddd", "k4": 10}, {"k4": 20}], "k5": {"k6": "foo"}}]} + +SELECT toTypeName(obj) FROM t_json_17 LIMIT 1; +SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow; +SELECT obj.arr.k1.k3, obj.arr.k1.k2 FROM t_json_17 ORDER BY obj.id; +SELECT obj.arr.k1.k4 FROM t_json_17 ORDER BY obj.id; + +DROP TABLE IF EXISTS t_json_17; From 56fffc86810773e0399de5147b08ced18b120822 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Thu, 12 May 2022 17:21:30 +0000 Subject: [PATCH 02/12] fix filling of missing columns --- src/Interpreters/inplaceBlockConversions.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Interpreters/inplaceBlockConversions.cpp b/src/Interpreters/inplaceBlockConversions.cpp index bc40d64de16..61d51da4eab 100644 --- a/src/Interpreters/inplaceBlockConversions.cpp +++ b/src/Interpreters/inplaceBlockConversions.cpp @@ -232,7 +232,7 @@ void fillMissingColumns( auto available_column = available_columns.begin(); for (size_t i = 0; i < num_columns; ++i, ++available_column) { - if (res_columns[i] == nullptr) + if (res_columns[i] == nullptr || isColumnConst(*res_columns[i])) continue; auto serialization = IDataType::getSerialization(*available_column); From 1523c9c9e5f714daff08d7811890effb25e4a87b Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 17 Jun 2022 01:10:52 +0000 Subject: [PATCH 03/12] fix filling of empty Nested + small refactoring --- src/Columns/ColumnArray.cpp | 6 +- .../CompressionFactoryAdditions.cpp | 4 +- src/DataTypes/IDataType.cpp | 29 +++-- src/DataTypes/IDataType.h | 2 + .../Serializations/ISerialization.cpp | 35 ++--- src/DataTypes/Serializations/ISerialization.h | 44 ++++++- .../Serializations/SerializationArray.cpp | 123 +++++++++--------- .../Serializations/SerializationArray.h | 5 +- .../SerializationLowCardinality.cpp | 27 ++-- .../SerializationLowCardinality.h | 2 +- .../Serializations/SerializationMap.cpp | 15 +-- .../Serializations/SerializationMap.h | 2 +- .../Serializations/SerializationNamed.cpp | 12 +- .../Serializations/SerializationNamed.h | 2 +- .../Serializations/SerializationNullable.cpp | 40 +++--- .../Serializations/SerializationNullable.h | 2 +- .../Serializations/SerializationSparse.cpp | 39 +++--- .../Serializations/SerializationSparse.h | 2 +- .../Serializations/SerializationTuple.cpp | 15 +-- .../Serializations/SerializationTuple.h | 2 +- .../Serializations/SerializationWrapper.cpp | 4 +- .../Serializations/SerializationWrapper.h | 2 +- src/Interpreters/InterpreterDescribeQuery.cpp | 2 +- src/Interpreters/inplaceBlockConversions.cpp | 25 ++-- src/Storages/ColumnsDescription.cpp | 2 +- .../MergeTreeDataPartWriterCompact.cpp | 3 +- .../MergeTree/MergeTreeDataPartWriterWide.cpp | 30 ++--- .../MergeTree/MergeTreeDataPartWriterWide.h | 9 +- .../MergeTree/MergeTreeReaderCompact.cpp | 23 ++-- .../MergeTree/MergeTreeReaderWide.cpp | 2 +- .../System/StorageSystemPartsColumns.cpp | 4 +- 31 files changed, 264 insertions(+), 250 deletions(-) diff --git a/src/Columns/ColumnArray.cpp b/src/Columns/ColumnArray.cpp index 24da9644335..7bdb66a9cc5 100644 --- a/src/Columns/ColumnArray.cpp +++ b/src/Columns/ColumnArray.cpp @@ -50,13 +50,15 @@ ColumnArray::ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && if (!offsets_concrete) throw Exception("offsets_column must be a ColumnUInt64", ErrorCodes::LOGICAL_ERROR); - if (!offsets_concrete->empty() && data) + if (!offsets_concrete->empty() && data && !data->empty()) { Offset last_offset = offsets_concrete->getData().back(); /// This will also prevent possible overflow in offset. if (data->size() != last_offset) - throw Exception("offsets_column has data inconsistent with nested_column", ErrorCodes::LOGICAL_ERROR); + throw Exception(ErrorCodes::LOGICAL_ERROR, + "offsets_column has data ({}) inconsistent with nested_column ({})", + data->size(), last_offset); } /** NOTE diff --git a/src/Compression/CompressionFactoryAdditions.cpp b/src/Compression/CompressionFactoryAdditions.cpp index d87d0f8b4ee..3e215076871 100644 --- a/src/Compression/CompressionFactoryAdditions.cpp +++ b/src/Compression/CompressionFactoryAdditions.cpp @@ -116,8 +116,8 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST( } }; - ISerialization::SubstreamPath path; - column_type->getDefaultSerialization()->enumerateStreams(path, callback, column_type); + auto serialization = column_type->getDefaultSerialization(); + serialization->enumerateStreams(callback, column_type); if (!result_codec) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find any substream with data type for type {}. It's a bug", column_type->getName()); diff --git a/src/DataTypes/IDataType.cpp b/src/DataTypes/IDataType.cpp index f2bb878a533..8229946cac8 100644 --- a/src/DataTypes/IDataType.cpp +++ b/src/DataTypes/IDataType.cpp @@ -84,18 +84,20 @@ void IDataType::forEachSubcolumn( { for (size_t i = 0; i < subpath.size(); ++i) { - if (!subpath[i].visited && ISerialization::hasSubcolumnForPath(subpath, i + 1)) + size_t prefix_len = i + 1; + if (!subpath[i].visited && ISerialization::hasSubcolumnForPath(subpath, prefix_len)) { - auto name = ISerialization::getSubcolumnNameForStream(subpath, i + 1); - auto subdata = ISerialization::createFromPath(subpath, i); + auto name = ISerialization::getSubcolumnNameForStream(subpath, prefix_len); + auto subdata = ISerialization::createFromPath(subpath, prefix_len); callback(subpath, name, subdata); } subpath[i].visited = true; } }; - SubstreamPath path; - data.serialization->enumerateStreams(path, callback_with_data, data); + ISerialization::EnumerateStreamsSettings settings; + settings.position_independent_encoding = false; + data.serialization->enumerateStreams(settings, callback_with_data, data); } template @@ -118,33 +120,38 @@ Ptr IDataType::getForSubcolumn( return res; } +bool IDataType::hasSubcolumn(const String & subcolumn_name) const +{ + return tryGetSubcolumnType(subcolumn_name) != nullptr; +} + DataTypePtr IDataType::tryGetSubcolumnType(const String & subcolumn_name) const { - SubstreamData data = { getDefaultSerialization(), getPtr(), nullptr, nullptr }; + auto data = SubstreamData(getDefaultSerialization()).withType(getPtr()); return getForSubcolumn(subcolumn_name, data, &SubstreamData::type, false); } DataTypePtr IDataType::getSubcolumnType(const String & subcolumn_name) const { - SubstreamData data = { getDefaultSerialization(), getPtr(), nullptr, nullptr }; + auto data = SubstreamData(getDefaultSerialization()).withType(getPtr()); return getForSubcolumn(subcolumn_name, data, &SubstreamData::type, true); } ColumnPtr IDataType::tryGetSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const { - SubstreamData data = { getDefaultSerialization(), nullptr, column, nullptr }; + auto data = SubstreamData(getDefaultSerialization()).withColumn(column); return getForSubcolumn(subcolumn_name, data, &SubstreamData::column, false); } ColumnPtr IDataType::getSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const { - SubstreamData data = { getDefaultSerialization(), nullptr, column, nullptr }; + auto data = SubstreamData(getDefaultSerialization()).withColumn(column); return getForSubcolumn(subcolumn_name, data, &SubstreamData::column, true); } SerializationPtr IDataType::getSubcolumnSerialization(const String & subcolumn_name, const SerializationPtr & serialization) const { - SubstreamData data = { serialization, nullptr, nullptr, nullptr }; + auto data = SubstreamData(serialization); return getForSubcolumn(subcolumn_name, data, &SubstreamData::serialization, true); } @@ -154,7 +161,7 @@ Names IDataType::getSubcolumnNames() const forEachSubcolumn([&](const auto &, const auto & name, const auto &) { res.push_back(name); - }, { getDefaultSerialization(), nullptr, nullptr, nullptr }); + }, SubstreamData(getDefaultSerialization())); return res; } diff --git a/src/DataTypes/IDataType.h b/src/DataTypes/IDataType.h index 420ef61a13f..9161fd7dc7b 100644 --- a/src/DataTypes/IDataType.h +++ b/src/DataTypes/IDataType.h @@ -79,6 +79,8 @@ public: /// Data type id. It's used for runtime type checks. virtual TypeIndex getTypeId() const = 0; + bool hasSubcolumn(const String & subcolumn_name) const; + DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const; DataTypePtr getSubcolumnType(const String & subcolumn_name) const; diff --git a/src/DataTypes/Serializations/ISerialization.cpp b/src/DataTypes/Serializations/ISerialization.cpp index 7df4a956c1a..b9b3b9ac6af 100644 --- a/src/DataTypes/Serializations/ISerialization.cpp +++ b/src/DataTypes/Serializations/ISerialization.cpp @@ -73,24 +73,24 @@ String ISerialization::SubstreamPath::toString() const } void ISerialization::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { - path.push_back(Substream::Regular); - path.back().data = data; - callback(path); - path.pop_back(); + settings.path.push_back(Substream::Regular); + settings.path.back().data = data; + callback(settings.path); + settings.path.pop_back(); } -void ISerialization::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const +void ISerialization::enumerateStreams( + const StreamCallback & callback, + const DataTypePtr & type, + const ColumnPtr & column) const { - enumerateStreams(path, callback, {getPtr(), nullptr, nullptr, nullptr}); -} - -void ISerialization::enumerateStreams(SubstreamPath & path, const StreamCallback & callback, const DataTypePtr & type) const -{ - enumerateStreams(path, callback, {getPtr(), type, nullptr, nullptr}); + EnumerateStreamsSettings settings; + auto data = SubstreamData(getPtr()).withType(type).withColumn(column); + enumerateStreams(settings, callback, data); } void ISerialization::serializeBinaryBulk(const IColumn & column, WriteBuffer &, size_t, size_t) const @@ -184,7 +184,7 @@ String ISerialization::getFileNameForStream(const NameAndTypePair & column, cons return getFileNameForStream(column.getNameInStorage(), path); } -static size_t isOffsetsOfNested(const ISerialization::SubstreamPath & path) +bool isOffsetsOfNested(const ISerialization::SubstreamPath & path) { if (path.empty()) return false; @@ -288,10 +288,13 @@ bool ISerialization::hasSubcolumnForPath(const SubstreamPath & path, size_t pref ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len) { - assert(prefix_len < path.size()); + assert(prefix_len <= path.size()); + if (prefix_len == 0) + return {}; - SubstreamData res = path[prefix_len].data; - for (ssize_t i = static_cast(prefix_len) - 1; i >= 0; --i) + ssize_t last_elem = prefix_len - 1; + auto res = path[last_elem].data; + for (ssize_t i = last_elem - 1; i >= 0; --i) { const auto & creator = path[i].creator; if (creator) diff --git a/src/DataTypes/Serializations/ISerialization.h b/src/DataTypes/Serializations/ISerialization.h index b5d2082631e..1193c15b939 100644 --- a/src/DataTypes/Serializations/ISerialization.h +++ b/src/DataTypes/Serializations/ISerialization.h @@ -101,6 +101,30 @@ public: struct SubstreamData { + SubstreamData() = default; + SubstreamData(SerializationPtr serialization_) + : serialization(std::move(serialization_)) + { + } + + SubstreamData & withType(DataTypePtr type_) + { + type = std::move(type_); + return *this; + } + + SubstreamData & withColumn(ColumnPtr column_) + { + column = std::move(column_); + return *this; + } + + SubstreamData & withSerializationInfo(SerializationInfoPtr serialization_info_) + { + serialization_info = std::move(serialization_info_); + return *this; + } + SerializationPtr serialization; DataTypePtr type; ColumnPtr column; @@ -164,16 +188,22 @@ public: using StreamCallback = std::function; + struct EnumerateStreamsSettings + { + SubstreamPath path; + bool position_independent_encoding = true; + }; + virtual void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const; - void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const; - void enumerateStreams(const StreamCallback & callback, SubstreamPath && path) const { enumerateStreams(callback, path); } - void enumerateStreams(const StreamCallback & callback) const { enumerateStreams(callback, {}); } - - void enumerateStreams(SubstreamPath & path, const StreamCallback & callback, const DataTypePtr & type) const; + /// Enumerate streams with default settings. + void enumerateStreams( + const StreamCallback & callback, + const DataTypePtr & type = nullptr, + const ColumnPtr & column = nullptr) const; using OutputStreamGetter = std::function; using InputStreamGetter = std::function; @@ -375,4 +405,6 @@ State * ISerialization::checkAndGetState(const StatePtr & state) const return state_concrete; } +bool isOffsetsOfNested(const ISerialization::SubstreamPath & path); + } diff --git a/src/DataTypes/Serializations/SerializationArray.cpp b/src/DataTypes/Serializations/SerializationArray.cpp index aebfb1b27b2..85182c3a111 100644 --- a/src/DataTypes/Serializations/SerializationArray.cpp +++ b/src/DataTypes/Serializations/SerializationArray.cpp @@ -132,53 +132,53 @@ namespace offset_values.resize(i); } -} -ColumnPtr arraySizesToOffsets(const IColumn & column) -{ - const auto & column_sizes = assert_cast(column); - MutableColumnPtr column_offsets = column_sizes.cloneEmpty(); + ColumnPtr arraySizesToOffsets(const IColumn & column) + { + const auto & column_sizes = assert_cast(column); + MutableColumnPtr column_offsets = column_sizes.cloneEmpty(); + + if (column_sizes.empty()) + return column_offsets; + + const auto & sizes_data = column_sizes.getData(); + auto & offsets_data = assert_cast(*column_offsets).getData(); + + offsets_data.resize(sizes_data.size()); + + IColumn::Offset prev_offset = 0; + for (size_t i = 0, size = sizes_data.size(); i < size; ++i) + { + prev_offset += sizes_data[i]; + offsets_data[i] = prev_offset; + } - if (column_sizes.empty()) return column_offsets; - - const auto & sizes_data = column_sizes.getData(); - auto & offsets_data = assert_cast(*column_offsets).getData(); - - offsets_data.resize(sizes_data.size()); - - IColumn::Offset prev_offset = 0; - for (size_t i = 0, size = sizes_data.size(); i < size; ++i) - { - prev_offset += sizes_data[i]; - offsets_data[i] = prev_offset; } - return column_offsets; -} + ColumnPtr arrayOffsetsToSizes(const IColumn & column) + { + const auto & column_offsets = assert_cast(column); + MutableColumnPtr column_sizes = column_offsets.cloneEmpty(); -ColumnPtr arrayOffsetsToSizes(const IColumn & column) -{ - const auto & column_offsets = assert_cast(column); - MutableColumnPtr column_sizes = column_offsets.cloneEmpty(); + if (column_offsets.empty()) + return column_sizes; + + const auto & offsets_data = column_offsets.getData(); + auto & sizes_data = assert_cast(*column_sizes).getData(); + + sizes_data.resize(offsets_data.size()); + + IColumn::Offset prev_offset = 0; + for (size_t i = 0, size = offsets_data.size(); i < size; ++i) + { + auto current_offset = offsets_data[i]; + sizes_data[i] = current_offset - prev_offset; + prev_offset = current_offset; + } - if (column_offsets.empty()) return column_sizes; - - const auto & offsets_data = column_offsets.getData(); - auto & sizes_data = assert_cast(*column_sizes).getData(); - - sizes_data.resize(offsets_data.size()); - - IColumn::Offset prev_offset = 0; - for (size_t i = 0, size = offsets_data.size(); i < size; ++i) - { - auto current_offset = offsets_data[i]; - sizes_data[i] = current_offset - prev_offset; - prev_offset = current_offset; } - - return column_sizes; } DataTypePtr SerializationArray::SubcolumnCreator::create(const DataTypePtr & prev) const @@ -197,41 +197,42 @@ ColumnPtr SerializationArray::SubcolumnCreator::create(const ColumnPtr & prev) c } void SerializationArray::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { const auto * type_array = data.type ? &assert_cast(*data.type) : nullptr; const auto * column_array = data.column ? &assert_cast(*data.column) : nullptr; - auto offsets_column = column_array ? column_array->getOffsetsPtr() : nullptr; + auto offsets = column_array ? column_array->getOffsetsPtr() : nullptr; - path.push_back(Substream::ArraySizes); - path.back().data = - { + auto offsets_serialization = std::make_shared( std::make_shared>(), - "size" + std::to_string(getArrayLevel(path)), false), - data.type ? std::make_shared() : nullptr, - offsets_column ? arrayOffsetsToSizes(*offsets_column) : nullptr, - data.serialization_info, - }; + "size" + std::to_string(getArrayLevel(settings.path)), false); - callback(path); + auto offsets_column = offsets && !settings.position_independent_encoding + ? arrayOffsetsToSizes(*offsets) + : offsets; - path.back() = Substream::ArrayElements; - path.back().data = data; - path.back().creator = std::make_shared(offsets_column); + settings.path.push_back(Substream::ArraySizes); + settings.path.back().data = SubstreamData(offsets_serialization) + .withType(type_array ? std::make_shared() : nullptr) + .withColumn(std::move(offsets_column)) + .withSerializationInfo(data.serialization_info); - SubstreamData next_data = - { - nested, - type_array ? type_array->getNestedType() : nullptr, - column_array ? column_array->getDataPtr() : nullptr, - data.serialization_info, - }; + callback(settings.path); - nested->enumerateStreams(path, callback, next_data); - path.pop_back(); + settings.path.back() = Substream::ArrayElements; + settings.path.back().data = data; + settings.path.back().creator = std::make_shared(offsets); + + auto next_data = SubstreamData(nested) + .withType(type_array ? type_array->getNestedType() : nullptr) + .withColumn(column_array ? column_array->getDataPtr() : nullptr) + .withSerializationInfo(data.serialization_info); + + nested->enumerateStreams(settings, callback, next_data); + settings.path.pop_back(); } void SerializationArray::serializeBinaryBulkStatePrefix( diff --git a/src/DataTypes/Serializations/SerializationArray.h b/src/DataTypes/Serializations/SerializationArray.h index 9179988bf10..84e37acbaad 100644 --- a/src/DataTypes/Serializations/SerializationArray.h +++ b/src/DataTypes/Serializations/SerializationArray.h @@ -36,7 +36,7 @@ public: */ void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; @@ -79,7 +79,4 @@ private: }; }; -ColumnPtr arrayOffsetsToSizes(const IColumn & column); -ColumnPtr arraySizesToOffsets(const IColumn & column); - } diff --git a/src/DataTypes/Serializations/SerializationLowCardinality.cpp b/src/DataTypes/Serializations/SerializationLowCardinality.cpp index c79f588e46c..2473a74327d 100644 --- a/src/DataTypes/Serializations/SerializationLowCardinality.cpp +++ b/src/DataTypes/Serializations/SerializationLowCardinality.cpp @@ -41,30 +41,25 @@ SerializationLowCardinality::SerializationLowCardinality(const DataTypePtr & dic } void SerializationLowCardinality::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { const auto * column_lc = data.column ? &getColumnLowCardinality(*data.column) : nullptr; - SubstreamData dict_data = - { - dict_inner_serialization, - data.type ? dictionary_type : nullptr, - column_lc ? column_lc->getDictionary().getNestedColumn() : nullptr, - data.serialization_info, - }; + settings.path.push_back(Substream::DictionaryKeys); + settings.path.back().data = SubstreamData(dict_inner_serialization) + .withType(data.type ? dictionary_type : nullptr) + .withColumn(column_lc ? column_lc->getDictionary().getNestedColumn() : nullptr) + .withSerializationInfo(data.serialization_info); - path.push_back(Substream::DictionaryKeys); - path.back().data = dict_data; + dict_inner_serialization->enumerateStreams(settings, callback, settings.path.back().data); - dict_inner_serialization->enumerateStreams(path, callback, dict_data); + settings.path.back() = Substream::DictionaryIndexes; + settings.path.back().data = data; - path.back() = Substream::DictionaryIndexes; - path.back().data = data; - - callback(path); - path.pop_back(); + callback(settings.path); + settings.path.pop_back(); } struct KeysSerializationVersion diff --git a/src/DataTypes/Serializations/SerializationLowCardinality.h b/src/DataTypes/Serializations/SerializationLowCardinality.h index 0a3597e86c7..860920a2422 100644 --- a/src/DataTypes/Serializations/SerializationLowCardinality.h +++ b/src/DataTypes/Serializations/SerializationLowCardinality.h @@ -18,7 +18,7 @@ public: explicit SerializationLowCardinality(const DataTypePtr & dictionary_type); void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; diff --git a/src/DataTypes/Serializations/SerializationMap.cpp b/src/DataTypes/Serializations/SerializationMap.cpp index ea22070b5b1..e46bb480d14 100644 --- a/src/DataTypes/Serializations/SerializationMap.cpp +++ b/src/DataTypes/Serializations/SerializationMap.cpp @@ -257,19 +257,16 @@ void SerializationMap::deserializeTextCSV(IColumn & column, ReadBuffer & istr, c } void SerializationMap::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { - SubstreamData next_data = - { - nested, - data.type ? assert_cast(*data.type).getNestedType() : nullptr, - data.column ? assert_cast(*data.column).getNestedColumnPtr() : nullptr, - data.serialization_info, - }; + auto next_data = SubstreamData(nested) + .withType(data.type ? assert_cast(*data.type).getNestedType() : nullptr) + .withColumn(data.column ? assert_cast(*data.column).getNestedColumnPtr() : nullptr) + .withSerializationInfo(data.serialization_info); - nested->enumerateStreams(path, callback, next_data); + nested->enumerateStreams(settings, callback, next_data); } void SerializationMap::serializeBinaryBulkStatePrefix( diff --git a/src/DataTypes/Serializations/SerializationMap.h b/src/DataTypes/Serializations/SerializationMap.h index 93b3e179499..42f99ca7991 100644 --- a/src/DataTypes/Serializations/SerializationMap.h +++ b/src/DataTypes/Serializations/SerializationMap.h @@ -32,7 +32,7 @@ public: void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override; void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; diff --git a/src/DataTypes/Serializations/SerializationNamed.cpp b/src/DataTypes/Serializations/SerializationNamed.cpp index 097e9cedfbe..4dac4b3a922 100644 --- a/src/DataTypes/Serializations/SerializationNamed.cpp +++ b/src/DataTypes/Serializations/SerializationNamed.cpp @@ -4,16 +4,16 @@ namespace DB { void SerializationNamed::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { - addToPath(path); - path.back().data = data; - path.back().creator = std::make_shared(name, escape_delimiter); + addToPath(settings.path); + settings.path.back().data = data; + settings.path.back().creator = std::make_shared(name, escape_delimiter); - nested_serialization->enumerateStreams(path, callback, data); - path.pop_back(); + nested_serialization->enumerateStreams(settings, callback, data); + settings.path.pop_back(); } void SerializationNamed::serializeBinaryBulkStatePrefix( diff --git a/src/DataTypes/Serializations/SerializationNamed.h b/src/DataTypes/Serializations/SerializationNamed.h index 343b96c16e3..2a2c7c0dfc7 100644 --- a/src/DataTypes/Serializations/SerializationNamed.h +++ b/src/DataTypes/Serializations/SerializationNamed.h @@ -26,7 +26,7 @@ public: const String & getElementName() const { return name; } void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; diff --git a/src/DataTypes/Serializations/SerializationNullable.cpp b/src/DataTypes/Serializations/SerializationNullable.cpp index a6273deaa30..47780f67800 100644 --- a/src/DataTypes/Serializations/SerializationNullable.cpp +++ b/src/DataTypes/Serializations/SerializationNullable.cpp @@ -38,38 +38,34 @@ ColumnPtr SerializationNullable::SubcolumnCreator::create(const ColumnPtr & prev } void SerializationNullable::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { const auto * type_nullable = data.type ? &assert_cast(*data.type) : nullptr; const auto * column_nullable = data.column ? &assert_cast(*data.column) : nullptr; - path.push_back(Substream::NullMap); - path.back().data = - { - std::make_shared(std::make_shared>(), "null", false), - type_nullable ? std::make_shared() : nullptr, - column_nullable ? column_nullable->getNullMapColumnPtr() : nullptr, - data.serialization_info, - }; + auto null_map_serialization = std::make_shared(std::make_shared>(), "null", false); - callback(path); + settings.path.push_back(Substream::NullMap); + settings.path.back().data = SubstreamData(null_map_serialization) + .withType(type_nullable ? std::make_shared() : nullptr) + .withColumn(column_nullable ? column_nullable->getNullMapColumnPtr() : nullptr) + .withSerializationInfo(data.serialization_info); - path.back() = Substream::NullableElements; - path.back().creator = std::make_shared(path.back().data.column); - path.back().data = data; + callback(settings.path); - SubstreamData next_data = - { - nested, - type_nullable ? type_nullable->getNestedType() : nullptr, - column_nullable ? column_nullable->getNestedColumnPtr() : nullptr, - data.serialization_info, - }; + settings.path.back() = Substream::NullableElements; + settings.path.back().creator = std::make_shared(settings.path.back().data.column); + settings.path.back().data = data; - nested->enumerateStreams(path, callback, next_data); - path.pop_back(); + auto next_data = SubstreamData(nested) + .withType(type_nullable ? type_nullable->getNestedType() : nullptr) + .withColumn(column_nullable ? column_nullable->getNestedColumnPtr() : nullptr) + .withSerializationInfo(data.serialization_info); + + nested->enumerateStreams(settings, callback, next_data); + settings.path.pop_back(); } void SerializationNullable::serializeBinaryBulkStatePrefix( diff --git a/src/DataTypes/Serializations/SerializationNullable.h b/src/DataTypes/Serializations/SerializationNullable.h index e6e0e4f33c2..ea3958065e7 100644 --- a/src/DataTypes/Serializations/SerializationNullable.h +++ b/src/DataTypes/Serializations/SerializationNullable.h @@ -14,7 +14,7 @@ public: explicit SerializationNullable(const SerializationPtr & nested_) : nested(nested_) {} void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; diff --git a/src/DataTypes/Serializations/SerializationSparse.cpp b/src/DataTypes/Serializations/SerializationSparse.cpp index 64db248c5fc..db194007af9 100644 --- a/src/DataTypes/Serializations/SerializationSparse.cpp +++ b/src/DataTypes/Serializations/SerializationSparse.cpp @@ -148,39 +148,32 @@ ColumnPtr SerializationSparse::SubcolumnCreator::create(const ColumnPtr & prev) } void SerializationSparse::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { const auto * column_sparse = data.column ? &assert_cast(*data.column) : nullptr; - size_t column_size = column_sparse ? column_sparse->size() : 0; - path.push_back(Substream::SparseOffsets); - path.back().data = - { - std::make_shared>(), - data.type ? std::make_shared() : nullptr, - column_sparse ? column_sparse->getOffsetsPtr() : nullptr, - data.serialization_info, - }; + settings.path.push_back(Substream::SparseOffsets); + settings.path.back().data = SubstreamData(std::make_shared>()) + .withType(data.type ? std::make_shared() : nullptr) + .withColumn(column_sparse ? column_sparse->getOffsetsPtr() : nullptr) + .withSerializationInfo(data.serialization_info); - callback(path); + callback(settings.path); - path.back() = Substream::SparseElements; - path.back().creator = std::make_shared(path.back().data.column, column_size); - path.back().data = data; + settings.path.back() = Substream::SparseElements; + settings.path.back().creator = std::make_shared(settings.path.back().data.column, column_size); + settings.path.back().data = data; - SubstreamData next_data = - { - nested, - data.type, - column_sparse ? column_sparse->getValuesPtr() : nullptr, - data.serialization_info, - }; + auto next_data = SubstreamData(nested) + .withType(data.type) + .withColumn(column_sparse ? column_sparse->getValuesPtr() : nullptr) + .withSerializationInfo(data.serialization_info); - nested->enumerateStreams(path, callback, next_data); - path.pop_back(); + nested->enumerateStreams(settings, callback, next_data); + settings.path.pop_back(); } void SerializationSparse::serializeBinaryBulkStatePrefix( diff --git a/src/DataTypes/Serializations/SerializationSparse.h b/src/DataTypes/Serializations/SerializationSparse.h index 54ab4853360..dc2f63c5a05 100644 --- a/src/DataTypes/Serializations/SerializationSparse.h +++ b/src/DataTypes/Serializations/SerializationSparse.h @@ -28,7 +28,7 @@ public: Kind getKind() const override { return Kind::SPARSE; } virtual void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; diff --git a/src/DataTypes/Serializations/SerializationTuple.cpp b/src/DataTypes/Serializations/SerializationTuple.cpp index 8dc15fc9841..437324d96fd 100644 --- a/src/DataTypes/Serializations/SerializationTuple.cpp +++ b/src/DataTypes/Serializations/SerializationTuple.cpp @@ -283,7 +283,7 @@ void SerializationTuple::deserializeTextCSV(IColumn & column, ReadBuffer & istr, } void SerializationTuple::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { @@ -293,15 +293,12 @@ void SerializationTuple::enumerateStreams( for (size_t i = 0; i < elems.size(); ++i) { - SubstreamData next_data = - { - elems[i], - type_tuple ? type_tuple->getElement(i) : nullptr, - column_tuple ? column_tuple->getColumnPtr(i) : nullptr, - info_tuple ? info_tuple->getElementInfo(i) : nullptr, - }; + auto next_data = SubstreamData(elems[i]) + .withType(type_tuple ? type_tuple->getElement(i) : nullptr) + .withColumn(column_tuple ? column_tuple->getColumnPtr(i) : nullptr) + .withSerializationInfo(info_tuple ? info_tuple->getElementInfo(i) : nullptr); - elems[i]->enumerateStreams(path, callback, next_data); + elems[i]->enumerateStreams(settings, callback, next_data); } } diff --git a/src/DataTypes/Serializations/SerializationTuple.h b/src/DataTypes/Serializations/SerializationTuple.h index e82d8473645..d1caeb73dad 100644 --- a/src/DataTypes/Serializations/SerializationTuple.h +++ b/src/DataTypes/Serializations/SerializationTuple.h @@ -34,7 +34,7 @@ public: /** Each sub-column in a tuple is serialized in separate stream. */ void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; diff --git a/src/DataTypes/Serializations/SerializationWrapper.cpp b/src/DataTypes/Serializations/SerializationWrapper.cpp index 271c53dfcf1..7c50c1c6e26 100644 --- a/src/DataTypes/Serializations/SerializationWrapper.cpp +++ b/src/DataTypes/Serializations/SerializationWrapper.cpp @@ -5,11 +5,11 @@ namespace DB { void SerializationWrapper::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { - nested_serialization->enumerateStreams(path, callback, data); + nested_serialization->enumerateStreams(settings, callback, data); } void SerializationWrapper::serializeBinaryBulkStatePrefix( diff --git a/src/DataTypes/Serializations/SerializationWrapper.h b/src/DataTypes/Serializations/SerializationWrapper.h index 43fc7e9914a..d010c6b5314 100644 --- a/src/DataTypes/Serializations/SerializationWrapper.h +++ b/src/DataTypes/Serializations/SerializationWrapper.h @@ -21,7 +21,7 @@ public: Kind getKind() const override { return nested_serialization->getKind(); } void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; diff --git a/src/Interpreters/InterpreterDescribeQuery.cpp b/src/Interpreters/InterpreterDescribeQuery.cpp index 9919b1272bd..0524feea1f6 100644 --- a/src/Interpreters/InterpreterDescribeQuery.cpp +++ b/src/Interpreters/InterpreterDescribeQuery.cpp @@ -163,7 +163,7 @@ BlockIO InterpreterDescribeQuery::execute() res_columns[6]->insertDefault(); res_columns[7]->insert(1u); - }, { type->getDefaultSerialization(), type, nullptr, nullptr }); + }, ISerialization::SubstreamData(type->getDefaultSerialization()).withType(type)); } } diff --git a/src/Interpreters/inplaceBlockConversions.cpp b/src/Interpreters/inplaceBlockConversions.cpp index 61d51da4eab..e4f1b46fc91 100644 --- a/src/Interpreters/inplaceBlockConversions.cpp +++ b/src/Interpreters/inplaceBlockConversions.cpp @@ -199,11 +199,11 @@ static bool arrayHasNoElementsRead(const IColumn & column) if (!size) return false; - if (const auto * nested_array = typeid_cast(&column_array->getData())) + const auto & array_data = column_array->getData(); + if (const auto * nested_array = typeid_cast(&array_data)) return arrayHasNoElementsRead(*nested_array); - size_t data_size = column_array->getData().size(); - if (data_size) + if (!array_data.empty()) return false; size_t last_offset = column_array->getOffsets()[size - 1]; @@ -238,8 +238,7 @@ void fillMissingColumns( auto serialization = IDataType::getSerialization(*available_column); auto name_in_storage = Nested::extractTableName(available_column->name); - ISerialization::SubstreamPath path; - serialization->enumerateStreams(path, [&](const auto & subpath) + serialization->enumerateStreams([&](const auto & subpath) { if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes) return; @@ -247,16 +246,15 @@ void fillMissingColumns( auto subname = ISerialization::getSubcolumnNameForStream(subpath); auto & offsets_column = offset_columns[Nested::concatenateName(name_in_storage, subname)]; - /// If for some reason multiple offsets columns are present for the same nested data structure, - /// choose the one that is not empty. - /// TODO: more optimal + /// If for some reason multiple offsets columns are present + /// for the same nested data structure, choose the one that is not empty. if (!offsets_column || offsets_column->empty()) - offsets_column = arraySizesToOffsets(*subpath.back().data.column); + offsets_column = subpath.back().data.column; - }, {serialization, available_column->type, res_columns[i], nullptr}); + }, available_column->type, res_columns[i]); } - /// insert default values only for columns without default expressions + /// Insert default values only for columns without default expressions. auto requested_column = requested_columns.begin(); for (size_t i = 0; i < num_columns; ++i, ++requested_column) { @@ -279,8 +277,7 @@ void fillMissingColumns( auto serialization = IDataType::getSerialization(*requested_column); auto name_in_storage = Nested::extractTableName(requested_column->name); - ISerialization::SubstreamPath path; - serialization->enumerateStreams(path, [&](const auto & subpath) + serialization->enumerateStreams([&](const auto & subpath) { if (!has_all_offsets) return; @@ -295,7 +292,7 @@ void fillMissingColumns( else has_all_offsets = false; - }, {serialization, type, nullptr, nullptr}); + }, type); } if (array_type && has_all_offsets) diff --git a/src/Storages/ColumnsDescription.cpp b/src/Storages/ColumnsDescription.cpp index 7a43ae7af4b..e9f4c1b92aa 100644 --- a/src/Storages/ColumnsDescription.cpp +++ b/src/Storages/ColumnsDescription.cpp @@ -780,7 +780,7 @@ void ColumnsDescription::addSubcolumns(const String & name_in_storage, const Dat "Cannot add subcolumn {}: column with this name already exists", subcolumn.name); subcolumns.get<0>().insert(std::move(subcolumn)); - }, {type_in_storage->getDefaultSerialization(), type_in_storage, nullptr, nullptr}); + }, ISerialization::SubstreamData(type_in_storage->getDefaultSerialization()).withType(type_in_storage)); } void ColumnsDescription::removeSubcolumns(const String & name_in_storage) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index a4786570fcb..bbdd2734c98 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -67,8 +67,7 @@ void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & column, compressed_streams.emplace(stream_name, stream); }; - ISerialization::SubstreamPath path; - data_part->getSerialization(column)->enumerateStreams(path, callback, column.type); + data_part->getSerialization(column)->enumerateStreams(callback, column.type); } namespace diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 6610b8fc06b..7c08e127ab4 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -119,8 +119,7 @@ void MergeTreeDataPartWriterWide::addStreams( settings.query_write_settings); }; - ISerialization::SubstreamPath path; - data_part->getSerialization(column)->enumerateStreams(path, callback, column.type); + data_part->getSerialization(column)->enumerateStreams(callback, column.type); } @@ -254,10 +253,9 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm void MergeTreeDataPartWriterWide::writeSingleMark( const NameAndTypePair & column, WrittenOffsetColumns & offset_columns, - size_t number_of_rows, - ISerialization::SubstreamPath & path) + size_t number_of_rows) { - StreamsWithMarks marks = getCurrentMarksForColumn(column, offset_columns, path); + StreamsWithMarks marks = getCurrentMarksForColumn(column, offset_columns); for (const auto & mark : marks) flushMarkToFile(mark, number_of_rows); } @@ -273,8 +271,7 @@ void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stre StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn( const NameAndTypePair & column, - WrittenOffsetColumns & offset_columns, - ISerialization::SubstreamPath & path) + WrittenOffsetColumns & offset_columns) { StreamsWithMarks result; data_part->getSerialization(column)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path) @@ -299,7 +296,7 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn( stream_with_mark.mark.offset_in_decompressed_block = stream.compressed.offset(); result.push_back(stream_with_mark); - }, path); + }); return result; } @@ -327,7 +324,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule( return; column_streams[stream_name]->compressed.nextIfAtEnd(); - }, serialize_settings.path); + }); } /// Column must not be empty. (column.size() !== 0) @@ -365,7 +362,7 @@ void MergeTreeDataPartWriterWide::writeColumn( { if (last_non_written_marks.contains(name)) throw Exception(ErrorCodes::LOGICAL_ERROR, "We have to add new mark for column, but already have non written mark. Current mark {}, total marks {}, offset {}", getCurrentMark(), index_granularity.getMarksCount(), rows_written_in_last_mark); - last_non_written_marks[name] = getCurrentMarksForColumn(name_and_type, offset_columns, serialize_settings.path); + last_non_written_marks[name] = getCurrentMarksForColumn(name_and_type, offset_columns); } writeSingleGranule( @@ -389,7 +386,7 @@ void MergeTreeDataPartWriterWide::writeColumn( } } - serialization->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path) + serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) { bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes; if (is_offsets) @@ -397,7 +394,7 @@ void MergeTreeDataPartWriterWide::writeColumn( String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path); offset_columns.insert(stream_name); } - }, serialize_settings.path); + }); } @@ -551,7 +548,7 @@ void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksum } if (write_final_mark) - writeFinalMark(*it, offset_columns, serialize_settings.path); + writeFinalMark(*it, offset_columns); } } @@ -616,10 +613,9 @@ void MergeTreeDataPartWriterWide::finish(bool sync) void MergeTreeDataPartWriterWide::writeFinalMark( const NameAndTypePair & column, - WrittenOffsetColumns & offset_columns, - ISerialization::SubstreamPath & path) + WrittenOffsetColumns & offset_columns) { - writeSingleMark(column, offset_columns, 0, path); + writeSingleMark(column, offset_columns, 0); /// Memoize information about offsets data_part->getSerialization(column)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path) { @@ -629,7 +625,7 @@ void MergeTreeDataPartWriterWide::writeFinalMark( String stream_name = ISerialization::getFileNameForStream(column, substream_path); offset_columns.insert(stream_name); } - }, path); + }); } static void fillIndexGranularityImpl( diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h index b82fcd652ae..8292a41b902 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h @@ -60,8 +60,7 @@ private: /// Take offsets from column and return as MarkInCompressed file with stream name StreamsWithMarks getCurrentMarksForColumn( const NameAndTypePair & column, - WrittenOffsetColumns & offset_columns, - ISerialization::SubstreamPath & path); + WrittenOffsetColumns & offset_columns); /// Write mark to disk using stream and rows count void flushMarkToFile( @@ -72,13 +71,11 @@ private: void writeSingleMark( const NameAndTypePair & column, WrittenOffsetColumns & offset_columns, - size_t number_of_rows, - ISerialization::SubstreamPath & path); + size_t number_of_rows); void writeFinalMark( const NameAndTypePair & column, - WrittenOffsetColumns & offset_columns, - ISerialization::SubstreamPath & path); + WrittenOffsetColumns & offset_columns); void addStreams( const NameAndTypePair & column, diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index b943c3c8718..6e7a080f418 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -49,22 +49,24 @@ MergeTreeReaderCompact::MergeTreeReaderCompact( column_positions.resize(columns_num); read_only_offsets.resize(columns_num); + auto name_and_type = columns.begin(); for (size_t i = 0; i < columns_num; ++i, ++name_and_type) { - if (name_and_type->isSubcolumn()) + auto column_from_part = getColumnFromPart(*name_and_type); + auto position = data_part->getColumnPosition(column_from_part.getNameInStorage()); + bool is_array = isArray(column_from_part.type); + + if (column_from_part.isSubcolumn()) { auto storage_column_from_part = getColumnFromPart( - {name_and_type->getNameInStorage(), name_and_type->getTypeInStorage()}); + {column_from_part.getNameInStorage(), column_from_part.getTypeInStorage()}); - if (!storage_column_from_part.type->tryGetSubcolumnType(name_and_type->getSubcolumnName())) - continue; + auto subcolumn_name = column_from_part.getSubcolumnName(); + if (!storage_column_from_part.type->hasSubcolumn(subcolumn_name)) + position.reset(); } - - auto column_from_part = getColumnFromPart(*name_and_type); - - auto position = data_part->getColumnPosition(column_from_part.getNameInStorage()); - if (!position && typeid_cast(column_from_part.type.get())) + else if (!position && is_array) { /// If array of Nested column is missing in part, /// we have to read its offsets if they exist. @@ -221,7 +223,8 @@ void MergeTreeReaderCompact::readData( auto buffer_getter = [&](const ISerialization::SubstreamPath & substream_path) -> ReadBuffer * { - if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != ISerialization::Substream::ArraySizes)) + bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes; + if (only_offsets && !is_offsets) return nullptr; return data_buffer; diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.cpp b/src/Storages/MergeTree/MergeTreeReaderWide.cpp index 0f5cf8de669..fa059506954 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderWide.cpp @@ -15,7 +15,6 @@ namespace DB namespace { - using OffsetColumns = std::map; constexpr auto DATA_FILE_EXTENSION = ".bin"; } @@ -291,6 +290,7 @@ void MergeTreeReaderWide::readData( /* seek_to_start = */false, substream_path, streams, name_and_type, from_mark, seek_to_mark, current_task_last_mark, cache); }; + deserialize_settings.continuous_reading = continue_reading; auto & deserialize_state = deserialize_binary_bulk_state_map[name]; diff --git a/src/Storages/System/StorageSystemPartsColumns.cpp b/src/Storages/System/StorageSystemPartsColumns.cpp index 7f648054da2..5ffca62d9c3 100644 --- a/src/Storages/System/StorageSystemPartsColumns.cpp +++ b/src/Storages/System/StorageSystemPartsColumns.cpp @@ -242,7 +242,7 @@ void StorageSystemPartsColumns::processNextStorage( IDataType::forEachSubcolumn([&](const auto & subpath, const auto & name, const auto & data) { /// We count only final subcolumns, which are represented by files on disk - /// and skip intermediate suibcolumns of types Tuple and Nested. + /// and skip intermediate subcolumns of types Tuple and Nested. if (isTuple(data.type) || isNested(data.type)) return; @@ -270,7 +270,7 @@ void StorageSystemPartsColumns::processNextStorage( subcolumn_data_uncompressed_bytes.push_back(size.data_uncompressed); subcolumn_marks_bytes.push_back(size.marks); - }, { serialization, column.type, nullptr, nullptr }); + }, ISerialization::SubstreamData(serialization).withType(column.type)); if (columns_mask[src_index++]) columns[res_index++]->insert(subcolumn_names); From 464818c142766787854d64fcad41869b71d37460 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 2 Sep 2022 15:05:58 +0000 Subject: [PATCH 04/12] try to fix filling of missed Nested columns with multiple levels --- src/Storages/MergeTree/IMergeTreeReader.cpp | 39 ++++++++--- src/Storages/MergeTree/IMergeTreeReader.h | 2 +- .../MergeTree/MergeTreeReaderCompact.cpp | 68 ++++++++++--------- .../MergeTree/MergeTreeReaderCompact.h | 1 + .../MergeTree/MergeTreeReaderInMemory.cpp | 8 +-- 5 files changed, 74 insertions(+), 44 deletions(-) diff --git a/src/Storages/MergeTree/IMergeTreeReader.cpp b/src/Storages/MergeTree/IMergeTreeReader.cpp index 36a1e5e4b55..b77975ff9ad 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.cpp +++ b/src/Storages/MergeTree/IMergeTreeReader.cpp @@ -204,17 +204,40 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const } } -IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const String & column_name) const +IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const NameAndTypePair & required_column) const { - String table_name = Nested::extractTableName(column_name); + auto get_offset_streams = [](const auto & serialization, const auto & name_in_storage) + { + NameSet offset_streams; + serialization->enumerateStreams([&](const auto & subpath) + { + if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes) + return; + + auto subname = ISerialization::getSubcolumnNameForStream(subpath); + auto full_name = Nested::concatenateName(name_in_storage, subname); + offset_streams.insert(full_name); + }); + + return offset_streams; + }; + + auto required_name_in_storage = Nested::extractTableName(required_column.getNameInStorage()); + auto required_offset_streams = get_offset_streams(getSerializationInPart(required_column), required_name_in_storage); + for (const auto & part_column : data_part->getColumns()) { - if (typeid_cast(part_column.type.get())) - { - auto position = data_part->getColumnPosition(part_column.getNameInStorage()); - if (position && Nested::extractTableName(part_column.name) == table_name) - return position; - } + auto name_in_storage = Nested::extractTableName(part_column.name); + if (name_in_storage != required_name_in_storage) + continue; + + auto offset_streams = get_offset_streams(data_part->getSerialization(part_column.name), name_in_storage); + + bool has_all_offsets = std::all_of(required_offset_streams.begin(), required_offset_streams.end(), + [&](const auto & stream_name) { return offset_streams.contains(stream_name); }); + + if (has_all_offsets) + return data_part->getColumnPosition(part_column.name); } return {}; diff --git a/src/Storages/MergeTree/IMergeTreeReader.h b/src/Storages/MergeTree/IMergeTreeReader.h index 453563522a5..fa1bf988fbf 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.h +++ b/src/Storages/MergeTree/IMergeTreeReader.h @@ -91,7 +91,7 @@ protected: MarkRanges all_mark_ranges; using ColumnPosition = std::optional; - ColumnPosition findColumnForOffsets(const String & column_name) const; + ColumnPosition findColumnForOffsets(const NameAndTypePair & column) const; private: /// Alter conversions, which must be applied on fly if required diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index 28ec2eff56f..87e0d75a871 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -46,37 +46,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact( { try { - size_t columns_num = columns_to_read.size(); - - column_positions.resize(columns_num); - read_only_offsets.resize(columns_num); - - for (size_t i = 0; i < columns_num; ++i) - { - const auto & column_to_read = columns_to_read[i]; - - auto position = data_part->getColumnPosition(column_to_read.getNameInStorage()); - bool is_array = isArray(column_to_read.type); - - if (column_to_read.isSubcolumn()) - { - auto storage_column_from_part = getColumnInPart( - {column_to_read.getNameInStorage(), column_to_read.getTypeInStorage()}); - - auto subcolumn_name = column_to_read.getSubcolumnName(); - if (!storage_column_from_part.type->hasSubcolumn(subcolumn_name)) - position.reset(); - } - else if (!position && is_array) - { - /// If array of Nested column is missing in part, - /// we have to read its offsets if they exist. - position = findColumnForOffsets(column_to_read.name); - read_only_offsets[i] = (position != std::nullopt); - } - - column_positions[i] = std::move(position); - } + fillColumnPositions(); /// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data. auto buffer_size = getReadBufferSize(data_part, marks_loader, column_positions, all_mark_ranges); @@ -139,6 +109,42 @@ MergeTreeReaderCompact::MergeTreeReaderCompact( } } +void MergeTreeReaderCompact::fillColumnPositions() +{ + size_t columns_num = columns_to_read.size(); + + column_positions.resize(columns_num); + read_only_offsets.resize(columns_num); + + for (size_t i = 0; i < columns_num; ++i) + { + const auto & column_to_read = columns_to_read[i]; + + auto position = data_part->getColumnPosition(column_to_read.getNameInStorage()); + bool is_array = isArray(column_to_read.type); + + if (column_to_read.isSubcolumn()) + { + auto storage_column_from_part = getColumnInPart( + {column_to_read.getNameInStorage(), column_to_read.getTypeInStorage()}); + + auto subcolumn_name = column_to_read.getSubcolumnName(); + if (!storage_column_from_part.type->hasSubcolumn(subcolumn_name)) + position.reset(); + } + + if (!position && is_array) + { + /// If array of Nested column is missing in part, + /// we have to read its offsets if they exist. + position = findColumnForOffsets(column_to_read); + read_only_offsets[i] = (position != std::nullopt); + } + + column_positions[i] = std::move(position); + } +} + size_t MergeTreeReaderCompact::readRows( size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) { diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.h b/src/Storages/MergeTree/MergeTreeReaderCompact.h index aa0eb949aa1..4c5f1231063 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.h +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.h @@ -39,6 +39,7 @@ public: private: bool isContinuousReading(size_t mark, size_t column_position); + void fillColumnPositions(); ReadBuffer * data_buffer; CompressedReadBufferBase * compressed_data_buffer; diff --git a/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp b/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp index 766c28c99b9..568e1cecf02 100644 --- a/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp @@ -32,13 +32,13 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory( {}) , part_in_memory(std::move(data_part_)) { - for (const auto & [name, type] : columns_to_read) + for (const auto & column_to_read : columns_to_read) { /// If array of Nested column is missing in part, /// we have to read its offsets if they exist. - if (!part_in_memory->block.has(name) && typeid_cast(type.get())) - if (auto offset_position = findColumnForOffsets(name)) - positions_for_offsets[name] = *offset_position; + if (!part_in_memory->block.has(column_to_read.name) && typeid_cast(column_to_read.type.get())) + if (auto offset_position = findColumnForOffsets(column_to_read)) + positions_for_offsets[column_to_read.name] = *offset_position; } } From dd776eb3d58c239a4b8c6db5817576c0ac446ca5 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 2 Sep 2022 16:18:50 +0000 Subject: [PATCH 05/12] fix enumerateStreams --- src/DataTypes/Serializations/SerializationLowCardinality.cpp | 5 +++-- src/DataTypes/Serializations/SerializationNullable.cpp | 5 +++-- src/DataTypes/Serializations/SerializationSparse.cpp | 5 +++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/DataTypes/Serializations/SerializationLowCardinality.cpp b/src/DataTypes/Serializations/SerializationLowCardinality.cpp index 528cd13ddf3..dfe0188c8e7 100644 --- a/src/DataTypes/Serializations/SerializationLowCardinality.cpp +++ b/src/DataTypes/Serializations/SerializationLowCardinality.cpp @@ -48,12 +48,13 @@ void SerializationLowCardinality::enumerateStreams( const auto * column_lc = data.column ? &getColumnLowCardinality(*data.column) : nullptr; settings.path.push_back(Substream::DictionaryKeys); - settings.path.back().data = SubstreamData(dict_inner_serialization) + auto dict_data = SubstreamData(dict_inner_serialization) .withType(data.type ? dictionary_type : nullptr) .withColumn(column_lc ? column_lc->getDictionary().getNestedColumn() : nullptr) .withSerializationInfo(data.serialization_info); - dict_inner_serialization->enumerateStreams(settings, callback, settings.path.back().data); + settings.path.back().data = dict_data; + dict_inner_serialization->enumerateStreams(settings, callback, dict_data); settings.path.back() = Substream::DictionaryIndexes; settings.path.back().data = data; diff --git a/src/DataTypes/Serializations/SerializationNullable.cpp b/src/DataTypes/Serializations/SerializationNullable.cpp index 47780f67800..560b73bc827 100644 --- a/src/DataTypes/Serializations/SerializationNullable.cpp +++ b/src/DataTypes/Serializations/SerializationNullable.cpp @@ -48,15 +48,16 @@ void SerializationNullable::enumerateStreams( auto null_map_serialization = std::make_shared(std::make_shared>(), "null", false); settings.path.push_back(Substream::NullMap); - settings.path.back().data = SubstreamData(null_map_serialization) + auto null_map_data = SubstreamData(null_map_serialization) .withType(type_nullable ? std::make_shared() : nullptr) .withColumn(column_nullable ? column_nullable->getNullMapColumnPtr() : nullptr) .withSerializationInfo(data.serialization_info); + settings.path.back().data = null_map_data; callback(settings.path); settings.path.back() = Substream::NullableElements; - settings.path.back().creator = std::make_shared(settings.path.back().data.column); + settings.path.back().creator = std::make_shared(null_map_data.column); settings.path.back().data = data; auto next_data = SubstreamData(nested) diff --git a/src/DataTypes/Serializations/SerializationSparse.cpp b/src/DataTypes/Serializations/SerializationSparse.cpp index 70f9a03f510..855bdfa1b3e 100644 --- a/src/DataTypes/Serializations/SerializationSparse.cpp +++ b/src/DataTypes/Serializations/SerializationSparse.cpp @@ -156,15 +156,16 @@ void SerializationSparse::enumerateStreams( size_t column_size = column_sparse ? column_sparse->size() : 0; settings.path.push_back(Substream::SparseOffsets); - settings.path.back().data = SubstreamData(std::make_shared>()) + auto offsets_data = SubstreamData(std::make_shared>()) .withType(data.type ? std::make_shared() : nullptr) .withColumn(column_sparse ? column_sparse->getOffsetsPtr() : nullptr) .withSerializationInfo(data.serialization_info); + settings.path.back().data = offsets_data; callback(settings.path); settings.path.back() = Substream::SparseElements; - settings.path.back().creator = std::make_shared(settings.path.back().data.column, column_size); + settings.path.back().creator = std::make_shared(offsets_data.column, column_size); settings.path.back().data = data; auto next_data = SubstreamData(nested) From 25e3bebd9d3cb64d8e42172f6f46ec489261109f Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Mon, 29 Aug 2022 20:36:13 +0200 Subject: [PATCH 06/12] Rework core collecting on CI (eliminate gcore usage) gcore is a gdb command, that internally uses gdb to dump the core. However with proper configuration of limits (core_dump.size_limit) it should not be required, althought some issues is possible: - non standard kernel.core_pattern - sanitizers So yes, gcore is more "universal" (you don't need to configure any `kernel_pattern`), but it is ad-hoc, and it has drawbacks - **it does not work when gdb fails**. For example gdb may fail with `Dwarf Error: DW_FORM_strx1 found in non-DWO CU` in case of DWARF-5 [1]. [1]: https://github.com/ClickHouse/ClickHouse/pull/40772#issuecomment-1236331323. Let's try to switch to more native way. Signed-off-by: Azat Khuzhin --- docker/test/fuzzer/run-fuzzer.sh | 23 +++++++++++++++++++++-- docker/test/stress/run.sh | 20 +++++++++++++++++--- tests/ci/ast_fuzzer_check.py | 6 +++++- tests/ci/stress_check.py | 2 +- 4 files changed, 44 insertions(+), 7 deletions(-) diff --git a/docker/test/fuzzer/run-fuzzer.sh b/docker/test/fuzzer/run-fuzzer.sh index 11ddb0bd2d3..93e38260395 100755 --- a/docker/test/fuzzer/run-fuzzer.sh +++ b/docker/test/fuzzer/run-fuzzer.sh @@ -1,8 +1,15 @@ #!/bin/bash # shellcheck disable=SC2086,SC2001,SC2046,SC2030,SC2031 -set -eux +set -x + +# core.COMM.PID-TID +sysctl kernel.core_pattern='core.%e.%p-%P' + +set -e +set -u set -o pipefail + trap "exit" INT TERM # The watchdog is in the separate process group, so we have to kill it separately # if the script terminates earlier. @@ -87,6 +94,19 @@ function configure # TODO figure out which ones are needed cp -av --dereference "$repo_dir"/tests/config/config.d/listen.xml db/config.d cp -av --dereference "$script_dir"/query-fuzzer-tweaks-users.xml db/users.d + + cat > db/config.d/core.xml < + + + 107374182400 + + + $PWD + +EOL } function watchdog @@ -180,7 +200,6 @@ handle SIGUSR2 nostop noprint pass handle SIG$RTMIN nostop noprint pass info signals continue -gcore backtrace full thread apply all backtrace full info registers diff --git a/docker/test/stress/run.sh b/docker/test/stress/run.sh index f8ecdf1aa21..a28785084f1 100755 --- a/docker/test/stress/run.sh +++ b/docker/test/stress/run.sh @@ -5,6 +5,9 @@ set -x +# core.COMM.PID-TID +sysctl kernel.core_pattern='core.%e.%p-%P' + # Thread Fuzzer allows to check more permutations of possible thread scheduling # and find more potential issues. @@ -99,6 +102,19 @@ EOL +EOL + + cat > /etc/clickhouse-server/config.d/core.xml < + + + 107374182400 + + + $PWD + EOL } @@ -155,7 +171,6 @@ handle SIGUSR2 nostop noprint pass handle SIG$RTMIN nostop noprint pass info signals continue -gcore backtrace full thread apply all backtrace full info registers @@ -467,8 +482,7 @@ done clickhouse-local --structure "test String, res String" -q "SELECT 'failure', test FROM table WHERE res != 'OK' order by (lower(test) like '%hung%'), rowNumberInAllBlocks() LIMIT 1" < /test_output/test_results.tsv > /test_output/check_status.tsv [ -s /test_output/check_status.tsv ] || echo -e "success\tNo errors found" > /test_output/check_status.tsv -# Core dumps (see gcore) -# Default filename is 'core.PROCESS_ID' +# Core dumps for core in core.*; do pigz $core mv $core.gz /test_output/ diff --git a/tests/ci/ast_fuzzer_check.py b/tests/ci/ast_fuzzer_check.py index 9f3ddbe9932..0e41aaf8fba 100644 --- a/tests/ci/ast_fuzzer_check.py +++ b/tests/ci/ast_fuzzer_check.py @@ -29,7 +29,11 @@ IMAGE_NAME = "clickhouse/fuzzer" def get_run_command(pr_number, sha, download_url, workspace_path, image): return ( - f"docker run --network=host --volume={workspace_path}:/workspace " + f"docker run " + # For sysctl + "--privileged " + "--network=host " + f"--volume={workspace_path}:/workspace " "--cap-add syslog --cap-add sys_admin --cap-add=SYS_PTRACE " f'-e PR_TO_TEST={pr_number} -e SHA_TO_TEST={sha} -e BINARY_URL_TO_DOWNLOAD="{download_url}" ' f"{image}" diff --git a/tests/ci/stress_check.py b/tests/ci/stress_check.py index e644eef3bc8..8f310eaa99d 100644 --- a/tests/ci/stress_check.py +++ b/tests/ci/stress_check.py @@ -33,7 +33,7 @@ def get_run_command( "docker run --cap-add=SYS_PTRACE " # a static link, don't use S3_URL or S3_DOWNLOAD "-e S3_URL='https://s3.amazonaws.com/clickhouse-datasets' " - # For dmesg + # For dmesg and sysctl "--privileged " f"--volume={build_path}:/package_folder " f"--volume={result_folder}:/test_output " From f76c1482bd148d5282222eff468bccc350421b3a Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 6 Sep 2022 13:56:32 +0000 Subject: [PATCH 07/12] try to fix filling of missed Nested columns with multiple levels --- src/Interpreters/inplaceBlockConversions.cpp | 179 ++++++++++-------- src/Interpreters/inplaceBlockConversions.h | 2 + src/Storages/MergeTree/IMergeTreeReader.cpp | 37 ++-- src/Storages/MergeTree/IMergeTreeReader.h | 2 + .../MergeTree/MergeTreeReaderCompact.cpp | 2 + .../MergeTree/MergeTreeReaderWide.cpp | 13 ++ src/Storages/StorageMemory.cpp | 2 +- .../0_stateless/01825_type_json_17.reference | 20 ++ .../0_stateless/01825_type_json_17.sql | 32 +++- 9 files changed, 191 insertions(+), 98 deletions(-) diff --git a/src/Interpreters/inplaceBlockConversions.cpp b/src/Interpreters/inplaceBlockConversions.cpp index e4f1b46fc91..116cd91c7cf 100644 --- a/src/Interpreters/inplaceBlockConversions.cpp +++ b/src/Interpreters/inplaceBlockConversions.cpp @@ -188,49 +188,13 @@ ActionsDAGPtr evaluateMissingDefaults( return createExpressions(header, expr_list, save_unneeded_columns, context); } -static bool arrayHasNoElementsRead(const IColumn & column) +static std::unordered_map collectOffsetsColumns( + const NamesAndTypesList & available_columns, const Columns & res_columns) { - const auto * column_array = typeid_cast(&column); + std::unordered_map offsets_columns; - if (!column_array) - return false; - - size_t size = column_array->size(); - if (!size) - return false; - - const auto & array_data = column_array->getData(); - if (const auto * nested_array = typeid_cast(&array_data)) - return arrayHasNoElementsRead(*nested_array); - - if (!array_data.empty()) - return false; - - size_t last_offset = column_array->getOffsets()[size - 1]; - return last_offset != 0; -} - -void fillMissingColumns( - Columns & res_columns, - size_t num_rows, - const NamesAndTypesList & requested_columns, - const NamesAndTypesList & available_columns, - StorageMetadataPtr metadata_snapshot) -{ - size_t num_columns = requested_columns.size(); - if (num_columns != res_columns.size()) - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Invalid number of columns passed to fillMissingColumns. Expected {}, got {}", - num_columns, res_columns.size()); - - /// For a missing column of a nested data structure we must create not a column of empty - /// arrays, but a column of arrays of correct length. - - /// First, collect offset columns for all arrays in the block. - - std::unordered_map offset_columns; auto available_column = available_columns.begin(); - for (size_t i = 0; i < num_columns; ++i, ++available_column) + for (size_t i = 0; i < available_columns.size(); ++i, ++available_column) { if (res_columns[i] == nullptr || isColumnConst(*res_columns[i])) continue; @@ -243,75 +207,122 @@ void fillMissingColumns( if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes) return; - auto subname = ISerialization::getSubcolumnNameForStream(subpath); - auto & offsets_column = offset_columns[Nested::concatenateName(name_in_storage, subname)]; + const auto & current_offsets_column = subpath.back().data.column; /// If for some reason multiple offsets columns are present /// for the same nested data structure, choose the one that is not empty. - if (!offsets_column || offsets_column->empty()) - offsets_column = subpath.back().data.column; + if (current_offsets_column && !current_offsets_column->empty()) + { + auto subname = ISerialization::getSubcolumnNameForStream(subpath); + auto full_name = Nested::concatenateName(name_in_storage, subname); + auto & offsets_column = offsets_columns[full_name]; + if (!offsets_column) + offsets_column = current_offsets_column; + + #ifndef NDEBUG + const auto & offsets_data = assert_cast(*offsets_column).getData(); + const auto & current_offsets_data = assert_cast(*current_offsets_column).getData(); + + if (offsets_data != current_offsets_data) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Found non-equal columns with offsets (sizes: {} and {}) for stream {}", + offsets_data.size(), current_offsets_data.size(), full_name); + #endif + } }, available_column->type, res_columns[i]); } + return offsets_columns; +} + +void fillMissingColumns( + Columns & res_columns, + size_t num_rows, + const NamesAndTypesList & requested_columns, + const NamesAndTypesList & available_columns, + const NameSet & partially_read_columns, + StorageMetadataPtr metadata_snapshot) +{ + size_t num_columns = requested_columns.size(); + if (num_columns != res_columns.size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Invalid number of columns passed to fillMissingColumns. Expected {}, got {}", + num_columns, res_columns.size()); + + /// For a missing column of a nested data structure + /// we must create not a column of empty arrays, + /// but a column of arrays of correct length. + + /// First, collect offset columns for all arrays in the block. + auto offset_columns = collectOffsetsColumns(available_columns, res_columns); + /// Insert default values only for columns without default expressions. auto requested_column = requested_columns.begin(); for (size_t i = 0; i < num_columns; ++i, ++requested_column) { const auto & [name, type] = *requested_column; - if (res_columns[i] && arrayHasNoElementsRead(*res_columns[i])) + if (res_columns[i] && partially_read_columns.contains(name)) res_columns[i] = nullptr; - if (res_columns[i] == nullptr) + if (res_columns[i]) + continue; + + if (metadata_snapshot && metadata_snapshot->getColumns().hasDefault(name)) + continue; + + std::vector current_offsets; + size_t num_dimensions = 0; + + if (const auto * array_type = typeid_cast(type.get())) { - if (metadata_snapshot && metadata_snapshot->getColumns().hasDefault(name)) - continue; + num_dimensions = getNumberOfDimensions(*array_type); + current_offsets.resize(num_dimensions); - std::vector current_offsets; - bool has_all_offsets = true; + auto serialization = IDataType::getSerialization(*requested_column); + auto name_in_storage = Nested::extractTableName(requested_column->name); - const auto * array_type = typeid_cast(type.get()); - if (array_type) + serialization->enumerateStreams([&](const auto & subpath) { - auto serialization = IDataType::getSerialization(*requested_column); - auto name_in_storage = Nested::extractTableName(requested_column->name); + if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes) + return; - serialization->enumerateStreams([&](const auto & subpath) + size_t level = ISerialization::getArrayLevel(subpath); + assert(level < num_dimensions); + + auto subname = ISerialization::getSubcolumnNameForStream(subpath); + auto it = offset_columns.find(Nested::concatenateName(name_in_storage, subname)); + if (it != offset_columns.end()) + current_offsets[level] = it->second; + }); + + for (size_t j = 0; j < num_dimensions; ++j) + { + if (!current_offsets[j]) { - if (!has_all_offsets) - return; - - if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes) - return; - - auto subname = ISerialization::getSubcolumnNameForStream(subpath); - auto it = offset_columns.find(Nested::concatenateName(name_in_storage, subname)); - if (it != offset_columns.end()) - current_offsets.emplace_back(it->second); - else - has_all_offsets = false; - - }, type); + current_offsets.resize(j); + break; + } } + } - if (array_type && has_all_offsets) - { - assert(!current_offsets.empty()); - auto scalar_type = getBaseTypeOfArray(type); + if (!current_offsets.empty()) + { + size_t num_empty_dimensions = num_dimensions - current_offsets.size(); + auto scalar_type = createArrayOfType(getBaseTypeOfArray(type), num_empty_dimensions); - size_t data_size = assert_cast(*current_offsets.back()).getData().back(); - res_columns[i] = scalar_type->createColumnConstWithDefaultValue(data_size)->convertToFullColumnIfConst(); + size_t data_size = assert_cast(*current_offsets.back()).getData().back(); + res_columns[i] = scalar_type->createColumnConstWithDefaultValue(data_size)->convertToFullColumnIfConst(); - for (auto it = current_offsets.rbegin(); it != current_offsets.rend(); ++it) - res_columns[i] = ColumnArray::create(res_columns[i], *it); - } - else - { - /// We must turn a constant column into a full column because the interpreter could infer - /// that it is constant everywhere but in some blocks (from other parts) it can be a full column. - res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst(); - } + for (auto it = current_offsets.rbegin(); it != current_offsets.rend(); ++it) + res_columns[i] = ColumnArray::create(res_columns[i], *it); + } + else + { + /// We must turn a constant column into a full column because the interpreter could infer + /// that it is constant everywhere but in some blocks (from other parts) it can be a full column. + res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst(); } } } diff --git a/src/Interpreters/inplaceBlockConversions.h b/src/Interpreters/inplaceBlockConversions.h index 70187d5aace..bea44bf6db9 100644 --- a/src/Interpreters/inplaceBlockConversions.h +++ b/src/Interpreters/inplaceBlockConversions.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -44,6 +45,7 @@ void fillMissingColumns( size_t num_rows, const NamesAndTypesList & requested_columns, const NamesAndTypesList & available_columns, + const NameSet & partially_read_columns, StorageMetadataPtr metadata_snapshot); } diff --git a/src/Storages/MergeTree/IMergeTreeReader.cpp b/src/Storages/MergeTree/IMergeTreeReader.cpp index b77975ff9ad..73f9cc8b75d 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.cpp +++ b/src/Storages/MergeTree/IMergeTreeReader.cpp @@ -65,7 +65,7 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e try { NamesAndTypesList available_columns(columns_to_read.begin(), columns_to_read.end()); - DB::fillMissingColumns(res_columns, num_rows, requested_columns, available_columns, metadata_snapshot); + DB::fillMissingColumns(res_columns, num_rows, requested_columns, available_columns, partially_read_columns, metadata_snapshot); should_evaluate_missing_defaults = std::any_of( res_columns.begin(), res_columns.end(), [](const auto & column) { return column == nullptr; }); @@ -206,9 +206,9 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const NameAndTypePair & required_column) const { - auto get_offset_streams = [](const auto & serialization, const auto & name_in_storage) + auto get_offsets_streams = [](const auto & serialization, const auto & name_in_storage) { - NameSet offset_streams; + Names offsets_streams; serialization->enumerateStreams([&](const auto & subpath) { if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes) @@ -216,31 +216,44 @@ IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const Na auto subname = ISerialization::getSubcolumnNameForStream(subpath); auto full_name = Nested::concatenateName(name_in_storage, subname); - offset_streams.insert(full_name); + offsets_streams.push_back(full_name); }); - return offset_streams; + return offsets_streams; }; auto required_name_in_storage = Nested::extractTableName(required_column.getNameInStorage()); - auto required_offset_streams = get_offset_streams(getSerializationInPart(required_column), required_name_in_storage); + auto required_offsets_streams = get_offsets_streams(getSerializationInPart(required_column), required_name_in_storage); + size_t max_matched_streams = 0; + ColumnPosition position; + + /// Find column that has maximal number of matching + /// offsets columns with required_column. for (const auto & part_column : data_part->getColumns()) { auto name_in_storage = Nested::extractTableName(part_column.name); if (name_in_storage != required_name_in_storage) continue; - auto offset_streams = get_offset_streams(data_part->getSerialization(part_column.name), name_in_storage); + auto offsets_streams = get_offsets_streams(data_part->getSerialization(part_column.name), name_in_storage); + NameSet offsets_streams_set(offsets_streams.begin(), offsets_streams.end()); - bool has_all_offsets = std::all_of(required_offset_streams.begin(), required_offset_streams.end(), - [&](const auto & stream_name) { return offset_streams.contains(stream_name); }); + size_t i = 0; + for (; i < required_offsets_streams.size(); ++i) + { + if (!offsets_streams_set.contains(required_offsets_streams[i])) + break; + } - if (has_all_offsets) - return data_part->getColumnPosition(part_column.name); + if (i && (!position || i > max_matched_streams)) + { + max_matched_streams = i; + position = data_part->getColumnPosition(part_column.name); + } } - return {}; + return position; } void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const diff --git a/src/Storages/MergeTree/IMergeTreeReader.h b/src/Storages/MergeTree/IMergeTreeReader.h index fa1bf988fbf..c2fab33bcf9 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.h +++ b/src/Storages/MergeTree/IMergeTreeReader.h @@ -93,6 +93,8 @@ protected: using ColumnPosition = std::optional; ColumnPosition findColumnForOffsets(const NameAndTypePair & column) const; + NameSet partially_read_columns; + private: /// Alter conversions, which must be applied on fly if required MergeTreeData::AlterConversions alter_conversions; diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index 87e0d75a871..440879a9340 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -142,6 +142,8 @@ void MergeTreeReaderCompact::fillColumnPositions() } column_positions[i] = std::move(position); + if (read_only_offsets[i]) + partially_read_columns.insert(column_to_read.name); } } diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.cpp b/src/Storages/MergeTree/MergeTreeReaderWide.cpp index abc3816c386..f1124c674c1 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderWide.cpp @@ -159,12 +159,18 @@ void MergeTreeReaderWide::addStreams( const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type) { + bool has_any_stream = false; + bool has_all_streams = true; + ISerialization::StreamCallback callback = [&] (const ISerialization::SubstreamPath & substream_path) { String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path); if (streams.contains(stream_name)) + { + has_any_stream = true; return; + } bool data_file_exists = data_part->checksums.files.contains(stream_name + DATA_FILE_EXTENSION); @@ -172,8 +178,12 @@ void MergeTreeReaderWide::addStreams( * It is necessary since it allows to add new column to structure of the table without creating new files for old parts. */ if (!data_file_exists) + { + has_all_streams = false; return; + } + has_any_stream = true; bool is_lc_dict = substream_path.size() > 1 && substream_path[substream_path.size() - 2].type == ISerialization::Substream::Type::DictionaryKeys; streams.emplace(stream_name, std::make_unique( @@ -185,6 +195,9 @@ void MergeTreeReaderWide::addStreams( }; serialization->enumerateStreams(callback); + + if (has_any_stream && !has_all_streams) + partially_read_columns.insert(name_and_type.name); } diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index d8a46b07102..e4dbfe15095 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -95,7 +95,7 @@ protected: ++name_and_type; } - fillMissingColumns(columns, src.rows(), column_names_and_types, column_names_and_types, /*metadata_snapshot=*/ nullptr); + fillMissingColumns(columns, src.rows(), column_names_and_types, column_names_and_types, {}, nullptr); assert(std::all_of(columns.begin(), columns.end(), [](const auto & column) { return column != nullptr; })); return Chunk(std::move(columns), src.rows()); diff --git a/tests/queries/0_stateless/01825_type_json_17.reference b/tests/queries/0_stateless/01825_type_json_17.reference index 96e58224f32..0f97bfed5bc 100644 --- a/tests/queries/0_stateless/01825_type_json_17.reference +++ b/tests/queries/0_stateless/01825_type_json_17.reference @@ -3,5 +3,25 @@ Tuple(arr Nested(k1 Nested(k2 String, k3 String, k4 Int8), k5 Tuple(k6 String)), {"obj":{"arr":[{"k1":[{"k2":"","k3":"ddd","k4":10},{"k2":"","k3":"","k4":20}],"k5":{"k6":"foo"}}],"id":2}} [['bbb','']] [['aaa','ccc']] [['ddd','']] [['','']] +1 [[0,0]] [[10,20]] +Tuple(arr Nested(k1 Nested(k2 String, k3 Nested(k4 Int8))), id Int8) +{"obj":{"arr":[{"k1":[{"k2":"aaa","k3":[]}]}],"id":1}} +{"obj":{"arr":[{"k1":[{"k2":"bbb","k3":[{"k4":10}]},{"k2":"ccc","k3":[{"k4":20}]}]}],"id":2}} +[['aaa']] [[[]]] +[['bbb','ccc']] [[[10],[20]]] +1 +[[[]]] +[[[10],[20]]] +Tuple(arr Nested(k1 Nested(k2 String, k4 Nested(k5 Int8)), k3 String), id Int8) +{"obj":{"arr":[{"k1":[],"k3":"qqq"},{"k1":[],"k3":"www"}],"id":1}} +{"obj":{"arr":[{"k1":[{"k2":"aaa","k4":[]}],"k3":"eee"}],"id":2}} +{"obj":{"arr":[{"k1":[{"k2":"bbb","k4":[{"k5":10}]},{"k2":"ccc","k4":[{"k5":20}]}],"k3":"rrr"}],"id":3}} +['qqq','www'] [[],[]] [[],[]] +['eee'] [['aaa']] [[[]]] +['rrr'] [['bbb','ccc']] [[[10],[20]]] +1 +[[],[]] +[[[]]] +[[[10],[20]]] diff --git a/tests/queries/0_stateless/01825_type_json_17.sql b/tests/queries/0_stateless/01825_type_json_17.sql index b34357f8ef1..e3c0c83322b 100644 --- a/tests/queries/0_stateless/01825_type_json_17.sql +++ b/tests/queries/0_stateless/01825_type_json_17.sql @@ -7,12 +7,42 @@ SET output_format_json_named_tuples_as_objects = 1; CREATE TABLE t_json_17(obj JSON) ENGINE = MergeTree ORDER BY tuple(); +DROP FUNCTION IF EXISTS hasValidSizes17; +CREATE FUNCTION hasValidSizes17 AS (arr1, arr2) -> length(arr1) = length(arr2) AND arrayAll((x, y) -> length(x) = length(y), arr1, arr2); + +SYSTEM STOP MERGES t_json_17; + INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k1": [{"k2": "aaa", "k3": "bbb"}, {"k2": "ccc"}]}]} INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k3": "ddd", "k4": 10}, {"k4": 20}], "k5": {"k6": "foo"}}]} SELECT toTypeName(obj) FROM t_json_17 LIMIT 1; SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow; SELECT obj.arr.k1.k3, obj.arr.k1.k2 FROM t_json_17 ORDER BY obj.id; +SELECT sum(hasValidSizes17(obj.arr.k1.k3, obj.arr.k1.k2)) == count() FROM t_json_17; SELECT obj.arr.k1.k4 FROM t_json_17 ORDER BY obj.id; -DROP TABLE IF EXISTS t_json_17; +TRUNCATE TABLE t_json_17; + +INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k1": [{"k2": "aaa"}]}]} +INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k2": "bbb", "k3": [{"k4": 10}]}, {"k2": "ccc", "k3": [{"k4": 20}]}]}]} + +SELECT toTypeName(obj) FROM t_json_17 LIMIT 1; +SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow; +SELECT obj.arr.k1.k2, obj.arr.k1.k3.k4 FROM t_json_17 ORDER BY obj.id; +SELECT sum(hasValidSizes17(obj.arr.k1.k2, obj.arr.k1.k3.k4)) == count() FROM t_json_17; +SELECT obj.arr.k1.k3.k4 FROM t_json_17 ORDER BY obj.id; + +TRUNCATE TABLE t_json_17; + +INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k3": "qqq"}, {"k3": "www"}]} +INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k2": "aaa"}], "k3": "eee"}]} +INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 3, "arr": [{"k1": [{"k2": "bbb", "k4": [{"k5": 10}]}, {"k2": "ccc", "k4": [{"k5": 20}]}], "k3": "rrr"}]} + +SELECT toTypeName(obj) FROM t_json_17 LIMIT 1; +SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow; +SELECT obj.arr.k3, obj.arr.k1.k2, obj.arr.k1.k4.k5 FROM t_json_17 ORDER BY obj.id; +SELECT sum(hasValidSizes17(obj.arr.k1.k2, obj.arr.k1.k4.k5)) == count() FROM t_json_17; +SELECT obj.arr.k1.k4.k5 FROM t_json_17 ORDER BY obj.id; + +DROP FUNCTION hasValidSizes17; +DROP TABLE t_json_17; From ce9b76f41649269cc05a680fbb26f1bf690d520f Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 6 Sep 2022 15:01:47 +0000 Subject: [PATCH 08/12] fix Nested in in-memory parts --- src/Storages/MergeTree/MergeTreeReaderInMemory.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp b/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp index 568e1cecf02..06f5785c868 100644 --- a/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp @@ -37,8 +37,13 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory( /// If array of Nested column is missing in part, /// we have to read its offsets if they exist. if (!part_in_memory->block.has(column_to_read.name) && typeid_cast(column_to_read.type.get())) - if (auto offset_position = findColumnForOffsets(column_to_read)) - positions_for_offsets[column_to_read.name] = *offset_position; + { + if (auto offsets_position = findColumnForOffsets(column_to_read)) + { + positions_for_offsets[column_to_read.name] = *offsets_position; + partially_read_columns.insert(column_to_read.name); + } + } } } From 9def86779508bc4b086b57a1874825ca755e2293 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 6 Sep 2022 17:38:51 +0000 Subject: [PATCH 09/12] fix reading of subcolumns from in-memory parts --- src/Storages/MergeTree/MergeTreeReaderInMemory.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp b/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp index 06f5785c868..ad425a10d30 100644 --- a/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp @@ -36,7 +36,8 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory( { /// If array of Nested column is missing in part, /// we have to read its offsets if they exist. - if (!part_in_memory->block.has(column_to_read.name) && typeid_cast(column_to_read.type.get())) + if (typeid_cast(column_to_read.type.get()) + && !tryGetColumnFromBlock(part_in_memory->block, column_to_read)) { if (auto offsets_position = findColumnForOffsets(column_to_read)) { From 5a790b15b46b304a57779a4c197788ee4bbf27ce Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Thu, 8 Sep 2022 00:20:11 +0000 Subject: [PATCH 10/12] try to fix filling of missed Nested columns with multiple levels --- src/Interpreters/inplaceBlockConversions.cpp | 23 ++++++++------------ src/Storages/MergeTree/IMergeTreeReader.cpp | 6 ++++- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/src/Interpreters/inplaceBlockConversions.cpp b/src/Interpreters/inplaceBlockConversions.cpp index 116cd91c7cf..a4791690f4e 100644 --- a/src/Interpreters/inplaceBlockConversions.cpp +++ b/src/Interpreters/inplaceBlockConversions.cpp @@ -200,23 +200,19 @@ static std::unordered_map collectOffsetsColumns( continue; auto serialization = IDataType::getSerialization(*available_column); - auto name_in_storage = Nested::extractTableName(available_column->name); - serialization->enumerateStreams([&](const auto & subpath) { if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes) return; + auto stream_name = ISerialization::getFileNameForStream(*available_column, subpath); const auto & current_offsets_column = subpath.back().data.column; /// If for some reason multiple offsets columns are present /// for the same nested data structure, choose the one that is not empty. if (current_offsets_column && !current_offsets_column->empty()) { - auto subname = ISerialization::getSubcolumnNameForStream(subpath); - auto full_name = Nested::concatenateName(name_in_storage, subname); - auto & offsets_column = offsets_columns[full_name]; - + auto & offsets_column = offsets_columns[stream_name]; if (!offsets_column) offsets_column = current_offsets_column; @@ -227,7 +223,7 @@ static std::unordered_map collectOffsetsColumns( if (offsets_data != current_offsets_data) throw Exception(ErrorCodes::LOGICAL_ERROR, "Found non-equal columns with offsets (sizes: {} and {}) for stream {}", - offsets_data.size(), current_offsets_data.size(), full_name); + offsets_data.size(), current_offsets_data.size(), stream_name); #endif } }, available_column->type, res_columns[i]); @@ -255,7 +251,7 @@ void fillMissingColumns( /// but a column of arrays of correct length. /// First, collect offset columns for all arrays in the block. - auto offset_columns = collectOffsetsColumns(available_columns, res_columns); + auto offsets_columns = collectOffsetsColumns(available_columns, res_columns); /// Insert default values only for columns without default expressions. auto requested_column = requested_columns.begin(); @@ -275,14 +271,13 @@ void fillMissingColumns( std::vector current_offsets; size_t num_dimensions = 0; - if (const auto * array_type = typeid_cast(type.get())) + const auto * array_type = typeid_cast(type.get()); + if (array_type && !offsets_columns.empty()) { num_dimensions = getNumberOfDimensions(*array_type); current_offsets.resize(num_dimensions); auto serialization = IDataType::getSerialization(*requested_column); - auto name_in_storage = Nested::extractTableName(requested_column->name); - serialization->enumerateStreams([&](const auto & subpath) { if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes) @@ -291,9 +286,9 @@ void fillMissingColumns( size_t level = ISerialization::getArrayLevel(subpath); assert(level < num_dimensions); - auto subname = ISerialization::getSubcolumnNameForStream(subpath); - auto it = offset_columns.find(Nested::concatenateName(name_in_storage, subname)); - if (it != offset_columns.end()) + auto stream_name = ISerialization::getFileNameForStream(*requested_column, subpath); + auto it = offsets_columns.find(stream_name); + if (it != offsets_columns.end()) current_offsets[level] = it->second; }); diff --git a/src/Storages/MergeTree/IMergeTreeReader.cpp b/src/Storages/MergeTree/IMergeTreeReader.cpp index 73f9cc8b75d..521de3515cf 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.cpp +++ b/src/Storages/MergeTree/IMergeTreeReader.cpp @@ -65,7 +65,11 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e try { NamesAndTypesList available_columns(columns_to_read.begin(), columns_to_read.end()); - DB::fillMissingColumns(res_columns, num_rows, requested_columns, available_columns, partially_read_columns, metadata_snapshot); + DB::fillMissingColumns( + res_columns, num_rows, + Nested::convertToSubcolumns(requested_columns), + Nested::convertToSubcolumns(available_columns), + partially_read_columns, metadata_snapshot); should_evaluate_missing_defaults = std::any_of( res_columns.begin(), res_columns.end(), [](const auto & column) { return column == nullptr; }); From fab1b40928ed0ffe6b2e0de120f3612d7e572b29 Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Thu, 8 Sep 2022 15:21:59 +0200 Subject: [PATCH 11/12] docs: mention SYNC modifier for DROP and DETACH statements --- docs/en/sql-reference/statements/detach.md | 4 +++- docs/en/sql-reference/statements/drop.md | 10 +++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/en/sql-reference/statements/detach.md b/docs/en/sql-reference/statements/detach.md index c7089b0714d..aa87b1ef613 100644 --- a/docs/en/sql-reference/statements/detach.md +++ b/docs/en/sql-reference/statements/detach.md @@ -10,7 +10,7 @@ Makes the server "forget" about the existence of a table, a materialized view, o **Syntax** ``` sql -DETACH TABLE|VIEW|DICTIONARY [IF EXISTS] [db.]name [ON CLUSTER cluster] [PERMANENTLY] +DETACH TABLE|VIEW|DICTIONARY [IF EXISTS] [db.]name [ON CLUSTER cluster] [PERMANENTLY] [SYNC] ``` Detaching does not delete the data or metadata of a table, a materialized view or a dictionary. If an entity was not detached `PERMANENTLY`, on the next server launch the server will read the metadata and recall the table/view/dictionary again. If an entity was detached `PERMANENTLY`, there will be no automatic recall. @@ -24,6 +24,8 @@ Note that you can not detach permanently the table which is already detached (te Also you can not [DROP](../../sql-reference/statements/drop#drop-table) the detached table, or [CREATE TABLE](../../sql-reference/statements/create/table.md) with the same name as detached permanently, or replace it with the other table with [RENAME TABLE](../../sql-reference/statements/rename.md) query. +The `SYNC` modifier executes the action without delay. + **Example** Creating a table: diff --git a/docs/en/sql-reference/statements/drop.md b/docs/en/sql-reference/statements/drop.md index 28d379421f1..8a83a8fae1d 100644 --- a/docs/en/sql-reference/statements/drop.md +++ b/docs/en/sql-reference/statements/drop.md @@ -6,7 +6,7 @@ sidebar_label: DROP # DROP Statements -Deletes existing entity. If the `IF EXISTS` clause is specified, these queries do not return an error if the entity does not exist. +Deletes existing entity. If the `IF EXISTS` clause is specified, these queries do not return an error if the entity does not exist. If the `SYNC` modifier is specified, the entity is dropped without delay. ## DROP DATABASE @@ -15,7 +15,7 @@ Deletes all tables inside the `db` database, then deletes the `db` database itse Syntax: ``` sql -DROP DATABASE [IF EXISTS] db [ON CLUSTER cluster] +DROP DATABASE [IF EXISTS] db [ON CLUSTER cluster] [SYNC] ``` ## DROP TABLE @@ -25,7 +25,7 @@ Deletes the table. Syntax: ``` sql -DROP [TEMPORARY] TABLE [IF EXISTS] [db.]name [ON CLUSTER cluster] +DROP [TEMPORARY] TABLE [IF EXISTS] [db.]name [ON CLUSTER cluster] [SYNC] ``` ## DROP DICTIONARY @@ -35,7 +35,7 @@ Deletes the dictionary. Syntax: ``` sql -DROP DICTIONARY [IF EXISTS] [db.]name +DROP DICTIONARY [IF EXISTS] [db.]name [SYNC] ``` ## DROP USER @@ -95,7 +95,7 @@ Deletes a view. Views can be deleted by a `DROP TABLE` command as well but `DROP Syntax: ``` sql -DROP VIEW [IF EXISTS] [db.]name [ON CLUSTER cluster] +DROP VIEW [IF EXISTS] [db.]name [ON CLUSTER cluster] [SYNC] ``` ## DROP FUNCTION From 9d717d62e141e4db2a33a161c41548e79c2b26cd Mon Sep 17 00:00:00 2001 From: DanRoscigno Date: Fri, 9 Sep 2022 14:56:25 -0400 Subject: [PATCH 12/12] fix slug --- docs/zh/development/tests.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/development/tests.md b/docs/zh/development/tests.md index e6d5cf66de9..6f1118e5e63 100644 --- a/docs/zh/development/tests.md +++ b/docs/zh/development/tests.md @@ -1,5 +1,5 @@ --- -slug: /en/development/tests +slug: /zh/development/tests sidebar_position: 70 sidebar_label: Testing title: ClickHouse Testing