diff --git a/src/Columns/ColumnArray.h b/src/Columns/ColumnArray.h index f77268a8be6..df52880d6e4 100644 --- a/src/Columns/ColumnArray.h +++ b/src/Columns/ColumnArray.h @@ -192,6 +192,13 @@ public: bool hasDynamicStructure() const override { return getData().hasDynamicStructure(); } void takeDynamicStructureFromSourceColumns(const Columns & source_columns) override; + bool dynamicStructureEquals(const IColumn & rhs) const override + { + if (const auto * rhs_concrete = typeid_cast(&rhs)) + return data->dynamicStructureEquals(*rhs_concrete->data); + return false; + } + private: WrappedPtr data; WrappedPtr offsets; diff --git a/src/Columns/ColumnDynamic.cpp b/src/Columns/ColumnDynamic.cpp index 5a837a62761..09a05e52c90 100644 --- a/src/Columns/ColumnDynamic.cpp +++ b/src/Columns/ColumnDynamic.cpp @@ -1153,6 +1153,15 @@ void ColumnDynamic::prepareVariantsForSquashing(const Columns & source_columns) } } +bool ColumnDynamic::dynamicStructureEquals(const IColumn & rhs) const +{ + if (const auto * rhs_concrete = typeid_cast(&rhs)) + return max_dynamic_types == rhs_concrete->max_dynamic_types && global_max_dynamic_types == rhs_concrete->global_max_dynamic_types + && variant_info.variant_name == rhs_concrete->variant_info.variant_name + && variant_column->dynamicStructureEquals(*rhs_concrete->variant_column); + return false; +} + void ColumnDynamic::takeDynamicStructureFromSourceColumns(const Columns & source_columns) { if (!empty()) diff --git a/src/Columns/ColumnDynamic.h b/src/Columns/ColumnDynamic.h index 17b0d80e5eb..9e8b1f79321 100644 --- a/src/Columns/ColumnDynamic.h +++ b/src/Columns/ColumnDynamic.h @@ -367,6 +367,7 @@ public: bool addNewVariant(const DataTypePtr & new_variant) { return addNewVariant(new_variant, new_variant->getName()); } bool hasDynamicStructure() const override { return true; } + bool dynamicStructureEquals(const IColumn & rhs) const override; void takeDynamicStructureFromSourceColumns(const Columns & source_columns) override; const StatisticsPtr & getStatistics() const { return statistics; } diff --git a/src/Columns/ColumnMap.cpp b/src/Columns/ColumnMap.cpp index 536da4d06d0..4e81191939f 100644 --- a/src/Columns/ColumnMap.cpp +++ b/src/Columns/ColumnMap.cpp @@ -330,6 +330,13 @@ bool ColumnMap::structureEquals(const IColumn & rhs) const return false; } +bool ColumnMap::dynamicStructureEquals(const IColumn & rhs) const +{ + if (const auto * rhs_map = typeid_cast(&rhs)) + return nested->dynamicStructureEquals(*rhs_map->nested); + return false; +} + ColumnPtr ColumnMap::compress() const { auto compressed = nested->compress(); diff --git a/src/Columns/ColumnMap.h b/src/Columns/ColumnMap.h index 39d15a586b9..8cb0b1680a7 100644 --- a/src/Columns/ColumnMap.h +++ b/src/Columns/ColumnMap.h @@ -120,6 +120,7 @@ public: ColumnPtr compress() const override; bool hasDynamicStructure() const override { return nested->hasDynamicStructure(); } + bool dynamicStructureEquals(const IColumn & rhs) const override; void takeDynamicStructureFromSourceColumns(const Columns & source_columns) override; }; diff --git a/src/Columns/ColumnObject.cpp b/src/Columns/ColumnObject.cpp index 3577ab1ec82..8e0182c7276 100644 --- a/src/Columns/ColumnObject.cpp +++ b/src/Columns/ColumnObject.cpp @@ -1299,6 +1299,31 @@ void ColumnObject::prepareForSquashing(const std::vector & source_col } } +bool ColumnObject::dynamicStructureEquals(const IColumn & rhs) const +{ + const auto * rhs_object = typeid_cast(&rhs); + if (!rhs_object || typed_paths.size() != rhs_object->typed_paths.size() + || global_max_dynamic_paths != rhs_object->global_max_dynamic_paths || max_dynamic_types != rhs_object->max_dynamic_types + || dynamic_paths.size() != rhs_object->dynamic_paths.size()) + return false; + + for (const auto & [path, column] : typed_paths) + { + auto it = rhs_object->typed_paths.find(path); + if (it == rhs_object->typed_paths.end() || !it->second->dynamicStructureEquals(*column)) + return false; + } + + for (const auto & [path, column] : dynamic_paths) + { + auto it = rhs_object->dynamic_paths.find(path); + if (it == rhs_object->dynamic_paths.end() || !it->second->dynamicStructureEquals(*column)) + return false; + } + + return true; +} + void ColumnObject::takeDynamicStructureFromSourceColumns(const DB::Columns & source_columns) { if (!empty()) diff --git a/src/Columns/ColumnObject.h b/src/Columns/ColumnObject.h index c7f282d9079..d5370625115 100644 --- a/src/Columns/ColumnObject.h +++ b/src/Columns/ColumnObject.h @@ -172,6 +172,7 @@ public: bool isFinalized() const override; bool hasDynamicStructure() const override { return true; } + bool dynamicStructureEquals(const IColumn & rhs) const override; void takeDynamicStructureFromSourceColumns(const Columns & source_columns) override; const PathToColumnMap & getTypedPaths() const { return typed_paths; } @@ -221,6 +222,7 @@ public: void setDynamicPaths(const std::vector & paths); void setMaxDynamicPaths(size_t max_dynamic_paths_); + void setGlobalMaxDynamicPaths(size_t global_max_dynamic_paths_); void setStatistics(const StatisticsPtr & statistics_) { statistics = statistics_; } void serializePathAndValueIntoSharedData(ColumnString * shared_data_paths, ColumnString * shared_data_values, std::string_view path, const IColumn & column, size_t n); diff --git a/src/Columns/ColumnTuple.cpp b/src/Columns/ColumnTuple.cpp index e741eb51c68..42acfdc85be 100644 --- a/src/Columns/ColumnTuple.cpp +++ b/src/Columns/ColumnTuple.cpp @@ -727,6 +727,26 @@ bool ColumnTuple::hasDynamicStructure() const return false; } +bool ColumnTuple::dynamicStructureEquals(const IColumn & rhs) const +{ + if (const auto * rhs_tuple = typeid_cast(&rhs)) + { + const size_t tuple_size = columns.size(); + if (tuple_size != rhs_tuple->columns.size()) + return false; + + for (size_t i = 0; i < tuple_size; ++i) + if (!columns[i]->dynamicStructureEquals(*rhs_tuple->columns[i])) + return false; + + return true; + } + else + { + return false; + } +} + void ColumnTuple::takeDynamicStructureFromSourceColumns(const Columns & source_columns) { std::vector nested_source_columns; diff --git a/src/Columns/ColumnTuple.h b/src/Columns/ColumnTuple.h index 6968294aef9..2539c27c441 100644 --- a/src/Columns/ColumnTuple.h +++ b/src/Columns/ColumnTuple.h @@ -138,6 +138,7 @@ public: ColumnPtr & getColumnPtr(size_t idx) { return columns[idx]; } bool hasDynamicStructure() const override; + bool dynamicStructureEquals(const IColumn & rhs) const override; void takeDynamicStructureFromSourceColumns(const Columns & source_columns) override; /// Empty tuple needs a public method to manage its size. diff --git a/src/Columns/ColumnVariant.cpp b/src/Columns/ColumnVariant.cpp index c6511695f5c..a18dffd8360 100644 --- a/src/Columns/ColumnVariant.cpp +++ b/src/Columns/ColumnVariant.cpp @@ -1376,6 +1376,23 @@ bool ColumnVariant::structureEquals(const IColumn & rhs) const return true; } +bool ColumnVariant::dynamicStructureEquals(const IColumn & rhs) const +{ + const auto * rhs_variant = typeid_cast(&rhs); + if (!rhs_variant) + return false; + + const size_t num_variants = variants.size(); + if (num_variants != rhs_variant->variants.size()) + return false; + + for (size_t i = 0; i < num_variants; ++i) + if (!variants[i]->dynamicStructureEquals(rhs_variant->getVariantByGlobalDiscriminator(globalDiscriminatorByLocal(i)))) + return false; + + return true; +} + ColumnPtr ColumnVariant::compress() const { ColumnPtr local_discriminators_compressed = local_discriminators->compress(); diff --git a/src/Columns/ColumnVariant.h b/src/Columns/ColumnVariant.h index 925eab74af8..2084de4fae7 100644 --- a/src/Columns/ColumnVariant.h +++ b/src/Columns/ColumnVariant.h @@ -327,6 +327,7 @@ public: void extend(const std::vector & old_to_new_global_discriminators, std::vector> && new_variants_and_discriminators); bool hasDynamicStructure() const override; + bool dynamicStructureEquals(const IColumn & rhs) const override; void takeDynamicStructureFromSourceColumns(const Columns & source_columns) override; private: diff --git a/src/Columns/IColumn.h b/src/Columns/IColumn.h index e4fe233ffdf..7131765f99c 100644 --- a/src/Columns/IColumn.h +++ b/src/Columns/IColumn.h @@ -590,6 +590,9 @@ public: /// Checks if column has dynamic subcolumns. virtual bool hasDynamicStructure() const { return false; } + + /// For columns with dynamic subcolumns checks if columns have equal dynamic structure. + [[nodiscard]] virtual bool dynamicStructureEquals(const IColumn & rhs) const { return structureEquals(rhs); } /// For columns with dynamic subcolumns this method takes dynamic structure from source columns /// and creates proper resulting dynamic structure in advance for merge of these source columns. virtual void takeDynamicStructureFromSourceColumns(const std::vector & /*source_columns*/) {} diff --git a/src/DataTypes/Serializations/SerializationDynamic.cpp b/src/DataTypes/Serializations/SerializationDynamic.cpp index 18a75918499..b00668fa8a4 100644 --- a/src/DataTypes/Serializations/SerializationDynamic.cpp +++ b/src/DataTypes/Serializations/SerializationDynamic.cpp @@ -26,8 +26,8 @@ namespace ErrorCodes struct SerializeBinaryBulkStateDynamic : public ISerialization::SerializeBinaryBulkState { - SerializationDynamic::DynamicStructureSerializationVersion structure_version; - size_t max_dynamic_types; + SerializationDynamic::DynamicSerializationVersion structure_version; + size_t num_dynamic_types; DataTypePtr variant_type; Names variant_names; SerializationPtr variant_serialization; @@ -81,14 +81,14 @@ void SerializationDynamic::enumerateStreams( settings.path.pop_back(); } -SerializationDynamic::DynamicStructureSerializationVersion::DynamicStructureSerializationVersion(UInt64 version) : value(static_cast(version)) +SerializationDynamic::DynamicSerializationVersion::DynamicSerializationVersion(UInt64 version) : value(static_cast(version)) { checkVersion(version); } -void SerializationDynamic::DynamicStructureSerializationVersion::checkVersion(UInt64 version) +void SerializationDynamic::DynamicSerializationVersion::checkVersion(UInt64 version) { - if (version != VariantTypeName) + if (version != V1 && version != V2) throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid version for Dynamic structure serialization."); } @@ -108,22 +108,17 @@ void SerializationDynamic::serializeBinaryBulkStatePrefix( throw Exception(ErrorCodes::LOGICAL_ERROR, "Missing stream for Dynamic column structure during serialization of binary bulk state prefix"); /// Write structure serialization version. - UInt64 structure_version = DynamicStructureSerializationVersion::Value::VariantTypeName; + UInt64 structure_version = DynamicSerializationVersion::Value::V2; writeBinaryLittleEndian(structure_version, *stream); auto dynamic_state = std::make_shared(structure_version); - dynamic_state->max_dynamic_types = column_dynamic.getMaxDynamicTypes(); - /// Write max_dynamic_types parameter, because it can differ from the max_dynamic_types - /// that is specified in the Dynamic type (we could decrease it before merge). - writeVarUInt(dynamic_state->max_dynamic_types, *stream); - dynamic_state->variant_type = variant_info.variant_type; dynamic_state->variant_names = variant_info.variant_names; const auto & variant_column = column_dynamic.getVariantColumn(); - /// Write information about variants. - size_t num_variants = dynamic_state->variant_names.size() - 1; /// Don't write shared variant, Dynamic column should always have it. - writeVarUInt(num_variants, *stream); + /// Write information about dynamic types. + dynamic_state->num_dynamic_types = dynamic_state->variant_names.size() - 1; /// -1 for SharedVariant + writeVarUInt(dynamic_state->num_dynamic_types, *stream); if (settings.data_types_binary_encoding) { const auto & variants = assert_cast(*dynamic_state->variant_type).getVariants(); @@ -251,22 +246,25 @@ ISerialization::DeserializeBinaryBulkStatePtr SerializationDynamic::deserializeD UInt64 structure_version; readBinaryLittleEndian(structure_version, *structure_stream); auto structure_state = std::make_shared(structure_version); - /// Read max_dynamic_types parameter. - readVarUInt(structure_state->max_dynamic_types, *structure_stream); + if (structure_state->structure_version.value == DynamicSerializationVersion::Value::V1) + { + /// Skip max_dynamic_types parameter in V1 serialization version. + size_t max_dynamic_types; + readVarUInt(max_dynamic_types, *structure_stream); + } /// Read information about variants. DataTypes variants; - size_t num_variants; - readVarUInt(num_variants, *structure_stream); - variants.reserve(num_variants + 1); /// +1 for shared variant. + readVarUInt(structure_state->num_dynamic_types, *structure_stream); + variants.reserve(structure_state->num_dynamic_types + 1); /// +1 for shared variant. if (settings.data_types_binary_encoding) { - for (size_t i = 0; i != num_variants; ++i) + for (size_t i = 0; i != structure_state->num_dynamic_types; ++i) variants.push_back(decodeDataType(*structure_stream)); } else { String data_type_name; - for (size_t i = 0; i != num_variants; ++i) + for (size_t i = 0; i != structure_state->num_dynamic_types; ++i) { readStringBinary(data_type_name, *structure_stream); variants.push_back(DataTypeFactory::instance().get(data_type_name)); @@ -364,9 +362,6 @@ void SerializationDynamic::serializeBinaryBulkWithMultipleStreamsAndCountTotalSi if (!variant_info.variant_type->equals(*dynamic_state->variant_type)) throw Exception(ErrorCodes::LOGICAL_ERROR, "Mismatch of internal columns of Dynamic. Expected: {}, Got: {}", dynamic_state->variant_type->getName(), variant_info.variant_type->getName()); - if (column_dynamic.getMaxDynamicTypes() != dynamic_state->max_dynamic_types) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Mismatch of max_dynamic_types parameter of Dynamic. Expected: {}, Got: {}", dynamic_state->max_dynamic_types, column_dynamic.getMaxDynamicTypes()); - settings.path.push_back(Substream::DynamicData); assert_cast(*dynamic_state->variant_serialization) .serializeBinaryBulkWithMultipleStreamsAndUpdateVariantStatistics( @@ -424,7 +419,7 @@ void SerializationDynamic::deserializeBinaryBulkWithMultipleStreams( if (mutable_column->empty()) { - column_dynamic.setMaxDynamicPaths(structure_state->max_dynamic_types); + column_dynamic.setMaxDynamicPaths(structure_state->num_dynamic_types); column_dynamic.setVariantType(structure_state->variant_type); column_dynamic.setStatistics(structure_state->statistics); } diff --git a/src/DataTypes/Serializations/SerializationDynamic.h b/src/DataTypes/Serializations/SerializationDynamic.h index f34b5d0e770..ac98bbbc8b5 100644 --- a/src/DataTypes/Serializations/SerializationDynamic.h +++ b/src/DataTypes/Serializations/SerializationDynamic.h @@ -16,18 +16,28 @@ public: { } - struct DynamicStructureSerializationVersion + struct DynamicSerializationVersion { enum Value { - VariantTypeName = 1, + /// V1 serialization: + /// - DynamicStructure stream: + /// + /// + /// + /// (only in MergeTree serialization) + /// (only in MergeTree serialization) + /// - DynamicData stream: contains the data of nested Variant column. + V1 = 1, + /// V2 serialization: the same as V1 but without max_dynamic_types parameter in DynamicStructure stream. + V2 = 2, }; Value value; static void checkVersion(UInt64 version); - explicit DynamicStructureSerializationVersion(UInt64 version); + explicit DynamicSerializationVersion(UInt64 version); }; void enumerateStreams( @@ -113,9 +123,9 @@ private: struct DeserializeBinaryBulkStateDynamicStructure : public ISerialization::DeserializeBinaryBulkState { - DynamicStructureSerializationVersion structure_version; + DynamicSerializationVersion structure_version; DataTypePtr variant_type; - size_t max_dynamic_types; + size_t num_dynamic_types; ColumnDynamic::StatisticsPtr statistics; explicit DeserializeBinaryBulkStateDynamicStructure(UInt64 structure_version_) diff --git a/src/DataTypes/Serializations/SerializationObject.cpp b/src/DataTypes/Serializations/SerializationObject.cpp index 760f6ce750d..b3ac2c52d70 100644 --- a/src/DataTypes/Serializations/SerializationObject.cpp +++ b/src/DataTypes/Serializations/SerializationObject.cpp @@ -68,14 +68,13 @@ SerializationObject::ObjectSerializationVersion::ObjectSerializationVersion(UInt void SerializationObject::ObjectSerializationVersion::checkVersion(UInt64 version) { - if (version != BASIC) + if (version != V1 && version != V2) throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid version for Object structure serialization."); } struct SerializeBinaryBulkStateObject: public ISerialization::SerializeBinaryBulkState { SerializationObject::ObjectSerializationVersion serialization_version; - size_t max_dynamic_paths; std::vector sorted_dynamic_paths; std::unordered_map typed_path_states; std::unordered_map dynamic_path_states; @@ -193,13 +192,10 @@ void SerializationObject::serializeBinaryBulkStatePrefix( throw Exception(ErrorCodes::LOGICAL_ERROR, "Missing stream for Object column structure during serialization of binary bulk state prefix"); /// Write serialization version. - UInt64 serialization_version = ObjectSerializationVersion::Value::BASIC; + UInt64 serialization_version = ObjectSerializationVersion::Value::V2; writeBinaryLittleEndian(serialization_version, *stream); auto object_state = std::make_shared(serialization_version); - object_state->max_dynamic_paths = column_object.getMaxDynamicPaths(); - /// Write max_dynamic_paths parameter. - writeVarUInt(object_state->max_dynamic_paths, *stream); /// Write all dynamic paths in sorted order. object_state->sorted_dynamic_paths.reserve(dynamic_paths.size()); for (const auto & [path, _] : dynamic_paths) @@ -353,8 +349,13 @@ ISerialization::DeserializeBinaryBulkStatePtr SerializationObject::deserializeOb UInt64 serialization_version; readBinaryLittleEndian(serialization_version, *structure_stream); auto structure_state = std::make_shared(serialization_version); - /// Read max_dynamic_paths parameter. - readVarUInt(structure_state->max_dynamic_paths, *structure_stream); + if (structure_state->structure_version.value == ObjectSerializationVersion::Value::V1) + { + /// Skip max_dynamic_paths parameter in V1 serialization version. + size_t max_dynamic_paths; + readVarUInt(max_dynamic_paths, *structure_stream); + } + /// Read the sorted list of dynamic paths. size_t dynamic_paths_size; readVarUInt(dynamic_paths_size, *structure_stream); @@ -411,9 +412,6 @@ void SerializationObject::serializeBinaryBulkWithMultipleStreams( const auto & shared_data = column_object.getSharedDataPtr(); auto * object_state = checkAndGetState(state); - if (column_object.getMaxDynamicPaths() != object_state->max_dynamic_paths) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Mismatch of max_dynamic_paths parameter of Object. Expected: {}, Got: {}", object_state->max_dynamic_paths, column_object.getMaxDynamicPaths()); - if (column_object.getDynamicPaths().size() != object_state->sorted_dynamic_paths.size()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Mismatch of number of dynamic paths in Object. Expected: {}, Got: {}", object_state->sorted_dynamic_paths.size(), column_object.getDynamicPaths().size()); @@ -538,7 +536,7 @@ void SerializationObject::deserializeBinaryBulkWithMultipleStreams( /// If it's a new object column, set dynamic paths and statistics. if (column_object.empty()) { - column_object.setMaxDynamicPaths(structure_state->max_dynamic_paths); + column_object.setMaxDynamicPaths(structure_state->sorted_dynamic_paths.size()); column_object.setDynamicPaths(structure_state->sorted_dynamic_paths); column_object.setStatistics(structure_state->statistics); } diff --git a/src/DataTypes/Serializations/SerializationObject.h b/src/DataTypes/Serializations/SerializationObject.h index 62ff9849f45..ba66dd6470e 100644 --- a/src/DataTypes/Serializations/SerializationObject.h +++ b/src/DataTypes/Serializations/SerializationObject.h @@ -19,7 +19,20 @@ public: { enum Value { - BASIC = 0, + /// V1 serialization: + /// - ObjectStructure stream: + /// + /// + /// + /// (only in MergeTree serialization) + /// (only in MergeTree serialization) + /// - ObjectData stream: + /// - ObjectTypedPath stream for each column in typed paths + /// - ObjectDynamicPath stream for each column in dynamic paths + /// - ObjectSharedData stream shared data column. + V1 = 0, + /// V2 serialization: the same as V1 but without max_dynamic_paths parameter in ObjectStructure stream. + V2 = 2, }; Value value; @@ -82,7 +95,6 @@ private: struct DeserializeBinaryBulkStateObjectStructure : public ISerialization::DeserializeBinaryBulkState { ObjectSerializationVersion structure_version; - size_t max_dynamic_paths; std::vector sorted_dynamic_paths; std::unordered_set dynamic_paths; /// Paths statistics. Map (dynamic path) -> (number of non-null values in this path). diff --git a/src/Functions/FunctionsConversion.cpp b/src/Functions/FunctionsConversion.cpp index ed13e581759..a7098e85ea0 100644 --- a/src/Functions/FunctionsConversion.cpp +++ b/src/Functions/FunctionsConversion.cpp @@ -83,6 +83,7 @@ namespace Setting extern const SettingsBool input_format_ipv4_default_on_conversion_error; extern const SettingsBool input_format_ipv6_default_on_conversion_error; extern const SettingsBool precise_float_parsing; + extern const SettingsBool cast_to_json_disable_dynamic_subcolumns; } namespace ErrorCodes @@ -4056,9 +4057,7 @@ private: { return [this](ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, const ColumnNullable * nullable_source, size_t input_rows_count) { - auto res = ConvertImplGenericFromString::execute(arguments, result_type, nullable_source, input_rows_count, context)->assumeMutable(); - res->finalize(); - return res; + return ConvertImplGenericFromString::execute(arguments, result_type, nullable_source, input_rows_count, context)->assumeMutable(); }; } diff --git a/src/Storages/AlterCommands.cpp b/src/Storages/AlterCommands.cpp index 460d74e68bf..0d7d3295e0a 100644 --- a/src/Storages/AlterCommands.cpp +++ b/src/Storages/AlterCommands.cpp @@ -1466,13 +1466,13 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const "The change of data type {} of column {} to {} is not allowed. It has known bugs", old_data_type->getName(), backQuote(column_name), command.data_type->getName()); - bool has_object_type = isObject(command.data_type); - command.data_type->forEachChild([&](const IDataType & type){ has_object_type |= isObject(type); }); - if (has_object_type) - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "The change of data type {} of column {} to {} is not supported.", - old_data_type->getName(), backQuote(column_name), command.data_type->getName()); +// bool has_object_type = isObject(command.data_type); +// command.data_type->forEachChild([&](const IDataType & type){ has_object_type |= isObject(type); }); +// if (has_object_type) +// throw Exception( +// ErrorCodes::BAD_ARGUMENTS, +// "The change of data type {} of column {} to {} is not supported.", +// old_data_type->getName(), backQuote(column_name), command.data_type->getName()); } if (command.isRemovingProperty()) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index a859172023f..96623307c8f 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -61,22 +61,6 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact( } } -void MergeTreeDataPartWriterCompact::initDynamicStreamsIfNeeded(const Block & block) -{ - if (is_dynamic_streams_initialized) - return; - - is_dynamic_streams_initialized = true; - for (const auto & column : columns_list) - { - if (column.type->hasDynamicSubcolumns()) - { - auto compression = getCodecDescOrDefault(column.name, default_codec); - addStreams(column, block.getByName(column.name).column, compression); - } - } -} - void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & name_and_type, const ColumnPtr & column, const ASTPtr & effective_codec_desc) { ISerialization::StreamCallback callback = [&](const auto & substream_path) @@ -175,20 +159,25 @@ void writeColumnSingleGranule( void MergeTreeDataPartWriterCompact::write(const Block & block, const IColumn::Permutation * permutation) { - /// On first block of data initialize streams for dynamic subcolumns. - initDynamicStreamsIfNeeded(block); + Block result_block = block; + + /// During serialization columns with dynamic subcolumns (like JSON/Dynamic) must have the same dynamic structure. + /// But it may happen that they don't (for example during ALTER MODIFY COLUMN from some type to JSON/Dynamic). + /// In this case we use dynamic structure of the column from the first written block and adjust columns from + /// the next blocks so they match this dynamic structure. + initOrAdjustDynamicStructureIfNeeded(result_block); /// Fill index granularity for this block /// if it's unknown (in case of insert data or horizontal merge, /// but not in case of vertical merge) if (compute_granularity) { - size_t index_granularity_for_block = computeIndexGranularity(block); + size_t index_granularity_for_block = computeIndexGranularity(result_block); assert(index_granularity_for_block >= 1); - fillIndexGranularity(index_granularity_for_block, block.rows()); + fillIndexGranularity(index_granularity_for_block, result_block.rows()); } - Block result_block = permuteBlockIfNeeded(block, permutation); + result_block = permuteBlockIfNeeded(result_block, permutation); if (!header) header = result_block.cloneEmpty(); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h index b440a37222d..03da9c5f754 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h @@ -48,9 +48,7 @@ private: void addToChecksums(MergeTreeDataPartChecksums & checksums); - void addStreams(const NameAndTypePair & name_and_type, const ColumnPtr & column, const ASTPtr & effective_codec_desc); - - void initDynamicStreamsIfNeeded(const Block & block); + void addStreams(const NameAndTypePair & name_and_type, const ColumnPtr & column, const ASTPtr & effective_codec_desc) override; Block header; @@ -104,8 +102,6 @@ private: /// then finally to 'marks_file'. std::unique_ptr marks_compressor; std::unique_ptr marks_source_hashing; - - bool is_dynamic_streams_initialized = false; }; } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index 35914d8c50a..fbf6ac769a0 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -557,6 +557,45 @@ Names MergeTreeDataPartWriterOnDisk::getSkipIndicesColumns() const return Names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end()); } +void MergeTreeDataPartWriterOnDisk::initOrAdjustDynamicStructureIfNeeded(Block & block) +{ + if (!is_dynamic_streams_initialized) + { + for (const auto & column : columns_list) + { + if (column.type->hasDynamicSubcolumns()) + { + /// Create all streams for dynamic subcolumns using dynamic structure from block. + auto compression = getCodecDescOrDefault(column.name, default_codec); + addStreams(column, block.getByName(column.name).column, compression); + } + } + is_dynamic_streams_initialized = true; + block_sample = block.cloneEmpty(); + } + else + { + size_t size = block.columns(); + for (size_t i = 0; i != size; ++i) + { + auto & column = block.getByPosition(i); + const auto & sample_column = block_sample.getByPosition(i); + /// Check if the dynamic structure of this column is different from the sample column. + if (column.type->hasDynamicSubcolumns() && !column.column->dynamicStructureEquals(*sample_column.column)) + { + /// We need to change the dynamic structure of the column so it matches the sample column. + /// To do it, we create empty column of this type, take dynamic structure from sample column + /// and insert data into it. Resulting column will have required dynamic structure and the content + /// of the column in current block. + auto new_column = sample_column.type->createColumn(); + new_column->takeDynamicStructureFromSourceColumns({sample_column.column}); + new_column->insertRangeFrom(*column.column, 0, column.column->size()); + column.column = std::move(new_column); + } + } + } +} + template struct MergeTreeDataPartWriterOnDisk::Stream; template struct MergeTreeDataPartWriterOnDisk::Stream; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h index 8d84442981e..69a089eda1b 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h @@ -153,6 +153,14 @@ protected: /// Get unique non ordered skip indices column. Names getSkipIndicesColumns() const; + virtual void addStreams(const NameAndTypePair & name_and_type, const ColumnPtr & column, const ASTPtr & effective_codec_desc) = 0; + + /// On first block create all required streams for columns with dynamic subcolumns and remember the block sample. + /// On each next block check if dynamic structure of the columns equals to the dynamic structure of the same + /// columns in the sample block. If for some column dynamic structure is different, adjust it so it matches + /// the structure from the sample. + void initOrAdjustDynamicStructureIfNeeded(Block & block); + const MergeTreeIndices skip_indices; const ColumnsStatistics stats; @@ -187,6 +195,10 @@ protected: size_t current_mark = 0; GinIndexStoreFactory::GinIndexStores gin_index_stores; + + bool is_dynamic_streams_initialized = false; + Block block_sample; + private: void initSkipIndices(); void initPrimaryIndex(); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 04e07a0588a..ba9d82fd097 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -106,23 +106,6 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide( } } -void MergeTreeDataPartWriterWide::initDynamicStreamsIfNeeded(const DB::Block & block) -{ - if (is_dynamic_streams_initialized) - return; - - is_dynamic_streams_initialized = true; - block_sample = block.cloneEmpty(); - for (const auto & column : columns_list) - { - if (column.type->hasDynamicSubcolumns()) - { - auto compression = getCodecDescOrDefault(column.name, default_codec); - addStreams(column, block_sample.getByName(column.name).column, compression); - } - } -} - void MergeTreeDataPartWriterWide::addStreams( const NameAndTypePair & name_and_type, const ColumnPtr & column, @@ -260,15 +243,20 @@ void MergeTreeDataPartWriterWide::shiftCurrentMark(const Granules & granules_wri void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Permutation * permutation) { - /// On first block of data initialize streams for dynamic subcolumns. - initDynamicStreamsIfNeeded(block); + Block block_to_write = block; + + /// During serialization columns with dynamic subcolumns (like JSON/Dynamic) must have the same dynamic structure. + /// But it may happen that they don't (for example during ALTER MODIFY COLUMN from some type to JSON/Dynamic). + /// In this case we use dynamic structure of the column from the first written block and adjust columns from + /// the next blocks so they match this dynamic structure. + initOrAdjustDynamicStructureIfNeeded(block_to_write); /// Fill index granularity for this block /// if it's unknown (in case of insert data or horizontal merge, /// but not in case of vertical part of vertical merge) if (compute_granularity) { - size_t index_granularity_for_block = computeIndexGranularity(block); + size_t index_granularity_for_block = computeIndexGranularity(block_to_write); if (rows_written_in_last_mark > 0) { size_t rows_left_in_last_mark = index_granularity.getMarkRows(getCurrentMark()) - rows_written_in_last_mark; @@ -286,11 +274,9 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm } } - fillIndexGranularity(index_granularity_for_block, block.rows()); + fillIndexGranularity(index_granularity_for_block, block_to_write.rows()); } - Block block_to_write = block; - auto granules_to_write = getGranulesToWrite(index_granularity, block_to_write.rows(), getCurrentMark(), rows_written_in_last_mark); auto offset_columns = written_offset_columns ? *written_offset_columns : WrittenOffsetColumns{}; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h index ab86ed27c7e..78dfc93c4d2 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h @@ -91,9 +91,7 @@ private: void addStreams( const NameAndTypePair & name_and_type, const ColumnPtr & column, - const ASTPtr & effective_codec_desc); - - void initDynamicStreamsIfNeeded(const Block & block); + const ASTPtr & effective_codec_desc) override; /// Method for self check (used in debug-build only). Checks that written /// data and corresponding marks are consistent. Otherwise throws logical @@ -139,10 +137,6 @@ private: /// How many rows we have already written in the current mark. /// More than zero when incoming blocks are smaller then their granularity. size_t rows_written_in_last_mark = 0; - - Block block_sample; - - bool is_dynamic_streams_initialized = false; }; } diff --git a/tests/queries/0_stateless/03246_alter_from_string_to_json.reference b/tests/queries/0_stateless/03246_alter_from_string_to_json.reference new file mode 100644 index 00000000000..a2d3a799fff --- /dev/null +++ b/tests/queries/0_stateless/03246_alter_from_string_to_json.reference @@ -0,0 +1,134 @@ +All paths: +['key0','key1','key2','key3','key4','key5'] +Shared data paths: +key2 +key3 +key4 +key5 +{"key0":"value0"} +{"key1":"value1"} +{"key0":"value2"} +{"key1":"value3"} +{"key0":"value4"} +{"key1":"value5"} +{"key0":"value6"} +{"key1":"value7"} +{"key0":"value8"} +{"key1":"value9"} +{"key2":"value300000"} +{"key3":"value300001"} +{"key2":"value300002"} +{"key3":"value300003"} +{"key2":"value300004"} +{"key3":"value300005"} +{"key2":"value300006"} +{"key3":"value300007"} +{"key2":"value300008"} +{"key3":"value300009"} +{"key4":"value600000"} +{"key5":"value600001"} +{"key4":"value600002"} +{"key5":"value600003"} +{"key4":"value600004"} +{"key5":"value600005"} +{"key4":"value600006"} +{"key5":"value600007"} +{"key4":"value600008"} +{"key5":"value600009"} +value0 \N \N \N \N \N +\N value1 \N \N \N \N +value2 \N \N \N \N \N +\N value3 \N \N \N \N +value4 \N \N \N \N \N +\N value5 \N \N \N \N +value6 \N \N \N \N \N +\N value7 \N \N \N \N +value8 \N \N \N \N \N +\N value9 \N \N \N \N +\N \N value300000 \N \N \N +\N \N \N value300001 \N \N +\N \N value300002 \N \N \N +\N \N \N value300003 \N \N +\N \N value300004 \N \N \N +\N \N \N value300005 \N \N +\N \N value300006 \N \N \N +\N \N \N value300007 \N \N +\N \N value300008 \N \N \N +\N \N \N value300009 \N \N +\N \N \N \N value600000 \N +\N \N \N \N \N value600001 +\N \N \N \N value600002 \N +\N \N \N \N \N value600003 +\N \N \N \N value600004 \N +\N \N \N \N \N value600005 +\N \N \N \N value600006 \N +\N \N \N \N \N value600007 +\N \N \N \N value600008 \N +\N \N \N \N \N value600009 +All paths: +['key0','key1','key2','key3','key4','key5'] +Shared data paths: +key2 +key3 +key4 +key5 +{"key0":"value0"} +{"key1":"value1"} +{"key0":"value2"} +{"key1":"value3"} +{"key0":"value4"} +{"key1":"value5"} +{"key0":"value6"} +{"key1":"value7"} +{"key0":"value8"} +{"key1":"value9"} +{"key2":"value300000"} +{"key3":"value300001"} +{"key2":"value300002"} +{"key3":"value300003"} +{"key2":"value300004"} +{"key3":"value300005"} +{"key2":"value300006"} +{"key3":"value300007"} +{"key2":"value300008"} +{"key3":"value300009"} +{"key4":"value600000"} +{"key5":"value600001"} +{"key4":"value600002"} +{"key5":"value600003"} +{"key4":"value600004"} +{"key5":"value600005"} +{"key4":"value600006"} +{"key5":"value600007"} +{"key4":"value600008"} +{"key5":"value600009"} +value0 \N \N \N \N \N +\N value1 \N \N \N \N +value2 \N \N \N \N \N +\N value3 \N \N \N \N +value4 \N \N \N \N \N +\N value5 \N \N \N \N +value6 \N \N \N \N \N +\N value7 \N \N \N \N +value8 \N \N \N \N \N +\N value9 \N \N \N \N +\N \N value300000 \N \N \N +\N \N \N value300001 \N \N +\N \N value300002 \N \N \N +\N \N \N value300003 \N \N +\N \N value300004 \N \N \N +\N \N \N value300005 \N \N +\N \N value300006 \N \N \N +\N \N \N value300007 \N \N +\N \N value300008 \N \N \N +\N \N \N value300009 \N \N +\N \N \N \N value600000 \N +\N \N \N \N \N value600001 +\N \N \N \N value600002 \N +\N \N \N \N \N value600003 +\N \N \N \N value600004 \N +\N \N \N \N \N value600005 +\N \N \N \N value600006 \N +\N \N \N \N \N value600007 +\N \N \N \N value600008 \N +\N \N \N \N \N value600009 diff --git a/tests/queries/0_stateless/03246_alter_from_string_to_json.sql.j2 b/tests/queries/0_stateless/03246_alter_from_string_to_json.sql.j2 new file mode 100644 index 00000000000..a13867b145d --- /dev/null +++ b/tests/queries/0_stateless/03246_alter_from_string_to_json.sql.j2 @@ -0,0 +1,32 @@ +set allow_experimental_json_type = 1; + +drop table if exists test; + +{% for create_command in ['create table test (x UInt64, json String) engine=MergeTree order by x settings min_rows_for_wide_part=100000000, min_bytes_for_wide_part=1000000000;', + 'create table test (x UInt64, json String) engine=MergeTree order by x settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1;'] -%} + +{{ create_command }} + +insert into test select number, toJSONString(map('key' || multiIf(number < 300000, number % 2, number < 600000, number % 2 + 2, number % 2 + 4), 'value' || number)) from numbers(1000000); + +alter table test modify column json JSON settings mutations_sync=1; + +select 'All paths:'; +select distinctJSONPaths(json) from test; +select 'Shared data paths:'; +select distinct (arrayJoin(JSONSharedDataPaths(json))) as path from test order by path; +select json from test order by x limit 10; +select json from test order by x limit 10 offset 300000; +select json from test order by x limit 10 offset 600000; +select json.key0, json.key1, json.key2, json.key3, json.key4, json.key5 from test order by x limit 10; +select json.key0, json.key1, json.key2, json.key3, json.key4, json.key5 from test order by x limit 10 offset 300000; +select json.key0, json.key1, json.key2, json.key3, json.key4, json.key5 from test order by x limit 10 offset 600000; + +select json from test format Null; +select json from test order by x format Null; +select json.key0, json.key1, json.key2, json.key3, json.key4, json.key5 from test format Null; +select json.key0, json.key1, json.key2, json.key3, json.key4, json.key5 from test order by x format Null; + +drop table test; + +{% endfor -%} diff --git a/tests/queries/0_stateless/03247_ghdata_string_to_json_alter.reference b/tests/queries/0_stateless/03247_ghdata_string_to_json_alter.reference new file mode 100644 index 00000000000..ca2fb7e8ff9 --- /dev/null +++ b/tests/queries/0_stateless/03247_ghdata_string_to_json_alter.reference @@ -0,0 +1,12 @@ +5000 +leonardomso/33-js-concepts 3 +ytdl-org/youtube-dl 3 +Bogdanp/neko 2 +bminossi/AllVideoPocsFromHackerOne 2 +disclose/diodata 2 +Commit 182 +chipeo345 119 +phanwi346 114 +Nicholas Piggin 95 +direwolf-github 49 +2 diff --git a/tests/queries/0_stateless/03247_ghdata_string_to_json_alter.sh b/tests/queries/0_stateless/03247_ghdata_string_to_json_alter.sh new file mode 100755 index 00000000000..931d106120c --- /dev/null +++ b/tests/queries/0_stateless/03247_ghdata_string_to_json_alter.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +# Tags: no-fasttest, no-s3-storage, long +# ^ no-s3-storage: too memory hungry + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS ghdata" +${CLICKHOUSE_CLIENT} -q "CREATE TABLE ghdata (data String) ENGINE = MergeTree ORDER BY tuple() SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi'" + +cat $CUR_DIR/data_json/ghdata_sample.json | ${CLICKHOUSE_CLIENT} \ + --max_memory_usage 10G --query "INSERT INTO ghdata FORMAT JSONAsString" + +${CLICKHOUSE_CLIENT} -q "ALTER TABLE ghdata MODIFY column data JSON SETTINGS mutations_sync=1" --allow_experimental_json_type 1 + +${CLICKHOUSE_CLIENT} -q "SELECT count() FROM ghdata WHERE NOT ignore(*)" + +${CLICKHOUSE_CLIENT} -q \ +"SELECT data.repo.name, count() AS stars FROM ghdata \ + WHERE data.type = 'WatchEvent' GROUP BY data.repo.name ORDER BY stars DESC, data.repo.name LIMIT 5" + +${CLICKHOUSE_CLIENT} --enable_analyzer=1 -q \ +"SELECT data.payload.commits[].author.name AS name, count() AS c FROM ghdata \ + ARRAY JOIN data.payload.commits[].author.name \ + GROUP BY name ORDER BY c DESC, name LIMIT 5" + +${CLICKHOUSE_CLIENT} -q "SELECT max(data.payload.pull_request.assignees[].size0) FROM ghdata" + +${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS ghdata" diff --git a/tests/queries/0_stateless/03248_string_to_json_alter_fuzz.reference b/tests/queries/0_stateless/03248_string_to_json_alter_fuzz.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/03248_string_to_json_alter_fuzz.sql b/tests/queries/0_stateless/03248_string_to_json_alter_fuzz.sql new file mode 100644 index 00000000000..87e10df9cc8 --- /dev/null +++ b/tests/queries/0_stateless/03248_string_to_json_alter_fuzz.sql @@ -0,0 +1,17 @@ +set allow_experimental_json_type=1; +set max_insert_block_size=10000; +set max_block_size=10000; + +drop table if exists test; +drop named collection if exists json_alter_fuzzer; + +create table test (json String) engine=MergeTree order by tuple(); +create named collection json_alter_fuzzer AS json_str='{}'; +insert into test select * from fuzzJSON(json_alter_fuzzer, reuse_output=true, max_output_length=128) limit 200000; +alter table test modify column json JSON settings mutations_sync=1; +select json from test format Null; +optimize table test final; +select json from test format Null; +drop named collection json_alter_fuzzer; +drop table test; +