This commit is contained in:
Evgeniy Gatov 2015-12-14 22:00:54 +03:00
commit 9e041bb485
30 changed files with 1097 additions and 136 deletions

View File

@ -77,6 +77,9 @@
M(ExternalAggregationCompressedBytes) \
M(ExternalAggregationUncompressedBytes) \
\
M(SlowRead) \
M(ReadBackoff) \
\
M(END)
namespace ProfileEvents

View File

@ -0,0 +1,55 @@
#pragma once
#include <memory>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/WriteBufferFromFile.h>
namespace DB
{
/** Позволяет запустить команду,
* читать её stdout, stderr, писать в stdin,
* дождаться завершения.
*
* Реализация похожа на функцию popen из POSIX (посмотреть можно в исходниках libc).
*
* Наиболее важное отличие: использует vfork вместо fork.
* Это сделано, потому что fork не работает (с ошибкой о нехватке памяти),
* при некоторых настройках overcommit-а, если размер адресного пространства процесса больше половины количества доступной памяти.
* Также, изменение memory map-ов - довольно ресурсоёмкая операция.
*
* Второе отличие - позволяет работать одновременно и с stdin, и с stdout, и с stderr запущенного процесса,
* а также узнать код и статус завершения.
*/
class ShellCommand
{
private:
pid_t pid;
ShellCommand(pid_t pid, int in_fd, int out_fd, int err_fd)
: pid(pid), in(in_fd), out(out_fd), err(err_fd) {};
static std::unique_ptr<ShellCommand> executeImpl(const char * filename, char * const argv[]);
public:
WriteBufferFromFile in; /// Если команда читает из stdin, то не забудьте вызвать in.close() после записи туда всех данных.
ReadBufferFromFile out;
ReadBufferFromFile err;
/// Выполнить команду с использованием /bin/sh -c
static std::unique_ptr<ShellCommand> execute(const std::string & command);
/// Выполнить исполняемый файл с указаннами аргументами. arguments - без argv[0].
static std::unique_ptr<ShellCommand> executeDirect(const std::string & path, const std::vector<std::string> & arguments);
/// Подождать завершения процесса, кинуть исключение, если код не 0 или если процесс был завершён не самостоятельно.
void wait();
/// Подождать завершения процесса, узнать код возврата. Кинуть исключение, если процесс был завершён не самостоятельно.
int tryWait();
};
}

View File

@ -299,6 +299,14 @@ namespace ErrorCodes
RECEIVED_EMPTY_DATA = 295,
NO_REMOTE_SHARD_FOUND = 296,
SHARD_HAS_NO_CONNECTIONS = 297,
CANNOT_PIPE = 298,
CANNOT_FORK = 299,
CANNOT_DLSYM = 300,
CANNOT_CREATE_CHILD_PROCESS = 301,
CHILD_WAS_NOT_EXITED_NORMALLY = 302,
CANNOT_SELECT = 303,
CANNOT_WAITPID = 304,
TABLE_WAS_NOT_DROPPED = 305,
KEEPER_EXCEPTION = 999,
POCO_EXCEPTION = 1000,

View File

@ -4,6 +4,7 @@
#include <DB/Interpreters/Aggregator.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Common/ConcurrentBoundedQueue.h>
#include <condition_variable>
namespace DB
@ -11,9 +12,47 @@ namespace DB
/** Доагрегирует потоки блоков, держа в оперативной памяти только по одному или несколько (до merging_threads) блоков из каждого источника.
* Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом потоке будет до 256 блоков с частями результата.
* Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом источнике будет до 256 блоков с частями результата.
*
* Агрегатные функции в блоках не должны быть финализированы, чтобы их состояния можно было объединить.
*
* Используется для решения двух задач:
*
* 1. Внешняя агрегация со сбросом данных на диск.
* Частично агрегированные данные (предварительно разбитые на 256 корзин) сброшены в какое-то количество файлов на диске.
* Нужно читать их и мерджить по корзинам - держа в оперативке одновременно только несколько корзин из каждого файла.
*
* 2. Слияние результатов агрегации при распределённой обработке запроса.
* С разных серверов приезжают частично агрегированные данные, которые могут быть разбиты, а могут быть не разбиты на 256 корзин,
* и эти корзины отдаются нам по сети с каждого сервера последовательно, друг за другом.
* Надо так же читать и мерджить по корзинам.
*
* Суть работы:
*
* Есть какое-то количество источников. Они отдают блоки с частично агрегированными данными.
* Каждый источник может отдать одну из следующих последовательностей блоков:
* 1. "неразрезанный" блок с bucket_num = -1;
* 2. "разрезанные" (two_level) блоки с bucket_num от 0 до 255;
* В обоих случаях, может ещё присутствовать блок "переполнений" (overflows) с bucket_num = -1 и is_overflows = true;
*
* Исходим из соглашения, что разрезанные блоки всегда передаются в порядке bucket_num.
* То есть, если a < b, то блок с bucket_num = a идёт раньше bucket_num = b.
* Это нужно для экономного по памяти слияния
* - чтобы не надо было читать блоки наперёд, а идти по всем последовательностям по возрастанию bucket_num.
*
* При этом, не все bucket_num из диапазона 0..255 могут присутствовать.
* Блок переполнений может присутствовать в любом порядке относительно других блоков (но он может быть только один).
*
* Необходимо объединить эти последовательности блоков и отдать результат в виде последовательности с такими же свойствами.
* То есть, на выходе, если в последовательности есть "разрезанные" блоки, то они должны идти в порядке bucket_num.
*
* Мердж можно осуществлять с использованием нескольких (merging_threads) потоков.
* Для этого, получение набора блоков для следующего bucket_num надо делать последовательно,
* а затем, когда мы имеем несколько полученных наборов, их объединение можно делать параллельно.
*
* При получении следующих блоков из разных источников,
* данные из источников можно также читать в несколько потоков (reading_threads)
* для оптимальной работы при наличии быстрой сети или дисков (откуда эти блоки читаются).
*/
class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream
{
@ -22,7 +61,7 @@ public:
BlockInputStreams inputs_, const Aggregator::Params & params, bool final_,
size_t reading_threads_, size_t merging_threads_);
~MergingAggregatedMemoryEfficientBlockInputStream();
~MergingAggregatedMemoryEfficientBlockInputStream() override;
String getName() const override { return "MergingAggregatedMemoryEfficient"; }
@ -31,20 +70,34 @@ public:
/// Отправляет запрос (инициирует вычисления) раньше, чем read.
void readPrefix() override;
/// Вызывается либо после того, как всё прочитано, либо после cancel-а.
void readSuffix() override;
/** Отличается от реализации по-умолчанию тем, что пытается остановить все источники,
* пропуская отвалившиеся по эксепшену.
*/
void cancel() override;
protected:
Block readImpl() override;
private:
static constexpr size_t NUM_BUCKETS = 256;
Aggregator aggregator;
bool final;
size_t reading_threads;
size_t merging_threads;
bool started = false;
bool all_read = false;
volatile bool has_two_level = false;
volatile bool has_overflows = false;
int current_bucket_num = -1;
Logger * log = &Logger::get("MergingAggregatedMemoryEfficientBlockInputStream");
struct Input
{
BlockInputStreamPtr stream;
@ -68,31 +121,34 @@ private:
std::unique_ptr<boost::threadpool::pool> reading_pool;
/// Для параллельного мерджа.
struct OutputData
{
Block block;
std::exception_ptr exception;
OutputData() {}
OutputData(Block && block_) : block(std::move(block_)) {}
OutputData(std::exception_ptr && exception_) : exception(std::move(exception_)) {}
};
struct ParallelMergeData
{
boost::threadpool::pool pool;
/// Сейчас один из мерджащих потоков получает следующие блоки для мерджа. Эта операция должна делаться последовательно.
std::mutex get_next_blocks_mutex;
ConcurrentBoundedQueue<OutputData> result_queue;
bool exhausted = false; /// Данных больше нет.
bool finish = false; /// Нужно завершить работу раньше, чем данные закончились.
std::atomic<size_t> active_threads;
ParallelMergeData(size_t max_threads) : pool(max_threads), result_queue(max_threads), active_threads(max_threads) {}
std::exception_ptr exception;
/// Следует отдавать блоки стого в порядке ключа (bucket_num).
/// Если значение - пустой блок - то нужно дождаться его мерджа.
/// (Такое значение означает обещание, что здесь будут данные. Это важно, потому что данные нужно отдавать в порядке ключа - bucket_num)
std::map<int, Block> merged_blocks;
std::mutex merged_blocks_mutex;
/// Событие, с помощью которого мерджащие потоки говорят главному потоку, что новый блок готов.
std::condition_variable merged_blocks_changed;
/// Событие, с помощью которого главный поток говорят мерджащим потокам, что можно обработать следующую группу блоков.
std::condition_variable have_space;
ParallelMergeData(size_t max_threads) : pool(max_threads) {}
};
std::unique_ptr<ParallelMergeData> parallel_merge_data;
void mergeThread(MemoryTracker * memory_tracker);
void finalize();
};
}

