Fixed error with external sorting when constants are present [#METR-21787].

This commit is contained in:
Alexey Milovidov 2016-07-25 23:57:05 +03:00
parent c954c26038
commit e1411af2ed
5 changed files with 143 additions and 28 deletions

View File

@ -17,17 +17,17 @@
namespace DB
{
/** Соединяет поток сортированных по отдельности блоков в сортированный целиком поток.
* Если данных для сортировки слишком много - может использовать внешнюю сортировку, с помощью временных файлов.
/** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks.
* If data to sort is too much, could use external sorting, with temporary files.
*/
/** Часть реализации. Сливает набор готовых (уже прочитанных откуда-то) блоков.
* Возвращает результат слияния в виде потока блоков не более max_merged_block_size строк.
/** Part of implementation. Merging array of ready (already read from somewhere) blocks.
* Returns result of merge as stream of blocks, not more than 'max_merged_block_size' rows in each.
*/
class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream
{
public:
/// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке.
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingBlocksBlockInputStream(Blocks & blocks_, SortDescription & description_,
size_t max_merged_block_size_, size_t limit_ = 0);
@ -52,8 +52,8 @@ private:
std::priority_queue<SortCursor> queue;
std::priority_queue<SortCursorWithCollation> queue_with_collation;
/** Делаем поддержку двух разных курсоров - с Collation и без.
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.
/** Two different cursors are supported - with and without Collation.
* Templates are used (instead of virtual functions in SortCursor) for zero-overhead.
*/
template <typename TSortCursor>
Block mergeImpl(std::priority_queue<TSortCursor> & queue);
@ -63,7 +63,7 @@ private:
class MergeSortingBlockInputStream : public IProfilingBlockInputStream
{
public:
/// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке.
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingBlockInputStream(BlockInputStreamPtr input_, SortDescription & description_,
size_t max_merged_block_size_, size_t limit_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
@ -104,10 +104,15 @@ private:
size_t sum_bytes_in_blocks = 0;
std::unique_ptr<IBlockInputStream> impl;
/// Всё ниже - для внешней сортировки.
/// Before operation, will remove constant columns from blocks. And after, place constant columns back.
/// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files)
/// Save original block structure here.
Block sample_block;
/// Everything below is for external sorting.
std::vector<std::unique_ptr<Poco::TemporaryFile>> temporary_files;
/// Для чтения сброшенных во временный файл данных.
/// For reading data from temporary file.
struct TemporaryFileStream
{
ReadBufferFromFile file_in;

View File

@ -10,26 +10,81 @@ namespace DB
{
/** Remove constant columns from block.
*/
static void removeConstantsFromBlock(Block & block)
{
size_t columns = block.columns();
size_t i = 0;
while (i < columns)
{
if (block.unsafeGetByPosition(i).column->isConst())
{
block.erase(i);
--columns;
}
else
++i;
}
}
static void removeConstantsFromSortDescription(const Block & sample_block, SortDescription & description)
{
description.erase(std::remove_if(description.begin(), description.end(),
[&](const SortColumnDescription & elem)
{
if (!elem.column_name.empty())
return sample_block.getByName(elem.column_name).column->isConst();
else
return sample_block.getByPosition(elem.column_number).column->isConst();
}), description.end());
}
/** Add into block, whose constant columns was removed by previous function,
* constant columns from sample_block (which must have structure as before removal of constants from block).
*/
static void enrichBlockWithConstants(Block & block, const Block & sample_block)
{
size_t rows = block.rowsInFirstColumn();
size_t columns = sample_block.columns();
for (size_t i = 0; i < columns; ++i)
{
const auto & col_type_name = sample_block.unsafeGetByPosition(i);
if (col_type_name.column->isConst())
block.insert(i, {col_type_name.column->cloneResized(rows), col_type_name.type, col_type_name.name});
}
}
Block MergeSortingBlockInputStream::readImpl()
{
/** Алгоритм:
* - читать в оперативку блоки из источника;
* - когда их становится слишком много и если возможна внешняя сортировка
* - слить блоки вместе в сортированный поток и записать его во временный файл;
* - в конце, слить вместе все сортированные потоки из временных файлов, а также из накопившихся в оперативке блоков.
/** Algorithm:
* - read to memory blocks from source stream;
* - if too much of them and if external sorting is enabled,
* - merge all blocks to sorted stream and write it to temporary file;
* - at the end, merge all sorted streams from temporary files and also from rest of blocks in memory.
*/
/// Ещё не прочитали блоки.
/// If has not read source blocks.
if (!impl)
{
while (Block block = children.back()->read())
{
if (!sample_block)
{
sample_block = block.cloneEmpty();
removeConstantsFromSortDescription(sample_block, description);
}
removeConstantsFromBlock(block);
blocks.push_back(block);
sum_bytes_in_blocks += block.bytes();
/** Если блоков стало слишком много и возможна внешняя сортировка,
* то сольём вместе те блоки, которые успели накопиться, и сбросим сортированный поток во временный (сжатый) файл.
* NOTE. Возможно - проверка наличия свободного места на жёстком диске.
/** If too much of them and if external sorting is enabled,
* will merge blocks that we have in memory at this moment and write merged stream to temporary (compressed) file.
* NOTE. It's possible to check free space in filesystem.
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
@ -42,7 +97,7 @@ Block MergeSortingBlockInputStream::readImpl()
LOG_INFO(log, "Sorting and writing part of data into temporary file " + path);
ProfileEvents::increment(ProfileEvents::ExternalSortWritePart);
copyData(block_in, block_out, &is_cancelled); /// NOTE. Возможно, ограничение на потребление места на дисках.
copyData(block_in, block_out, &is_cancelled); /// NOTE. Possibly limit disk usage.
LOG_INFO(log, "Done writing part of data into temporary file " + path);
blocks.clear();
@ -59,28 +114,31 @@ Block MergeSortingBlockInputStream::readImpl()
}
else
{
/// Если были сброшены временные данные в файлы.
/// If there was temporary files.
ProfileEvents::increment(ProfileEvents::ExternalSortMerge);
LOG_INFO(log, "There are " << temporary_files.size() << " temporary sorted parts to merge.");
/// Сформируем сортированные потоки для слияния.
/// Create sorted streams to merge.
for (const auto & file : temporary_files)
{
temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path()));
inputs_to_merge.emplace_back(temporary_inputs.back()->block_in);
}
/// Оставшиеся в оперативке блоки.
/// Rest of blocks in memory.
if (!blocks.empty())
inputs_to_merge.emplace_back(std::make_shared<MergeSortingBlocksBlockInputStream>(blocks, description, max_merged_block_size, limit));
/// Будем сливать эти потоки.
/// Will merge that sorted streams.
impl = std::make_unique<MergingSortedBlockInputStream>(inputs_to_merge, description, max_merged_block_size, limit);
}
}
return impl->read();
Block res = impl->read();
if (res)
enrichBlockWithConstants(res, sample_block);
return res;
}
@ -142,7 +200,7 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCur
for (size_t i = 0; i < num_columns; ++i) /// TODO: reserve
merged_columns.push_back(merged.getByPosition(i).column.get());
/// Вынимаем строки в нужном порядке и кладём в merged.
/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
while (!queue.empty())
{

View File

@ -81,13 +81,21 @@ void MergingSortedBlockInputStream::init(Block & merged_block, ColumnPlainPtrs &
size_t dst_columns = merged_block.columns();
if (src_columns != dst_columns)
throw Exception("Merging blocks has different number of columns", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
throw Exception("Merging blocks has different number of columns ("
+ toString(src_columns) + " and " + toString(dst_columns) + ")",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
for (size_t i = 0; i < src_columns; ++i)
{
if (shared_block_ptr->getByPosition(i).name != merged_block.getByPosition(i).name
|| shared_block_ptr->getByPosition(i).type->getName() != merged_block.getByPosition(i).type->getName()
|| shared_block_ptr->getByPosition(i).column->getName() != merged_block.getByPosition(i).column->getName())
throw Exception("Merging blocks has different names or types of columns", ErrorCodes::BLOCKS_HAS_DIFFERENT_STRUCTURE);
{
throw Exception("Merging blocks has different names or types of columns:\n"
+ shared_block_ptr->dumpStructure() + "\nand\n" + merged_block.dumpStructure(),
ErrorCodes::BLOCKS_HAS_DIFFERENT_STRUCTURE);
}
}
}
for (size_t i = 0; i < num_columns; ++i)

View File

@ -0,0 +1,40 @@
999990 Hello
999991 Hello
999992 Hello
999993 Hello
999994 Hello
999995 Hello
999996 Hello
999997 Hello
999998 Hello
999999 Hello
999990 Hello
999991 Hello
999992 Hello
999993 Hello
999994 Hello
999995 Hello
999996 Hello
999997 Hello
999998 Hello
999999 Hello
999990 Hello
999991 Hello
999992 Hello
999993 Hello
999994 Hello
999995 Hello
999996 Hello
999997 Hello
999998 Hello
999999 Hello
999990 Hello
999991 Hello
999992 Hello
999993 Hello
999994 Hello
999995 Hello
999996 Hello
999997 Hello
999998 Hello
999999 Hello

View File

@ -0,0 +1,4 @@
SELECT number, 'Hello' AS k FROM (SELECT number FROM system.numbers LIMIT 1000000) ORDER BY number LIMIT 999990, 100 SETTINGS max_bytes_before_external_sort = 1000000;
SELECT number, 'Hello' AS k FROM (SELECT number FROM system.numbers LIMIT 1000000) ORDER BY number, k LIMIT 999990, 100 SETTINGS max_bytes_before_external_sort = 1000000;
SELECT number, 'Hello' AS k FROM (SELECT number FROM system.numbers LIMIT 1000000) ORDER BY k, number, k LIMIT 999990, 100 SETTINGS max_bytes_before_external_sort = 1000000;
SELECT number, 'Hello' AS k FROM (SELECT number FROM system.numbers LIMIT 1000000) ORDER BY number, k, number LIMIT 999990, 100 SETTINGS max_bytes_before_external_sort = 1000000;