From 42acb1dc29bd8e6272e38a8bc33ca9577ff011d6 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Thu, 31 Mar 2022 13:26:32 +0000 Subject: [PATCH] fix inserts to columns of type Object in partitioned tables --- src/DataTypes/ObjectUtils.cpp | 16 ++++++------- src/DataTypes/ObjectUtils.h | 2 +- .../MergeTree/MergeTreeDataWriter.cpp | 24 ++++++++++++------- src/Storages/MergeTree/MergeTreeDataWriter.h | 9 +++---- src/Storages/MergeTree/MergeTreeSink.cpp | 2 ++ .../MergeTree/ReplicatedMergeTreeSink.cpp | 4 +++- src/Storages/StorageMemory.cpp | 3 +-- .../01825_type_json_partitions.reference | 2 ++ .../01825_type_json_partitions.sql | 13 ++++++++++ 9 files changed, 49 insertions(+), 26 deletions(-) create mode 100644 tests/queries/0_stateless/01825_type_json_partitions.reference create mode 100644 tests/queries/0_stateless/01825_type_json_partitions.sql diff --git a/src/DataTypes/ObjectUtils.cpp b/src/DataTypes/ObjectUtils.cpp index 9004a5296e0..cbabc71a965 100644 --- a/src/DataTypes/ObjectUtils.cpp +++ b/src/DataTypes/ObjectUtils.cpp @@ -128,22 +128,21 @@ static auto extractVector(const std::vector & vec) return res; } -void convertObjectsToTuples(NamesAndTypesList & columns_list, Block & block, const NamesAndTypesList & extended_storage_columns) +void convertObjectsToTuples(Block & block, const NamesAndTypesList & extended_storage_columns) { std::unordered_map storage_columns_map; for (const auto & [name, type] : extended_storage_columns) storage_columns_map[name] = type; - for (auto & name_type : columns_list) + for (auto & column : block) { - if (!isObject(name_type.type)) + if (!isObject(column.type)) continue; - auto & column = block.getByName(name_type.name); if (!isObject(column.type)) throw Exception(ErrorCodes::TYPE_MISMATCH, "Type for column '{}' mismatch in columns list and in block. In list: {}, in block: {}", - name_type.name, name_type.type->getName(), column.type->getName()); + column.name, column.type->getName(), column.type->getName()); const auto & column_object = assert_cast(*column.column); const auto & subcolumns = column_object.getSubcolumns(); @@ -151,7 +150,7 @@ void convertObjectsToTuples(NamesAndTypesList & columns_list, Block & block, con if (!column_object.isFinalized()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot convert to tuple column '{}' from type {}. Column should be finalized first", - name_type.name, name_type.type->getName()); + column.name, column.type->getName()); PathsInData tuple_paths; DataTypes tuple_types; @@ -164,12 +163,11 @@ void convertObjectsToTuples(NamesAndTypesList & columns_list, Block & block, con tuple_columns.emplace_back(entry->data.getFinalizedColumnPtr()); } - auto it = storage_columns_map.find(name_type.name); + auto it = storage_columns_map.find(column.name); if (it == storage_columns_map.end()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Column '{}' not found in storage", name_type.name); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Column '{}' not found in storage", column.name); std::tie(column.column, column.type) = unflattenTuple(tuple_paths, tuple_types, tuple_columns); - name_type.type = column.type; /// Check that constructed Tuple type and type in storage are compatible. getLeastCommonTypeForObject({column.type, it->second}, true); diff --git a/src/DataTypes/ObjectUtils.h b/src/DataTypes/ObjectUtils.h index 199a048c8cd..1dbeac2b244 100644 --- a/src/DataTypes/ObjectUtils.h +++ b/src/DataTypes/ObjectUtils.h @@ -38,7 +38,7 @@ DataTypePtr getDataTypeByColumn(const IColumn & column); /// Converts Object types and columns to Tuples in @columns_list and @block /// and checks that types are consistent with types in @extended_storage_columns. -void convertObjectsToTuples(NamesAndTypesList & columns_list, Block & block, const NamesAndTypesList & extended_storage_columns); +void convertObjectsToTuples(Block & block, const NamesAndTypesList & extended_storage_columns); /// Checks that each path is not the prefix of any other path. void checkObjectHasNoAmbiguosPaths(const PathsInData & paths); diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 4805a273c70..fc05e293684 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -145,7 +145,7 @@ void MergeTreeDataWriter::TemporaryPart::finalize() } BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts( - const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) + const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) { BlocksWithPartition result; if (!block || !block.rows()) @@ -282,16 +282,12 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart( { TemporaryPart temp_part; Block & block = block_with_partition.block; + auto columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames()); - auto storage_snapshot = data.getStorageSnapshot(metadata_snapshot); - if (!storage_snapshot->object_columns.empty()) - { - auto extended_storage_columns = storage_snapshot->getColumns( - GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects()); - - convertObjectsToTuples(columns, block, extended_storage_columns); - } + for (auto & column : columns) + if (isObject(column.type)) + column.type = block.getByName(column.name).type; static const String TMP_PREFIX = "tmp_insert_"; @@ -466,6 +462,16 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart( return temp_part; } +void MergeTreeDataWriter::deduceTypesOfObjectColumns(const StorageSnapshotPtr & storage_snapshot, Block & block) +{ + if (!storage_snapshot->object_columns.empty()) + { + auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects(); + auto storage_columns = storage_snapshot->getColumns(options); + convertObjectsToTuples(block, storage_columns); + } +} + MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl( const String & part_name, MergeTreeDataPartType part_type, diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.h b/src/Storages/MergeTree/MergeTreeDataWriter.h index ae46a94ccd7..33742d7e52a 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.h +++ b/src/Storages/MergeTree/MergeTreeDataWriter.h @@ -42,14 +42,12 @@ public: */ static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context); - /** All rows must correspond to same partition. - * Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData. - */ - MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, bool optimize_on_insert); + void deduceTypesOfObjectColumns(const StorageSnapshotPtr & storage_snapshot, Block & block); /// This structure contains not completely written temporary part. /// Some writes may happen asynchronously, e.g. for blob storages. /// You should call finalize() to wait until all data is written. + struct TemporaryPart { MergeTreeData::MutableDataPartPtr part; @@ -65,6 +63,9 @@ public: void finalize(); }; + /** All rows must correspond to same partition. + * Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData. + */ TemporaryPart writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context); /// For insertion. diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index 97bbfc17e9d..7a4ecae24b3 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -50,7 +50,9 @@ struct MergeTreeSink::DelayedChunk void MergeTreeSink::consume(Chunk chunk) { auto block = getHeader().cloneWithColumns(chunk.detachColumns()); + auto storage_snapshot = storage.getStorageSnapshot(metadata_snapshot); + storage.writer.deduceTypesOfObjectColumns(storage_snapshot, block); auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context); using DelayedPartitions = std::vector; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 550c586f7de..63fa2071056 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -150,7 +150,8 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk) if (quorum) checkQuorumPrecondition(zookeeper); - const Settings & settings = context->getSettingsRef(); + auto storage_snapshot = storage.getStorageSnapshot(metadata_snapshot); + storage.writer.deduceTypesOfObjectColumns(storage_snapshot, block); auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context); using DelayedPartitions = std::vector; @@ -158,6 +159,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk) size_t streams = 0; bool support_parallel_write = false; + const Settings & settings = context->getSettingsRef(); for (auto & current_block : part_blocks) { diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index 30be297194a..a371ac1ccf8 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -137,11 +137,10 @@ public: storage_snapshot->metadata->check(block, true); if (!storage_snapshot->object_columns.empty()) { - auto columns = storage_snapshot->metadata->getColumns().getAllPhysical().filter(block.getNames()); auto extended_storage_columns = storage_snapshot->getColumns( GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects()); - convertObjectsToTuples(columns, block, extended_storage_columns); + convertObjectsToTuples(block, extended_storage_columns); } if (storage.compress) diff --git a/tests/queries/0_stateless/01825_type_json_partitions.reference b/tests/queries/0_stateless/01825_type_json_partitions.reference new file mode 100644 index 00000000000..5a7ba251572 --- /dev/null +++ b/tests/queries/0_stateless/01825_type_json_partitions.reference @@ -0,0 +1,2 @@ +{"id":1,"obj":{"k1":"v1","k2":""}} +{"id":2,"obj":{"k1":"","k2":"v2"}} diff --git a/tests/queries/0_stateless/01825_type_json_partitions.sql b/tests/queries/0_stateless/01825_type_json_partitions.sql new file mode 100644 index 00000000000..2cb9bca7702 --- /dev/null +++ b/tests/queries/0_stateless/01825_type_json_partitions.sql @@ -0,0 +1,13 @@ +DROP TABLE IF EXISTS t_json_partitions; + +SET allow_experimental_object_type = 1; +SET output_format_json_named_tuples_as_objects = 1; + +CREATE TABLE t_json_partitions (id UInt32, obj JSON) +ENGINE MergeTree ORDER BY id PARTITION BY id; + +INSERT INTO t_json_partitions FORMAT JSONEachRow {"id": 1, "obj": {"k1": "v1"}} {"id": 2, "obj": {"k2": "v2"}}; + +SELECT * FROM t_json_partitions ORDER BY id FORMAT JSONEachRow; + +DROP TABLE t_json_partitions;