View File

@ -18,6 +18,12 @@
#include <DB/Interpreters/AggregationCommon.h>
#include <DB/Functions/NumberTraits.h>
#include <DB/Functions/FunctionsConditional.h>
#include <DB/AggregateFunctions/IAggregateFunction.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
#include <DB/Parsers/ExpressionListParsers.h>
#include <DB/Parsers/parseQuery.h>
#include <DB/Parsers/ASTExpressionList.h>
#include <DB/Parsers/ASTLiteral.h>
#include <ext/range.hpp>
@ -47,6 +53,8 @@ namespace DB
* - для кортежей из элементов на соответствующих позициях в нескольких массивах.
*
* emptyArrayToSingle(arr) - заменить пустые массивы на массивы из одного элемента со значением "по-умолчанию".
*
* arrayReduce('agg', arr1, ...) - применить агрегатную функцию agg к массивам arr1...
*/
@ -2320,6 +2328,183 @@ private:
};
/** Применяет к массиву агрегатную функцию и возвращает её результат.
* Также может быть применена к нескольким массивам одинаковых размеров, если агрегатная функция принимает несколько аргументов.
*/
class FunctionArrayReduce : public IFunction
{
public:
static constexpr auto name = "arrayReduce";
static IFunction * create(const Context & context) { return new FunctionArrayReduce; }
/// Получить имя функции.
String getName() const override
{
return name;
}
void getReturnTypeAndPrerequisites(
const ColumnsWithTypeAndName & arguments,
DataTypePtr & out_return_type,
std::vector<ExpressionAction> & out_prerequisites) override
{
/// Первый аргумент - константная строка с именем агрегатной функции (возможно, с параметрами в скобках, например: "quantile(0.99)").
if (arguments.size() < 2)
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be at least 2.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const ColumnConstString * aggregate_function_name_column = typeid_cast<const ColumnConstString *>(arguments[0].column.get());
if (!aggregate_function_name_column)
throw Exception("First argument for function " + getName() + " must be constant string: name of aggregate function.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
DataTypes argument_types(arguments.size() - 1);
for (size_t i = 1, size = arguments.size(); i < size; ++i)
{
const DataTypeArray * arg = typeid_cast<const DataTypeArray *>(arguments[i].type.get());
if (!arg)
throw Exception("Argument " + toString(i) + " for function " + getName() + " must be array.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
argument_types[i - 1] = arg->getNestedType()->clone();
}
if (!aggregate_function)
{
const String & aggregate_function_name_with_params = aggregate_function_name_column->getData();
if (aggregate_function_name_with_params.empty())
throw Exception("First argument for function " + getName() + " (name of aggregate function) cannot be empty.",
ErrorCodes::BAD_ARGUMENTS);
bool has_parameters = ')' == aggregate_function_name_with_params.back();
String aggregate_function_name = aggregate_function_name_with_params;
String parameters;
Array params_row;
if (has_parameters)
{
size_t pos = aggregate_function_name_with_params.find('(');
if (pos == std::string::npos || pos + 2 >= aggregate_function_name_with_params.size())
throw Exception("First argument for function " + getName() + " doesn't look like aggregate function name.",
ErrorCodes::BAD_ARGUMENTS);
aggregate_function_name = aggregate_function_name_with_params.substr(0, pos);
parameters = aggregate_function_name_with_params.substr(pos + 1, aggregate_function_name_with_params.size() - pos - 2);
if (aggregate_function_name.empty())
throw Exception("First argument for function " + getName() + " doesn't look like aggregate function name.",
ErrorCodes::BAD_ARGUMENTS);
ParserExpressionList params_parser(false);
ASTPtr args_ast = parseQuery(params_parser,
parameters.data(), parameters.data() + parameters.size(),
"parameters of aggregate function");
ASTExpressionList & args_list = typeid_cast<ASTExpressionList &>(*args_ast);
if (args_list.children.empty())
throw Exception("Incorrect list of parameters to aggregate function "
+ aggregate_function_name, ErrorCodes::BAD_ARGUMENTS);
params_row.reserve(args_list.children.size());
for (const auto & child : args_list.children)
{
const ASTLiteral * lit = typeid_cast<const ASTLiteral *>(child.get());
if (!lit)
throw Exception("Parameters to aggregate functions must be literals",
ErrorCodes::PARAMETERS_TO_AGGREGATE_FUNCTIONS_MUST_BE_LITERALS);
params_row.push_back(lit->value);
}
}
aggregate_function = AggregateFunctionFactory().get(aggregate_function_name, argument_types);
/// Потому что владение состояниями агрегатных функций никуда не отдаётся.
if (aggregate_function->isState())
throw Exception("Using aggregate function with -State modifier in function arrayReduce is not supported", ErrorCodes::BAD_ARGUMENTS);
if (has_parameters)
aggregate_function->setParameters(params_row);
aggregate_function->setArguments(argument_types);
}
out_return_type = aggregate_function->getReturnType();
}
void execute(Block & block, const ColumnNumbers & arguments, size_t result) override
{
IAggregateFunction & agg_func = *aggregate_function.get();
std::unique_ptr<char[]> place_holder { new char[agg_func.sizeOfData()] };
AggregateDataPtr place = place_holder.get();
size_t rows = block.rowsInFirstColumn();
/// Агрегатные функции не поддерживают константные столбцы. Поэтому, материализуем их.
std::vector<ColumnPtr> materialized_columns;
std::vector<const IColumn *> aggregate_arguments_vec(arguments.size() - 1);
for (size_t i = 0, size = arguments.size() - 1; i < size; ++i)
{
const IColumn * col = block.unsafeGetByPosition(arguments[i + 1]).column.get();
if (const ColumnArray * arr = typeid_cast<const ColumnArray *>(col))
{
aggregate_arguments_vec[i] = arr->getDataPtr().get();
}
else if (const ColumnConstArray * arr = typeid_cast<const ColumnConstArray *>(col))
{
materialized_columns.emplace_back(arr->convertToFullColumn());
aggregate_arguments_vec[i] = typeid_cast<const ColumnArray &>(*materialized_columns.back().get()).getDataPtr().get();
}
else
throw Exception("Illegal column " + col->getName() + " as argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
}
const IColumn ** aggregate_arguments = aggregate_arguments_vec.data();
const ColumnArray::Offsets_t & offsets = typeid_cast<const ColumnArray &>(!materialized_columns.empty()
? *materialized_columns.front().get()
: *block.unsafeGetByPosition(arguments[1]).column.get()).getOffsets();
ColumnPtr result_holder = block.getByPosition(result).type->createColumn();
block.getByPosition(result).column = result_holder;
IColumn & res_col = *result_holder.get();
ColumnArray::Offset_t current_offset = 0;
for (size_t i = 0; i < rows; ++i)
{
agg_func.create(place);
ColumnArray::Offset_t next_offset = offsets[i];
try
{
for (size_t j = current_offset; j < next_offset; ++j)
agg_func.add(place, aggregate_arguments, j);
agg_func.insertResultInto(place, res_col);
}
catch (...)
{
agg_func.destroy(place);
throw;
}
agg_func.destroy(place);
current_offset = next_offset;
}
}
private:
AggregateFunctionPtr aggregate_function;
};
struct NameHas { static constexpr auto name = "has"; };
struct NameIndexOf { static constexpr auto name = "indexOf"; };
struct NameCountEqual { static constexpr auto name = "countEqual"; };

View File

@ -587,17 +587,27 @@ public:
+ toString(arguments.size()) + ", should be 1 or 2.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (typeid_cast<const DataTypeDateTime *>(&*arguments[0]) == nullptr)
if (typeid_cast<const DataTypeDate *>(&*arguments[0]) != nullptr)
{
if (arguments.size() != 1)
if (arguments.size() > 1)
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 1.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
else if ((arguments.size()) == 2 && typeid_cast<const DataTypeString *>(&*arguments[1]) == nullptr)
else if (typeid_cast<const DataTypeDateTime *>(&*arguments[0]) != nullptr)
{
/// Ничего не делаем.
}
else
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument 1 of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
if ((arguments.size() == 2) && (typeid_cast<const DataTypeString *>(&*arguments[1]) == nullptr))
{
throw Exception{
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[1]->getName() + " of argument 2 of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <memory>
#include <DB/IO/createReadBufferFromFileBase.h>
#include <DB/IO/CompressedReadBufferBase.h>
#include <DB/IO/UncompressedCache.h>
@ -23,8 +24,7 @@ private:
size_t estimated_size;
size_t aio_threshold;
/// SharedPtr - для ленивой инициализации (только в случае кэш-промаха).
Poco::SharedPtr<ReadBufferFromFileBase> file_in;
std::unique_ptr<ReadBufferFromFileBase> file_in;
size_t file_pos;
/// Кусок данных из кэша, или кусок считанных данных, который мы положим в кэш.
@ -34,8 +34,11 @@ private:
{
if (!file_in)
{
file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size);
file_in.reset(createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size));
compressed_in = &*file_in;
if (profile_callback)
file_in->setProfileCallback(profile_callback, clock_type);
}
}
@ -81,6 +84,11 @@ private:
return true;
}
/// Передаётся в file_in.
ReadBufferFromFileBase::ProfileCallback profile_callback;
clockid_t clock_type;
public:
CachedCompressedReadBuffer(
const std::string & path_, UncompressedCache * cache_, size_t estimated_size_, size_t aio_threshold_,
@ -115,6 +123,13 @@ public:
bytes -= offset();
}
}
void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE)
{
profile_callback = profile_callback_;
clock_type = clock_type_;
}
};
}

