mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-29 13:10:48 +00:00
in-memory parts: fix reading of nested
This commit is contained in:
parent
fa05641b06
commit
e1970f6d28
@ -248,7 +248,23 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns)
|
||||
}
|
||||
}
|
||||
|
||||
void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read)
|
||||
IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const String & column_name) const
|
||||
{
|
||||
String table_name = Nested::extractTableName(column_name);
|
||||
for (const auto & part_column : data_part->getColumns())
|
||||
{
|
||||
if (typeid_cast<const DataTypeArray *>(part_column.type.get()))
|
||||
{
|
||||
auto position = data_part->getColumnPosition(part_column.name);
|
||||
if (position && Nested::extractTableName(part_column.name) == table_name)
|
||||
return position;
|
||||
}
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const
|
||||
{
|
||||
if (num_columns_to_read != columns.size())
|
||||
throw Exception("invalid number of columns passed to MergeTreeReader::readRows. "
|
||||
|
@ -61,7 +61,7 @@ protected:
|
||||
/// Returns actual column type in part, which can differ from table metadata.
|
||||
NameAndTypePair getColumnFromPart(const NameAndTypePair & required_column) const;
|
||||
|
||||
void checkNumberOfColumns(size_t columns_num_to_read);
|
||||
void checkNumberOfColumns(size_t columns_num_to_read) const;
|
||||
|
||||
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.
|
||||
ValueSizeMap avg_value_size_hints;
|
||||
@ -79,6 +79,9 @@ protected:
|
||||
const MergeTreeData & storage;
|
||||
MarkRanges all_mark_ranges;
|
||||
|
||||
using ColumnPosition = std::optional<size_t>;
|
||||
ColumnPosition findColumnForOffsets(const String & column_name) const;
|
||||
|
||||
friend class MergeTreeRangeReader::DelayedStream;
|
||||
|
||||
private:
|
||||
|
@ -84,12 +84,11 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
|
||||
if (!position && typeid_cast<const DataTypeArray *>(type.get()))
|
||||
{
|
||||
/// If array of Nested column is missing in part,
|
||||
/// we have to read it's offsets if they exists.
|
||||
/// we have to read its offsets if they exist.
|
||||
position = findColumnForOffsets(name);
|
||||
read_only_offsets[i] = (position != std::nullopt);
|
||||
}
|
||||
|
||||
|
||||
column_positions[i] = std::move(position);
|
||||
}
|
||||
|
||||
@ -168,23 +167,6 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
|
||||
return read_rows;
|
||||
}
|
||||
|
||||
MergeTreeReaderCompact::ColumnPosition MergeTreeReaderCompact::findColumnForOffsets(const String & column_name)
|
||||
{
|
||||
String table_name = Nested::extractTableName(column_name);
|
||||
for (const auto & part_column : data_part->getColumns())
|
||||
{
|
||||
if (typeid_cast<const DataTypeArray *>(part_column.type.get()))
|
||||
{
|
||||
auto position = data_part->getColumnPosition(part_column.name);
|
||||
if (position && Nested::extractTableName(part_column.name) == table_name)
|
||||
return position;
|
||||
}
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeReaderCompact::readData(
|
||||
const String & name, IColumn & column, const IDataType & type,
|
||||
size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets)
|
||||
|
@ -40,7 +40,6 @@ private:
|
||||
|
||||
MergeTreeMarksLoader marks_loader;
|
||||
|
||||
using ColumnPosition = std::optional<size_t>;
|
||||
/// Positions of columns in part structure.
|
||||
std::vector<ColumnPosition> column_positions;
|
||||
/// Should we read full column or only it's offsets
|
||||
@ -53,8 +52,6 @@ private:
|
||||
|
||||
void readData(const String & name, IColumn & column, const IDataType & type,
|
||||
size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets = false);
|
||||
|
||||
ColumnPosition findColumnForOffsets(const String & column_name);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Poco/File.h>
|
||||
|
||||
namespace DB
|
||||
@ -24,10 +25,20 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory(
|
||||
std::move(settings_), {})
|
||||
, part_in_memory(std::move(data_part_))
|
||||
{
|
||||
for (const auto & name_and_type : columns)
|
||||
{
|
||||
auto [name, type] = getColumnFromPart(name_and_type);
|
||||
if (!part_in_memory->block.has(name) && typeid_cast<const DataTypeArray *>(type.get()))
|
||||
if (auto offset_position = findColumnForOffsets(name))
|
||||
positions_for_offsets[name] = *offset_position;
|
||||
}
|
||||
}
|
||||
|
||||
size_t MergeTreeReaderInMemory::readRows(size_t from_mark, bool /* continue_reading */, size_t max_rows_to_read, Columns & res_columns)
|
||||
size_t MergeTreeReaderInMemory::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
|
||||
{
|
||||
if (!continue_reading)
|
||||
total_rows_read = 0;
|
||||
|
||||
size_t total_marks = data_part->index_granularity.getMarksCount();
|
||||
if (from_mark >= total_marks)
|
||||
throw Exception("Mark " + toString(from_mark) + " is out of bound. Max mark: "
|
||||
@ -41,34 +52,49 @@ size_t MergeTreeReaderInMemory::readRows(size_t from_mark, bool /* continue_read
|
||||
throw Exception("Cannot read data in MergeTreeReaderInMemory. Rows already read: "
|
||||
+ toString(total_rows_read) + ". Rows in part: " + toString(part_rows), ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||
|
||||
size_t rows_to_read = std::min(max_rows_to_read, part_rows - total_rows_read);
|
||||
auto column_it = columns.begin();
|
||||
size_t rows_read = 0;
|
||||
for (size_t i = 0; i < num_columns; ++i, ++column_it)
|
||||
{
|
||||
auto [name, type] = getColumnFromPart(*column_it);
|
||||
if (!part_in_memory->block.has(name))
|
||||
continue;
|
||||
|
||||
const auto & block_column = part_in_memory->block.getByName(name).column;
|
||||
if (total_rows_read == 0 && part_rows <= max_rows_to_read)
|
||||
{
|
||||
res_columns[i] = block_column;
|
||||
rows_read = part_rows;
|
||||
}
|
||||
else
|
||||
auto offsets_it = positions_for_offsets.find(name);
|
||||
if (offsets_it != positions_for_offsets.end())
|
||||
{
|
||||
const auto & source_offsets = assert_cast<const ColumnArray &>(
|
||||
*part_in_memory->block.getByPosition(offsets_it->second).column).getOffsets();
|
||||
|
||||
if (res_columns[i] == nullptr)
|
||||
res_columns[i] = type->createColumn();
|
||||
|
||||
auto mutable_column = res_columns[i]->assumeMutable();
|
||||
rows_read = std::min(max_rows_to_read, part_rows - total_rows_read);
|
||||
mutable_column->insertRangeFrom(*block_column, total_rows_read, rows_read);
|
||||
auto & res_offstes = assert_cast<ColumnArray &>(*mutable_column).getOffsets();
|
||||
for (size_t row = 0; row < rows_to_read; ++row)
|
||||
res_offstes.push_back(source_offsets[total_rows_read + row]);
|
||||
|
||||
res_columns[i] = std::move(mutable_column);
|
||||
}
|
||||
else if (part_in_memory->block.has(name))
|
||||
{
|
||||
const auto & block_column = part_in_memory->block.getByName(name).column;
|
||||
if (rows_to_read == part_rows)
|
||||
{
|
||||
res_columns[i] = block_column;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (res_columns[i] == nullptr)
|
||||
res_columns[i] = type->createColumn();
|
||||
|
||||
auto mutable_column = res_columns[i]->assumeMutable();
|
||||
mutable_column->insertRangeFrom(*block_column, total_rows_read, rows_to_read);
|
||||
res_columns[i] = std::move(mutable_column);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
total_rows_read += rows_read;
|
||||
return rows_read;
|
||||
total_rows_read += rows_to_read;
|
||||
return rows_to_read;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -29,6 +29,8 @@ public:
|
||||
private:
|
||||
size_t total_rows_read = 0;
|
||||
DataPartInMemoryPtr part_in_memory;
|
||||
|
||||
std::unordered_map<String, size_t> positions_for_offsets;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,15 @@
|
||||
[0]
|
||||
[0,0,0]
|
||||
[0,0,0,0,0]
|
||||
[0,0,0,0,0,0,0]
|
||||
[0,0,0,0,0,0,0,0,0]
|
||||
[0]
|
||||
[0,2,4]
|
||||
[0,2,4,6,8]
|
||||
[0,2,4,6,8,10,12]
|
||||
[0,2,4,6,8,10,12,14,16]
|
||||
[0] [0]
|
||||
[0,1,2] [0,2,4]
|
||||
[0,1,2,3,4] [0,2,4,6,8]
|
||||
[0,1,2,3,4,5,6] [0,2,4,6,8,10,12]
|
||||
[0,1,2,3,4,5,6,7,8] [0,2,4,6,8,10,12,14,16]
|
16
tests/queries/0_stateless/01130_in_memory_parts_nested.sql
Normal file
16
tests/queries/0_stateless/01130_in_memory_parts_nested.sql
Normal file
@ -0,0 +1,16 @@
|
||||
-- Test 00576_nested_and_prewhere, but with in-memory parts.
|
||||
DROP TABLE IF EXISTS nested;
|
||||
|
||||
CREATE TABLE nested (x UInt64, filter UInt8, n Nested(a UInt64)) ENGINE = MergeTree ORDER BY x
|
||||
SETTINGS min_rows_for_compact_part = 200000, min_rows_for_wide_part = 300000;
|
||||
|
||||
INSERT INTO nested SELECT number, number % 2, range(number % 10) FROM system.numbers LIMIT 100000;
|
||||
|
||||
ALTER TABLE nested ADD COLUMN n.b Array(UInt64);
|
||||
SELECT DISTINCT n.b FROM nested PREWHERE filter;
|
||||
|
||||
ALTER TABLE nested ADD COLUMN n.c Array(UInt64) DEFAULT arrayMap(x -> x * 2, n.a);
|
||||
SELECT DISTINCT n.c FROM nested PREWHERE filter;
|
||||
SELECT DISTINCT n.a, n.c FROM nested PREWHERE filter;
|
||||
|
||||
DROP TABLE nested;
|
Loading…
Reference in New Issue
Block a user