fix reading of empty Nested(Array(...))

This commit is contained in:
Anton Popov 2023-08-07 12:32:12 +00:00
parent 991abde851
commit 981da23144
21 changed files with 44 additions and 46 deletions

View File

@ -89,7 +89,7 @@ public:
virtual MergeTreeReaderPtr getReader(
const NamesAndTypesList & columns_,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,

View File

@ -24,7 +24,7 @@ namespace ErrorCodes
IMergeTreeReader::IMergeTreeReader(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
const NamesAndTypesList & columns_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
const MarkRanges & all_mark_ranges_,
@ -35,7 +35,7 @@ IMergeTreeReader::IMergeTreeReader(
, uncompressed_cache(uncompressed_cache_)
, mark_cache(mark_cache_)
, settings(settings_)
, metadata_snapshot(metadata_snapshot_)
, storage_snapshot(storage_snapshot_)
, all_mark_ranges(all_mark_ranges_)
, alter_conversions(data_part_info_for_read->getAlterConversions())
/// For wide parts convert plain arrays of Nested to subcolumns
@ -71,7 +71,7 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e
res_columns, num_rows,
Nested::convertToSubcolumns(requested_columns),
Nested::convertToSubcolumns(available_columns),
partially_read_columns, metadata_snapshot);
partially_read_columns, storage_snapshot->metadata);
should_evaluate_missing_defaults = std::any_of(
res_columns.begin(), res_columns.end(), [](const auto & column) { return column == nullptr; });
@ -110,7 +110,10 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
}
auto dag = DB::evaluateMissingDefaults(
additional_columns, requested_columns, metadata_snapshot->getColumns(), data_part_info_for_read->getContext());
additional_columns, requested_columns,
storage_snapshot->metadata->getColumns(),
data_part_info_for_read->getContext());
if (dag)
{
dag->addMaterializingOutputActions();

View File

@ -24,7 +24,7 @@ public:
IMergeTreeReader(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
const NamesAndTypesList & columns_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
const MarkRanges & all_mark_ranges_,
@ -92,7 +92,7 @@ protected:
MergeTreeReaderSettings settings;
StorageMetadataPtr metadata_snapshot;
StorageSnapshotPtr storage_snapshot;
MarkRanges all_mark_ranges;
/// Position and level (of nesting).

View File

@ -191,7 +191,6 @@ ChunkAndProgress IMergeTreeSelectAlgorithm::read()
}
void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForCurrentTask(
const StorageMetadataPtr & metadata_snapshot,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback)
{
@ -206,7 +205,7 @@ void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForCurrentTask(
else
{
reader = task->data_part->getReader(
task->task_columns.columns, metadata_snapshot, task->mark_ranges,
task->task_columns.columns, storage_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(),
task->alter_conversions, reader_settings, value_size_map, profile_callback);
}
@ -222,8 +221,8 @@ void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForCurrentTask(
{
initializeMergeTreePreReadersForPart(
task->data_part, task->alter_conversions,
task->task_columns, metadata_snapshot,
task->mark_ranges, value_size_map, profile_callback);
task->task_columns, task->mark_ranges,
value_size_map, profile_callback);
}
}
@ -231,18 +230,17 @@ void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForPart(
const MergeTreeData::DataPartPtr & data_part,
const AlterConversionsPtr & alter_conversions,
const MergeTreeReadTaskColumns & task_columns,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback)
{
reader = data_part->getReader(
task_columns.columns, metadata_snapshot, mark_ranges,
task_columns.columns, storage_snapshot, mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(),
alter_conversions, reader_settings, value_size_map, profile_callback);
initializeMergeTreePreReadersForPart(
data_part, alter_conversions, task_columns, metadata_snapshot,
data_part, alter_conversions, task_columns,
mark_ranges, value_size_map, profile_callback);
}
@ -250,7 +248,6 @@ void IMergeTreeSelectAlgorithm::initializeMergeTreePreReadersForPart(
const MergeTreeData::DataPartPtr & data_part,
const AlterConversionsPtr & alter_conversions,
const MergeTreeReadTaskColumns & task_columns,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback)
@ -262,7 +259,7 @@ void IMergeTreeSelectAlgorithm::initializeMergeTreePreReadersForPart(
{
pre_reader_for_step.push_back(
data_part->getReader(
{LightweightDeleteDescription::FILTER_COLUMN}, metadata_snapshot,
{LightweightDeleteDescription::FILTER_COLUMN}, storage_snapshot,
mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(),
alter_conversions, reader_settings, value_size_map, profile_callback));
}
@ -271,7 +268,7 @@ void IMergeTreeSelectAlgorithm::initializeMergeTreePreReadersForPart(
{
pre_reader_for_step.push_back(
data_part->getReader(
pre_columns_per_step, metadata_snapshot, mark_ranges,
pre_columns_per_step, storage_snapshot, mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(),
alter_conversions, reader_settings, value_size_map, profile_callback));
}

View File

@ -120,7 +120,6 @@ protected:
/// Sets up data readers for each step of prewhere and where
void initializeMergeTreeReadersForCurrentTask(
const StorageMetadataPtr & metadata_snapshot,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
@ -128,7 +127,6 @@ protected:
const MergeTreeData::DataPartPtr & data_part,
const AlterConversionsPtr & alter_conversions,
const MergeTreeReadTaskColumns & task_columns,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
@ -207,7 +205,6 @@ private:
const MergeTreeData::DataPartPtr & data_part,
const AlterConversionsPtr & alter_conversions,
const MergeTreeReadTaskColumns & task_columns,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback);

View File

@ -30,7 +30,7 @@ MergeTreeDataPartCompact::MergeTreeDataPartCompact(
IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
const NamesAndTypesList & columns_to_read,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
@ -43,7 +43,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
auto * load_marks_threadpool = reader_settings.read_settings.load_marks_asynchronously ? &read_info->getContext()->getLoadMarksThreadpool() : nullptr;
return std::make_unique<MergeTreeReaderCompact>(
read_info, columns_to_read, metadata_snapshot, uncompressed_cache,
read_info, columns_to_read, storage_snapshot, uncompressed_cache,
mark_cache, mark_ranges, reader_settings, load_marks_threadpool,
avg_value_size_hints, profile_callback);
}

View File

@ -30,7 +30,7 @@ public:
MergeTreeReaderPtr getReader(
const NamesAndTypesList & columns,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,

View File

@ -32,7 +32,7 @@ MergeTreeDataPartInMemory::MergeTreeDataPartInMemory(
IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader(
const NamesAndTypesList & columns_to_read,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
UncompressedCache * /* uncompressed_cache */,
MarkCache * /* mark_cache */,
@ -45,7 +45,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader(
auto ptr = std::static_pointer_cast<const MergeTreeDataPartInMemory>(shared_from_this());
return std::make_unique<MergeTreeReaderInMemory>(
read_info, ptr, columns_to_read, metadata_snapshot, mark_ranges, reader_settings);
read_info, ptr, columns_to_read, storage_snapshot, mark_ranges, reader_settings);
}
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter(

View File

@ -19,7 +19,7 @@ public:
MergeTreeReaderPtr getReader(
const NamesAndTypesList & columns,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,

View File

@ -29,7 +29,7 @@ MergeTreeDataPartWide::MergeTreeDataPartWide(
IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
const NamesAndTypesList & columns_to_read,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
@ -41,7 +41,7 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
auto read_info = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this(), alter_conversions);
return std::make_unique<MergeTreeReaderWide>(
read_info, columns_to_read,
metadata_snapshot, uncompressed_cache,
storage_snapshot, uncompressed_cache,
mark_cache, mark_ranges, reader_settings,
avg_value_size_hints, profile_callback);
}

View File

@ -25,7 +25,7 @@ public:
MergeTreeReaderPtr getReader(
const NamesAndTypesList & columns,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,

View File

@ -97,7 +97,7 @@ std::future<MergeTreeReaderPtr> MergeTreePrefetchedReadPool::createPrefetchedRea
Priority priority) const
{
auto reader = data_part.getReader(
columns, storage_snapshot->metadata, required_ranges,
columns, storage_snapshot, required_ranges,
uncompressed_cache, mark_cache, alter_conversions, reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);

View File

@ -17,7 +17,7 @@ namespace ErrorCodes
MergeTreeReaderCompact::MergeTreeReaderCompact(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
MarkRanges mark_ranges_,
@ -29,7 +29,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
: IMergeTreeReader(
data_part_info_for_read_,
columns_,
metadata_snapshot_,
storage_snapshot_,
uncompressed_cache_,
mark_cache_,
mark_ranges_,
@ -166,8 +166,11 @@ void MergeTreeReaderCompact::fillColumnPositions()
name_in_storage = alter_conversions->getColumnNewName(name_in_storage);
if (!storage_columns_with_collected_nested)
storage_columns_with_collected_nested = ColumnsDescription(
Nested::collect(metadata_snapshot->getColumns().getAllPhysical()));
{
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
auto storage_columns_list = Nested::collect(storage_snapshot->getColumns(options));
storage_columns_with_collected_nested = ColumnsDescription(std::move(storage_columns_list));
}
column_to_read_with_subcolumns = storage_columns_with_collected_nested
->getColumnOrSubcolumn(

View File

@ -21,7 +21,7 @@ public:
MergeTreeReaderCompact(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
MarkRanges mark_ranges_,

View File

@ -19,13 +19,13 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
DataPartInMemoryPtr data_part_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
MarkRanges mark_ranges_,
MergeTreeReaderSettings settings_)
: IMergeTreeReader(
data_part_info_for_read_,
columns_,
metadata_snapshot_,
storage_snapshot_,
nullptr,
nullptr,
mark_ranges_,

View File

@ -18,7 +18,7 @@ public:
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
DataPartInMemoryPtr data_part_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
MarkRanges mark_ranges_,
MergeTreeReaderSettings settings_);

View File

@ -24,7 +24,7 @@ namespace
MergeTreeReaderWide::MergeTreeReaderWide(
MergeTreeDataPartInfoForReaderPtr data_part_info_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
MarkRanges mark_ranges_,
@ -35,7 +35,7 @@ MergeTreeReaderWide::MergeTreeReaderWide(
: IMergeTreeReader(
data_part_info_,
columns_,
metadata_snapshot_,
storage_snapshot_,
uncompressed_cache_,
mark_cache_,
mark_ranges_,

View File

@ -17,7 +17,7 @@ public:
MergeTreeReaderWide(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
MarkRanges mark_ranges_,

View File

@ -65,7 +65,7 @@ void MergeTreeSelectAlgorithm::initializeReaders()
initializeMergeTreeReadersForPart(
data_part, alter_conversions, task_columns,
storage_snapshot->getMetadataForQuery(), all_mark_ranges, {}, {});
all_mark_ranges, /*value_size_map=*/ {}, /*profile_callback=*/ {});
}

View File

@ -150,7 +150,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
mark_ranges.emplace(MarkRanges{MarkRange(0, data_part->getMarksCount())});
reader = data_part->getReader(
columns_for_reader, storage_snapshot->metadata,
columns_for_reader, storage_snapshot,
*mark_ranges, /* uncompressed_cache = */ nullptr,
mark_cache.get(), alter_conversions, reader_settings, {}, {});
}

View File

@ -45,8 +45,6 @@ void MergeTreeThreadSelectAlgorithm::finalizeNewTask()
/// Allows pool to reduce number of threads in case of too slow reads.
auto profile_callback = [this](ReadBufferFromFileBase::ProfileInfo info_) { pool->profileFeedback(info_); };
const auto & metadata_snapshot = storage_snapshot->metadata;
IMergeTreeReader::ValueSizeMap value_size_map;
if (reader && part_name != last_read_part_name)
@ -57,7 +55,7 @@ void MergeTreeThreadSelectAlgorithm::finalizeNewTask()
/// task->reader.valid() means there is a prefetched reader in this test, use it.
const bool init_new_readers = !reader || task->reader.valid() || part_name != last_read_part_name;
if (init_new_readers)
initializeMergeTreeReadersForCurrentTask(metadata_snapshot, value_size_map, profile_callback);
initializeMergeTreeReadersForCurrentTask(value_size_map, profile_callback);
last_read_part_name = part_name;
}