View File

@ -120,6 +120,11 @@ public:
return bytes_read;
}
void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE)
{
file_in.setProfileCallback(profile_callback_, clock_type_);
}
};
}

View File

@ -28,9 +28,28 @@ public:
throwFromErrno("Cannot open file " + file_name, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE);
}
/// Использовать уже открытый файл.
ReadBufferFromFile(int fd, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), file_name("(fd = " + toString(fd) + ")")
{
}
virtual ~ReadBufferFromFile()
{
close(fd);
if (fd < 0)
return;
::close(fd);
}
/// Закрыть файл раньше вызова деструктора.
void close()
{
if (0 != ::close(fd))
throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
fd = -1;
}
virtual std::string getFileName()

View File

@ -19,7 +19,27 @@ public:
virtual std::string getFileName() const = 0;
virtual int getFD() const = 0;
/// Есть возможность получать информацию о времени каждого чтения.
struct ProfileInfo
{
size_t bytes_requested;
size_t bytes_read;
size_t nanoseconds;
};
using ProfileCallback = std::function<void(ProfileInfo)>;
/// CLOCK_MONOTONIC_COARSE более чем достаточно для отслеживания долгих чтений - например, залипаний на секунды.
void setProfileCallback(const ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE)
{
profile_callback = profile_callback_;
clock_type = clock_type_;
}
protected:
ProfileCallback profile_callback;
clockid_t clock_type;
virtual off_t doSeek(off_t off, int whence) = 0;
};

View File

@ -2,8 +2,12 @@
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <experimental/optional>
#include <DB/Common/ProfileEvents.h>
#include <DB/Common/Stopwatch.h>
#include <DB/Common/Exception.h>
#include <DB/Core/ErrorCodes.h>
@ -32,6 +36,10 @@ protected:
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead);
std::experimental::optional<Stopwatch> watch;
if (profile_callback)
watch.emplace(clock_type);
ssize_t res = ::read(fd, internal_buffer.begin(), internal_buffer.size());
if (!res)
break;
@ -41,6 +49,15 @@ protected:
if (res > 0)
bytes_read += res;
if (profile_callback)
{
ProfileInfo info;
info.bytes_requested = internal_buffer.size();
info.bytes_read = res;
info.nanoseconds = watch->elapsed();
profile_callback(info);
}
}
pos_in_file += bytes_read;
@ -76,7 +93,9 @@ public:
return pos_in_file - (working_buffer.end() - pos);
}
private:
/// Если offset такой маленький, что мы не выйдем за пределы буфера, настоящий seek по файлу не делается.
off_t doSeek(off_t offset, int whence) override
{
@ -108,6 +127,23 @@ private:
return res;
}
}
/// При условии, что файловый дескриптор позволяет использовать select, проверяет в течение таймаута, есть ли данные для чтения.
bool poll(size_t timeout_microseconds)
{
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
timeval timeout = { time_t(timeout_microseconds / 1000000), time_t(timeout_microseconds % 1000000) };
int res = select(1, &fds, 0, 0, &timeout);
if (-1 == res)
throwFromErrno("Cannot select", ErrorCodes::CANNOT_SELECT);
return res > 0;
}
};
}

View File

