ClickHouse/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp

218 lines
6.6 KiB
C++
Raw Normal View History

2013-10-01 18:09:31 +00:00
#include <DB/DataStreams/SummingSortedBlockInputStream.h>
2015-04-08 16:38:38 +00:00
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataTypes/DataTypeArray.h>
2015-04-09 17:01:02 +00:00
#include <boost/range/iterator_range_core.hpp>
2013-10-01 18:09:31 +00:00
namespace DB
{
void SummingSortedBlockInputStream::insertCurrentRow(ColumnPlainPtrs & merged_columns)
{
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert(current_row[i]);
}
2013-11-30 18:43:59 +00:00
2013-10-01 18:09:31 +00:00
2015-04-08 16:38:38 +00:00
namespace
{
bool endsWith(const std::string & s, const std::string & suffix)
{
return s.size() >= suffix.size() && 0 == strncmp(s.data() + s.size() - suffix.size(), suffix.data(), suffix.size());
}
bool isInPrimaryKey(const SortDescription & description, const std::string & name, const std::size_t number)
{
for (auto & desc : description)
if (desc.column_name == name || (desc.column_name.empty() && desc.column_number == number))
return true;
return false;
}
}
2013-10-01 18:09:31 +00:00
Block SummingSortedBlockInputStream::readImpl()
{
2015-01-18 08:25:56 +00:00
if (finished)
2013-10-01 18:09:31 +00:00
return Block();
2014-11-22 02:22:30 +00:00
2013-10-01 18:09:31 +00:00
if (children.size() == 1)
return children[0]->read();
Block merged_block;
ColumnPlainPtrs merged_columns;
2014-11-22 02:22:30 +00:00
2013-10-01 18:09:31 +00:00
init(merged_block, merged_columns);
if (merged_columns.empty())
return Block();
/// Дополнительная инициализация.
if (current_row.empty())
{
current_row.resize(num_columns);
current_key.resize(description.size());
next_key.resize(description.size());
2015-04-08 16:38:38 +00:00
std::unordered_map<std::string, std::vector<std::size_t>> discovered_maps;
2014-11-22 02:22:30 +00:00
/** Заполним номера столбцов, которые должны быть просуммированы.
* Это могут быть только числовые столбцы, не входящие в ключ сортировки.
* Если задан непустой список column_names_to_sum, то берём только эти столбцы.
* Часть столбцов из column_names_to_sum может быть не найдена. Это игнорируется.
*/
2013-10-01 18:09:31 +00:00
for (size_t i = 0; i < num_columns; ++i)
{
ColumnWithTypeAndName & column = merged_block.getByPosition(i);
2013-10-01 18:09:31 +00:00
/// Discover nested Maps and find columns for summation
if (typeid_cast<const DataTypeArray *>(column.type.get()))
2015-04-08 16:38:38 +00:00
{
const auto map_name = DataTypeNested::extractNestedTableName(column.name);
/// if nested table name ends with `Map` it is a possible candidate for special handling
2015-04-08 16:38:38 +00:00
if (map_name == column.name || !endsWith(map_name, "Map"))
continue;
discovered_maps[map_name].emplace_back(i);
}
else
{
/// Оставляем только числовые типы. При чём, даты и даты-со-временем здесь такими не считаются.
if (!column.type->isNumeric() || column.type->getName() == "Date" ||
column.type->getName() == "DateTime")
continue;
/// Входят ли в PK?
if (isInPrimaryKey(description, column.name, i))
continue;
if (column_names_to_sum.empty()
|| column_names_to_sum.end() !=
std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name))
{
column_numbers_to_sum.push_back(i);
}
}
}
/// select actual nested Maps from list of candidates
2015-04-08 16:38:38 +00:00
for (const auto & map : discovered_maps)
{
/// map should contain at least two elements (key -> value)
2015-04-09 17:01:02 +00:00
if (map.second.size() < 2)
2015-04-08 16:38:38 +00:00
continue;
/// check type of key
2015-04-08 16:38:38 +00:00
const auto key_num = map.second.front();
auto & key_col = merged_block.getByPosition(key_num);
/// skip maps, whose members are part of primary key
2015-04-08 16:38:38 +00:00
if (isInPrimaryKey(description, key_col.name, key_num))
2013-10-01 18:09:31 +00:00
continue;
2015-04-08 16:38:38 +00:00
auto & key_nested_type = static_cast<const DataTypeArray *>(key_col.type.get())->getNestedType();
/// key can only be integral
if (!key_nested_type->isNumeric() || key_nested_type->getName() == "Float32" || key_nested_type->getName() == "Float64")
continue;
2013-10-01 18:09:31 +00:00
2015-04-09 17:01:02 +00:00
/// check each value type (skip the first column number which is for key)
auto correct_types = true;
for (auto & value_num : boost::make_iterator_range(std::next(map.second.begin()), map.second.end()))
{
auto & value_col = merged_block.getByPosition(value_num);
/// skip maps, whose members are part of primary key
if (isInPrimaryKey(description, value_col.name, value_num))
{
correct_types = false;
break;
}
2013-10-01 18:09:31 +00:00
2015-04-09 17:01:02 +00:00
auto & value_nested_type = static_cast<const DataTypeArray *>(value_col.type.get())->getNestedType();
/// value can be any arithmetic type except date and datetime
if (!value_nested_type->isNumeric() || value_nested_type->getName() == "Date" ||
value_nested_type->getName() == "DateTime")
{
correct_types = false;
break;
}
}
2015-04-08 16:38:38 +00:00
2015-04-09 17:01:02 +00:00
if (correct_types)
maps_to_sum.push_back({ key_num, { std::next(map.second.begin()), map.second.end() } });
2013-10-01 18:09:31 +00:00
}
}
if (has_collation)
merge(merged_columns, queue_with_collation);
2013-10-01 18:09:31 +00:00
else
merge(merged_columns, queue);
2013-10-01 18:09:31 +00:00
return merged_block;
}
2013-11-30 18:43:59 +00:00
2013-10-01 18:09:31 +00:00
template<class TSortCursor>
void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
2014-11-22 02:22:30 +00:00
{
2013-10-01 18:09:31 +00:00
size_t merged_rows = 0;
2014-11-22 02:22:30 +00:00
2013-10-01 18:09:31 +00:00
/// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size
while (!queue.empty())
{
TSortCursor current = queue.top();
setPrimaryKey(next_key, current);
bool key_differs = next_key != current_key;
/// если накопилось достаточно строк и последняя посчитана полностью
if (key_differs && merged_rows >= max_block_size)
return;
queue.pop();
if (key_differs)
2013-10-01 18:09:31 +00:00
{
/// Запишем данные для предыдущей группы.
2014-05-07 11:44:14 +00:00
if (!current_key[0].isNull() && !current_row_is_zero)
2013-10-01 20:38:01 +00:00
{
++merged_rows;
2014-05-07 11:44:14 +00:00
output_is_non_empty = true;
2013-10-01 20:38:01 +00:00
insertCurrentRow(merged_columns);
}
2013-10-01 18:09:31 +00:00
current_key = std::move(next_key);
2013-10-01 18:09:31 +00:00
next_key.resize(description.size());
setRow(current_row, current);
2013-11-30 18:43:59 +00:00
current_row_is_zero = false;
2013-10-01 18:09:31 +00:00
}
else
{
2013-11-30 18:43:59 +00:00
current_row_is_zero = !addRow(current_row, current);
2013-10-01 18:09:31 +00:00
}
if (!current->isLast())
{
current->next();
queue.push(current);
}
else
{
/// Достаём из соответствующего источника следующий блок, если есть.
fetchNextBlock(current, queue);
}
}
2014-05-07 11:44:14 +00:00
/// Запишем данные для последней группы, если она ненулевая.
/// Если она нулевая, и без нее выходной поток окажется пустым, запишем ее все равно.
if (!current_row_is_zero || !output_is_non_empty)
{
++merged_rows;
insertCurrentRow(merged_columns);
}
2013-10-01 18:09:31 +00:00
2015-01-18 08:25:56 +00:00
finished = true;
2013-10-01 18:09:31 +00:00
}
}