Create result columns as MutableColumns at the begining

Use IColumn* to manipulate columns on specific positions in result columns
This commit is contained in:
Igor Nikonov 2023-03-16 17:44:29 +00:00
parent b369366b19
commit 4904a9fb7a
4 changed files with 77 additions and 162 deletions

View File

@ -107,39 +107,4 @@ void FillingRow::initFromDefaults(size_t from_pos)
row[i] = getFillDescription(i).fill_from;
}
void insertFromFillingRow(MutableColumns & filling_columns, MutableColumns & interpolate_columns, MutableColumns & other_columns,
const FillingRow & filling_row, const Block & interpolate_block)
{
for (size_t i = 0, size = filling_columns.size(); i < size; ++i)
{
if (filling_row[i].isNull())
{
filling_columns[i]->insertDefault();
}
else
{
filling_columns[i]->insert(filling_row[i]);
}
}
if (size_t size = interpolate_block.columns())
{
Columns columns = interpolate_block.getColumns();
for (size_t i = 0; i < size; ++i)
interpolate_columns[i]->insertFrom(*columns[i]->convertToFullColumnIfConst(), 0);
}
else
for (const auto & interpolate_column : interpolate_columns)
interpolate_column->insertDefault();
for (const auto & other_column : other_columns)
other_column->insertDefault();
}
void copyRowFromColumns(MutableColumns & dest, const Columns & source, size_t row_num)
{
for (size_t i = 0, size = source.size(); i < size; ++i)
dest[i]->insertFrom(*source[i], row_num);
}
}

View File

@ -39,8 +39,4 @@ private:
SortDescription sort_description;
};
void insertFromFillingRow(MutableColumns & filling_columns, MutableColumns & interpolate_columns, MutableColumns & other_columns,
const FillingRow & filling_row, const Block & interpolate_block);
void copyRowFromColumns(MutableColumns & dest, const Columns & source, size_t row_num);
}

View File