@ -32,8 +32,18 @@ public:
throwFromErrno("Cannot open file " + file_name, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE);
}
/// Использовать уже открытый файл.
WriteBufferFromFile(int fd, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, mode_t mode = 0666,
char * existing_memory = nullptr, size_t alignment = 0)
: WriteBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), file_name("(fd = " + toString(fd) + ")")
{
}
~WriteBufferFromFile()
{
if (fd < 0)
return;
try
{
next();
@ -43,7 +53,18 @@ public:
tryLogCurrentException(__PRETTY_FUNCTION__);
}
close(fd);
::close(fd);
}
/// Закрыть файл раньше вызова деструктора.
void close()
{
next();
if (0 != ::close(fd))
throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
fd = -1;
}
/** fsync() transfers ("flushes") all modified in-core data of (i.e., modified buffer cache pages for) the file

View File

@ -67,6 +67,7 @@ public:
{
try
{
if (fd >= 0)
next();
}
catch (...)

View File

@ -176,8 +176,18 @@ struct Settings
M(SettingUInt64, select_sequential_consistency, 0) \
/** Максимальное количество различных шардов и максимальное количество реплик одного шарда в функции remote. */ \
M(SettingUInt64, table_function_remote_max_addresses, 1000) \
/** Маскимальное количество потоков при распределённой обработке одного запроса **/ \
/** Маскимальное количество потоков при распределённой обработке одного запроса */ \
M(SettingUInt64, max_distributed_processing_threads, 8) \
\
/** Настройки понижения числа потоков в случае медленных чтений. */ \
/** Обращать внимания только на чтения, занявшие не меньше такого количества времени. */ \
M(SettingMilliseconds, read_backoff_min_latency_ms, 1000) \
/** Считать события, когда пропускная способность меньше стольки байт в секунду. */ \
M(SettingUInt64, read_backoff_max_throughput, 1048576) \
/** Не обращать внимания на событие, если от предыдущего прошло меньше стольки-то времени. */ \
M(SettingMilliseconds, read_backoff_min_interval_between_events_ms, 1000) \
/** Количество событий, после которого количество потоков будет уменьшено. */ \
M(SettingUInt64, read_backoff_min_events, 2) \
/// Всевозможные ограничения на выполнение запроса.
Limits limits;

View File

@ -48,19 +48,64 @@ using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
/** Provides read tasks for MergeTreeThreadBlockInputStream`s in fine-grained batches, allowing for more
* uniform distribution of work amongst multiple threads. All parts and their ranges are divided into `threads`
* workloads with at most `sum_marks / threads` marks. Then, threads are performing reads from these workloads
* in "sequential" manner, requesting work in small batches. As soon as some thread some thread has exhausted
* in "sequential" manner, requesting work in small batches. As soon as some thread has exhausted
* it's workload, it either is signaled that no more work is available (`do_not_steal_tasks == false`) or
* continues taking small batches from other threads' workloads (`do_not_steal_tasks == true`).
*/
class MergeTreeReadPool
{
public:
/** Пул может динамически уменьшать количество потоков, если чтения происходят медленно.
* Настройки порогов для такого уменьшения.
*/
struct BackoffSettings
{
/// Обращать внимания только на чтения, занявшие не меньше такого количества времени. Если выставлено в 0 - значит backoff выключен.
size_t min_read_latency_ms = 1000;
/// Считать события, когда пропускная способность меньше стольки байт в секунду.
size_t max_throughput = 1048576;
/// Не обращать внимания на событие, если от предыдущего прошло меньше стольки-то времени.
size_t min_interval_between_events_ms = 1000;
/// Количество событий, после которого количество потоков будет уменьшено.
size_t min_events = 2;
/// Константы выше приведены лишь в качестве примера.
BackoffSettings(const Settings & settings)
: min_read_latency_ms(settings.read_backoff_min_latency_ms.totalMilliseconds()),
max_throughput(settings.read_backoff_max_throughput),
min_interval_between_events_ms(settings.read_backoff_min_interval_between_events_ms.totalMilliseconds()),
min_events(settings.read_backoff_min_events)
{
}
BackoffSettings() : min_read_latency_ms(0) {}
};
BackoffSettings backoff_settings;
private:
/** Состояние для отслеживания скорости чтений.
*/
struct BackoffState
{
size_t current_threads;
Stopwatch time_since_prev_event {CLOCK_MONOTONIC_COARSE};
size_t num_events = 0;
BackoffState(size_t threads) : current_threads(threads) {}
};
BackoffState backoff_state;
public:
MergeTreeReadPool(
const std::size_t threads, const std::size_t sum_marks, const std::size_t min_marks_for_concurrent_read,
RangesInDataParts parts, MergeTreeData & data, const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column_name, const bool check_columns, const Names & column_names,
const BackoffSettings & backoff_settings,
const bool do_not_steal_tasks = false)
: data{data}, column_names{column_names}, do_not_steal_tasks{do_not_steal_tasks}
: backoff_settings{backoff_settings}, backoff_state{threads},
data{data}, column_names{column_names}, do_not_steal_tasks{do_not_steal_tasks}
{
const auto per_part_sum_marks = fillPerPartInfo(parts, prewhere_actions, prewhere_column_name, check_columns);
fillPerThreadInfo(threads, sum_marks, per_part_sum_marks, parts, min_marks_for_concurrent_read);
@ -73,6 +118,10 @@ public:
{
const std::lock_guard<std::mutex> lock{mutex};
/// Если количество потоков было уменьшено из-за backoff, то не будем отдавать задачи для более чем backoff_state.current_threads потоков.
if (thread >= backoff_state.current_threads)
return nullptr;
if (remaining_thread_tasks.empty())
return nullptr;
@ -150,6 +199,50 @@ public:
per_part_remove_prewhere_column[part_idx], per_part_should_reorder[part_idx]);
}
/** Каждый обработчик задач может вызвать этот метод, передав в него информацию о скорости чтения.
* Если скорость чтения слишком низкая, то пул может принять решение уменьшить число потоков - не отдавать больше задач в некоторые потоки.
* Это позволяет бороться с чрезмерной нагрузкой на дисковую подсистему в случаях, когда чтения осуществляются не из page cache.
*/
void profileFeedback(const ReadBufferFromFileBase::ProfileInfo info)
{
if (backoff_settings.min_read_latency_ms == 0 || do_not_steal_tasks)
return;
if (info.nanoseconds < backoff_settings.min_read_latency_ms * 1000000)
return;
std::lock_guard<std::mutex> lock(mutex);
if (backoff_state.current_threads <= 1)
return;
size_t throughput = info.bytes_read * 1000000000 / info.nanoseconds;
if (throughput >= backoff_settings.max_throughput)
return;
if (backoff_state.time_since_prev_event.elapsed() < backoff_settings.min_interval_between_events_ms * 1000000)
return;
backoff_state.time_since_prev_event.restart();
++backoff_state.num_events;
ProfileEvents::increment(ProfileEvents::SlowRead);
LOG_DEBUG(log, std::fixed << std::setprecision(3)
<< "Slow read, event №" << backoff_state.num_events
<< ": read " << info.bytes_read << " bytes in " << info.nanoseconds / 1000000000.0 << " sec., "
<< info.bytes_read * 1000.0 / info.nanoseconds << " MB/s.");
if (backoff_state.num_events < backoff_settings.min_events)
return;
backoff_state.num_events = 0;
--backoff_state.current_threads;
ProfileEvents::increment(ProfileEvents::ReadBackoff);
LOG_DEBUG(log, "Will lower number of threads to " << backoff_state.current_threads);
}
private:
std::vector<std::size_t> fillPerPartInfo(
RangesInDataParts & parts, const ExpressionActionsPtr & prewhere_actions, const String & prewhere_column_name,
@ -381,7 +474,7 @@ private:
std::vector<std::unique_ptr<Poco::ScopedReadRWLock>> per_part_columns_lock;
MergeTreeData & data;
Names column_names;
const bool do_not_steal_tasks;
bool do_not_steal_tasks;
std::vector<NameSet> per_part_column_name_set;
std::vector<NamesAndTypesList> per_part_columns;
std::vector<NamesAndTypesList> per_part_pre_columns;
@ -414,6 +507,8 @@ private:
std::set<std::size_t> remaining_thread_tasks;
mutable std::mutex mutex;
Logger * log = &Logger::get("MergeTreeReadPool");
};
using MergeTreeReadPoolPtr = std::shared_ptr<MergeTreeReadPool>;

View File

@ -24,14 +24,17 @@ namespace DB
class MergeTreeReader
{
using OffsetColumns = std::map<std::string, ColumnPtr>;
using ValueSizeMap = std::map<std::string, double>;
public:
using ValueSizeMap = std::map<std::string, double>;
MergeTreeReader(const String & path, /// Путь к куску
const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns,
UncompressedCache * uncompressed_cache, MarkCache * mark_cache,
MergeTreeData & storage, const MarkRanges & all_mark_ranges,
size_t aio_threshold, size_t max_read_buffer_size, const ValueSizeMap & avg_value_size_hints = ValueSizeMap{})
size_t aio_threshold, size_t max_read_buffer_size, const ValueSizeMap & avg_value_size_hints = ValueSizeMap{},
const ReadBufferFromFileBase::ProfileCallback & profile_callback = ReadBufferFromFileBase::ProfileCallback{},
clockid_t clock_type = CLOCK_MONOTONIC_COARSE)
: avg_value_size_hints(avg_value_size_hints), path(path), data_part(data_part), columns(columns),
uncompressed_cache(uncompressed_cache), mark_cache(mark_cache), storage(storage),
all_mark_ranges(all_mark_ranges), aio_threshold(aio_threshold), max_read_buffer_size(max_read_buffer_size)
@ -42,7 +45,7 @@ public:
throw Exception("Part " + path + " is missing", ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
for (const NameAndTypePair & column : columns)
addStream(column.name, *column.type, all_mark_ranges);
addStream(column.name, *column.type, all_mark_ranges, profile_callback, clock_type);
}
catch (...)
{
@ -151,7 +154,8 @@ private:
Stream(
const String & path_prefix_, UncompressedCache * uncompressed_cache, MarkCache * mark_cache,
const MarkRanges & all_mark_ranges, size_t aio_threshold, size_t max_read_buffer_size)
const MarkRanges & all_mark_ranges, size_t aio_threshold, size_t max_read_buffer_size,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
: path_prefix(path_prefix_)
{
loadMarks(mark_cache);
@ -205,12 +209,20 @@ private:
{
cached_buffer = std::make_unique<CachedCompressedReadBuffer>(
path_prefix + ".bin", uncompressed_cache, estimated_size, aio_threshold, buffer_size);
if (profile_callback)
cached_buffer->setProfileCallback(profile_callback, clock_type);
data_buffer = cached_buffer.get();
}
else
{
non_cached_buffer = std::make_unique<CompressedReadBufferFromFile>(
path_prefix + ".bin", estimated_size, aio_threshold, buffer_size);
if (profile_callback)
non_cached_buffer->setProfileCallback(profile_callback, clock_type);
data_buffer = non_cached_buffer.get();
}
}
@ -289,7 +301,9 @@ private:
size_t aio_threshold;
size_t max_read_buffer_size;
void addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges, size_t level = 0)
void addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type,
size_t level = 0)
{
String escaped_column_name = escapeForFileName(name);
@ -310,14 +324,14 @@ private:
if (!streams.count(size_name))
streams.emplace(size_name, std::make_unique<Stream>(
path + escaped_size_name, uncompressed_cache, mark_cache,
all_mark_ranges, aio_threshold, max_read_buffer_size));
all_mark_ranges, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
addStream(name, *type_arr->getNestedType(), all_mark_ranges, level + 1);
addStream(name, *type_arr->getNestedType(), all_mark_ranges, profile_callback, clock_type, level + 1);
}
else
streams.emplace(name, std::make_unique<Stream>(
path + escaped_column_name, uncompressed_cache, mark_cache,
all_mark_ranges, aio_threshold, max_read_buffer_size));
all_mark_ranges, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
}

