Merge pull request #72285 from Avogar/read-column-once-compact-subcolumns

Bring back optimization for reading subcolumns of single column in Compact parts
This commit is contained in:
Pavel Kruglov 2024-12-10 13:08:50 +00:00 committed by GitHub
commit bba8f7e807
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 69 additions and 21 deletions

View File

@ -81,8 +81,10 @@ protected:
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.
ValueSizeMap avg_value_size_hints;
/// Stores states for IDataType::deserializeBinaryBulk
/// Stores states for IDataType::deserializeBinaryBulk for regular columns.
DeserializeBinaryBulkStateMap deserialize_binary_bulk_state_map;
/// The same as above, but for subcolumns.
DeserializeBinaryBulkStateMap deserialize_binary_bulk_state_map_for_subcolumns;
/// Actual column names and types of columns in part,
/// which may differ from table metadata.

View File

@ -148,7 +148,9 @@ void MergeTreeReaderCompact::readData(
ColumnPtr & column,
size_t rows_to_read,
const InputStreamGetter & getter,
ISerialization::SubstreamsCache & cache)
ISerialization::SubstreamsCache & cache,
std::unordered_map<String, ColumnPtr> & columns_cache_for_subcolumns,
const ColumnNameLevel & name_level_for_offsets)
{
try
{
@ -171,10 +173,22 @@ void MergeTreeReaderCompact::readData(
const auto & type_in_storage = name_and_type.getTypeInStorage();
const auto & name_in_storage = name_and_type.getNameInStorage();
auto cache_for_subcolumns_it = columns_cache_for_subcolumns.find(name_in_storage);
if (!name_level_for_offsets.has_value() && cache_for_subcolumns_it != columns_cache_for_subcolumns.end())
{
auto subcolumn = type_in_storage->getSubcolumn(name_and_type.getSubcolumnName(), cache_for_subcolumns_it->second);
/// TODO: Avoid extra copying.
if (column->empty())
column = IColumn::mutate(subcolumn);
else
column->assumeMutable()->insertRangeFrom(*subcolumn, 0, subcolumn->size());
}
else
{
auto serialization = getSerializationInPart({name_in_storage, type_in_storage});
ColumnPtr temp_column = type_in_storage->createColumn(*serialization);
serialization->deserializeBinaryBulkWithMultipleStreams(temp_column, rows_to_read, deserialize_settings, deserialize_binary_bulk_state_map[name], nullptr);
serialization->deserializeBinaryBulkWithMultipleStreams(temp_column, rows_to_read, deserialize_settings, deserialize_binary_bulk_state_map_for_subcolumns[name_in_storage], nullptr);
auto subcolumn = type_in_storage->getSubcolumn(name_and_type.getSubcolumnName(), temp_column);
/// TODO: Avoid extra copying.
@ -182,6 +196,10 @@ void MergeTreeReaderCompact::readData(
column = subcolumn;
else
column->assumeMutable()->insertRangeFrom(*subcolumn, 0, subcolumn->size());
if (!name_level_for_offsets.has_value())
columns_cache_for_subcolumns[name_in_storage] = temp_column;
}
}
else
{
@ -227,15 +245,23 @@ void MergeTreeReaderCompact::readPrefix(
serialization_for_prefix->deserializeBinaryBulkStatePrefix(deserialize_settings, state_for_prefix, nullptr);
}
SerializationPtr serialization;
if (name_and_type.isSubcolumn())
serialization = getSerializationInPart({name_and_type.getNameInStorage(), name_and_type.getTypeInStorage()});
else
serialization = getSerializationInPart(name_and_type);
deserialize_settings.getter = buffer_getter;
deserialize_settings.object_and_dynamic_read_statistics = true;
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name_and_type.name], nullptr);
if (name_and_type.isSubcolumn())
{
/// For subcolumns of the same column we need to deserialize prefix only once.
if (deserialize_binary_bulk_state_map_for_subcolumns.contains(name_and_type.getNameInStorage()))
return;
auto serialization = getSerializationInPart({name_and_type.getNameInStorage(), name_and_type.getTypeInStorage()});
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map_for_subcolumns[name_and_type.getNameInStorage()], nullptr);
}
else
{
auto serialization = getSerializationInPart(name_and_type);
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name_and_type.getNameInStorage()], nullptr);
}
}
catch (Exception & e)
{

View File

@ -45,7 +45,9 @@ protected:
ColumnPtr & column,
size_t rows_to_read,
const InputStreamGetter & getter,
ISerialization::SubstreamsCache & cache);
ISerialization::SubstreamsCache & cache,
std::unordered_map<String, ColumnPtr> & columns_cache_for_subcolumns,
const ColumnNameLevel & name_level_for_offsets);
void readPrefix(
const NameAndTypePair & name_and_type,

View File

@ -25,10 +25,18 @@ try
while (read_rows < max_rows_to_read)
{
size_t rows_to_read = data_part_info_for_read->getIndexGranularity().getMarkRows(from_mark);
deserialize_binary_bulk_state_map.clear();
deserialize_binary_bulk_state_map_for_subcolumns.clear();
/// Use cache to avoid reading the column with the same name twice.
/// It may happen if there are empty array Nested in the part.
ISerialization::SubstreamsCache cache;
/// If we need to read multiple subcolumns from a single column in storage,
/// we will read it this column only once and then reuse to extract all subcolumns.
/// We cannot use SubstreamsCache for it, because we may also read the full column itself
/// and it might me not empty inside res_columns (and SubstreamsCache contains the whole columns).
/// TODO: refactor the code in a way when we first read all full columns and then extract all subcolumns from them.
std::unordered_map<String, ColumnPtr> columns_cache_for_subcolumns;
for (size_t pos = 0; pos < num_columns; ++pos)
{
@ -56,7 +64,7 @@ try
};
readPrefix(columns_to_read[pos], buffer_getter, buffer_getter_for_prefix, columns_for_offsets[pos]);
readData(columns_to_read[pos], column, rows_to_read, buffer_getter, cache);
readData(columns_to_read[pos], column, rows_to_read, buffer_getter, cache, columns_cache_for_subcolumns, columns_for_offsets[pos]);
}
++from_mark;

View File

@ -0,0 +1,10 @@
<test>
<settings>
<allow_experimental_json_type>1</allow_experimental_json_type>
</settings>
<create_query>CREATE TABLE t_json (data JSON) ENGINE = MergeTree ORDER BY tuple() SETTINGS min_rows_for_wide_part=1000000000, min_bytes_for_wide_part=100000000000</create_query>
<fill_query>INSERT INTO t_json SELECT toJSONString(map(number % 10, repeat('a', number % 100))) FROM numbers(10000000)</fill_query>
<query>SELECT data.k0, data.k1, data.k2, data.k3, data.k4, data.k5, data.k6, data.k7, data.k8, data.k9 FROM t_json FORMAT Null</query>
<drop_query>DROP TABLE IF EXISTS t_json</drop_query>
</test>