experiments

This commit is contained in:
Nikita Mikhaylov 2019-11-18 13:14:37 +03:00
parent b380f8d53c
commit 8eebcad52d
2 changed files with 26 additions and 0 deletions

View File

@ -377,6 +377,12 @@ public:
if (unlikely(this->c_end == this->c_end_of_storage))
this->reserveForNextSize(std::forward<TAllocatorParams>(allocator_params)...);
if (this->c_end == nullptr)
std::cout << "this->c_end == nullptr" << std::endl;
if (this->c_end_of_storage == nullptr)
std::cout << "this->c_end_of_storage == nullptr" << std::endl;
new (t_end()) T(std::forward<U>(x));
this->c_end += this->byte_size(1);
}

View File

@ -48,6 +48,8 @@ SummingSortedBlockInputStream::SummingSortedBlockInputStream(
size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_)
{
std::cout << "Constructor SummingSortedBlockInputStream()" << std::endl;
std::cout << "num_columns " << num_columns << std::endl;
current_row.resize(num_columns);
/// name of nested structure -> the column numbers that refer to it.
@ -60,11 +62,14 @@ SummingSortedBlockInputStream::SummingSortedBlockInputStream(
*/
for (size_t i = 0; i < num_columns; ++i)
{
std::cout << "Constructor for loop" << std::endl;
const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
std::cout << "column name " << column.name << std::endl;
/// Discover nested Maps and find columns for summation
if (typeid_cast<const DataTypeArray *>(column.type.get()))
{
std::cout << "typeid_cast<const DataTypeArray *>(column.type.get())" << std::endl;
const auto map_name = Nested::extractTableName(column.name);
/// if nested table name ends with `Map` it is a possible candidate for special handling
if (map_name == column.name || !endsWith(map_name, "Map"))
@ -81,6 +86,7 @@ SummingSortedBlockInputStream::SummingSortedBlockInputStream(
if (!column.type->isSummable() && !is_agg_func)
{
column_numbers_not_to_aggregate.push_back(i);
std::cout << "!column.type->isSummable() && !is_agg_func" << std::endl;
continue;
}
@ -106,11 +112,13 @@ SummingSortedBlockInputStream::SummingSortedBlockInputStream(
}
columns_to_aggregate.emplace_back(std::move(desc));
std::cout << "columns_to_aggregate" << std::endl;
}
else
{
// Column is not going to be summed, use last value
column_numbers_not_to_aggregate.push_back(i);
std::cout << "column_numbers_not_to_aggregate" << std::endl;
}
}
}
@ -258,6 +266,7 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & me
Block SummingSortedBlockInputStream::readImpl()
{
std::cout << "SummingSortedBlockInputStream::readImpl" << std::endl;
if (finished)
return Block();
@ -268,14 +277,20 @@ Block SummingSortedBlockInputStream::readImpl()
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
if (merged_columns.empty())
{
std::cout << "merged_columns.empty()" << std::endl;
return {};
}
/// Update aggregation result columns for current block
for (auto & desc : columns_to_aggregate)
{
std::cout << "readImpl() in for loop" << std::endl;
// Wrap aggregated columns in a tuple to match function signature
if (!desc.is_agg_func_type && isTuple(desc.function->getReturnType()))
{
std::cout << "!desc.is_agg_func_type && isTuple(desc.function->getReturnType())" << std::endl;
size_t tuple_size = desc.column_numbers.size();
MutableColumns tuple_columns(tuple_size);
for (size_t i = 0; i < tuple_size; ++i)
@ -284,7 +299,11 @@ Block SummingSortedBlockInputStream::readImpl()
desc.merged_column = ColumnTuple::create(std::move(tuple_columns));
}
else
{
std::cout << "else" << std::endl;
desc.merged_column = header.safeGetByPosition(desc.column_numbers[0]).column->cloneEmpty();
}
}
merge(merged_columns, queue_without_collation);
@ -310,6 +329,7 @@ Block SummingSortedBlockInputStream::readImpl()
void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
{
std::cout << "SummingSortedBlockInputStream::merge(...)" << std::endl;
merged_rows = 0;
/// Take the rows in needed order and put them in `merged_columns` until rows no more than `max_block_size`