@ -264,28 +264,7 @@ IProcessor::Status FillingTransform::prepare()
return ISimpleTransform::prepare();
}
void FillingTransform::initFillingRow(const Columns & input_columns)
{
for (size_t i = 0, size = filling_row.size(); i < size; ++i)
{
auto current_value = (*input_columns[i])[0];
const auto & fill_from = filling_row.getFillDescription(i).fill_from;
if (!fill_from.isNull() && !equals(current_value, fill_from))
{
filling_row.initFromDefaults(i);
if (less(fill_from, current_value, filling_row.getDirection(i)))
{
// interpolate();
// insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
}
break;
}
filling_row[i] = current_value;
}
}
void FillingTransform::interpolate(const std::vector<const IColumn *> & result_columns, Block & interpolate_block)
void FillingTransform::interpolate(const MutableColumns & result_columns, Block & interpolate_block)
{
if (interpolate_description)
{
@ -297,7 +276,7 @@ void FillingTransform::interpolate(const std::vector<const IColumn *> & result_c
for (const auto & [col_pos, name_type] : input_positions)
{
MutableColumnPtr column = name_type.type->createColumn();
const auto * res_column = result_columns[col_pos];
const auto * res_column = result_columns[col_pos].get();
// auto [res_columns, pos] = res_map[col_pos];
// size_t size = (*res_columns)[pos]->size();
size_t size = res_column->size();
@ -324,35 +303,66 @@ void FillingTransform::interpolate(const std::vector<const IColumn *> & result_c
}
}
using MutableColumnRefs = std::vector<const IColumn *>;
using MutableColumnRawPtrs = std::vector<IColumn*>;
static void insertFromFillingRow(const MutableColumnRawPtrs & filling_columns, const MutableColumnRawPtrs & interpolate_columns, const MutableColumnRawPtrs & other_columns,
const FillingRow & filling_row, const Block & interpolate_block)
{
for (size_t i = 0, size = filling_columns.size(); i < size; ++i)
{
if (filling_row[i].isNull())
{
filling_columns[i]->insertDefault();
}
else
{
filling_columns[i]->insert(filling_row[i]);
}
}
if (size_t size = interpolate_block.columns())
{
Columns columns = interpolate_block.getColumns();
for (size_t i = 0; i < size; ++i)
interpolate_columns[i]->insertFrom(*columns[i]->convertToFullColumnIfConst(), 0);
}
else
for (auto * interpolate_column : interpolate_columns)
interpolate_column->insertDefault();
for (auto * other_column : other_columns)
other_column->insertDefault();
}
static void copyRowFromColumns(const MutableColumnRawPtrs & dest, const Columns & source, size_t row_num)
{
for (size_t i = 0, size = source.size(); i < size; ++i)
dest[i]->insertFrom(*source[i], row_num);
}
void FillingTransform::transform(Chunk & chunk)
{
LOG_DEBUG(&Poco::Logger::get(__PRETTY_FUNCTION__), "Input columns={} rows={}", chunk.getNumColumns(), chunk.getNumRows());
if (!chunk.hasRows() && !generate_suffix)
return;
Columns old_fill_columns;
Columns old_interpolate_columns;
Columns old_other_columns;
MutableColumns res_fill_columns;
MutableColumns res_interpolate_columns;
MutableColumns res_other_columns;
MutableColumnRawPtrs res_fill_columns;
MutableColumnRawPtrs res_interpolate_columns;
MutableColumnRawPtrs res_other_columns;
MutableColumns result_columns;
MutableColumnRefs result_columns(input.getHeader().columns());
// std::vector<std::pair<MutableColumns *, size_t>> res_map;
// res_map.resize(input.getHeader().columns());
auto init_columns_by_positions = [&result_columns](const Columns & old_columns, Columns & new_columns,
MutableColumns & new_mutable_columns, const Positions & positions)
auto init_columns_by_positions = [](const Columns & old_columns, Columns & new_columns,
const MutableColumns& output_columns, MutableColumnRawPtrs & new_mutable_columns, const Positions & positions)
{
for (size_t pos : positions)
{
auto old_column = old_columns[pos]->convertToFullColumnIfConst();
new_columns.push_back(old_column);
// res_map[pos] = {&new_mutable_columns, new_mutable_columns.size()};
new_mutable_columns.push_back(old_column->cloneEmpty()->assumeMutable());
result_columns[pos] = &new_mutable_columns.back()->assumeMutableRef();
new_mutable_columns.push_back(output_columns[pos].get());
}
};
@ -361,50 +371,18 @@ void FillingTransform::transform(Chunk & chunk)
auto interpolate = [&]()
{
this->interpolate(result_columns, interpolate_block);
// if (interpolate_description)
// {
// interpolate_block.clear();
// if (!input_positions.empty())
// {
// /// populate calculation block with required columns with values from previous row
// for (const auto & [col_pos, name_type] : input_positions)
// {
// MutableColumnPtr column = name_type.type->createColumn();
// auto * res_column = result_columns[col_pos];
// // auto [res_columns, pos] = res_map[col_pos];
// // size_t size = (*res_columns)[pos]->size();
// size_t size = res_column->size();
// if (size == 0) /// this is the first row in current chunk
// {
// /// take value from last row of previous chunk if exists, else use default
// if (last_row.size() > col_pos && !last_row[col_pos]->empty())
// column->insertFrom(*last_row[col_pos], 0);
// else
// column->insertDefault();
// }
// else /// take value from previous row of current chunk
// column->insertFrom(*res_column, size - 1);
// interpolate_block.insert({std::move(column), name_type.type, name_type.name});
// }
// interpolate_actions->execute(interpolate_block);
// }
// else /// all INTERPOLATE expressions are constants
// {
// size_t n = 1;
// interpolate_actions->execute(interpolate_block, n);
// }
// }
};
if (generate_suffix)
{
const auto & empty_columns = input.getHeader().getColumns();
init_columns_by_positions(empty_columns, old_fill_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(empty_columns, old_interpolate_columns, res_interpolate_columns, interpolate_column_positions);
init_columns_by_positions(empty_columns, old_other_columns, res_other_columns, other_column_positions);
for (const auto & column : empty_columns)
result_columns.push_back(column->convertToFullColumnIfConst()->cloneEmpty()->assumeMutable());
LOG_DEBUG(&Poco::Logger::get(__PRETTY_FUNCTION__), "Result columns size: {}", result_columns.size());
init_columns_by_positions(empty_columns, old_fill_columns, result_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(empty_columns, old_interpolate_columns, result_columns, res_interpolate_columns, interpolate_column_positions);
init_columns_by_positions(empty_columns, old_other_columns, result_columns, res_other_columns, other_column_positions);
if (first)
filling_row.initFromDefaults();
@ -422,16 +400,21 @@ void FillingTransform::transform(Chunk & chunk)
interpolate();
}
setResultColumns(chunk, res_fill_columns, res_interpolate_columns, res_other_columns);
size_t num_output_rows = result_columns[0]->size();
chunk.setColumns(std::move(result_columns), num_output_rows);
LOG_DEBUG(&Poco::Logger::get(__PRETTY_FUNCTION__), "Rows: filling={}", res_fill_columns[0]->size());
LOG_DEBUG(&Poco::Logger::get(__PRETTY_FUNCTION__), "Output(suffix) columns={} rows={}", chunk.getNumColumns(), chunk.getNumRows());
return;
}
const size_t num_rows = chunk.getNumRows();
auto old_columns = chunk.detachColumns();
for (const auto & column : old_columns)
result_columns.push_back(column->convertToFullColumnIfConst()->cloneEmpty()->assumeMutable());
init_columns_by_positions(old_columns, old_fill_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(old_columns, old_interpolate_columns, res_interpolate_columns, interpolate_column_positions);
init_columns_by_positions(old_columns, old_other_columns, res_other_columns, other_column_positions);
init_columns_by_positions(old_columns, old_fill_columns, result_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(old_columns, old_interpolate_columns, result_columns, res_interpolate_columns, interpolate_column_positions);
init_columns_by_positions(old_columns, old_other_columns, result_columns, res_other_columns, other_column_positions);
if (first)
{
@ -490,54 +473,27 @@ void FillingTransform::transform(Chunk & chunk)
copyRowFromColumns(res_other_columns, old_other_columns, row_ind);
}
saveLastRow(res_fill_columns, res_interpolate_columns, res_other_columns);
setResultColumns(chunk, res_fill_columns, res_interpolate_columns, res_other_columns);
saveLastRow(result_columns);
size_t num_output_rows = result_columns[0]->size();
chunk.setColumns(std::move(result_columns), num_output_rows);
LOG_DEBUG(&Poco::Logger::get(__PRETTY_FUNCTION__), "Rows: filling={}", res_fill_columns[0]->size());
LOG_DEBUG(&Poco::Logger::get(__PRETTY_FUNCTION__), "Output columns={} rows={}", chunk.getNumColumns(), chunk.getNumRows());
}
void FillingTransform::setResultColumns(Chunk & chunk, MutableColumns & fill_columns, MutableColumns & interpolate_columns, MutableColumns & other_columns) const
{
MutableColumns result_columns(fill_columns.size() + interpolate_columns.size() + other_columns.size());
/// fill_columns always non-empty.
size_t num_rows = fill_columns[0]->size();
for (size_t i = 0, size = fill_columns.size(); i < size; ++i)
result_columns[fill_column_positions[i]] = std::move(fill_columns[i]);
for (size_t i = 0, size = interpolate_columns.size(); i < size; ++i)
result_columns[interpolate_column_positions[i]] = std::move(interpolate_columns[i]);
for (size_t i = 0, size = other_columns.size(); i < size; ++i)
result_columns[other_column_positions[i]] = std::move(other_columns[i]);
chunk.setColumns(std::move(result_columns), num_rows);
}
void FillingTransform::saveLastRow(const MutableColumns & fill_columns, const MutableColumns & interpolate_columns, const MutableColumns & other_columns)
void FillingTransform::saveLastRow(const MutableColumns & result_columns)
{
last_row.clear();
last_row.resize(fill_columns.size() + interpolate_columns.size() + other_columns.size());
last_row.resize(result_columns.size());
size_t num_rows = fill_columns[0]->size();
size_t num_rows = result_columns[0]->size();
if (num_rows == 0)
return;
for (size_t i = 0, size = fill_columns.size(); i < size; ++i)
for (size_t i = 0, size = result_columns.size(); i < size; ++i)
{
auto column = fill_columns[i]->cloneEmpty();
column->insertFrom(*fill_columns[i], num_rows - 1);
last_row[fill_column_positions[i]] = std::move(column);
}
for (size_t i = 0, size = interpolate_columns.size(); i < size; ++i)
{
auto column = interpolate_columns[i]->cloneEmpty();
column->insertFrom(*interpolate_columns[i], num_rows - 1);
last_row[interpolate_column_positions[i]] = std::move(column);
}
for (size_t i = 0, size = other_columns.size(); i < size; ++i)
{
auto column = other_columns[i]->cloneEmpty();
column->insertFrom(*other_columns[i], num_rows - 1);
last_row[other_column_positions[i]] = std::move(column);
auto column = result_columns[i]->cloneEmpty();
column->insertFrom(*result_columns[i], num_rows - 1);
last_row[i] = std::move(column);
}
}

View File

@ -28,10 +28,8 @@ protected:
void transform(Chunk & Chunk) override;
private:
void setResultColumns(Chunk & chunk, MutableColumns & fill_columns, MutableColumns & interpolate_columns, MutableColumns & other_columns) const;
void saveLastRow(const MutableColumns & fill_columns, const MutableColumns & interpolate_columns, const MutableColumns & other_columns);
void initFillingRow(const Columns & input_columns);
void interpolate(const std::vector<const IColumn *> & result_columns, Block & interpolate_block);
void saveLastRow(const MutableColumns & result_columns);
void interpolate(const MutableColumns& result_columns, Block & interpolate_block);
const SortDescription sort_description; /// Contains only columns with WITH FILL.
const InterpolateDescriptionPtr interpolate_description; /// Contains INTERPOLATE columns