try to fix filling of missed Nested columns with multiple levels

This commit is contained in:
Anton Popov 2022-09-06 13:56:32 +00:00
parent dd776eb3d5
commit f76c1482bd
9 changed files with 191 additions and 98 deletions

View File

@ -188,49 +188,13 @@ ActionsDAGPtr evaluateMissingDefaults(
return createExpressions(header, expr_list, save_unneeded_columns, context);
}
static bool arrayHasNoElementsRead(const IColumn & column)
static std::unordered_map<String, ColumnPtr> collectOffsetsColumns(
const NamesAndTypesList & available_columns, const Columns & res_columns)
{
const auto * column_array = typeid_cast<const ColumnArray *>(&column);
std::unordered_map<String, ColumnPtr> offsets_columns;
if (!column_array)
return false;
size_t size = column_array->size();
if (!size)
return false;
const auto & array_data = column_array->getData();
if (const auto * nested_array = typeid_cast<const ColumnArray *>(&array_data))
return arrayHasNoElementsRead(*nested_array);
if (!array_data.empty())
return false;
size_t last_offset = column_array->getOffsets()[size - 1];
return last_offset != 0;
}
void fillMissingColumns(
Columns & res_columns,
size_t num_rows,
const NamesAndTypesList & requested_columns,
const NamesAndTypesList & available_columns,
StorageMetadataPtr metadata_snapshot)
{
size_t num_columns = requested_columns.size();
if (num_columns != res_columns.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid number of columns passed to fillMissingColumns. Expected {}, got {}",
num_columns, res_columns.size());
/// For a missing column of a nested data structure we must create not a column of empty
/// arrays, but a column of arrays of correct length.
/// First, collect offset columns for all arrays in the block.
std::unordered_map<String, ColumnPtr> offset_columns;
auto available_column = available_columns.begin();
for (size_t i = 0; i < num_columns; ++i, ++available_column)
for (size_t i = 0; i < available_columns.size(); ++i, ++available_column)
{
if (res_columns[i] == nullptr || isColumnConst(*res_columns[i]))
continue;
@ -243,75 +207,122 @@ void fillMissingColumns(
if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes)
return;
auto subname = ISerialization::getSubcolumnNameForStream(subpath);
auto & offsets_column = offset_columns[Nested::concatenateName(name_in_storage, subname)];
const auto & current_offsets_column = subpath.back().data.column;
/// If for some reason multiple offsets columns are present
/// for the same nested data structure, choose the one that is not empty.
if (!offsets_column || offsets_column->empty())
offsets_column = subpath.back().data.column;
if (current_offsets_column && !current_offsets_column->empty())
{
auto subname = ISerialization::getSubcolumnNameForStream(subpath);
auto full_name = Nested::concatenateName(name_in_storage, subname);
auto & offsets_column = offsets_columns[full_name];
if (!offsets_column)
offsets_column = current_offsets_column;
#ifndef NDEBUG
const auto & offsets_data = assert_cast<const ColumnUInt64 &>(*offsets_column).getData();
const auto & current_offsets_data = assert_cast<const ColumnUInt64 &>(*current_offsets_column).getData();
if (offsets_data != current_offsets_data)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Found non-equal columns with offsets (sizes: {} and {}) for stream {}",
offsets_data.size(), current_offsets_data.size(), full_name);
#endif
}
}, available_column->type, res_columns[i]);
}
return offsets_columns;
}
void fillMissingColumns(
Columns & res_columns,
size_t num_rows,
const NamesAndTypesList & requested_columns,
const NamesAndTypesList & available_columns,
const NameSet & partially_read_columns,
StorageMetadataPtr metadata_snapshot)
{
size_t num_columns = requested_columns.size();
if (num_columns != res_columns.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid number of columns passed to fillMissingColumns. Expected {}, got {}",
num_columns, res_columns.size());
/// For a missing column of a nested data structure
/// we must create not a column of empty arrays,
/// but a column of arrays of correct length.
/// First, collect offset columns for all arrays in the block.
auto offset_columns = collectOffsetsColumns(available_columns, res_columns);
/// Insert default values only for columns without default expressions.
auto requested_column = requested_columns.begin();
for (size_t i = 0; i < num_columns; ++i, ++requested_column)
{
const auto & [name, type] = *requested_column;
if (res_columns[i] && arrayHasNoElementsRead(*res_columns[i]))
if (res_columns[i] && partially_read_columns.contains(name))
res_columns[i] = nullptr;
if (res_columns[i] == nullptr)
if (res_columns[i])
continue;
if (metadata_snapshot && metadata_snapshot->getColumns().hasDefault(name))
continue;
std::vector<ColumnPtr> current_offsets;
size_t num_dimensions = 0;
if (const auto * array_type = typeid_cast<const DataTypeArray *>(type.get()))
{
if (metadata_snapshot && metadata_snapshot->getColumns().hasDefault(name))
continue;
num_dimensions = getNumberOfDimensions(*array_type);
current_offsets.resize(num_dimensions);
std::vector<ColumnPtr> current_offsets;
bool has_all_offsets = true;
auto serialization = IDataType::getSerialization(*requested_column);
auto name_in_storage = Nested::extractTableName(requested_column->name);
const auto * array_type = typeid_cast<const DataTypeArray *>(type.get());
if (array_type)
serialization->enumerateStreams([&](const auto & subpath)
{
auto serialization = IDataType::getSerialization(*requested_column);
auto name_in_storage = Nested::extractTableName(requested_column->name);
if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes)
return;
serialization->enumerateStreams([&](const auto & subpath)
size_t level = ISerialization::getArrayLevel(subpath);
assert(level < num_dimensions);
auto subname = ISerialization::getSubcolumnNameForStream(subpath);
auto it = offset_columns.find(Nested::concatenateName(name_in_storage, subname));
if (it != offset_columns.end())
current_offsets[level] = it->second;
});
for (size_t j = 0; j < num_dimensions; ++j)
{
if (!current_offsets[j])
{
if (!has_all_offsets)
return;
if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes)
return;
auto subname = ISerialization::getSubcolumnNameForStream(subpath);
auto it = offset_columns.find(Nested::concatenateName(name_in_storage, subname));
if (it != offset_columns.end())
current_offsets.emplace_back(it->second);
else
has_all_offsets = false;
}, type);
current_offsets.resize(j);
break;
}
}
}
if (array_type && has_all_offsets)
{
assert(!current_offsets.empty());
auto scalar_type = getBaseTypeOfArray(type);
if (!current_offsets.empty())
{
size_t num_empty_dimensions = num_dimensions - current_offsets.size();
auto scalar_type = createArrayOfType(getBaseTypeOfArray(type), num_empty_dimensions);
size_t data_size = assert_cast<const ColumnUInt64 &>(*current_offsets.back()).getData().back();
res_columns[i] = scalar_type->createColumnConstWithDefaultValue(data_size)->convertToFullColumnIfConst();
size_t data_size = assert_cast<const ColumnUInt64 &>(*current_offsets.back()).getData().back();
res_columns[i] = scalar_type->createColumnConstWithDefaultValue(data_size)->convertToFullColumnIfConst();
for (auto it = current_offsets.rbegin(); it != current_offsets.rend(); ++it)
res_columns[i] = ColumnArray::create(res_columns[i], *it);
}
else
{
/// We must turn a constant column into a full column because the interpreter could infer
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
}
for (auto it = current_offsets.rbegin(); it != current_offsets.rend(); ++it)
res_columns[i] = ColumnArray::create(res_columns[i], *it);
}
else
{
/// We must turn a constant column into a full column because the interpreter could infer
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
}
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <Core/Names.h>
#include <Interpreters/Context_fwd.h>
#include <Common/COW.h>
@ -44,6 +45,7 @@ void fillMissingColumns(
size_t num_rows,
const NamesAndTypesList & requested_columns,
const NamesAndTypesList & available_columns,
const NameSet & partially_read_columns,
StorageMetadataPtr metadata_snapshot);
}