View File

@ -102,6 +102,9 @@ private:
const auto path = storage.getFullPath() + task->data_part->name + '/';
/// Позволяет пулу уменьшать количество потоков в случае слишком медленных чтений.
auto profile_callback = [this](ReadBufferFromFileBase::ProfileInfo info) { pool->profileFeedback(info); };
if (!reader)
{
if (use_uncompressed_cache)
@ -111,13 +114,13 @@ private:
reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(),
storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback);
if (prewhere_actions)
pre_reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), storage, task->mark_ranges, min_bytes_to_use_direct_io,
max_read_buffer_size);
max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback);
}
else
{
@ -125,13 +128,13 @@ private:
reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(),
storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size,
reader->getAvgValueSizeHints());
reader->getAvgValueSizeHints(), profile_callback);
if (prewhere_actions)
pre_reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), storage, task->mark_ranges, min_bytes_to_use_direct_io,
max_read_buffer_size, pre_reader->getAvgValueSizeHints());
max_read_buffer_size, pre_reader->getAvgValueSizeHints(), profile_callback);
}
return true;

View File

@ -238,6 +238,21 @@ private:
}
/// Стоит ли сделать хоть что-нибудь ради праздника.
bool isNewYearMode()
{
time_t current_time = time(0);
/// Плохо быть навязчивым.
if (current_time % 3 != 0)
return false;
mysqlxx::Date now(current_time);
return (now.month() == 12 && now.day() >= 20)
|| (now.month() == 1 && now.day() <= 5);
}
int mainImpl(const std::vector<std::string> & args)
{
/** Будем работать в batch режиме, если выполнено одно из следующих условий:
@ -305,7 +320,7 @@ private:
loop();
std::cout << "Bye." << std::endl;
std::cout << (isNewYearMode() ? "Happy new year." : "Bye.") << std::endl;
return 0;
}

View File

@ -0,0 +1,193 @@
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <fcntl.h>
#include <dlfcn.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Common/Exception.h>
#include <DB/Common/ShellCommand.h>
#include <DB/IO/WriteBufferFromVector.h>
namespace
{
struct Pipe
{
union
{
int fds[2];
struct
{
int read_fd;
int write_fd;
};
};
Pipe()
{
if (0 != pipe2(fds, O_CLOEXEC))
DB::throwFromErrno("Cannot create pipe", DB::ErrorCodes::CANNOT_PIPE);
}
~Pipe()
{
if (read_fd >= 0)
close(read_fd);
if (write_fd >= 0)
close(write_fd);
}
};
/// По этим кодам возврата из дочернего процесса мы узнаем (наверняка) об ошибках при его создании.
enum class ReturnCodes : int
{
CANNOT_DUP_STDIN = 42, /// Значение не принципиально, но выбрано так, чтобы редко конфликтовать с кодом возврата программы.
CANNOT_DUP_STDOUT = 43,
CANNOT_DUP_STDERR = 44,
CANNOT_EXEC = 45,
};
}
namespace DB
{
std::unique_ptr<ShellCommand> ShellCommand::executeImpl(const char * filename, char * const argv[])
{
/** Тут написано, что при обычном вызове vfork, есть шанс deadlock-а в многопоточных программах,
* из-за резолвинга символов в shared-библиотеке:
* http://www.oracle.com/technetwork/server-storage/solaris10/subprocess-136439.html
* Поэтому, отделим резолвинг символа от вызова.
*/
static void * real_vfork = dlsym(RTLD_DEFAULT, "vfork");
if (!real_vfork)
throwFromErrno("Cannot find symbol vfork in myself", ErrorCodes::CANNOT_DLSYM);
Pipe pipe_stdin;
Pipe pipe_stdout;
Pipe pipe_stderr;
pid_t pid = reinterpret_cast<pid_t(*)()>(real_vfork)();
if (-1 == pid)
throwFromErrno("Cannot vfork", ErrorCodes::CANNOT_FORK);
if (0 == pid)
{
/// Находимся в свежесозданном процессе.
/// Почему _exit а не exit? Потому что exit вызывает atexit и деструкторы thread local storage.
/// А там куча мусора (в том числе, например, блокируется mutex). А это нельзя делать после vfork - происходит deadlock.
/// Заменяем файловые дескрипторы на концы наших пайпов.
if (STDIN_FILENO != dup2(pipe_stdin.read_fd, STDIN_FILENO))
_exit(int(ReturnCodes::CANNOT_DUP_STDIN));
if (STDOUT_FILENO != dup2(pipe_stdout.write_fd, STDOUT_FILENO))
_exit(int(ReturnCodes::CANNOT_DUP_STDOUT));
if (STDERR_FILENO != dup2(pipe_stderr.write_fd, STDERR_FILENO))
_exit(int(ReturnCodes::CANNOT_DUP_STDERR));
execv(filename, argv);
/// Если процесс запущен, то execv не возвращает сюда.
_exit(int(ReturnCodes::CANNOT_EXEC));
}
std::unique_ptr<ShellCommand> res(new ShellCommand(pid, pipe_stdin.write_fd, pipe_stdout.read_fd, pipe_stderr.read_fd));
/// Теперь владение файловыми дескрипторами передано в результат.
pipe_stdin.write_fd = -1;
pipe_stdout.read_fd = -1;
pipe_stderr.read_fd = -1;
return res;
}
std::unique_ptr<ShellCommand> ShellCommand::execute(const std::string & command)
{
/// Аргументы в неконстантных кусках памяти (как требуется для execv).
/// Причём, их копирование должно быть совершено раньше вызова vfork, чтобы после vfork делать минимум вещей.
std::vector<char> argv0("sh", "sh" + strlen("sh") + 1);
std::vector<char> argv1("-c", "-c" + strlen("-c") + 1);
std::vector<char> argv2(command.data(), command.data() + command.size() + 1);
char * const argv[] = { argv0.data(), argv1.data(), argv2.data(), nullptr };
return executeImpl("/bin/sh", argv);
}
std::unique_ptr<ShellCommand> ShellCommand::executeDirect(const std::string & path, const std::vector<std::string> & arguments)
{
size_t argv_sum_size = path.size() + 1;
for (const auto & arg : arguments)
argv_sum_size += arg.size() + 1;
std::vector<char *> argv(arguments.size() + 2);
std::vector<char> argv_data(argv_sum_size);
WriteBuffer writer(argv_data.data(), argv_sum_size);
argv[0] = writer.position();
writer.write(path.data(), path.size() + 1);
for (size_t i = 0, size = arguments.size(); i < size; ++i)
{
argv[i + 1] = writer.position();
writer.write(arguments[i].data(), arguments[i].size() + 1);
}
argv[arguments.size() + 1] = nullptr;
return executeImpl(path.data(), argv.data());
}
int ShellCommand::tryWait()
{
int status = 0;
if (-1 == waitpid(pid, &status, 0))
throwFromErrno("Cannot waitpid", ErrorCodes::CANNOT_WAITPID);
if (WIFEXITED(status))
return WEXITSTATUS(status);
if (WIFSIGNALED(status))
throw Exception("Child process was terminated by signal " + toString(WTERMSIG(status)), ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY);
if (WIFSTOPPED(status))
throw Exception("Child process was stopped by signal " + toString(WSTOPSIG(status)), ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY);
throw Exception("Child process was not exited normally by unknown reason", ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY);
}
void ShellCommand::wait()
{
int retcode = tryWait();
if (retcode != EXIT_SUCCESS)
{
switch (retcode)
{
case int(ReturnCodes::CANNOT_DUP_STDIN):
throw Exception("Cannot dup2 stdin of child process", ErrorCodes::CANNOT_CREATE_CHILD_PROCESS);
case int(ReturnCodes::CANNOT_DUP_STDOUT):
throw Exception("Cannot dup2 stdout of child process", ErrorCodes::CANNOT_CREATE_CHILD_PROCESS);
case int(ReturnCodes::CANNOT_DUP_STDERR):
throw Exception("Cannot dup2 stderr of child process", ErrorCodes::CANNOT_CREATE_CHILD_PROCESS);
case int(ReturnCodes::CANNOT_EXEC):
throw Exception("Cannot execv in child process", ErrorCodes::CANNOT_CREATE_CHILD_PROCESS);
default:
throw Exception("Child process was exited with return code " + toString(retcode), ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY);
}
}
}
}

