diff --git a/docker/test/fuzzer/run-fuzzer.sh b/docker/test/fuzzer/run-fuzzer.sh index 11ddb0bd2d3..93e38260395 100755 --- a/docker/test/fuzzer/run-fuzzer.sh +++ b/docker/test/fuzzer/run-fuzzer.sh @@ -1,8 +1,15 @@ #!/bin/bash # shellcheck disable=SC2086,SC2001,SC2046,SC2030,SC2031 -set -eux +set -x + +# core.COMM.PID-TID +sysctl kernel.core_pattern='core.%e.%p-%P' + +set -e +set -u set -o pipefail + trap "exit" INT TERM # The watchdog is in the separate process group, so we have to kill it separately # if the script terminates earlier. @@ -87,6 +94,19 @@ function configure # TODO figure out which ones are needed cp -av --dereference "$repo_dir"/tests/config/config.d/listen.xml db/config.d cp -av --dereference "$script_dir"/query-fuzzer-tweaks-users.xml db/users.d + + cat > db/config.d/core.xml < + + + 107374182400 + + + $PWD + +EOL } function watchdog @@ -180,7 +200,6 @@ handle SIGUSR2 nostop noprint pass handle SIG$RTMIN nostop noprint pass info signals continue -gcore backtrace full thread apply all backtrace full info registers diff --git a/docker/test/stress/run.sh b/docker/test/stress/run.sh index 43dbe08d765..ac7de9c07a2 100755 --- a/docker/test/stress/run.sh +++ b/docker/test/stress/run.sh @@ -8,6 +8,9 @@ dmesg --clear set -x +# core.COMM.PID-TID +sysctl kernel.core_pattern='core.%e.%p-%P' + # Thread Fuzzer allows to check more permutations of possible thread scheduling # and find more potential issues. @@ -104,6 +107,19 @@ EOL +EOL + + cat > /etc/clickhouse-server/config.d/core.xml < + + + 107374182400 + + + $PWD + EOL } @@ -160,7 +176,6 @@ handle SIGUSR2 nostop noprint pass handle SIG$RTMIN nostop noprint pass info signals continue -gcore backtrace full thread apply all backtrace full info registers @@ -504,8 +519,7 @@ done clickhouse-local --structure "test String, res String" -q "SELECT 'failure', test FROM table WHERE res != 'OK' order by (lower(test) like '%hung%'), rowNumberInAllBlocks() LIMIT 1" < /test_output/test_results.tsv > /test_output/check_status.tsv [ -s /test_output/check_status.tsv ] || echo -e "success\tNo errors found" > /test_output/check_status.tsv -# Core dumps (see gcore) -# Default filename is 'core.PROCESS_ID' +# Core dumps for core in core.*; do pigz $core mv $core.gz /test_output/ diff --git a/docs/en/sql-reference/statements/detach.md b/docs/en/sql-reference/statements/detach.md index c7089b0714d..aa87b1ef613 100644 --- a/docs/en/sql-reference/statements/detach.md +++ b/docs/en/sql-reference/statements/detach.md @@ -10,7 +10,7 @@ Makes the server "forget" about the existence of a table, a materialized view, o **Syntax** ``` sql -DETACH TABLE|VIEW|DICTIONARY [IF EXISTS] [db.]name [ON CLUSTER cluster] [PERMANENTLY] +DETACH TABLE|VIEW|DICTIONARY [IF EXISTS] [db.]name [ON CLUSTER cluster] [PERMANENTLY] [SYNC] ``` Detaching does not delete the data or metadata of a table, a materialized view or a dictionary. If an entity was not detached `PERMANENTLY`, on the next server launch the server will read the metadata and recall the table/view/dictionary again. If an entity was detached `PERMANENTLY`, there will be no automatic recall. @@ -24,6 +24,8 @@ Note that you can not detach permanently the table which is already detached (te Also you can not [DROP](../../sql-reference/statements/drop#drop-table) the detached table, or [CREATE TABLE](../../sql-reference/statements/create/table.md) with the same name as detached permanently, or replace it with the other table with [RENAME TABLE](../../sql-reference/statements/rename.md) query. +The `SYNC` modifier executes the action without delay. + **Example** Creating a table: diff --git a/docs/en/sql-reference/statements/drop.md b/docs/en/sql-reference/statements/drop.md index 28d379421f1..8a83a8fae1d 100644 --- a/docs/en/sql-reference/statements/drop.md +++ b/docs/en/sql-reference/statements/drop.md @@ -6,7 +6,7 @@ sidebar_label: DROP # DROP Statements -Deletes existing entity. If the `IF EXISTS` clause is specified, these queries do not return an error if the entity does not exist. +Deletes existing entity. If the `IF EXISTS` clause is specified, these queries do not return an error if the entity does not exist. If the `SYNC` modifier is specified, the entity is dropped without delay. ## DROP DATABASE @@ -15,7 +15,7 @@ Deletes all tables inside the `db` database, then deletes the `db` database itse Syntax: ``` sql -DROP DATABASE [IF EXISTS] db [ON CLUSTER cluster] +DROP DATABASE [IF EXISTS] db [ON CLUSTER cluster] [SYNC] ``` ## DROP TABLE @@ -25,7 +25,7 @@ Deletes the table. Syntax: ``` sql -DROP [TEMPORARY] TABLE [IF EXISTS] [db.]name [ON CLUSTER cluster] +DROP [TEMPORARY] TABLE [IF EXISTS] [db.]name [ON CLUSTER cluster] [SYNC] ``` ## DROP DICTIONARY @@ -35,7 +35,7 @@ Deletes the dictionary. Syntax: ``` sql -DROP DICTIONARY [IF EXISTS] [db.]name +DROP DICTIONARY [IF EXISTS] [db.]name [SYNC] ``` ## DROP USER @@ -95,7 +95,7 @@ Deletes a view. Views can be deleted by a `DROP TABLE` command as well but `DROP Syntax: ``` sql -DROP VIEW [IF EXISTS] [db.]name [ON CLUSTER cluster] +DROP VIEW [IF EXISTS] [db.]name [ON CLUSTER cluster] [SYNC] ``` ## DROP FUNCTION diff --git a/docs/zh/development/tests.md b/docs/zh/development/tests.md index e6d5cf66de9..6f1118e5e63 100644 --- a/docs/zh/development/tests.md +++ b/docs/zh/development/tests.md @@ -1,5 +1,5 @@ --- -slug: /en/development/tests +slug: /zh/development/tests sidebar_position: 70 sidebar_label: Testing title: ClickHouse Testing diff --git a/src/Columns/ColumnArray.cpp b/src/Columns/ColumnArray.cpp index 93bcc3eb611..fabdc03ace9 100644 --- a/src/Columns/ColumnArray.cpp +++ b/src/Columns/ColumnArray.cpp @@ -50,7 +50,7 @@ ColumnArray::ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && if (!offsets_concrete) throw Exception("offsets_column must be a ColumnUInt64", ErrorCodes::LOGICAL_ERROR); - if (!offsets_concrete->empty() && data) + if (!offsets_concrete->empty() && data && !data->empty()) { Offset last_offset = offsets_concrete->getData().back(); diff --git a/src/Compression/CompressionFactoryAdditions.cpp b/src/Compression/CompressionFactoryAdditions.cpp index d87d0f8b4ee..3e215076871 100644 --- a/src/Compression/CompressionFactoryAdditions.cpp +++ b/src/Compression/CompressionFactoryAdditions.cpp @@ -116,8 +116,8 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST( } }; - ISerialization::SubstreamPath path; - column_type->getDefaultSerialization()->enumerateStreams(path, callback, column_type); + auto serialization = column_type->getDefaultSerialization(); + serialization->enumerateStreams(callback, column_type); if (!result_codec) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find any substream with data type for type {}. It's a bug", column_type->getName()); diff --git a/src/DataTypes/IDataType.cpp b/src/DataTypes/IDataType.cpp index cb3bab5c653..0c29c263fe7 100644 --- a/src/DataTypes/IDataType.cpp +++ b/src/DataTypes/IDataType.cpp @@ -84,18 +84,20 @@ void IDataType::forEachSubcolumn( { for (size_t i = 0; i < subpath.size(); ++i) { - if (!subpath[i].visited && ISerialization::hasSubcolumnForPath(subpath, i + 1)) + size_t prefix_len = i + 1; + if (!subpath[i].visited && ISerialization::hasSubcolumnForPath(subpath, prefix_len)) { - auto name = ISerialization::getSubcolumnNameForStream(subpath, i + 1); - auto subdata = ISerialization::createFromPath(subpath, i); + auto name = ISerialization::getSubcolumnNameForStream(subpath, prefix_len); + auto subdata = ISerialization::createFromPath(subpath, prefix_len); callback(subpath, name, subdata); } subpath[i].visited = true; } }; - SubstreamPath path; - data.serialization->enumerateStreams(path, callback_with_data, data); + ISerialization::EnumerateStreamsSettings settings; + settings.position_independent_encoding = false; + data.serialization->enumerateStreams(settings, callback_with_data, data); } template @@ -118,33 +120,38 @@ Ptr IDataType::getForSubcolumn( return res; } +bool IDataType::hasSubcolumn(const String & subcolumn_name) const +{ + return tryGetSubcolumnType(subcolumn_name) != nullptr; +} + DataTypePtr IDataType::tryGetSubcolumnType(const String & subcolumn_name) const { - SubstreamData data = { getDefaultSerialization(), getPtr(), nullptr, nullptr }; + auto data = SubstreamData(getDefaultSerialization()).withType(getPtr()); return getForSubcolumn(subcolumn_name, data, &SubstreamData::type, false); } DataTypePtr IDataType::getSubcolumnType(const String & subcolumn_name) const { - SubstreamData data = { getDefaultSerialization(), getPtr(), nullptr, nullptr }; + auto data = SubstreamData(getDefaultSerialization()).withType(getPtr()); return getForSubcolumn(subcolumn_name, data, &SubstreamData::type, true); } ColumnPtr IDataType::tryGetSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const { - SubstreamData data = { getDefaultSerialization(), nullptr, column, nullptr }; + auto data = SubstreamData(getDefaultSerialization()).withColumn(column); return getForSubcolumn(subcolumn_name, data, &SubstreamData::column, false); } ColumnPtr IDataType::getSubcolumn(const String & subcolumn_name, const ColumnPtr & column) const { - SubstreamData data = { getDefaultSerialization(), nullptr, column, nullptr }; + auto data = SubstreamData(getDefaultSerialization()).withColumn(column); return getForSubcolumn(subcolumn_name, data, &SubstreamData::column, true); } SerializationPtr IDataType::getSubcolumnSerialization(const String & subcolumn_name, const SerializationPtr & serialization) const { - SubstreamData data = { serialization, nullptr, nullptr, nullptr }; + auto data = SubstreamData(serialization); return getForSubcolumn(subcolumn_name, data, &SubstreamData::serialization, true); } @@ -154,7 +161,7 @@ Names IDataType::getSubcolumnNames() const forEachSubcolumn([&](const auto &, const auto & name, const auto &) { res.push_back(name); - }, { getDefaultSerialization(), nullptr, nullptr, nullptr }); + }, SubstreamData(getDefaultSerialization())); return res; } diff --git a/src/DataTypes/IDataType.h b/src/DataTypes/IDataType.h index 0174ca426c2..c93128ced95 100644 --- a/src/DataTypes/IDataType.h +++ b/src/DataTypes/IDataType.h @@ -79,6 +79,8 @@ public: /// Data type id. It's used for runtime type checks. virtual TypeIndex getTypeId() const = 0; + bool hasSubcolumn(const String & subcolumn_name) const; + DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const; DataTypePtr getSubcolumnType(const String & subcolumn_name) const; diff --git a/src/DataTypes/Serializations/ISerialization.cpp b/src/DataTypes/Serializations/ISerialization.cpp index e6e6fdba5dc..da0142a5d57 100644 --- a/src/DataTypes/Serializations/ISerialization.cpp +++ b/src/DataTypes/Serializations/ISerialization.cpp @@ -73,24 +73,24 @@ String ISerialization::SubstreamPath::toString() const } void ISerialization::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { - path.push_back(Substream::Regular); - path.back().data = data; - callback(path); - path.pop_back(); + settings.path.push_back(Substream::Regular); + settings.path.back().data = data; + callback(settings.path); + settings.path.pop_back(); } -void ISerialization::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const +void ISerialization::enumerateStreams( + const StreamCallback & callback, + const DataTypePtr & type, + const ColumnPtr & column) const { - enumerateStreams(path, callback, {getPtr(), nullptr, nullptr, nullptr}); -} - -void ISerialization::enumerateStreams(SubstreamPath & path, const StreamCallback & callback, const DataTypePtr & type) const -{ - enumerateStreams(path, callback, {getPtr(), type, nullptr, nullptr}); + EnumerateStreamsSettings settings; + auto data = SubstreamData(getPtr()).withType(type).withColumn(column); + enumerateStreams(settings, callback, data); } void ISerialization::serializeBinaryBulk(const IColumn & column, WriteBuffer &, size_t, size_t) const @@ -184,7 +184,7 @@ String ISerialization::getFileNameForStream(const NameAndTypePair & column, cons return getFileNameForStream(column.getNameInStorage(), path); } -static size_t isOffsetsOfNested(const ISerialization::SubstreamPath & path) +bool isOffsetsOfNested(const ISerialization::SubstreamPath & path) { if (path.empty()) return false; @@ -287,10 +287,13 @@ bool ISerialization::hasSubcolumnForPath(const SubstreamPath & path, size_t pref ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len) { - assert(prefix_len < path.size()); + assert(prefix_len <= path.size()); + if (prefix_len == 0) + return {}; - SubstreamData res = path[prefix_len].data; - for (ssize_t i = static_cast(prefix_len) - 1; i >= 0; --i) + ssize_t last_elem = prefix_len - 1; + auto res = path[last_elem].data; + for (ssize_t i = last_elem - 1; i >= 0; --i) { const auto & creator = path[i].creator; if (creator) diff --git a/src/DataTypes/Serializations/ISerialization.h b/src/DataTypes/Serializations/ISerialization.h index b5d2082631e..1193c15b939 100644 --- a/src/DataTypes/Serializations/ISerialization.h +++ b/src/DataTypes/Serializations/ISerialization.h @@ -101,6 +101,30 @@ public: struct SubstreamData { + SubstreamData() = default; + SubstreamData(SerializationPtr serialization_) + : serialization(std::move(serialization_)) + { + } + + SubstreamData & withType(DataTypePtr type_) + { + type = std::move(type_); + return *this; + } + + SubstreamData & withColumn(ColumnPtr column_) + { + column = std::move(column_); + return *this; + } + + SubstreamData & withSerializationInfo(SerializationInfoPtr serialization_info_) + { + serialization_info = std::move(serialization_info_); + return *this; + } + SerializationPtr serialization; DataTypePtr type; ColumnPtr column; @@ -164,16 +188,22 @@ public: using StreamCallback = std::function; + struct EnumerateStreamsSettings + { + SubstreamPath path; + bool position_independent_encoding = true; + }; + virtual void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const; - void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const; - void enumerateStreams(const StreamCallback & callback, SubstreamPath && path) const { enumerateStreams(callback, path); } - void enumerateStreams(const StreamCallback & callback) const { enumerateStreams(callback, {}); } - - void enumerateStreams(SubstreamPath & path, const StreamCallback & callback, const DataTypePtr & type) const; + /// Enumerate streams with default settings. + void enumerateStreams( + const StreamCallback & callback, + const DataTypePtr & type = nullptr, + const ColumnPtr & column = nullptr) const; using OutputStreamGetter = std::function; using InputStreamGetter = std::function; @@ -375,4 +405,6 @@ State * ISerialization::checkAndGetState(const StatePtr & state) const return state_concrete; } +bool isOffsetsOfNested(const ISerialization::SubstreamPath & path); + } diff --git a/src/DataTypes/Serializations/SerializationArray.cpp b/src/DataTypes/Serializations/SerializationArray.cpp index 625f2dce0b0..48980febd9d 100644 --- a/src/DataTypes/Serializations/SerializationArray.cpp +++ b/src/DataTypes/Serializations/SerializationArray.cpp @@ -155,30 +155,30 @@ namespace return column_offsets; } -} -ColumnPtr arrayOffsetsToSizes(const IColumn & column) -{ - const auto & column_offsets = assert_cast(column); - MutableColumnPtr column_sizes = column_offsets.cloneEmpty(); - - if (column_offsets.empty()) - return column_sizes; - - const auto & offsets_data = column_offsets.getData(); - auto & sizes_data = assert_cast(*column_sizes).getData(); - - sizes_data.resize(offsets_data.size()); - - IColumn::Offset prev_offset = 0; - for (size_t i = 0, size = offsets_data.size(); i < size; ++i) + ColumnPtr arrayOffsetsToSizes(const IColumn & column) { - auto current_offset = offsets_data[i]; - sizes_data[i] = current_offset - prev_offset; - prev_offset = current_offset; - } + const auto & column_offsets = assert_cast(column); + MutableColumnPtr column_sizes = column_offsets.cloneEmpty(); - return column_sizes; + if (column_offsets.empty()) + return column_sizes; + + const auto & offsets_data = column_offsets.getData(); + auto & sizes_data = assert_cast(*column_sizes).getData(); + + sizes_data.resize(offsets_data.size()); + + IColumn::Offset prev_offset = 0; + for (size_t i = 0, size = offsets_data.size(); i < size; ++i) + { + auto current_offset = offsets_data[i]; + sizes_data[i] = current_offset - prev_offset; + prev_offset = current_offset; + } + + return column_sizes; + } } DataTypePtr SerializationArray::SubcolumnCreator::create(const DataTypePtr & prev) const @@ -197,41 +197,42 @@ ColumnPtr SerializationArray::SubcolumnCreator::create(const ColumnPtr & prev) c } void SerializationArray::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { const auto * type_array = data.type ? &assert_cast(*data.type) : nullptr; const auto * column_array = data.column ? &assert_cast(*data.column) : nullptr; - auto offsets_column = column_array ? column_array->getOffsetsPtr() : nullptr; + auto offsets = column_array ? column_array->getOffsetsPtr() : nullptr; - path.push_back(Substream::ArraySizes); - path.back().data = - { + auto offsets_serialization = std::make_shared( std::make_shared>(), - "size" + std::to_string(getArrayLevel(path)), false), - data.type ? std::make_shared() : nullptr, - offsets_column ? arrayOffsetsToSizes(*offsets_column) : nullptr, - data.serialization_info, - }; + "size" + std::to_string(getArrayLevel(settings.path)), false); - callback(path); + auto offsets_column = offsets && !settings.position_independent_encoding + ? arrayOffsetsToSizes(*offsets) + : offsets; - path.back() = Substream::ArrayElements; - path.back().data = data; - path.back().creator = std::make_shared(offsets_column); + settings.path.push_back(Substream::ArraySizes); + settings.path.back().data = SubstreamData(offsets_serialization) + .withType(type_array ? std::make_shared() : nullptr) + .withColumn(std::move(offsets_column)) + .withSerializationInfo(data.serialization_info); - SubstreamData next_data = - { - nested, - type_array ? type_array->getNestedType() : nullptr, - column_array ? column_array->getDataPtr() : nullptr, - data.serialization_info, - }; + callback(settings.path); - nested->enumerateStreams(path, callback, next_data); - path.pop_back(); + settings.path.back() = Substream::ArrayElements; + settings.path.back().data = data; + settings.path.back().creator = std::make_shared(offsets); + + auto next_data = SubstreamData(nested) + .withType(type_array ? type_array->getNestedType() : nullptr) + .withColumn(column_array ? column_array->getDataPtr() : nullptr) + .withSerializationInfo(data.serialization_info); + + nested->enumerateStreams(settings, callback, next_data); + settings.path.pop_back(); } void SerializationArray::serializeBinaryBulkStatePrefix( diff --git a/src/DataTypes/Serializations/SerializationArray.h b/src/DataTypes/Serializations/SerializationArray.h index 3769f8a4513..84e37acbaad 100644 --- a/src/DataTypes/Serializations/SerializationArray.h +++ b/src/DataTypes/Serializations/SerializationArray.h @@ -36,7 +36,7 @@ public: */ void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; @@ -79,6 +79,4 @@ private: }; }; -ColumnPtr arrayOffsetsToSizes(const IColumn & column); - } diff --git a/src/DataTypes/Serializations/SerializationLowCardinality.cpp b/src/DataTypes/Serializations/SerializationLowCardinality.cpp index 8e19c5a740b..dfe0188c8e7 100644 --- a/src/DataTypes/Serializations/SerializationLowCardinality.cpp +++ b/src/DataTypes/Serializations/SerializationLowCardinality.cpp @@ -41,30 +41,26 @@ SerializationLowCardinality::SerializationLowCardinality(const DataTypePtr & dic } void SerializationLowCardinality::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { const auto * column_lc = data.column ? &getColumnLowCardinality(*data.column) : nullptr; - SubstreamData dict_data = - { - dict_inner_serialization, - data.type ? dictionary_type : nullptr, - column_lc ? column_lc->getDictionary().getNestedColumn() : nullptr, - data.serialization_info, - }; + settings.path.push_back(Substream::DictionaryKeys); + auto dict_data = SubstreamData(dict_inner_serialization) + .withType(data.type ? dictionary_type : nullptr) + .withColumn(column_lc ? column_lc->getDictionary().getNestedColumn() : nullptr) + .withSerializationInfo(data.serialization_info); - path.push_back(Substream::DictionaryKeys); - path.back().data = dict_data; + settings.path.back().data = dict_data; + dict_inner_serialization->enumerateStreams(settings, callback, dict_data); - dict_inner_serialization->enumerateStreams(path, callback, dict_data); + settings.path.back() = Substream::DictionaryIndexes; + settings.path.back().data = data; - path.back() = Substream::DictionaryIndexes; - path.back().data = data; - - callback(path); - path.pop_back(); + callback(settings.path); + settings.path.pop_back(); } struct KeysSerializationVersion diff --git a/src/DataTypes/Serializations/SerializationLowCardinality.h b/src/DataTypes/Serializations/SerializationLowCardinality.h index 96e3a297d6a..cc090f2044e 100644 --- a/src/DataTypes/Serializations/SerializationLowCardinality.h +++ b/src/DataTypes/Serializations/SerializationLowCardinality.h @@ -18,7 +18,7 @@ public: explicit SerializationLowCardinality(const DataTypePtr & dictionary_type); void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; diff --git a/src/DataTypes/Serializations/SerializationMap.cpp b/src/DataTypes/Serializations/SerializationMap.cpp index ea22070b5b1..e46bb480d14 100644 --- a/src/DataTypes/Serializations/SerializationMap.cpp +++ b/src/DataTypes/Serializations/SerializationMap.cpp @@ -257,19 +257,16 @@ void SerializationMap::deserializeTextCSV(IColumn & column, ReadBuffer & istr, c } void SerializationMap::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { - SubstreamData next_data = - { - nested, - data.type ? assert_cast(*data.type).getNestedType() : nullptr, - data.column ? assert_cast(*data.column).getNestedColumnPtr() : nullptr, - data.serialization_info, - }; + auto next_data = SubstreamData(nested) + .withType(data.type ? assert_cast(*data.type).getNestedType() : nullptr) + .withColumn(data.column ? assert_cast(*data.column).getNestedColumnPtr() : nullptr) + .withSerializationInfo(data.serialization_info); - nested->enumerateStreams(path, callback, next_data); + nested->enumerateStreams(settings, callback, next_data); } void SerializationMap::serializeBinaryBulkStatePrefix( diff --git a/src/DataTypes/Serializations/SerializationMap.h b/src/DataTypes/Serializations/SerializationMap.h index 93b3e179499..42f99ca7991 100644 --- a/src/DataTypes/Serializations/SerializationMap.h +++ b/src/DataTypes/Serializations/SerializationMap.h @@ -32,7 +32,7 @@ public: void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override; void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; diff --git a/src/DataTypes/Serializations/SerializationNamed.cpp b/src/DataTypes/Serializations/SerializationNamed.cpp index 097e9cedfbe..4dac4b3a922 100644 --- a/src/DataTypes/Serializations/SerializationNamed.cpp +++ b/src/DataTypes/Serializations/SerializationNamed.cpp @@ -4,16 +4,16 @@ namespace DB { void SerializationNamed::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { - addToPath(path); - path.back().data = data; - path.back().creator = std::make_shared(name, escape_delimiter); + addToPath(settings.path); + settings.path.back().data = data; + settings.path.back().creator = std::make_shared(name, escape_delimiter); - nested_serialization->enumerateStreams(path, callback, data); - path.pop_back(); + nested_serialization->enumerateStreams(settings, callback, data); + settings.path.pop_back(); } void SerializationNamed::serializeBinaryBulkStatePrefix( diff --git a/src/DataTypes/Serializations/SerializationNamed.h b/src/DataTypes/Serializations/SerializationNamed.h index 343b96c16e3..2a2c7c0dfc7 100644 --- a/src/DataTypes/Serializations/SerializationNamed.h +++ b/src/DataTypes/Serializations/SerializationNamed.h @@ -26,7 +26,7 @@ public: const String & getElementName() const { return name; } void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; diff --git a/src/DataTypes/Serializations/SerializationNullable.cpp b/src/DataTypes/Serializations/SerializationNullable.cpp index a6273deaa30..560b73bc827 100644 --- a/src/DataTypes/Serializations/SerializationNullable.cpp +++ b/src/DataTypes/Serializations/SerializationNullable.cpp @@ -38,38 +38,35 @@ ColumnPtr SerializationNullable::SubcolumnCreator::create(const ColumnPtr & prev } void SerializationNullable::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { const auto * type_nullable = data.type ? &assert_cast(*data.type) : nullptr; const auto * column_nullable = data.column ? &assert_cast(*data.column) : nullptr; - path.push_back(Substream::NullMap); - path.back().data = - { - std::make_shared(std::make_shared>(), "null", false), - type_nullable ? std::make_shared() : nullptr, - column_nullable ? column_nullable->getNullMapColumnPtr() : nullptr, - data.serialization_info, - }; + auto null_map_serialization = std::make_shared(std::make_shared>(), "null", false); - callback(path); + settings.path.push_back(Substream::NullMap); + auto null_map_data = SubstreamData(null_map_serialization) + .withType(type_nullable ? std::make_shared() : nullptr) + .withColumn(column_nullable ? column_nullable->getNullMapColumnPtr() : nullptr) + .withSerializationInfo(data.serialization_info); - path.back() = Substream::NullableElements; - path.back().creator = std::make_shared(path.back().data.column); - path.back().data = data; + settings.path.back().data = null_map_data; + callback(settings.path); - SubstreamData next_data = - { - nested, - type_nullable ? type_nullable->getNestedType() : nullptr, - column_nullable ? column_nullable->getNestedColumnPtr() : nullptr, - data.serialization_info, - }; + settings.path.back() = Substream::NullableElements; + settings.path.back().creator = std::make_shared(null_map_data.column); + settings.path.back().data = data; - nested->enumerateStreams(path, callback, next_data); - path.pop_back(); + auto next_data = SubstreamData(nested) + .withType(type_nullable ? type_nullable->getNestedType() : nullptr) + .withColumn(column_nullable ? column_nullable->getNestedColumnPtr() : nullptr) + .withSerializationInfo(data.serialization_info); + + nested->enumerateStreams(settings, callback, next_data); + settings.path.pop_back(); } void SerializationNullable::serializeBinaryBulkStatePrefix( diff --git a/src/DataTypes/Serializations/SerializationNullable.h b/src/DataTypes/Serializations/SerializationNullable.h index e6e0e4f33c2..ea3958065e7 100644 --- a/src/DataTypes/Serializations/SerializationNullable.h +++ b/src/DataTypes/Serializations/SerializationNullable.h @@ -14,7 +14,7 @@ public: explicit SerializationNullable(const SerializationPtr & nested_) : nested(nested_) {} void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; diff --git a/src/DataTypes/Serializations/SerializationSparse.cpp b/src/DataTypes/Serializations/SerializationSparse.cpp index 6fa40e460c5..855bdfa1b3e 100644 --- a/src/DataTypes/Serializations/SerializationSparse.cpp +++ b/src/DataTypes/Serializations/SerializationSparse.cpp @@ -148,39 +148,33 @@ ColumnPtr SerializationSparse::SubcolumnCreator::create(const ColumnPtr & prev) } void SerializationSparse::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { const auto * column_sparse = data.column ? &assert_cast(*data.column) : nullptr; - size_t column_size = column_sparse ? column_sparse->size() : 0; - path.push_back(Substream::SparseOffsets); - path.back().data = - { - std::make_shared>(), - data.type ? std::make_shared() : nullptr, - column_sparse ? column_sparse->getOffsetsPtr() : nullptr, - data.serialization_info, - }; + settings.path.push_back(Substream::SparseOffsets); + auto offsets_data = SubstreamData(std::make_shared>()) + .withType(data.type ? std::make_shared() : nullptr) + .withColumn(column_sparse ? column_sparse->getOffsetsPtr() : nullptr) + .withSerializationInfo(data.serialization_info); - callback(path); + settings.path.back().data = offsets_data; + callback(settings.path); - path.back() = Substream::SparseElements; - path.back().creator = std::make_shared(path.back().data.column, column_size); - path.back().data = data; + settings.path.back() = Substream::SparseElements; + settings.path.back().creator = std::make_shared(offsets_data.column, column_size); + settings.path.back().data = data; - SubstreamData next_data = - { - nested, - data.type, - column_sparse ? column_sparse->getValuesPtr() : nullptr, - data.serialization_info, - }; + auto next_data = SubstreamData(nested) + .withType(data.type) + .withColumn(column_sparse ? column_sparse->getValuesPtr() : nullptr) + .withSerializationInfo(data.serialization_info); - nested->enumerateStreams(path, callback, next_data); - path.pop_back(); + nested->enumerateStreams(settings, callback, next_data); + settings.path.pop_back(); } void SerializationSparse::serializeBinaryBulkStatePrefix( diff --git a/src/DataTypes/Serializations/SerializationSparse.h b/src/DataTypes/Serializations/SerializationSparse.h index 54ab4853360..dc2f63c5a05 100644 --- a/src/DataTypes/Serializations/SerializationSparse.h +++ b/src/DataTypes/Serializations/SerializationSparse.h @@ -28,7 +28,7 @@ public: Kind getKind() const override { return Kind::SPARSE; } virtual void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; diff --git a/src/DataTypes/Serializations/SerializationTuple.cpp b/src/DataTypes/Serializations/SerializationTuple.cpp index 8138b15c9af..5663ff86dd6 100644 --- a/src/DataTypes/Serializations/SerializationTuple.cpp +++ b/src/DataTypes/Serializations/SerializationTuple.cpp @@ -283,7 +283,7 @@ void SerializationTuple::deserializeTextCSV(IColumn & column, ReadBuffer & istr, } void SerializationTuple::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { @@ -293,15 +293,12 @@ void SerializationTuple::enumerateStreams( for (size_t i = 0; i < elems.size(); ++i) { - SubstreamData next_data = - { - elems[i], - type_tuple ? type_tuple->getElement(i) : nullptr, - column_tuple ? column_tuple->getColumnPtr(i) : nullptr, - info_tuple ? info_tuple->getElementInfo(i) : nullptr, - }; + auto next_data = SubstreamData(elems[i]) + .withType(type_tuple ? type_tuple->getElement(i) : nullptr) + .withColumn(column_tuple ? column_tuple->getColumnPtr(i) : nullptr) + .withSerializationInfo(info_tuple ? info_tuple->getElementInfo(i) : nullptr); - elems[i]->enumerateStreams(path, callback, next_data); + elems[i]->enumerateStreams(settings, callback, next_data); } } diff --git a/src/DataTypes/Serializations/SerializationTuple.h b/src/DataTypes/Serializations/SerializationTuple.h index e82d8473645..d1caeb73dad 100644 --- a/src/DataTypes/Serializations/SerializationTuple.h +++ b/src/DataTypes/Serializations/SerializationTuple.h @@ -34,7 +34,7 @@ public: /** Each sub-column in a tuple is serialized in separate stream. */ void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; diff --git a/src/DataTypes/Serializations/SerializationWrapper.cpp b/src/DataTypes/Serializations/SerializationWrapper.cpp index 271c53dfcf1..7c50c1c6e26 100644 --- a/src/DataTypes/Serializations/SerializationWrapper.cpp +++ b/src/DataTypes/Serializations/SerializationWrapper.cpp @@ -5,11 +5,11 @@ namespace DB { void SerializationWrapper::enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const { - nested_serialization->enumerateStreams(path, callback, data); + nested_serialization->enumerateStreams(settings, callback, data); } void SerializationWrapper::serializeBinaryBulkStatePrefix( diff --git a/src/DataTypes/Serializations/SerializationWrapper.h b/src/DataTypes/Serializations/SerializationWrapper.h index 43fc7e9914a..d010c6b5314 100644 --- a/src/DataTypes/Serializations/SerializationWrapper.h +++ b/src/DataTypes/Serializations/SerializationWrapper.h @@ -21,7 +21,7 @@ public: Kind getKind() const override { return nested_serialization->getKind(); } void enumerateStreams( - SubstreamPath & path, + EnumerateStreamsSettings & settings, const StreamCallback & callback, const SubstreamData & data) const override; diff --git a/src/Interpreters/InterpreterDescribeQuery.cpp b/src/Interpreters/InterpreterDescribeQuery.cpp index 9919b1272bd..0524feea1f6 100644 --- a/src/Interpreters/InterpreterDescribeQuery.cpp +++ b/src/Interpreters/InterpreterDescribeQuery.cpp @@ -163,7 +163,7 @@ BlockIO InterpreterDescribeQuery::execute() res_columns[6]->insertDefault(); res_columns[7]->insert(1u); - }, { type->getDefaultSerialization(), type, nullptr, nullptr }); + }, ISerialization::SubstreamData(type->getDefaultSerialization()).withType(type)); } } diff --git a/src/Interpreters/inplaceBlockConversions.cpp b/src/Interpreters/inplaceBlockConversions.cpp index 1bde6fe5a8c..a4791690f4e 100644 --- a/src/Interpreters/inplaceBlockConversions.cpp +++ b/src/Interpreters/inplaceBlockConversions.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -187,29 +188,56 @@ ActionsDAGPtr evaluateMissingDefaults( return createExpressions(header, expr_list, save_unneeded_columns, context); } -static bool arrayHasNoElementsRead(const IColumn & column) +static std::unordered_map collectOffsetsColumns( + const NamesAndTypesList & available_columns, const Columns & res_columns) { - const auto * column_array = typeid_cast(&column); + std::unordered_map offsets_columns; - if (!column_array) - return false; + auto available_column = available_columns.begin(); + for (size_t i = 0; i < available_columns.size(); ++i, ++available_column) + { + if (res_columns[i] == nullptr || isColumnConst(*res_columns[i])) + continue; - size_t size = column_array->size(); - if (!size) - return false; + auto serialization = IDataType::getSerialization(*available_column); + serialization->enumerateStreams([&](const auto & subpath) + { + if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes) + return; - size_t data_size = column_array->getData().size(); - if (data_size) - return false; + auto stream_name = ISerialization::getFileNameForStream(*available_column, subpath); + const auto & current_offsets_column = subpath.back().data.column; - size_t last_offset = column_array->getOffsets()[size - 1]; - return last_offset != 0; + /// If for some reason multiple offsets columns are present + /// for the same nested data structure, choose the one that is not empty. + if (current_offsets_column && !current_offsets_column->empty()) + { + auto & offsets_column = offsets_columns[stream_name]; + if (!offsets_column) + offsets_column = current_offsets_column; + + #ifndef NDEBUG + const auto & offsets_data = assert_cast(*offsets_column).getData(); + const auto & current_offsets_data = assert_cast(*current_offsets_column).getData(); + + if (offsets_data != current_offsets_data) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Found non-equal columns with offsets (sizes: {} and {}) for stream {}", + offsets_data.size(), current_offsets_data.size(), stream_name); + #endif + } + }, available_column->type, res_columns[i]); + } + + return offsets_columns; } void fillMissingColumns( Columns & res_columns, size_t num_rows, const NamesAndTypesList & requested_columns, + const NamesAndTypesList & available_columns, + const NameSet & partially_read_columns, StorageMetadataPtr metadata_snapshot) { size_t num_columns = requested_columns.size(); @@ -218,65 +246,79 @@ void fillMissingColumns( "Invalid number of columns passed to fillMissingColumns. Expected {}, got {}", num_columns, res_columns.size()); - /// For a missing column of a nested data structure we must create not a column of empty - /// arrays, but a column of arrays of correct length. + /// For a missing column of a nested data structure + /// we must create not a column of empty arrays, + /// but a column of arrays of correct length. /// First, collect offset columns for all arrays in the block. + auto offsets_columns = collectOffsetsColumns(available_columns, res_columns); - std::unordered_map offset_columns; + /// Insert default values only for columns without default expressions. auto requested_column = requested_columns.begin(); for (size_t i = 0; i < num_columns; ++i, ++requested_column) - { - if (res_columns[i] == nullptr) - continue; - - if (const auto * array = typeid_cast(res_columns[i].get())) - { - String offsets_name = Nested::extractTableName(requested_column->name); - auto & offsets_column = offset_columns[offsets_name]; - - /// If for some reason multiple offsets columns are present for the same nested data structure, - /// choose the one that is not empty. - if (!offsets_column || offsets_column->empty()) - offsets_column = array->getOffsetsPtr(); - } - } - - /// insert default values only for columns without default expressions - requested_column = requested_columns.begin(); - for (size_t i = 0; i < num_columns; ++i, ++requested_column) { const auto & [name, type] = *requested_column; - if (res_columns[i] && arrayHasNoElementsRead(*res_columns[i])) + if (res_columns[i] && partially_read_columns.contains(name)) res_columns[i] = nullptr; - if (res_columns[i] == nullptr) + if (res_columns[i]) + continue; + + if (metadata_snapshot && metadata_snapshot->getColumns().hasDefault(name)) + continue; + + std::vector current_offsets; + size_t num_dimensions = 0; + + const auto * array_type = typeid_cast(type.get()); + if (array_type && !offsets_columns.empty()) { - if (metadata_snapshot && metadata_snapshot->getColumns().hasDefault(name)) - continue; + num_dimensions = getNumberOfDimensions(*array_type); + current_offsets.resize(num_dimensions); - String offsets_name = Nested::extractTableName(name); - auto offset_it = offset_columns.find(offsets_name); - const auto * array_type = typeid_cast(type.get()); - if (offset_it != offset_columns.end() && array_type) + auto serialization = IDataType::getSerialization(*requested_column); + serialization->enumerateStreams([&](const auto & subpath) { - const auto & nested_type = array_type->getNestedType(); - ColumnPtr offsets_column = offset_it->second; - size_t nested_rows = typeid_cast(*offsets_column).getData().back(); + if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes) + return; - ColumnPtr nested_column = - nested_type->createColumnConstWithDefaultValue(nested_rows)->convertToFullColumnIfConst(); + size_t level = ISerialization::getArrayLevel(subpath); + assert(level < num_dimensions); - res_columns[i] = ColumnArray::create(nested_column, offsets_column); - } - else + auto stream_name = ISerialization::getFileNameForStream(*requested_column, subpath); + auto it = offsets_columns.find(stream_name); + if (it != offsets_columns.end()) + current_offsets[level] = it->second; + }); + + for (size_t j = 0; j < num_dimensions; ++j) { - /// We must turn a constant column into a full column because the interpreter could infer - /// that it is constant everywhere but in some blocks (from other parts) it can be a full column. - res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst(); + if (!current_offsets[j]) + { + current_offsets.resize(j); + break; + } } } + + if (!current_offsets.empty()) + { + size_t num_empty_dimensions = num_dimensions - current_offsets.size(); + auto scalar_type = createArrayOfType(getBaseTypeOfArray(type), num_empty_dimensions); + + size_t data_size = assert_cast(*current_offsets.back()).getData().back(); + res_columns[i] = scalar_type->createColumnConstWithDefaultValue(data_size)->convertToFullColumnIfConst(); + + for (auto it = current_offsets.rbegin(); it != current_offsets.rend(); ++it) + res_columns[i] = ColumnArray::create(res_columns[i], *it); + } + else + { + /// We must turn a constant column into a full column because the interpreter could infer + /// that it is constant everywhere but in some blocks (from other parts) it can be a full column. + res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst(); + } } } diff --git a/src/Interpreters/inplaceBlockConversions.h b/src/Interpreters/inplaceBlockConversions.h index b3113ddfa5c..bea44bf6db9 100644 --- a/src/Interpreters/inplaceBlockConversions.h +++ b/src/Interpreters/inplaceBlockConversions.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -43,6 +44,8 @@ void fillMissingColumns( Columns & res_columns, size_t num_rows, const NamesAndTypesList & requested_columns, + const NamesAndTypesList & available_columns, + const NameSet & partially_read_columns, StorageMetadataPtr metadata_snapshot); } diff --git a/src/Storages/ColumnsDescription.cpp b/src/Storages/ColumnsDescription.cpp index 1a90312e076..d2490858a72 100644 --- a/src/Storages/ColumnsDescription.cpp +++ b/src/Storages/ColumnsDescription.cpp @@ -780,7 +780,7 @@ void ColumnsDescription::addSubcolumns(const String & name_in_storage, const Dat "Cannot add subcolumn {}: column with this name already exists", subcolumn.name); subcolumns.get<0>().insert(std::move(subcolumn)); - }, {type_in_storage->getDefaultSerialization(), type_in_storage, nullptr, nullptr}); + }, ISerialization::SubstreamData(type_in_storage->getDefaultSerialization()).withType(type_in_storage)); } void ColumnsDescription::removeSubcolumns(const String & name_in_storage) diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 1bc73c82dbe..e9d900c6d54 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -445,11 +445,11 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const column_name_to_position.clear(); column_name_to_position.reserve(new_columns.size()); size_t pos = 0; - for (const auto & column : columns) - column_name_to_position.emplace(column.name, pos++); for (const auto & column : columns) { + column_name_to_position.emplace(column.name, pos++); + auto it = serialization_infos.find(column.name); auto serialization = it == serialization_infos.end() ? IDataType::getSerialization(column) @@ -461,7 +461,7 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const { auto full_name = Nested::concatenateName(column.name, subname); serializations.emplace(full_name, subdata.serialization); - }, {serialization, nullptr, nullptr, nullptr}); + }, ISerialization::SubstreamData(serialization)); } columns_description = ColumnsDescription(columns); @@ -1352,7 +1352,6 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const } } - void IMergeTreeDataPart::appendFilesOfColumns(Strings & files) { files.push_back("columns.txt"); diff --git a/src/Storages/MergeTree/IMergeTreeReader.cpp b/src/Storages/MergeTree/IMergeTreeReader.cpp index 851b0378e6f..8711664d531 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.cpp +++ b/src/Storages/MergeTree/IMergeTreeReader.cpp @@ -63,7 +63,13 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e { try { - DB::fillMissingColumns(res_columns, num_rows, requested_columns, metadata_snapshot); + NamesAndTypesList available_columns(columns_to_read.begin(), columns_to_read.end()); + DB::fillMissingColumns( + res_columns, num_rows, + Nested::convertToSubcolumns(requested_columns), + Nested::convertToSubcolumns(available_columns), + partially_read_columns, metadata_snapshot); + should_evaluate_missing_defaults = std::any_of( res_columns.begin(), res_columns.end(), [](const auto & column) { return column == nullptr; }); } @@ -201,20 +207,56 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const } } -IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const String & column_name) const +IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const NameAndTypePair & required_column) const { - String table_name = Nested::extractTableName(column_name); + auto get_offsets_streams = [](const auto & serialization, const auto & name_in_storage) + { + Names offsets_streams; + serialization->enumerateStreams([&](const auto & subpath) + { + if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes) + return; + + auto subname = ISerialization::getSubcolumnNameForStream(subpath); + auto full_name = Nested::concatenateName(name_in_storage, subname); + offsets_streams.push_back(full_name); + }); + + return offsets_streams; + }; + + auto required_name_in_storage = Nested::extractTableName(required_column.getNameInStorage()); + auto required_offsets_streams = get_offsets_streams(getSerializationInPart(required_column), required_name_in_storage); + + size_t max_matched_streams = 0; + ColumnPosition position; + + /// Find column that has maximal number of matching + /// offsets columns with required_column. for (const auto & part_column : data_part_info_for_read->getColumns()) { - if (typeid_cast(part_column.type.get())) + auto name_in_storage = Nested::extractTableName(part_column.name); + if (name_in_storage != required_name_in_storage) + continue; + + auto offsets_streams = get_offsets_streams(data_part_info_for_read->getSerialization(part_column), name_in_storage); + NameSet offsets_streams_set(offsets_streams.begin(), offsets_streams.end()); + + size_t i = 0; + for (; i < required_offsets_streams.size(); ++i) { - auto position = data_part_info_for_read->getColumnPosition(part_column.getNameInStorage()); - if (position && Nested::extractTableName(part_column.name) == table_name) - return position; + if (!offsets_streams_set.contains(required_offsets_streams[i])) + break; + } + + if (i && (!position || i > max_matched_streams)) + { + max_matched_streams = i; + position = data_part_info_for_read->getColumnPosition(part_column.name); } } - return {}; + return position; } void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const diff --git a/src/Storages/MergeTree/IMergeTreeReader.h b/src/Storages/MergeTree/IMergeTreeReader.h index f88f916908f..16db13692aa 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.h +++ b/src/Storages/MergeTree/IMergeTreeReader.h @@ -92,7 +92,9 @@ protected: MarkRanges all_mark_ranges; using ColumnPosition = std::optional; - ColumnPosition findColumnForOffsets(const String & column_name) const; + ColumnPosition findColumnForOffsets(const NameAndTypePair & column) const; + + NameSet partially_read_columns; private: /// Alter conversions, which must be applied on fly if required diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index 771248b99c6..44fe50815da 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -66,8 +66,7 @@ void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & column, compressed_streams.emplace(stream_name, stream); }; - ISerialization::SubstreamPath path; - data_part->getSerialization(column.name)->enumerateStreams(path, callback, column.type); + data_part->getSerialization(column.name)->enumerateStreams(callback, column.type); } namespace diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 3d4aa0a7707..99bf188f03c 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -121,7 +121,7 @@ void MergeTreeDataPartWriterWide::addStreams( }; ISerialization::SubstreamPath path; - data_part->getSerialization(column.name)->enumerateStreams(path, callback, column.type); + data_part->getSerialization(column.name)->enumerateStreams(callback, column.type); } @@ -255,10 +255,9 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm void MergeTreeDataPartWriterWide::writeSingleMark( const NameAndTypePair & column, WrittenOffsetColumns & offset_columns, - size_t number_of_rows, - ISerialization::SubstreamPath & path) + size_t number_of_rows) { - StreamsWithMarks marks = getCurrentMarksForColumn(column, offset_columns, path); + StreamsWithMarks marks = getCurrentMarksForColumn(column, offset_columns); for (const auto & mark : marks) flushMarkToFile(mark, number_of_rows); } @@ -274,8 +273,7 @@ void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stre StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn( const NameAndTypePair & column, - WrittenOffsetColumns & offset_columns, - ISerialization::SubstreamPath & path) + WrittenOffsetColumns & offset_columns) { StreamsWithMarks result; data_part->getSerialization(column.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path) @@ -300,7 +298,7 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn( stream_with_mark.mark.offset_in_decompressed_block = stream.compressed.offset(); result.push_back(stream_with_mark); - }, path); + }); return result; } @@ -328,7 +326,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule( return; column_streams[stream_name]->compressed.nextIfAtEnd(); - }, serialize_settings.path); + }); } /// Column must not be empty. (column.size() !== 0) @@ -366,7 +364,7 @@ void MergeTreeDataPartWriterWide::writeColumn( { if (last_non_written_marks.contains(name)) throw Exception(ErrorCodes::LOGICAL_ERROR, "We have to add new mark for column, but already have non written mark. Current mark {}, total marks {}, offset {}", getCurrentMark(), index_granularity.getMarksCount(), rows_written_in_last_mark); - last_non_written_marks[name] = getCurrentMarksForColumn(name_and_type, offset_columns, serialize_settings.path); + last_non_written_marks[name] = getCurrentMarksForColumn(name_and_type, offset_columns); } writeSingleGranule( @@ -390,7 +388,7 @@ void MergeTreeDataPartWriterWide::writeColumn( } } - serialization->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path) + serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) { bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes; if (is_offsets) @@ -398,7 +396,7 @@ void MergeTreeDataPartWriterWide::writeColumn( String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path); offset_columns.insert(stream_name); } - }, serialize_settings.path); + }); } @@ -553,7 +551,7 @@ void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksum } if (write_final_mark) - writeFinalMark(*it, offset_columns, serialize_settings.path); + writeFinalMark(*it, offset_columns); } } @@ -618,10 +616,9 @@ void MergeTreeDataPartWriterWide::finish(bool sync) void MergeTreeDataPartWriterWide::writeFinalMark( const NameAndTypePair & column, - WrittenOffsetColumns & offset_columns, - ISerialization::SubstreamPath & path) + WrittenOffsetColumns & offset_columns) { - writeSingleMark(column, offset_columns, 0, path); + writeSingleMark(column, offset_columns, 0); /// Memoize information about offsets data_part->getSerialization(column.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path) { @@ -631,7 +628,7 @@ void MergeTreeDataPartWriterWide::writeFinalMark( String stream_name = ISerialization::getFileNameForStream(column, substream_path); offset_columns.insert(stream_name); } - }, path); + }); } static void fillIndexGranularityImpl( diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h index a3517f3aa88..08815d9930a 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h @@ -61,8 +61,7 @@ private: /// Take offsets from column and return as MarkInCompressed file with stream name StreamsWithMarks getCurrentMarksForColumn( const NameAndTypePair & column, - WrittenOffsetColumns & offset_columns, - ISerialization::SubstreamPath & path); + WrittenOffsetColumns & offset_columns); /// Write mark to disk using stream and rows count void flushMarkToFile( @@ -73,13 +72,11 @@ private: void writeSingleMark( const NameAndTypePair & column, WrittenOffsetColumns & offset_columns, - size_t number_of_rows, - ISerialization::SubstreamPath & path); + size_t number_of_rows); void writeFinalMark( const NameAndTypePair & column, - WrittenOffsetColumns & offset_columns, - ISerialization::SubstreamPath & path); + WrittenOffsetColumns & offset_columns); void addStreams( const NameAndTypePair & column, diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index 413e6838665..d59da08aa6c 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -46,35 +46,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact( { try { - size_t columns_num = columns_to_read.size(); - - column_positions.resize(columns_num); - read_only_offsets.resize(columns_num); - - for (size_t i = 0; i < columns_num; ++i) - { - const auto & column_to_read = columns_to_read[i]; - - if (column_to_read.isSubcolumn()) - { - auto storage_column_from_part = getColumnInPart( - {column_to_read.getNameInStorage(), column_to_read.getTypeInStorage()}); - - if (!storage_column_from_part.type->tryGetSubcolumnType(column_to_read.getSubcolumnName())) - continue; - } - - auto position = data_part_info_for_read->getColumnPosition(column_to_read.getNameInStorage()); - if (!position && typeid_cast(column_to_read.type.get())) - { - /// If array of Nested column is missing in part, - /// we have to read its offsets if they exist. - position = findColumnForOffsets(column_to_read.name); - read_only_offsets[i] = (position != std::nullopt); - } - - column_positions[i] = std::move(position); - } + fillColumnPositions(); /// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data. auto buffer_size = getReadBufferSize(*data_part_info_for_read, marks_loader, column_positions, all_mark_ranges); @@ -137,6 +109,44 @@ MergeTreeReaderCompact::MergeTreeReaderCompact( } } +void MergeTreeReaderCompact::fillColumnPositions() +{ + size_t columns_num = columns_to_read.size(); + + column_positions.resize(columns_num); + read_only_offsets.resize(columns_num); + + for (size_t i = 0; i < columns_num; ++i) + { + const auto & column_to_read = columns_to_read[i]; + + auto position = data_part_info_for_read->getColumnPosition(column_to_read.getNameInStorage()); + bool is_array = isArray(column_to_read.type); + + if (column_to_read.isSubcolumn()) + { + auto storage_column_from_part = getColumnInPart( + {column_to_read.getNameInStorage(), column_to_read.getTypeInStorage()}); + + auto subcolumn_name = column_to_read.getSubcolumnName(); + if (!storage_column_from_part.type->hasSubcolumn(subcolumn_name)) + position.reset(); + } + + if (!position && is_array) + { + /// If array of Nested column is missing in part, + /// we have to read its offsets if they exist. + position = findColumnForOffsets(column_to_read); + read_only_offsets[i] = (position != std::nullopt); + } + + column_positions[i] = std::move(position); + if (read_only_offsets[i]) + partially_read_columns.insert(column_to_read.name); + } +} + size_t MergeTreeReaderCompact::readRows( size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) { @@ -214,7 +224,8 @@ void MergeTreeReaderCompact::readData( auto buffer_getter = [&](const ISerialization::SubstreamPath & substream_path) -> ReadBuffer * { - if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != ISerialization::Substream::ArraySizes)) + bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes; + if (only_offsets && !is_offsets) return nullptr; return data_buffer; diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.h b/src/Storages/MergeTree/MergeTreeReaderCompact.h index afc360adc51..953455b7e26 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.h +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.h @@ -39,6 +39,7 @@ public: private: bool isContinuousReading(size_t mark, size_t column_position); + void fillColumnPositions(); ReadBuffer * data_buffer; CompressedReadBufferBase * compressed_data_buffer; diff --git a/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp b/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp index c392199fa9e..3b3a6b95cff 100644 --- a/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp @@ -33,13 +33,19 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory( {}) , part_in_memory(std::move(data_part_)) { - for (const auto & [name, type] : columns_to_read) + for (const auto & column_to_read : columns_to_read) { /// If array of Nested column is missing in part, /// we have to read its offsets if they exist. - if (!part_in_memory->block.has(name) && typeid_cast(type.get())) - if (auto offset_position = findColumnForOffsets(name)) - positions_for_offsets[name] = *offset_position; + if (typeid_cast(column_to_read.type.get()) + && !tryGetColumnFromBlock(part_in_memory->block, column_to_read)) + { + if (auto offsets_position = findColumnForOffsets(column_to_read)) + { + positions_for_offsets[column_to_read.name] = *offsets_position; + partially_read_columns.insert(column_to_read.name); + } + } } } diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.cpp b/src/Storages/MergeTree/MergeTreeReaderWide.cpp index 5a048e8bc1a..22f07e26473 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderWide.cpp @@ -16,7 +16,6 @@ namespace DB namespace { - using OffsetColumns = std::map; constexpr auto DATA_FILE_EXTENSION = ".bin"; } @@ -160,12 +159,18 @@ void MergeTreeReaderWide::addStreams( const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type) { + bool has_any_stream = false; + bool has_all_streams = true; + ISerialization::StreamCallback callback = [&] (const ISerialization::SubstreamPath & substream_path) { String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path); if (streams.contains(stream_name)) + { + has_any_stream = true; return; + } bool data_file_exists = data_part_info_for_read->getChecksums().files.contains(stream_name + DATA_FILE_EXTENSION); @@ -173,8 +178,12 @@ void MergeTreeReaderWide::addStreams( * It is necessary since it allows to add new column to structure of the table without creating new files for old parts. */ if (!data_file_exists) + { + has_all_streams = false; return; + } + has_any_stream = true; bool is_lc_dict = substream_path.size() > 1 && substream_path[substream_path.size() - 2].type == ISerialization::Substream::Type::DictionaryKeys; streams.emplace(stream_name, std::make_unique( @@ -186,6 +195,9 @@ void MergeTreeReaderWide::addStreams( }; serialization->enumerateStreams(callback); + + if (has_any_stream && !has_all_streams) + partially_read_columns.insert(name_and_type.name); } @@ -283,6 +295,7 @@ void MergeTreeReaderWide::readData( /* seek_to_start = */false, substream_path, streams, name_and_type, from_mark, seek_to_mark, current_task_last_mark, cache); }; + deserialize_settings.continuous_reading = continue_reading; auto & deserialize_state = deserialize_binary_bulk_state_map[name_and_type.name]; diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index f3f1162287f..e4dbfe15095 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -95,7 +95,7 @@ protected: ++name_and_type; } - fillMissingColumns(columns, src.rows(), column_names_and_types, /*metadata_snapshot=*/ nullptr); + fillMissingColumns(columns, src.rows(), column_names_and_types, column_names_and_types, {}, nullptr); assert(std::all_of(columns.begin(), columns.end(), [](const auto & column) { return column != nullptr; })); return Chunk(std::move(columns), src.rows()); diff --git a/src/Storages/System/StorageSystemPartsColumns.cpp b/src/Storages/System/StorageSystemPartsColumns.cpp index 87a5afe2439..8837c11970d 100644 --- a/src/Storages/System/StorageSystemPartsColumns.cpp +++ b/src/Storages/System/StorageSystemPartsColumns.cpp @@ -242,7 +242,7 @@ void StorageSystemPartsColumns::processNextStorage( IDataType::forEachSubcolumn([&](const auto & subpath, const auto & name, const auto & data) { /// We count only final subcolumns, which are represented by files on disk - /// and skip intermediate suibcolumns of types Tuple and Nested. + /// and skip intermediate subcolumns of types Tuple and Nested. if (isTuple(data.type) || isNested(data.type)) return; @@ -270,7 +270,7 @@ void StorageSystemPartsColumns::processNextStorage( subcolumn_data_uncompressed_bytes.push_back(size.data_uncompressed); subcolumn_marks_bytes.push_back(size.marks); - }, { serialization, column.type, nullptr, nullptr }); + }, ISerialization::SubstreamData(serialization).withType(column.type)); if (columns_mask[src_index++]) columns[res_index++]->insert(subcolumn_names); diff --git a/tests/ci/ast_fuzzer_check.py b/tests/ci/ast_fuzzer_check.py index f3939dc89ad..8f94ef4a915 100644 --- a/tests/ci/ast_fuzzer_check.py +++ b/tests/ci/ast_fuzzer_check.py @@ -29,7 +29,11 @@ IMAGE_NAME = "clickhouse/fuzzer" def get_run_command(pr_number, sha, download_url, workspace_path, image): return ( - f"docker run --network=host --volume={workspace_path}:/workspace " + f"docker run " + # For sysctl + "--privileged " + "--network=host " + f"--volume={workspace_path}:/workspace " "--cap-add syslog --cap-add sys_admin --cap-add=SYS_PTRACE " f'-e PR_TO_TEST={pr_number} -e SHA_TO_TEST={sha} -e BINARY_URL_TO_DOWNLOAD="{download_url}" ' f"{image}" diff --git a/tests/ci/stress_check.py b/tests/ci/stress_check.py index e644eef3bc8..8f310eaa99d 100644 --- a/tests/ci/stress_check.py +++ b/tests/ci/stress_check.py @@ -33,7 +33,7 @@ def get_run_command( "docker run --cap-add=SYS_PTRACE " # a static link, don't use S3_URL or S3_DOWNLOAD "-e S3_URL='https://s3.amazonaws.com/clickhouse-datasets' " - # For dmesg + # For dmesg and sysctl "--privileged " f"--volume={build_path}:/package_folder " f"--volume={result_folder}:/test_output " diff --git a/tests/queries/0_stateless/01825_type_json_17.reference b/tests/queries/0_stateless/01825_type_json_17.reference new file mode 100644 index 00000000000..0f97bfed5bc --- /dev/null +++ b/tests/queries/0_stateless/01825_type_json_17.reference @@ -0,0 +1,27 @@ +Tuple(arr Nested(k1 Nested(k2 String, k3 String, k4 Int8), k5 Tuple(k6 String)), id Int8) +{"obj":{"arr":[{"k1":[{"k2":"aaa","k3":"bbb","k4":0},{"k2":"ccc","k3":"","k4":0}],"k5":{"k6":""}}],"id":1}} +{"obj":{"arr":[{"k1":[{"k2":"","k3":"ddd","k4":10},{"k2":"","k3":"","k4":20}],"k5":{"k6":"foo"}}],"id":2}} +[['bbb','']] [['aaa','ccc']] +[['ddd','']] [['','']] +1 +[[0,0]] +[[10,20]] +Tuple(arr Nested(k1 Nested(k2 String, k3 Nested(k4 Int8))), id Int8) +{"obj":{"arr":[{"k1":[{"k2":"aaa","k3":[]}]}],"id":1}} +{"obj":{"arr":[{"k1":[{"k2":"bbb","k3":[{"k4":10}]},{"k2":"ccc","k3":[{"k4":20}]}]}],"id":2}} +[['aaa']] [[[]]] +[['bbb','ccc']] [[[10],[20]]] +1 +[[[]]] +[[[10],[20]]] +Tuple(arr Nested(k1 Nested(k2 String, k4 Nested(k5 Int8)), k3 String), id Int8) +{"obj":{"arr":[{"k1":[],"k3":"qqq"},{"k1":[],"k3":"www"}],"id":1}} +{"obj":{"arr":[{"k1":[{"k2":"aaa","k4":[]}],"k3":"eee"}],"id":2}} +{"obj":{"arr":[{"k1":[{"k2":"bbb","k4":[{"k5":10}]},{"k2":"ccc","k4":[{"k5":20}]}],"k3":"rrr"}],"id":3}} +['qqq','www'] [[],[]] [[],[]] +['eee'] [['aaa']] [[[]]] +['rrr'] [['bbb','ccc']] [[[10],[20]]] +1 +[[],[]] +[[[]]] +[[[10],[20]]] diff --git a/tests/queries/0_stateless/01825_type_json_17.sql b/tests/queries/0_stateless/01825_type_json_17.sql new file mode 100644 index 00000000000..e3c0c83322b --- /dev/null +++ b/tests/queries/0_stateless/01825_type_json_17.sql @@ -0,0 +1,48 @@ +-- Tags: no-fasttest + +DROP TABLE IF EXISTS t_json_17; +SET allow_experimental_object_type = 1; +SET output_format_json_named_tuples_as_objects = 1; + +CREATE TABLE t_json_17(obj JSON) +ENGINE = MergeTree ORDER BY tuple(); + +DROP FUNCTION IF EXISTS hasValidSizes17; +CREATE FUNCTION hasValidSizes17 AS (arr1, arr2) -> length(arr1) = length(arr2) AND arrayAll((x, y) -> length(x) = length(y), arr1, arr2); + +SYSTEM STOP MERGES t_json_17; + +INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k1": [{"k2": "aaa", "k3": "bbb"}, {"k2": "ccc"}]}]} +INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k3": "ddd", "k4": 10}, {"k4": 20}], "k5": {"k6": "foo"}}]} + +SELECT toTypeName(obj) FROM t_json_17 LIMIT 1; +SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow; +SELECT obj.arr.k1.k3, obj.arr.k1.k2 FROM t_json_17 ORDER BY obj.id; +SELECT sum(hasValidSizes17(obj.arr.k1.k3, obj.arr.k1.k2)) == count() FROM t_json_17; +SELECT obj.arr.k1.k4 FROM t_json_17 ORDER BY obj.id; + +TRUNCATE TABLE t_json_17; + +INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k1": [{"k2": "aaa"}]}]} +INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k2": "bbb", "k3": [{"k4": 10}]}, {"k2": "ccc", "k3": [{"k4": 20}]}]}]} + +SELECT toTypeName(obj) FROM t_json_17 LIMIT 1; +SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow; +SELECT obj.arr.k1.k2, obj.arr.k1.k3.k4 FROM t_json_17 ORDER BY obj.id; +SELECT sum(hasValidSizes17(obj.arr.k1.k2, obj.arr.k1.k3.k4)) == count() FROM t_json_17; +SELECT obj.arr.k1.k3.k4 FROM t_json_17 ORDER BY obj.id; + +TRUNCATE TABLE t_json_17; + +INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k3": "qqq"}, {"k3": "www"}]} +INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k2": "aaa"}], "k3": "eee"}]} +INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 3, "arr": [{"k1": [{"k2": "bbb", "k4": [{"k5": 10}]}, {"k2": "ccc", "k4": [{"k5": 20}]}], "k3": "rrr"}]} + +SELECT toTypeName(obj) FROM t_json_17 LIMIT 1; +SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow; +SELECT obj.arr.k3, obj.arr.k1.k2, obj.arr.k1.k4.k5 FROM t_json_17 ORDER BY obj.id; +SELECT sum(hasValidSizes17(obj.arr.k1.k2, obj.arr.k1.k4.k5)) == count() FROM t_json_17; +SELECT obj.arr.k1.k4.k5 FROM t_json_17 ORDER BY obj.id; + +DROP FUNCTION hasValidSizes17; +DROP TABLE t_json_17;