View File

@ -65,7 +65,7 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e
try
{
NamesAndTypesList available_columns(columns_to_read.begin(), columns_to_read.end());
DB::fillMissingColumns(res_columns, num_rows, requested_columns, available_columns, metadata_snapshot);
DB::fillMissingColumns(res_columns, num_rows, requested_columns, available_columns, partially_read_columns, metadata_snapshot);
should_evaluate_missing_defaults = std::any_of(
res_columns.begin(), res_columns.end(), [](const auto & column) { return column == nullptr; });
@ -206,9 +206,9 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const NameAndTypePair & required_column) const
{
auto get_offset_streams = [](const auto & serialization, const auto & name_in_storage)
auto get_offsets_streams = [](const auto & serialization, const auto & name_in_storage)
{
NameSet offset_streams;
Names offsets_streams;
serialization->enumerateStreams([&](const auto & subpath)
{
if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes)
@ -216,31 +216,44 @@ IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const Na
auto subname = ISerialization::getSubcolumnNameForStream(subpath);
auto full_name = Nested::concatenateName(name_in_storage, subname);
offset_streams.insert(full_name);
offsets_streams.push_back(full_name);
});
return offset_streams;
return offsets_streams;
};
auto required_name_in_storage = Nested::extractTableName(required_column.getNameInStorage());
auto required_offset_streams = get_offset_streams(getSerializationInPart(required_column), required_name_in_storage);
auto required_offsets_streams = get_offsets_streams(getSerializationInPart(required_column), required_name_in_storage);
size_t max_matched_streams = 0;
ColumnPosition position;
/// Find column that has maximal number of matching
/// offsets columns with required_column.
for (const auto & part_column : data_part->getColumns())
{
auto name_in_storage = Nested::extractTableName(part_column.name);
if (name_in_storage != required_name_in_storage)
continue;
auto offset_streams = get_offset_streams(data_part->getSerialization(part_column.name), name_in_storage);
auto offsets_streams = get_offsets_streams(data_part->getSerialization(part_column.name), name_in_storage);
NameSet offsets_streams_set(offsets_streams.begin(), offsets_streams.end());
bool has_all_offsets = std::all_of(required_offset_streams.begin(), required_offset_streams.end(),
[&](const auto & stream_name) { return offset_streams.contains(stream_name); });
size_t i = 0;
for (; i < required_offsets_streams.size(); ++i)
{
if (!offsets_streams_set.contains(required_offsets_streams[i]))
break;
}
if (has_all_offsets)
return data_part->getColumnPosition(part_column.name);
if (i && (!position || i > max_matched_streams))
{
max_matched_streams = i;
position = data_part->getColumnPosition(part_column.name);
}
}
return {};
return position;
}
void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const

