#pragma once #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; } /** Соединяет несколько сортированных потоков в один. * При этом, для каждой группы идущих подряд одинаковых значений первичного ключа (столбцов, по которым сортируются данные), * схлопывает их в одну строку, суммируя все числовые столбцы кроме первичного ключа. * Если во всех числовых столбцах кроме первичного ключа получился ноль, то удаляет строчку. */ class SummingSortedBlockInputStream : public MergingSortedBlockInputStream { public: SummingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, /// Список столбцов, которых нужно суммировать. Если пустое - берутся все числовые столбцы, не входящие в description. const Names & column_names_to_sum_, size_t max_block_size_) : MergingSortedBlockInputStream(inputs_, description_, max_block_size_), column_names_to_sum(column_names_to_sum_) { } String getName() const override { return "SummingSorted"; } String getID() const override; protected: /// Может возвращаться на 1 больше записей, чем max_block_size. Block readImpl() override; private: Logger * log = &Logger::get("SummingSortedBlockInputStream"); /// Прочитали до конца. bool finished = false; /// Столбцы с какими номерами надо суммировать. Names column_names_to_sum; /// Если задано - преобразуется в column_numbers_to_sum при инициализации. ColumnNumbers column_numbers_to_sum; /** Таблица может иметь вложенные таблицы, обрабатываемые особым образом. * Если название вложенной таблицы заканчинвается на `Map` и она содержит не менее двух столбцов, * удовлетворяющих следующим критериям: * - первый столбец - числовой ((U)IntN, Date, DateTime), назовем его условно key, * - остальные столбцы - арифметические ((U)IntN, Float32/64), условно (values...). * Такая вложенная таблица воспринимается как отображение key => (values...) и при слиянии * ее строк выполняется слияние элементов двух множеств по key со сложением соответствующих (values...). * Пример: * [(1, 100)] + [(2, 150)] -> [(1, 100), (2, 150)] * [(1, 100)] + [(1, 150)] -> [(1, 250)] * [(1, 100)] + [(1, 150), (2, 150)] -> [(1, 250), (2, 150)] * [(1, 100), (2, 150)] + [(1, -100)] -> [(2, 150)] */ /// Хранит номера столбца-ключа и столбцов-значений struct map_description { std::size_t key_col_num; std::vector val_col_nums; }; /// Найденные вложенные Map таблицы std::vector maps_to_sum; Row current_key; /// Текущий первичный ключ. Row next_key; /// Первичный ключ следующей строки. Row current_row; bool current_row_is_zero = false; /// Текущая строчка просуммировалась в ноль, и её следует удалить. bool output_is_non_empty = false; /// Отдали ли мы наружу хоть одну строку. /** Делаем поддержку двух разных курсоров - с Collation и без. * Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций. */ template void merge(ColumnPlainPtrs & merged_columns, std::priority_queue & queue); /// Вставить в результат просуммированную строку для текущей группы. void insertCurrentRow(ColumnPlainPtrs & merged_columns); /** Для вложенных Map выполняется слияние по ключу с выбрасыванием строк вложенных массивов, в которых * все элементы - нулевые. */ template bool mergeMaps(Row & row, TSortCursor & cursor); /** Прибавить строчку под курсором к row. * Возвращает false, если результат получился нулевым. */ template bool addRow(Row & row, TSortCursor & cursor); }; }