mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
dbms: added support for external sorting [#METR-2944].
This commit is contained in:
parent
8620b80d99
commit
41a7fc50d6
@ -1,18 +1,24 @@
|
||||
#pragma once
|
||||
|
||||
#include <queue>
|
||||
#include <Poco/TemporaryFile.h>
|
||||
|
||||
#include <Yandex/logger_useful.h>
|
||||
|
||||
#include <DB/Core/SortDescription.h>
|
||||
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
#include <DB/DataStreams/NativeBlockInputStream.h>
|
||||
|
||||
#include <DB/IO/ReadBufferFromFile.h>
|
||||
#include <DB/IO/CompressedReadBuffer.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Соединяет поток сортированных по отдельности блоков в сортированный целиком поток.
|
||||
* Если данных для сортировки слишком много - может использовать внешнюю сортировку, с помощью временных файлов.
|
||||
*/
|
||||
|
||||
/** Часть реализации. Сливает набор готовых (уже прочитанных откуда-то) блоков.
|
||||
@ -58,8 +64,11 @@ class MergeSortingBlockInputStream : public IProfilingBlockInputStream
|
||||
{
|
||||
public:
|
||||
/// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке.
|
||||
MergeSortingBlockInputStream(BlockInputStreamPtr input_, SortDescription & description_, size_t limit_ = 0)
|
||||
: description(description_), limit(limit_)
|
||||
MergeSortingBlockInputStream(BlockInputStreamPtr input_, SortDescription & description_,
|
||||
size_t max_merged_block_size_, size_t limit_,
|
||||
size_t max_bytes_before_external_sort_, const std::string & tmp_path_, const DataTypeFactory & data_type_factory_)
|
||||
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
|
||||
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_), data_type_factory(data_type_factory_)
|
||||
{
|
||||
children.push_back(input_);
|
||||
}
|
||||
@ -83,12 +92,36 @@ protected:
|
||||
|
||||
private:
|
||||
SortDescription description;
|
||||
size_t max_merged_block_size;
|
||||
size_t limit;
|
||||
|
||||
size_t max_bytes_before_external_sort;
|
||||
const std::string tmp_path;
|
||||
const DataTypeFactory & data_type_factory;
|
||||
|
||||
Logger * log = &Logger::get("MergeSortingBlockInputStream");
|
||||
|
||||
Blocks blocks;
|
||||
std::unique_ptr<MergeSortingBlocksBlockInputStream> impl;
|
||||
size_t sum_bytes_in_blocks = 0;
|
||||
std::unique_ptr<IBlockInputStream> impl;
|
||||
|
||||
/// Всё ниже - для внешней сортировки.
|
||||
std::vector<std::unique_ptr<Poco::TemporaryFile>> temporary_files;
|
||||
|
||||
/// Для чтения сброшенных во временный файл данных.
|
||||
struct TemporaryFileStream
|
||||
{
|
||||
ReadBufferFromFile file_in;
|
||||
CompressedReadBuffer compressed_in;
|
||||
BlockInputStreamPtr block_in;
|
||||
|
||||
TemporaryFileStream(const std::string & path, const DataTypeFactory & data_type_factory)
|
||||
: file_in(path), compressed_in(file_in), block_in(new NativeBlockInputStream(compressed_in, data_type_factory)) {}
|
||||
};
|
||||
|
||||
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
|
||||
|
||||
BlockInputStreams inputs_to_merge;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -80,6 +80,7 @@ struct ContextShared
|
||||
int interserver_io_port; /// и порт,
|
||||
|
||||
String path; /// Путь к директории с данными, со слешем на конце.
|
||||
String tmp_path; /// Путь ко временным файлам, возникающим при обработке запроса.
|
||||
Databases databases; /// Список БД и таблиц в них.
|
||||
TableFunctionFactory table_function_factory; /// Табличные функции.
|
||||
AggregateFunctionFactory aggregate_function_factory; /// Агрегатные функции.
|
||||
@ -187,7 +188,9 @@ private:
|
||||
|
||||
public:
|
||||
String getPath() const;
|
||||
String getTemporaryPath() const;
|
||||
void setPath(const String & path);
|
||||
void setTemporaryPath(const String & path);
|
||||
|
||||
/** Забрать список пользователей, квот и профилей настроек из этого конфига.
|
||||
* Список пользователей полностью заменяется.
|
||||
|
@ -36,6 +36,7 @@ struct Limits
|
||||
M(SettingUInt64, max_rows_to_sort, 0) \
|
||||
M(SettingUInt64, max_bytes_to_sort, 0) \
|
||||
M(SettingOverflowMode<false>, sort_overflow_mode, OverflowMode::THROW) \
|
||||
M(SettingUInt64, max_bytes_before_external_sort, 0) \
|
||||
\
|
||||
/** Ограничение на размер результата. \
|
||||
* Проверяются также для подзапросов и на удалённых серверах. \
|
||||
@ -44,7 +45,7 @@ struct Limits
|
||||
M(SettingUInt64, max_result_bytes, 0) \
|
||||
M(SettingOverflowMode<false>, result_overflow_mode, OverflowMode::THROW) \
|
||||
\
|
||||
/* TODO: Проверять также при merge стадии сортировки, при слиянии и финализации агрегатных функций. */ \
|
||||
/* TODO: Проверять также при слиянии и финализации агрегатных функций. */ \
|
||||
M(SettingSeconds, max_execution_time, 0) \
|
||||
M(SettingOverflowMode<false>, timeout_overflow_mode, OverflowMode::THROW) \
|
||||
\
|
||||
@ -145,7 +146,7 @@ struct Limits
|
||||
|
||||
private:
|
||||
friend struct Settings;
|
||||
|
||||
|
||||
/// Записать все настройки в буфер. (В отличие от соответствующего метода в Settings, пустая строка на конце не пишется).
|
||||
void serialize(WriteBuffer & buf) const
|
||||
{
|
||||
|
@ -1,4 +1,9 @@
|
||||
#include <DB/DataStreams/MergeSortingBlockInputStream.h>
|
||||
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
|
||||
#include <DB/DataStreams/NativeBlockOutputStream.h>
|
||||
#include <DB/DataStreams/copyData.h>
|
||||
#include <DB/IO/WriteBufferFromFile.h>
|
||||
#include <DB/IO/CompressedWriteBuffer.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -7,21 +12,70 @@ namespace DB
|
||||
|
||||
Block MergeSortingBlockInputStream::readImpl()
|
||||
{
|
||||
/** Достаточно простой алгоритм:
|
||||
* - прочитать в оперативку все блоки;
|
||||
* - объединить их всех;
|
||||
/** Алгоритм:
|
||||
* - читать в оперативку блоки из источника;
|
||||
* - когда их становится слишком много и если возможна внешняя сортировка
|
||||
* - слить блоки вместе в сортированный поток и записать его во временный файл;
|
||||
* - в конце, слить вместе все сортированные потоки из временных файлов, а также из накопившихся в оперативке блоков.
|
||||
*/
|
||||
|
||||
/// Ещё не прочитали блоки.
|
||||
if (!impl)
|
||||
{
|
||||
while (Block block = children.back()->read())
|
||||
{
|
||||
blocks.push_back(block);
|
||||
sum_bytes_in_blocks += block.bytes();
|
||||
|
||||
if (blocks.empty() || isCancelled())
|
||||
/** Если блоков стало слишком много и возможна внешняя сортировка,
|
||||
* то сольём вместе те блоки, которые успели накопиться, и сбросим сортированный поток во временный (сжатый) файл.
|
||||
* NOTE. Возможно - проверка наличия свободного места на жёстком диске.
|
||||
*/
|
||||
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
|
||||
{
|
||||
temporary_files.emplace_back(new Poco::TemporaryFile(tmp_path));
|
||||
const std::string & path = temporary_files.back()->path();
|
||||
WriteBufferFromFile file_buf(path);
|
||||
CompressedWriteBuffer compressed_buf(file_buf);
|
||||
NativeBlockOutputStream block_out(compressed_buf);
|
||||
MergeSortingBlocksBlockInputStream block_in(blocks, description, max_merged_block_size, limit);
|
||||
|
||||
LOG_INFO(log, "Sorting and writing part of data into temporary file " + path);
|
||||
copyData(block_in, block_out); /// TODO Проверка isCancelled.
|
||||
LOG_INFO(log, "Done writing part of data into temporary file " + path);
|
||||
|
||||
blocks.clear();
|
||||
sum_bytes_in_blocks = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if ((blocks.empty() && temporary_files.empty()) || isCancelled())
|
||||
return Block();
|
||||
|
||||
impl.reset(new MergeSortingBlocksBlockInputStream(blocks, description, DEFAULT_BLOCK_SIZE, limit));
|
||||
if (temporary_files.empty())
|
||||
{
|
||||
impl.reset(new MergeSortingBlocksBlockInputStream(blocks, description, max_merged_block_size, limit));
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Если были сброшены временные данные в файлы.
|
||||
|
||||
LOG_INFO(log, "There are " << temporary_files.size() << " temporary sorted parts to merge.");
|
||||
|
||||
/// Сформируем сортированные потоки для слияния.
|
||||
for (const auto & file : temporary_files)
|
||||
{
|
||||
temporary_inputs.emplace_back(new TemporaryFileStream(file->path(), data_type_factory));
|
||||
inputs_to_merge.emplace_back(temporary_inputs.back()->block_in);
|
||||
}
|
||||
|
||||
/// Оставшиеся в оперативке блоки.
|
||||
if (!blocks.empty())
|
||||
inputs_to_merge.emplace_back(new MergeSortingBlocksBlockInputStream(blocks, description, max_merged_block_size, limit));
|
||||
|
||||
/// Будем сливать эти потоки.
|
||||
impl.reset(new MergingSortedBlockInputStream(inputs_to_merge, description, max_merged_block_size, limit));
|
||||
}
|
||||
}
|
||||
|
||||
return impl->read();
|
||||
|
@ -161,7 +161,7 @@ int main(int argc, char ** argv)
|
||||
|
||||
Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Context{}, Settings(), stage, argc == 2 ? atoi(argv[1]) : 1048576)[0];
|
||||
in = new PartialSortingBlockInputStream(in, sort_columns);
|
||||
in = new MergeSortingBlockInputStream(in, sort_columns);
|
||||
in = new MergeSortingBlockInputStream(in, sort_columns, DEFAULT_BLOCK_SIZE);
|
||||
//in = new LimitBlockInputStream(in, 10);
|
||||
|
||||
WriteBufferFromOStream ob(std::cout);
|
||||
|
@ -20,6 +20,12 @@ String Context::getPath() const
|
||||
return shared->path;
|
||||
}
|
||||
|
||||
String Context::getTemporaryPath() const
|
||||
{
|
||||
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
|
||||
return shared->tmp_path;
|
||||
}
|
||||
|
||||
|
||||
void Context::setPath(const String & path)
|
||||
{
|
||||
@ -27,6 +33,12 @@ void Context::setPath(const String & path)
|
||||
shared->path = path;
|
||||
}
|
||||
|
||||
void Context::setTemporaryPath(const String & path)
|
||||
{
|
||||
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
|
||||
shared->tmp_path = path;
|
||||
}
|
||||
|
||||
|
||||
void Context::setUsersConfig(ConfigurationPtr config)
|
||||
{
|
||||
|
@ -893,8 +893,11 @@ void InterpreterSelectQuery::executeOrder(BlockInputStreams & streams)
|
||||
streams.resize(1);
|
||||
}
|
||||
|
||||
/// Сливаем сортированные блоки TODO: таймаут на слияние.
|
||||
stream = maybeAsynchronous(new MergeSortingBlockInputStream(stream, order_descr, limit), is_async);
|
||||
/// Сливаем сортированные блоки.
|
||||
stream = maybeAsynchronous(new MergeSortingBlockInputStream(
|
||||
stream, order_descr, settings.max_block_size, limit,
|
||||
settings.limits.max_bytes_before_external_sort, context.getTemporaryPath(), context.getDataTypeFactory()),
|
||||
is_async);
|
||||
}
|
||||
|
||||
|
||||
|
@ -457,6 +457,12 @@ int Server::main(const std::vector<std::string> & args)
|
||||
global_context->setGlobalContext(*global_context);
|
||||
global_context->setPath(path);
|
||||
|
||||
/// Директория для временных файлов при обработке тяжёлых запросов.
|
||||
std::string tmp_path = config().getString("tmp_path", path + "tmp/");
|
||||
global_context->setTemporaryPath(tmp_path);
|
||||
Poco::File(tmp_path).createDirectories();
|
||||
/// TODO Очистка временных файлов. Проверка, что директория с временными файлами не совпадает и не содержит в себе основной path.
|
||||
|
||||
bool has_zookeeper = false;
|
||||
if (config().has("zookeeper"))
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user