View File

@ -93,6 +93,8 @@ protected:
using ColumnPosition = std::optional<size_t>;
ColumnPosition findColumnForOffsets(const NameAndTypePair & column) const;
NameSet partially_read_columns;
private:
/// Alter conversions, which must be applied on fly if required
MergeTreeData::AlterConversions alter_conversions;

View File

@ -142,6 +142,8 @@ void MergeTreeReaderCompact::fillColumnPositions()
}
column_positions[i] = std::move(position);
if (read_only_offsets[i])
partially_read_columns.insert(column_to_read.name);
}
}

View File

@ -159,12 +159,18 @@ void MergeTreeReaderWide::addStreams(
const ReadBufferFromFileBase::ProfileCallback & profile_callback,
clockid_t clock_type)
{
bool has_any_stream = false;
bool has_all_streams = true;
ISerialization::StreamCallback callback = [&] (const ISerialization::SubstreamPath & substream_path)
{
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
if (streams.contains(stream_name))
{
has_any_stream = true;
return;
}
bool data_file_exists = data_part->checksums.files.contains(stream_name + DATA_FILE_EXTENSION);
@ -172,8 +178,12 @@ void MergeTreeReaderWide::addStreams(
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
*/
if (!data_file_exists)
{
has_all_streams = false;
return;
}
has_any_stream = true;
bool is_lc_dict = substream_path.size() > 1 && substream_path[substream_path.size() - 2].type == ISerialization::Substream::Type::DictionaryKeys;
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
@ -185,6 +195,9 @@ void MergeTreeReaderWide::addStreams(
};
serialization->enumerateStreams(callback);
if (has_any_stream && !has_all_streams)
partially_read_columns.insert(name_and_type.name);
}

View File

@ -95,7 +95,7 @@ protected:
++name_and_type;
}
fillMissingColumns(columns, src.rows(), column_names_and_types, column_names_and_types, /*metadata_snapshot=*/ nullptr);
fillMissingColumns(columns, src.rows(), column_names_and_types, column_names_and_types, {}, nullptr);
assert(std::all_of(columns.begin(), columns.end(), [](const auto & column) { return column != nullptr; }));
return Chunk(std::move(columns), src.rows());

View File

@ -3,5 +3,25 @@ Tuple(arr Nested(k1 Nested(k2 String, k3 String, k4 Int8), k5 Tuple(k6 String)),
{"obj":{"arr":[{"k1":[{"k2":"","k3":"ddd","k4":10},{"k2":"","k3":"","k4":20}],"k5":{"k6":"foo"}}],"id":2}}
[['bbb','']] [['aaa','ccc']]
[['ddd','']] [['','']]
1
[[0,0]]
[[10,20]]
Tuple(arr Nested(k1 Nested(k2 String, k3 Nested(k4 Int8))), id Int8)
{"obj":{"arr":[{"k1":[{"k2":"aaa","k3":[]}]}],"id":1}}
{"obj":{"arr":[{"k1":[{"k2":"bbb","k3":[{"k4":10}]},{"k2":"ccc","k3":[{"k4":20}]}]}],"id":2}}
[['aaa']] [[[]]]
[['bbb','ccc']] [[[10],[20]]]
1
[[[]]]
[[[10],[20]]]
Tuple(arr Nested(k1 Nested(k2 String, k4 Nested(k5 Int8)), k3 String), id Int8)
{"obj":{"arr":[{"k1":[],"k3":"qqq"},{"k1":[],"k3":"www"}],"id":1}}
{"obj":{"arr":[{"k1":[{"k2":"aaa","k4":[]}],"k3":"eee"}],"id":2}}
{"obj":{"arr":[{"k1":[{"k2":"bbb","k4":[{"k5":10}]},{"k2":"ccc","k4":[{"k5":20}]}],"k3":"rrr"}],"id":3}}
['qqq','www'] [[],[]] [[],[]]
['eee'] [['aaa']] [[[]]]
['rrr'] [['bbb','ccc']] [[[10],[20]]]
1
[[],[]]
[[[]]]
[[[10],[20]]]

View File

@ -7,12 +7,42 @@ SET output_format_json_named_tuples_as_objects = 1;
CREATE TABLE t_json_17(obj JSON)
ENGINE = MergeTree ORDER BY tuple();
DROP FUNCTION IF EXISTS hasValidSizes17;
CREATE FUNCTION hasValidSizes17 AS (arr1, arr2) -> length(arr1) = length(arr2) AND arrayAll((x, y) -> length(x) = length(y), arr1, arr2);
SYSTEM STOP MERGES t_json_17;
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k1": [{"k2": "aaa", "k3": "bbb"}, {"k2": "ccc"}]}]}
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k3": "ddd", "k4": 10}, {"k4": 20}], "k5": {"k6": "foo"}}]}
SELECT toTypeName(obj) FROM t_json_17 LIMIT 1;
SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow;
SELECT obj.arr.k1.k3, obj.arr.k1.k2 FROM t_json_17 ORDER BY obj.id;
SELECT sum(hasValidSizes17(obj.arr.k1.k3, obj.arr.k1.k2)) == count() FROM t_json_17;
SELECT obj.arr.k1.k4 FROM t_json_17 ORDER BY obj.id;
DROP TABLE IF EXISTS t_json_17;
TRUNCATE TABLE t_json_17;
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k1": [{"k2": "aaa"}]}]}
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k2": "bbb", "k3": [{"k4": 10}]}, {"k2": "ccc", "k3": [{"k4": 20}]}]}]}
SELECT toTypeName(obj) FROM t_json_17 LIMIT 1;
SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow;
SELECT obj.arr.k1.k2, obj.arr.k1.k3.k4 FROM t_json_17 ORDER BY obj.id;
SELECT sum(hasValidSizes17(obj.arr.k1.k2, obj.arr.k1.k3.k4)) == count() FROM t_json_17;
SELECT obj.arr.k1.k3.k4 FROM t_json_17 ORDER BY obj.id;
TRUNCATE TABLE t_json_17;
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 1, "arr": [{"k3": "qqq"}, {"k3": "www"}]}
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 2, "arr": [{"k1": [{"k2": "aaa"}], "k3": "eee"}]}
INSERT INTO t_json_17 FORMAT JSONAsObject {"id": 3, "arr": [{"k1": [{"k2": "bbb", "k4": [{"k5": 10}]}, {"k2": "ccc", "k4": [{"k5": 20}]}], "k3": "rrr"}]}
SELECT toTypeName(obj) FROM t_json_17 LIMIT 1;
SELECT obj FROM t_json_17 ORDER BY obj.id FORMAT JSONEachRow;
SELECT obj.arr.k3, obj.arr.k1.k2, obj.arr.k1.k4.k5 FROM t_json_17 ORDER BY obj.id;
SELECT sum(hasValidSizes17(obj.arr.k1.k2, obj.arr.k1.k4.k5)) == count() FROM t_json_17;
SELECT obj.arr.k1.k4.k5 FROM t_json_17 ORDER BY obj.id;
DROP FUNCTION hasValidSizes17;
DROP TABLE t_json_17;