Merge pull request #35806 from CurtizJ/dynamic-columns-8

Fix inserts to columns of type `Object` in partitioned tables
This commit is contained in:
Anton Popov 2022-04-04 16:16:54 +02:00 committed by GitHub
commit d08d4a2437
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 51 additions and 26 deletions

View File

@ -128,22 +128,21 @@ static auto extractVector(const std::vector<Tuple> & vec)
return res;
}
void convertObjectsToTuples(NamesAndTypesList & columns_list, Block & block, const NamesAndTypesList & extended_storage_columns)
void convertObjectsToTuples(Block & block, const NamesAndTypesList & extended_storage_columns)
{
std::unordered_map<String, DataTypePtr> storage_columns_map;
for (const auto & [name, type] : extended_storage_columns)
storage_columns_map[name] = type;
for (auto & name_type : columns_list)
for (auto & column : block)
{
if (!isObject(name_type.type))
if (!isObject(column.type))
continue;
auto & column = block.getByName(name_type.name);
if (!isObject(column.type))
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Type for column '{}' mismatch in columns list and in block. In list: {}, in block: {}",
name_type.name, name_type.type->getName(), column.type->getName());
column.name, column.type->getName(), column.type->getName());
const auto & column_object = assert_cast<const ColumnObject &>(*column.column);
const auto & subcolumns = column_object.getSubcolumns();
@ -151,7 +150,7 @@ void convertObjectsToTuples(NamesAndTypesList & columns_list, Block & block, con
if (!column_object.isFinalized())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot convert to tuple column '{}' from type {}. Column should be finalized first",
name_type.name, name_type.type->getName());
column.name, column.type->getName());
PathsInData tuple_paths;
DataTypes tuple_types;
@ -164,12 +163,11 @@ void convertObjectsToTuples(NamesAndTypesList & columns_list, Block & block, con
tuple_columns.emplace_back(entry->data.getFinalizedColumnPtr());
}
auto it = storage_columns_map.find(name_type.name);
auto it = storage_columns_map.find(column.name);
if (it == storage_columns_map.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column '{}' not found in storage", name_type.name);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column '{}' not found in storage", column.name);
std::tie(column.column, column.type) = unflattenTuple(tuple_paths, tuple_types, tuple_columns);
name_type.type = column.type;
/// Check that constructed Tuple type and type in storage are compatible.
getLeastCommonTypeForObject({column.type, it->second}, true);

View File

@ -38,7 +38,7 @@ DataTypePtr getDataTypeByColumn(const IColumn & column);
/// Converts Object types and columns to Tuples in @columns_list and @block
/// and checks that types are consistent with types in @extended_storage_columns.
void convertObjectsToTuples(NamesAndTypesList & columns_list, Block & block, const NamesAndTypesList & extended_storage_columns);
void convertObjectsToTuples(Block & block, const NamesAndTypesList & extended_storage_columns);
/// Checks that each path is not the prefix of any other path.
void checkObjectHasNoAmbiguosPaths(const PathsInData & paths);

View File

@ -145,7 +145,7 @@ void MergeTreeDataWriter::TemporaryPart::finalize()
}
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
BlocksWithPartition result;
if (!block || !block.rows())
@ -282,16 +282,12 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
{
TemporaryPart temp_part;
Block & block = block_with_partition.block;
auto columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
auto storage_snapshot = data.getStorageSnapshot(metadata_snapshot);
if (!storage_snapshot->object_columns.empty())
{
auto extended_storage_columns = storage_snapshot->getColumns(
GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects());
convertObjectsToTuples(columns, block, extended_storage_columns);
}
for (auto & column : columns)
if (isObject(column.type))
column.type = block.getByName(column.name).type;
static const String TMP_PREFIX = "tmp_insert_";
@ -466,6 +462,16 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
return temp_part;
}
void MergeTreeDataWriter::deduceTypesOfObjectColumns(const StorageSnapshotPtr & storage_snapshot, Block & block)
{
if (!storage_snapshot->object_columns.empty())
{
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
auto storage_columns = storage_snapshot->getColumns(options);
convertObjectsToTuples(block, storage_columns);
}
}
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
const String & part_name,
MergeTreeDataPartType part_type,

View File

@ -42,14 +42,12 @@ public:
*/
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
/** All rows must correspond to same partition.
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
*/
MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, bool optimize_on_insert);
static void deduceTypesOfObjectColumns(const StorageSnapshotPtr & storage_snapshot, Block & block);
/// This structure contains not completely written temporary part.
/// Some writes may happen asynchronously, e.g. for blob storages.
/// You should call finalize() to wait until all data is written.
struct TemporaryPart
{
MergeTreeData::MutableDataPartPtr part;
@ -65,6 +63,9 @@ public:
void finalize();
};
/** All rows must correspond to same partition.
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
*/
TemporaryPart writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
/// For insertion.

View File

@ -50,7 +50,9 @@ struct MergeTreeSink::DelayedChunk
void MergeTreeSink::consume(Chunk chunk)
{
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
auto storage_snapshot = storage.getStorageSnapshot(metadata_snapshot);
storage.writer.deduceTypesOfObjectColumns(storage_snapshot, block);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
using DelayedPartitions = std::vector<MergeTreeSink::DelayedChunk::Partition>;

View File

@ -150,7 +150,8 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
if (quorum)
checkQuorumPrecondition(zookeeper);
const Settings & settings = context->getSettingsRef();
auto storage_snapshot = storage.getStorageSnapshot(metadata_snapshot);
storage.writer.deduceTypesOfObjectColumns(storage_snapshot, block);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
using DelayedPartitions = std::vector<ReplicatedMergeTreeSink::DelayedChunk::Partition>;
@ -158,6 +159,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
size_t streams = 0;
bool support_parallel_write = false;
const Settings & settings = context->getSettingsRef();
for (auto & current_block : part_blocks)
{

View File

@ -137,11 +137,10 @@ public:
storage_snapshot->metadata->check(block, true);
if (!storage_snapshot->object_columns.empty())
{
auto columns = storage_snapshot->metadata->getColumns().getAllPhysical().filter(block.getNames());
auto extended_storage_columns = storage_snapshot->getColumns(
GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects());
convertObjectsToTuples(columns, block, extended_storage_columns);
convertObjectsToTuples(block, extended_storage_columns);
}
if (storage.compress)

View File

@ -0,0 +1,2 @@
{"id":1,"obj":{"k1":"v1","k2":""}}
{"id":2,"obj":{"k1":"","k2":"v2"}}

View File

@ -0,0 +1,15 @@
-- Tags: no-fasttest
DROP TABLE IF EXISTS t_json_partitions;
SET allow_experimental_object_type = 1;
SET output_format_json_named_tuples_as_objects = 1;
CREATE TABLE t_json_partitions (id UInt32, obj JSON)
ENGINE MergeTree ORDER BY id PARTITION BY id;
INSERT INTO t_json_partitions FORMAT JSONEachRow {"id": 1, "obj": {"k1": "v1"}} {"id": 2, "obj": {"k2": "v2"}};
SELECT * FROM t_json_partitions ORDER BY id FORMAT JSONEachRow;
DROP TABLE t_json_partitions;