remove couts

This commit is contained in:
Nikita Mikhaylov 2019-11-25 20:42:17 +03:00
parent 8f96118c14
commit ee0855be6d
4 changed files with 1 additions and 48 deletions

View File

@ -23,10 +23,7 @@ MergingSortedBlockInputStream::MergingSortedBlockInputStream(
, average_block_sizes(average_block_sizes_), source_blocks(inputs_.size()) , average_block_sizes(average_block_sizes_), source_blocks(inputs_.size())
, cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_) , cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_)
{ {
std::cout << "size1 " << children.size() << std::endl;
children.insert(children.end(), inputs_.begin(), inputs_.end()); children.insert(children.end(), inputs_.begin(), inputs_.end());
std::cout << "size2 " << children.size() << std::endl;
std::cout << children.at(0)->getName() << std::endl;
children.at(0)->dumpTree(std::cout, 0); children.at(0)->dumpTree(std::cout, 0);
header = children.at(0)->getHeader(); header = children.at(0)->getHeader();
num_columns = header.columns(); num_columns = header.columns();

View File

@ -48,10 +48,6 @@ SummingSortedBlockInputStream::SummingSortedBlockInputStream(
size_t max_block_size_) size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_) : MergingSortedBlockInputStream(inputs_, description_, max_block_size_)
{ {
std::cout << "Constructor SummingSortedBlockInputStream()" << std::endl;
std::cout << "num_columns " << num_columns << std::endl;
std::cout << "inputs size " << inputs_.size() << std::endl;
std::cout << StackTrace().toString() << std::endl;
current_row.resize(num_columns); current_row.resize(num_columns);
/// name of nested structure -> the column numbers that refer to it. /// name of nested structure -> the column numbers that refer to it.
@ -64,15 +60,11 @@ SummingSortedBlockInputStream::SummingSortedBlockInputStream(
*/ */
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
{ {
std::cout << "Constructor for loop" << std::endl;
const ColumnWithTypeAndName & column = header.safeGetByPosition(i); const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
std::cout << "column name " << column.name << std::endl;
std::cout << header.dumpNames() << std::endl;
/// Discover nested Maps and find columns for summation /// Discover nested Maps and find columns for summation
if (typeid_cast<const DataTypeArray *>(column.type.get())) if (typeid_cast<const DataTypeArray *>(column.type.get()))
{ {
std::cout << "typeid_cast<const DataTypeArray *>(column.type.get())" << std::endl;
const auto map_name = Nested::extractTableName(column.name); const auto map_name = Nested::extractTableName(column.name);
/// if nested table name ends with `Map` it is a possible candidate for special handling /// if nested table name ends with `Map` it is a possible candidate for special handling
if (map_name == column.name || !endsWith(map_name, "Map")) if (map_name == column.name || !endsWith(map_name, "Map"))
@ -89,7 +81,6 @@ SummingSortedBlockInputStream::SummingSortedBlockInputStream(
if (!column.type->isSummable() && !is_agg_func) if (!column.type->isSummable() && !is_agg_func)
{ {
column_numbers_not_to_aggregate.push_back(i); column_numbers_not_to_aggregate.push_back(i);
std::cout << "!column.type->isSummable() && !is_agg_func" << std::endl;
continue; continue;
} }
@ -115,13 +106,11 @@ SummingSortedBlockInputStream::SummingSortedBlockInputStream(
} }
columns_to_aggregate.emplace_back(std::move(desc)); columns_to_aggregate.emplace_back(std::move(desc));
std::cout << "columns_to_aggregate" << std::endl;
} }
else else
{ {
// Column is not going to be summed, use last value // Column is not going to be summed, use last value
column_numbers_not_to_aggregate.push_back(i); column_numbers_not_to_aggregate.push_back(i);
std::cout << "column_numbers_not_to_aggregate" << std::endl;
} }
} }
} }
@ -209,13 +198,11 @@ SummingSortedBlockInputStream::SummingSortedBlockInputStream(
void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & merged_columns) void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & merged_columns)
{ {
std::cout << "SummingSortedBlockInputStream::insertCurrentRowIfNeeded" << std::endl;
if (columns_to_aggregate.empty()) if (columns_to_aggregate.empty())
current_row_is_zero = false; current_row_is_zero = false;
for (auto & desc : columns_to_aggregate) for (auto & desc : columns_to_aggregate)
{ {
std::cout << "MergedColumnName " << desc.merged_column->getName() << std::endl;
// Do not insert if the aggregation state hasn't been created // Do not insert if the aggregation state hasn't been created
if (desc.created) if (desc.created)
{ {
@ -258,7 +245,6 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & me
/// (at this moment we need rollback only cols from columns_to_aggregate) /// (at this moment we need rollback only cols from columns_to_aggregate)
if (current_row_is_zero) if (current_row_is_zero)
{ {
std::cout << "current_row_is_zero" << std::endl;
for (auto & desc : columns_to_aggregate) for (auto & desc : columns_to_aggregate)
desc.merged_column->popBack(1); desc.merged_column->popBack(1);
@ -270,13 +256,11 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & me
/// Update per-block and per-group flags /// Update per-block and per-group flags
++merged_rows; ++merged_rows;
std::cout << "insertCurrentRowIfNeeded merged_rows " << merged_rows << std::endl;
} }
Block SummingSortedBlockInputStream::readImpl() Block SummingSortedBlockInputStream::readImpl()
{ {
std::cout << "SummingSortedBlockInputStream::readImpl" << std::endl;
if (finished) if (finished)
return Block(); return Block();
@ -287,20 +271,15 @@ Block SummingSortedBlockInputStream::readImpl()
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
if (merged_columns.empty()) if (merged_columns.empty())
{
std::cout << "merged_columns.empty()" << std::endl;
return {}; return {};
}
/// Update aggregation result columns for current block /// Update aggregation result columns for current block
for (auto & desc : columns_to_aggregate) for (auto & desc : columns_to_aggregate)
{ {
std::cout << "readImpl() in for loop" << std::endl;
// Wrap aggregated columns in a tuple to match function signature // Wrap aggregated columns in a tuple to match function signature
if (!desc.is_agg_func_type && isTuple(desc.function->getReturnType())) if (!desc.is_agg_func_type && isTuple(desc.function->getReturnType()))
{ {
std::cout << "!desc.is_agg_func_type && isTuple(desc.function->getReturnType())" << std::endl;
size_t tuple_size = desc.column_numbers.size(); size_t tuple_size = desc.column_numbers.size();
MutableColumns tuple_columns(tuple_size); MutableColumns tuple_columns(tuple_size);
for (size_t i = 0; i < tuple_size; ++i) for (size_t i = 0; i < tuple_size; ++i)
@ -309,19 +288,13 @@ Block SummingSortedBlockInputStream::readImpl()
desc.merged_column = ColumnTuple::create(std::move(tuple_columns)); desc.merged_column = ColumnTuple::create(std::move(tuple_columns));
} }
else else
{
std::cout << "else" << std::endl;
desc.merged_column = header.safeGetByPosition(desc.column_numbers[0]).column->cloneEmpty(); desc.merged_column = header.safeGetByPosition(desc.column_numbers[0]).column->cloneEmpty();
}
} }
std::cout << "queue_without_collation.size() " << queue_without_collation.size() << std::endl;
merge(merged_columns, queue_without_collation); merge(merged_columns, queue_without_collation);
Block res = header.cloneWithColumns(std::move(merged_columns)); Block res = header.cloneWithColumns(std::move(merged_columns));
std::cout << "result rows count " << res.rows() << std::endl;
/// Place aggregation results into block. /// Place aggregation results into block.
for (auto & desc : columns_to_aggregate) for (auto & desc : columns_to_aggregate)
{ {
@ -342,26 +315,19 @@ Block SummingSortedBlockInputStream::readImpl()
void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue) void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
{ {
std::cout << "SummingSortedBlockInputStream::merge(...)" << std::endl;
merged_rows = 0; merged_rows = 0;
/// Take the rows in needed order and put them in `merged_columns` until rows no more than `max_block_size` /// Take the rows in needed order and put them in `merged_columns` until rows no more than `max_block_size`
while (!queue.empty()) while (!queue.empty())
{ {
std::cout << "while loop" << std::endl;
SortCursor current = queue.top(); SortCursor current = queue.top();
setPrimaryKeyRef(next_key, current); setPrimaryKeyRef(next_key, current);
if (next_key.empty())
std::cout << "next_key empty()" << std::endl;
bool key_differs; bool key_differs;
if (current_key.empty()) /// The first key encountered. if (current_key.empty()) /// The first key encountered.
{ {
std::cout << "current_key is empty" << std::endl;
key_differs = true; key_differs = true;
current_row_is_zero = true; current_row_is_zero = true;
} }
@ -402,7 +368,6 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
/// We have only columns_to_aggregate. The status of current row will be determined /// We have only columns_to_aggregate. The status of current row will be determined
/// in 'insertCurrentRowIfNeeded' method on the values of aggregate functions. /// in 'insertCurrentRowIfNeeded' method on the values of aggregate functions.
current_row_is_zero = true; current_row_is_zero = true;
std::cout << "maps_to_sum.empty() true" << std::endl;
} }
else else
{ {
@ -440,7 +405,6 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
/// If it is zero, and without it the output stream will be empty, we will write it anyway. /// If it is zero, and without it the output stream will be empty, we will write it anyway.
insertCurrentRowIfNeeded(merged_columns); insertCurrentRowIfNeeded(merged_columns);
finished = true; finished = true;
std::cout << "merged rows " << merged_rows << std::endl;
} }

View File

@ -1,6 +1,5 @@
#include <Processors/ISource.h> #include <Processors/ISource.h>
namespace DB namespace DB
{ {

View File

@ -577,9 +577,6 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
{ {
/// Add columns needed to calculate the sorting expression and the sign. /// Add columns needed to calculate the sorting expression and the sign.
std::vector<String> add_columns = data.sorting_key_expr->getRequiredColumns(); std::vector<String> add_columns = data.sorting_key_expr->getRequiredColumns();
std::cout << "std::vector<String> add_columns" << std::endl;
for (auto column: add_columns)
std::cout << column << std::endl;
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end()); column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
if (!data.merging_params.sign_column.empty()) if (!data.merging_params.sign_column.empty())
@ -1115,12 +1112,8 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
BlockInputStreams streams; BlockInputStreams streams;
streams.reserve(num_streams); streams.reserve(num_streams);
for (size_t i = 0; i < num_streams; ++i) { for (size_t i = 0; i < num_streams; ++i)
std::cout << "labmda for loop № " << i << std::endl;
std::cout << pipes[i].getHeader().dumpStructure() << std::endl;
streams.emplace_back(std::make_shared<TreeExecutorBlockInputStream>(std::move(pipes[i]))); streams.emplace_back(std::make_shared<TreeExecutorBlockInputStream>(std::move(pipes[i])));
}
pipes.clear(); pipes.clear();
return streams; return streams;