Better semantic of sharing columns: development [#CLICKHOUSE-2].

This commit is contained in:
Alexey Milovidov 2017-12-17 08:21:04 +03:00
parent 5f4a536f75
commit d497d010d7
7 changed files with 39 additions and 8 deletions

View File

@ -370,7 +370,7 @@ void ColumnArray::insertRangeFrom(const IColumn & src, size_t start, size_t leng
size_t nested_offset = src_concrete.offsetAt(start);
size_t nested_length = src_concrete.getOffsets()[start + length - 1] - nested_offset;
insertRangeFrom(src_concrete.getData(), nested_offset, nested_length);
getData().insertRangeFrom(src_concrete.getData(), nested_offset, nested_length);
Offsets & cur_offsets = getOffsets();
const Offsets & src_offsets = src_concrete.getOffsets();

View File

@ -88,9 +88,13 @@ Block ColumnGathererStream::readImpl()
if (!source_to_fully_copy && row_sources_buf.eof())
return Block();
MutableColumnPtr output_column = column.column->cloneEmpty();
output_block = Block{column.cloneEmpty()};
MutableColumnPtr output_column = output_block.getByPosition(0).column->mutate();
output_column->gather(*this);
return { ColumnWithTypeAndName(std::move(output_column), column.type, column.name) };
if (!output_column->empty())
output_block.getByPosition(0).column = std::move(output_column);
return output_block;
}

View File

@ -289,7 +289,23 @@ Block SummingSortedBlockInputStream::readImpl()
}
merge(merged_columns, queue);
return header.cloneWithColumns(std::move(merged_columns));
Block res = header.cloneWithColumns(std::move(merged_columns));
/// Place aggregation results into block.
for (auto & desc : columns_to_aggregate)
{
if (checkDataType<DataTypeTuple>(desc.function->getReturnType().get()))
{
/// Unpack tuple into block.
size_t tuple_size = desc.column_numbers.size();
for (size_t i = 0; i < tuple_size; ++i)
res.getByPosition(desc.column_numbers[i]).column = static_cast<const ColumnTuple &>(*desc.merged_column).getColumnPtr(i);
}
else
res.getByPosition(desc.column_numbers[0]).column = std::move(desc.merged_column);
}
return res;
}

View File

@ -1219,7 +1219,17 @@ Block Aggregator::prepareBlockAndFill(
res.getByPosition(i).column = std::move(key_columns[i]);
for (size_t i = 0; i < params.aggregates_size; ++i)
res.getByPosition(i + params.keys_size).column = std::move(final ? final_aggregate_columns[i] : aggregate_columns[i]);
{
if (final)
{
res.getByPosition(i + params.keys_size).type = aggregate_functions[i]->getReturnType();
res.getByPosition(i + params.keys_size).column = std::move(final_aggregate_columns[i]);
}
else
{
res.getByPosition(i + params.keys_size).column = std::move(aggregate_columns[i]);
}
}
/// Change the size of the columns-constants in the block.
size_t columns = sample.columns();

View File

@ -111,7 +111,7 @@ struct AggregationMethodOneNumber
const FieldType * vec;
/** Called at the start of each block processing.
* Sets the variables needed for the other methods called in internal loops.
* Sets the variables needed for the other methods called in inner loops.
*/
void init(ColumnRawPtrs & key_columns)
{

View File

@ -33,7 +33,7 @@ struct SetMethodOneNumber
const FieldType * vec;
/** Called at the start of each block processing.
* Sets the variables required for the other methods called in internal loops.
* Sets the variables required for the other methods called in inner loops.
*/
void init(const ColumnRawPtrs & key_columns)
{

View File

@ -76,7 +76,6 @@ size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t
size_t read_rows = 0;
try
{
/// Pointers to offset columns that are common to the nested data structure columns.
/// If append is true, then the value will be equal to nullptr and will be used only to
/// check that the offsets column has been already read.
@ -135,6 +134,8 @@ size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t
if (column->size())
res.getByName(it.name).column = std::move(column);
else
res.erase(it.name);
}
/// NOTE: positions for all streams must be kept in sync. In particular, even if for some streams there are no rows to be read,