View File

@ -0,0 +1,49 @@
#include <iostream>
#include <DB/Common/ShellCommand.h>
#include <DB/IO/copyData.h>
#include <DB/IO/WriteBufferFromFileDescriptor.h>
#include <DB/IO/ReadBufferFromString.h>
using namespace DB;
int main(int arg, char ** argv)
try
{
{
auto command = ShellCommand::execute("echo 'Hello, world!'");
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
copyData(command->out, out);
command->wait();
}
{
auto command = ShellCommand::executeDirect("/bin/echo", {"Hello, world!"});
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
copyData(command->out, out);
command->wait();
}
{
auto command = ShellCommand::execute("cat");
String in_str = "Hello, world!\n";
ReadBufferFromString in(in_str);
copyData(in, command->in);
command->in.close();
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
copyData(command->out, out);
command->wait();
}
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(false) << "\n";
return 1;
}

View File

@ -34,6 +34,53 @@ void MergingAggregatedMemoryEfficientBlockInputStream::readPrefix()
}
void MergingAggregatedMemoryEfficientBlockInputStream::readSuffix()
{
if (!all_read && !is_cancelled.load(std::memory_order_seq_cst))
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
finalize();
for (size_t i = 0; i < children.size(); ++i)
children[i]->readSuffix();
}
void MergingAggregatedMemoryEfficientBlockInputStream::cancel()
{
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
if (parallel_merge_data)
{
std::unique_lock<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
parallel_merge_data->finish = true;
parallel_merge_data->merged_blocks_changed.notify_one();
}
for (auto & input : inputs)
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(input.stream.get()))
{
try
{
child->cancel();
}
catch (...)
{
/** Если не удалось попросить остановиться одного или несколько источников.
* (например, разорвано соединение при распределённой обработке запроса)
* - то пофиг.
*/
LOG_ERROR(log, "Exception while cancelling " << child->getName());
}
}
}
}
void MergingAggregatedMemoryEfficientBlockInputStream::start()
{
if (started)
@ -73,6 +120,23 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start()
for (auto & task : tasks)
task.get_future().get();
}
if (merging_threads > 1)
{
/** Создадим несколько потоков. Каждый из них в цикле будет доставать следующий набор блоков для мерджа,
* затем мерджить их и класть результат в очередь, откуда мы будем читать готовые результаты.
*/
parallel_merge_data.reset(new ParallelMergeData(merging_threads));
auto & pool = parallel_merge_data->pool;
/** Создаём потоки, которые будут получать и мерджить данные.
*/
for (size_t i = 0; i < merging_threads; ++i)
pool.schedule(std::bind(&MergingAggregatedMemoryEfficientBlockInputStream::mergeThread,
this, current_memory_tracker));
}
}
@ -88,55 +152,74 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
}
else
{
/** Создадим несколько потоков. Каждый из них в цикле будет доставать следующий набор блоков для мерджа,
* затем мерджить их и класть результат в очередь, откуда мы будем читать готовые результаты.
*/
Block res;
if (!parallel_merge_data)
while (true)
{
parallel_merge_data.reset(new ParallelMergeData(merging_threads));
std::unique_lock<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
auto & pool = parallel_merge_data->pool;
if (parallel_merge_data->exception)
std::rethrow_exception(parallel_merge_data->exception);
/** Создаём потоки, которые будут получать и мерджить данные.
*/
if (parallel_merge_data->finish)
break;
for (size_t i = 0; i < merging_threads; ++i)
pool.schedule(std::bind(&MergingAggregatedMemoryEfficientBlockInputStream::mergeThread,
this, current_memory_tracker));
if (!parallel_merge_data->merged_blocks.empty())
{
auto it = parallel_merge_data->merged_blocks.begin();
if (it->second)
{
res.swap(it->second);
parallel_merge_data->merged_blocks.erase(it);
parallel_merge_data->have_space.notify_one();
break;
}
}
else if (parallel_merge_data->exhausted)
break;
parallel_merge_data->merged_blocks_changed.wait(lock);
}
OutputData res;
parallel_merge_data->result_queue.pop(res);
if (!res)
all_read = true;
if (res.exception)
std::rethrow_exception(res.exception);
if (!res.block)
parallel_merge_data->pool.wait();
return res.block;
return res;
}
}
MergingAggregatedMemoryEfficientBlockInputStream::~MergingAggregatedMemoryEfficientBlockInputStream()
{
try
{
if (!all_read)
cancel();
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void MergingAggregatedMemoryEfficientBlockInputStream::finalize()
{
if (!started)
return;
LOG_TRACE(log, "Waiting for threads to finish");
if (reading_pool)
reading_pool->wait();
if (parallel_merge_data)
{
LOG_TRACE((&Logger::get("MergingAggregatedMemoryEfficientBlockInputStream")), "Waiting for threads to finish");
{
std::lock_guard<std::mutex> lock(parallel_merge_data->get_next_blocks_mutex);
parallel_merge_data->finish = true;
}
parallel_merge_data->result_queue.clear();
parallel_merge_data->pool.wait();
}
LOG_TRACE(log, "Waited for threads to finish");
}
@ -147,7 +230,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker
try
{
while (true)
while (!parallel_merge_data->finish)
{
/** Получение следующих блоков делается в одном пуле потоков, а мердж - в другом.
* Это весьма сложное взаимодействие.
@ -157,6 +240,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker
* - один из merging_threads выполняет слияние этой группы блоков;
*/
BlocksToMerge blocks_to_merge;
int output_order = -1;
{
std::lock_guard<std::mutex> lock(parallel_merge_data->get_next_blocks_mutex);
@ -168,32 +252,53 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker
if (!blocks_to_merge || blocks_to_merge->empty())
{
std::unique_lock<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
parallel_merge_data->exhausted = true;
parallel_merge_data->merged_blocks_changed.notify_one();
break;
}
output_order = blocks_to_merge->front().info.is_overflows
? NUM_BUCKETS /// Блоки "переполнений" отдаются функцией getNextBlocksToMerge позже всех остальных.
: blocks_to_merge->front().info.bucket_num;
{
std::unique_lock<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
while (parallel_merge_data->merged_blocks.size() >= merging_threads)
parallel_merge_data->have_space.wait(lock);
/** Кладём пустой блок, что означает обещание его заполнить.
* Основной поток должен возвращать результаты строго в порядке output_order, поэтому это важно.
*/
parallel_merge_data->merged_blocks[output_order];
}
}
Block res = aggregator.mergeBlocks(*blocks_to_merge, final);
{
std::lock_guard<std::mutex> lock(parallel_merge_data->get_next_blocks_mutex);
std::lock_guard<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
if (parallel_merge_data->finish)
break;
parallel_merge_data->result_queue.push(OutputData(std::move(res)));
parallel_merge_data->merged_blocks[output_order] = res;
parallel_merge_data->merged_blocks_changed.notify_one();
}
}
}
catch (...)
{
parallel_merge_data->result_queue.push(std::current_exception());
return;
{
std::lock_guard<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
parallel_merge_data->exception = std::current_exception();
parallel_merge_data->merged_blocks_changed.notify_one();
}
/// Последний поток при выходе сообщает, что данных больше нет.
if (0 == --parallel_merge_data->active_threads)
parallel_merge_data->result_queue.push(Block());
cancel();
}
}
@ -217,9 +322,6 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
* Это дополнительные данные для строк, не прошедших через max_rows_to_group_by.
* Они должны объединяться друг с другом отдельно.
*/
constexpr size_t NUM_BUCKETS = 256;
++current_bucket_num;
/// Получить из источника следующий блок с номером корзины не больше current_bucket_num.

View File

@ -29,6 +29,7 @@ void registerFunctionsArray(FunctionFactory & factory)
factory.registerFunction<FunctionEmptyArrayString>();
factory.registerFunction<FunctionEmptyArrayToSingle>();
factory.registerFunction<FunctionRange>();
factory.registerFunction<FunctionArrayReduce>();
}
}

