2011-09-04 01:42:14 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
2015-01-07 15:30:11 +00:00
|
|
|
|
#include <queue>
|
2015-01-07 17:19:23 +00:00
|
|
|
|
#include <Poco/TemporaryFile.h>
|
2015-01-07 15:30:11 +00:00
|
|
|
|
|
2012-09-05 19:51:09 +00:00
|
|
|
|
#include <Yandex/logger_useful.h>
|
|
|
|
|
|
2011-09-04 01:42:14 +00:00
|
|
|
|
#include <DB/Core/SortDescription.h>
|
|
|
|
|
|
2011-09-04 21:23:19 +00:00
|
|
|
|
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
2015-01-07 17:19:23 +00:00
|
|
|
|
#include <DB/DataStreams/NativeBlockInputStream.h>
|
|
|
|
|
|
|
|
|
|
#include <DB/IO/ReadBufferFromFile.h>
|
|
|
|
|
#include <DB/IO/CompressedReadBuffer.h>
|
2011-09-04 01:42:14 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
/** Соединяет поток сортированных по отдельности блоков в сортированный целиком поток.
|
2015-01-07 17:19:23 +00:00
|
|
|
|
* Если данных для сортировки слишком много - может использовать внешнюю сортировку, с помощью временных файлов.
|
2011-09-04 01:42:14 +00:00
|
|
|
|
*/
|
2015-01-07 15:30:11 +00:00
|
|
|
|
|
|
|
|
|
/** Часть реализации. Сливает набор готовых (уже прочитанных откуда-то) блоков.
|
|
|
|
|
* Возвращает результат слияния в виде потока блоков не более max_merged_block_size строк.
|
|
|
|
|
*/
|
|
|
|
|
class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream
|
|
|
|
|
{
|
|
|
|
|
public:
|
|
|
|
|
/// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке.
|
|
|
|
|
MergeSortingBlocksBlockInputStream(Blocks & blocks_, SortDescription & description_,
|
|
|
|
|
size_t max_merged_block_size_, size_t limit_ = 0);
|
|
|
|
|
|
|
|
|
|
String getName() const override { return "MergeSortingBlocksBlockInputStream"; }
|
|
|
|
|
String getID() const override { return getName(); }
|
|
|
|
|
|
|
|
|
|
protected:
|
|
|
|
|
Block readImpl() override;
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
Blocks & blocks;
|
|
|
|
|
SortDescription description;
|
|
|
|
|
size_t max_merged_block_size;
|
|
|
|
|
size_t limit;
|
|
|
|
|
size_t total_merged_rows = 0;
|
|
|
|
|
|
|
|
|
|
using CursorImpls = std::vector<SortCursorImpl>;
|
|
|
|
|
CursorImpls cursors;
|
|
|
|
|
|
|
|
|
|
bool has_collation = false;
|
|
|
|
|
|
|
|
|
|
std::priority_queue<SortCursor> queue;
|
|
|
|
|
std::priority_queue<SortCursorWithCollation> queue_with_collation;
|
|
|
|
|
|
|
|
|
|
/** Делаем поддержку двух разных курсоров - с Collation и без.
|
|
|
|
|
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.
|
|
|
|
|
*/
|
|
|
|
|
template <typename TSortCursor>
|
|
|
|
|
Block mergeImpl(std::priority_queue<TSortCursor> & queue);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
2011-09-04 21:23:19 +00:00
|
|
|
|
class MergeSortingBlockInputStream : public IProfilingBlockInputStream
|
2011-09-04 01:42:14 +00:00
|
|
|
|
{
|
|
|
|
|
public:
|
2013-09-16 05:44:47 +00:00
|
|
|
|
/// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке.
|
2015-01-07 17:19:23 +00:00
|
|
|
|
MergeSortingBlockInputStream(BlockInputStreamPtr input_, SortDescription & description_,
|
|
|
|
|
size_t max_merged_block_size_, size_t limit_,
|
|
|
|
|
size_t max_bytes_before_external_sort_, const std::string & tmp_path_, const DataTypeFactory & data_type_factory_)
|
|
|
|
|
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
|
|
|
|
|
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_), data_type_factory(data_type_factory_)
|
2011-09-04 21:23:19 +00:00
|
|
|
|
{
|
2013-05-04 04:05:15 +00:00
|
|
|
|
children.push_back(input_);
|
2011-09-04 21:23:19 +00:00
|
|
|
|
}
|
2011-09-04 01:42:14 +00:00
|
|
|
|
|
2014-11-08 23:52:18 +00:00
|
|
|
|
String getName() const override { return "MergeSortingBlockInputStream"; }
|
2011-09-04 01:42:14 +00:00
|
|
|
|
|
2014-11-08 23:52:18 +00:00
|
|
|
|
String getID() const override
|
2013-05-03 10:20:53 +00:00
|
|
|
|
{
|
|
|
|
|
std::stringstream res;
|
2013-05-04 05:20:07 +00:00
|
|
|
|
res << "MergeSorting(" << children.back()->getID();
|
2014-11-08 23:52:18 +00:00
|
|
|
|
|
2013-05-03 10:20:53 +00:00
|
|
|
|
for (size_t i = 0; i < description.size(); ++i)
|
|
|
|
|
res << ", " << description[i].getID();
|
2014-11-08 23:52:18 +00:00
|
|
|
|
|
2013-05-03 10:20:53 +00:00
|
|
|
|
res << ")";
|
|
|
|
|
return res.str();
|
|
|
|
|
}
|
|
|
|
|
|
2012-10-20 02:10:47 +00:00
|
|
|
|
protected:
|
2014-11-08 23:52:18 +00:00
|
|
|
|
Block readImpl() override;
|
2012-10-20 02:10:47 +00:00
|
|
|
|
|
2011-09-04 01:42:14 +00:00
|
|
|
|
private:
|
|
|
|
|
SortDescription description;
|
2015-01-07 17:19:23 +00:00
|
|
|
|
size_t max_merged_block_size;
|
2013-09-16 05:44:47 +00:00
|
|
|
|
size_t limit;
|
2011-09-04 01:42:14 +00:00
|
|
|
|
|
2015-01-07 17:19:23 +00:00
|
|
|
|
size_t max_bytes_before_external_sort;
|
|
|
|
|
const std::string tmp_path;
|
|
|
|
|
const DataTypeFactory & data_type_factory;
|
|
|
|
|
|
2015-01-07 15:30:11 +00:00
|
|
|
|
Logger * log = &Logger::get("MergeSortingBlockInputStream");
|
2014-11-08 23:52:18 +00:00
|
|
|
|
|
2015-01-07 15:30:11 +00:00
|
|
|
|
Blocks blocks;
|
2015-01-07 17:19:23 +00:00
|
|
|
|
size_t sum_bytes_in_blocks = 0;
|
|
|
|
|
std::unique_ptr<IBlockInputStream> impl;
|
|
|
|
|
|
|
|
|
|
/// Всё ниже - для внешней сортировки.
|
|
|
|
|
std::vector<std::unique_ptr<Poco::TemporaryFile>> temporary_files;
|
|
|
|
|
|
|
|
|
|
/// Для чтения сброшенных во временный файл данных.
|
|
|
|
|
struct TemporaryFileStream
|
|
|
|
|
{
|
|
|
|
|
ReadBufferFromFile file_in;
|
|
|
|
|
CompressedReadBuffer compressed_in;
|
|
|
|
|
BlockInputStreamPtr block_in;
|
|
|
|
|
|
|
|
|
|
TemporaryFileStream(const std::string & path, const DataTypeFactory & data_type_factory)
|
|
|
|
|
: file_in(path), compressed_in(file_in), block_in(new NativeBlockInputStream(compressed_in, data_type_factory)) {}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
|
|
|
|
|
|
|
|
|
|
BlockInputStreams inputs_to_merge;
|
2011-09-04 01:42:14 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
}
|