View File

@ -1,11 +1,15 @@
#include <DB/IO/ReadBufferAIO.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/Common/Stopwatch.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Core/Defines.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <experimental/optional>
namespace DB
{
@ -61,6 +65,10 @@ bool ReadBufferAIO::nextImpl()
if (is_eof)
return false;
std::experimental::optional<Stopwatch> watch;
if (profile_callback)
watch.emplace(clock_type);
if (!is_aio)
{
synchronousRead();
@ -69,6 +77,15 @@ bool ReadBufferAIO::nextImpl()
else
receive();
if (profile_callback)
{
ProfileInfo info;
info.bytes_requested = requested_byte_count;
info.bytes_read = bytes_read;
info.nanoseconds = watch->elapsed();
profile_callback(info);
}
is_started = true;
/// Если конец файла только что достигнут, больше ничего не делаем.

View File

@ -1,10 +1,9 @@
#include <stdio.h>
#include <Poco/DirectoryIterator.h>
#include <common/Revision.h>
#include <ext/unlock_guard.hpp>
#include <DB/Common/SipHash.h>
#include <DB/Common/ShellCommand.h>
#include <DB/IO/Operators.h>
#include <DB/IO/WriteBufferFromString.h>
@ -155,35 +154,6 @@ SharedLibraryPtr Compiler::getOrCount(
}
struct Pipe : private boost::noncopyable
{
FILE * f;
Pipe(const std::string & command)
{
errno = 0;
f = popen(command.c_str(), "r");
if (!f)
throwFromErrno("Cannot popen");
}
~Pipe()
{
try
{
errno = 0;
if (f && -1 == pclose(f))
throwFromErrno("Cannot pclose");
}
catch (...)
{
tryLogCurrentException("Pipe");
}
}
};
void Compiler::compile(
HashedKey hashed_key,
std::string file_name,
@ -231,18 +201,14 @@ void Compiler::compile(
std::string compile_result;
{
Pipe pipe(command.str());
int pipe_fd = fileno(pipe.f);
if (-1 == pipe_fd)
throwFromErrno("Cannot fileno");
auto process = ShellCommand::execute(command.str());
{
ReadBufferFromFileDescriptor command_output(pipe_fd);
WriteBufferFromString res(compile_result);
copyData(command_output, res);
copyData(process->out, res);
}
process->wait();
}
if (!compile_result.empty())

View File

@ -72,7 +72,7 @@ BlockIO InterpreterDropQuery::execute()
String current_table_name = table->getTableName();
/// Удаляем информацию о таблице из оперативки
context.detachTable(database_name, current_table_name);
StoragePtr detached = context.detachTable(database_name, current_table_name);
/// Удаляем данные таблицы
if (!drop.detach)
@ -81,12 +81,40 @@ BlockIO InterpreterDropQuery::execute()
String current_metadata_path = metadata_path + escapeForFileName(current_table_name) + ".sql";
/// Для таблиц типа ChunkRef, файла с метаданными не существует.
if (Poco::File(current_metadata_path).exists())
Poco::File(current_metadata_path).remove();
bool metadata_file_exists = Poco::File(current_metadata_path).exists();
if (metadata_file_exists)
{
if (Poco::File(current_metadata_path + ".bak").exists())
Poco::File(current_metadata_path + ".bak").remove();
Poco::File(current_metadata_path).renameTo(current_metadata_path + ".bak");
}
try
{
table->drop();
}
catch (const Exception & e)
{
/// Такая ошибка означает, что таблицу невозможно удалить, и данные пока ещё консистентны. Можно вернуть таблицу на место.
/// NOTE Таблица будет оставаться в состоянии после shutdown - не производить всевозможной фоновой работы.
if (e.code() == ErrorCodes::TABLE_WAS_NOT_DROPPED)
{
if (metadata_file_exists)
Poco::File(current_metadata_path + ".bak").renameTo(current_metadata_path);
context.addTable(database_name, current_table_name, detached);
throw;
}
else
throw;
}
table->is_dropped = true;
if (metadata_file_exists)
Poco::File(current_metadata_path + ".bak").remove();
if (Poco::File(current_data_path).exists())
Poco::File(current_data_path).remove(true);
}
@ -115,6 +143,7 @@ BlockIO InterpreterDropQuery::execute()
return {};
}
void InterpreterDropQuery::dropDetachedTable(String database_name, StoragePtr table, Context & context)
{
table->shutdown();

View File

@ -111,6 +111,12 @@ static void loadTable(Context & context, const String & path, const Table & tabl
}
static bool endsWith(const String & s, const char * suffix)
{
return s.size() >= strlen(suffix) && 0 == s.compare(s.size() - strlen(suffix), strlen(suffix), suffix);
}
void loadMetadata(Context & context)
{
Logger * log = &Logger::get("loadMetadata");
@ -153,11 +159,15 @@ void loadMetadata(Context & context)
if (jt.name().at(0) == '.')
continue;
/// Файлы имеют имена вида table_name.sql
if (jt.name().compare(jt.name().size() - 4, 4, ".sql"))
throw Exception("Incorrect file extension: " + jt.name() + " in metadata directory " + it->path(), ErrorCodes::INCORRECT_FILE_NAME);
/// Есть файлы .sql.bak - пропускаем.
if (endsWith(jt.name(), ".sql.bak"))
continue;
/// Нужные файлы имеют имена вида table_name.sql
if (endsWith(jt.name(), ".sql"))
file_names.push_back(jt.name());
else
throw Exception("Incorrect file extension: " + jt.name() + " in metadata directory " + it->path(), ErrorCodes::INCORRECT_FILE_NAME);
}
/** Таблицы быстрее грузятся, если их грузить в сортированном (по именам) порядке.

View File

@ -528,7 +528,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
threads, sum_marks, min_marks_for_concurrent_read, parts, data, prewhere_actions, prewhere_column, true,
column_names);
column_names, MergeTreeReadPool::BackoffSettings(settings));
/// Оценим общее количество строк - для прогресс-бара.
const std::size_t total_rows = data.index_granularity * sum_marks;
@ -669,7 +669,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
parts.size(), sum_marks, min_marks_for_read_task, parts, data, prewhere_actions, prewhere_column, true,
column_names, true);
column_names, MergeTreeReadPool::BackoffSettings{}, true);
/// Оценим общее количество строк - для прогресс-бара.
const std::size_t total_rows = data.index_granularity * sum_marks;

View File

@ -3058,6 +3058,9 @@ void StorageReplicatedMergeTree::drop()
auto zookeeper = getZooKeeper();
if (zookeeper->expired())
throw Exception("Table was not dropped because ZooKeeper session has been expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED);
LOG_INFO(log, "Removing replica " << replica_path);
replica_is_active_node = nullptr;
zookeeper->tryRemoveRecursive(replica_path);

View File

@ -0,0 +1,13 @@
2 4 4 3
[nan,nan] []
[0,0] [0]
[0.5,0.9] [0,1]
[1,1.8] [0,1,2]
[1.5,2.7] [0,1,2,3]
[2,3.6] [0,1,2,3,4]
[2.5,4.5] [0,1,2,3,4,5]
[3,5.4] [0,1,2,3,4,5,6]
[3.5,6.3] [0,1,2,3,4,5,6,7]
[4,7.2] [0,1,2,3,4,5,6,7,8]
[4.5,8.1] [0,1,2,3,4,5,6,7,8,9]
[5,9] [0,1,2,3,4,5,6,7,8,9,10]

View File

@ -0,0 +1,7 @@
SELECT
arrayReduce('uniq', [1, 2, 1]) AS a,
arrayReduce('uniq', [1, 2, 2, 1], ['hello', 'world', '', '']) AS b,
arrayReduce('uniqUpTo(5)', [1, 2, 2, 1], materialize(['hello', 'world', '', ''])) AS c,
arrayReduce('uniqExactIf', [1, 2, 3, 4], [1, 0, 1, 1]) AS d;
SELECT arrayReduce('quantiles(0.5, 0.9)', range(number) AS r), r FROM system.numbers LIMIT 12;