diff --git a/dbms/include/DB/Common/ProfileEvents.h b/dbms/include/DB/Common/ProfileEvents.h index a58733d5773..e08401f555a 100644 --- a/dbms/include/DB/Common/ProfileEvents.h +++ b/dbms/include/DB/Common/ProfileEvents.h @@ -77,6 +77,9 @@ M(ExternalAggregationCompressedBytes) \ M(ExternalAggregationUncompressedBytes) \ \ + M(SlowRead) \ + M(ReadBackoff) \ + \ M(END) namespace ProfileEvents diff --git a/dbms/include/DB/Common/ShellCommand.h b/dbms/include/DB/Common/ShellCommand.h new file mode 100644 index 00000000000..fab59fb2913 --- /dev/null +++ b/dbms/include/DB/Common/ShellCommand.h @@ -0,0 +1,55 @@ +#pragma once + +#include +#include +#include + + +namespace DB +{ + + +/** Позволяет запустить команду, + * читать её stdout, stderr, писать в stdin, + * дождаться завершения. + * + * Реализация похожа на функцию popen из POSIX (посмотреть можно в исходниках libc). + * + * Наиболее важное отличие: использует vfork вместо fork. + * Это сделано, потому что fork не работает (с ошибкой о нехватке памяти), + * при некоторых настройках overcommit-а, если размер адресного пространства процесса больше половины количества доступной памяти. + * Также, изменение memory map-ов - довольно ресурсоёмкая операция. + * + * Второе отличие - позволяет работать одновременно и с stdin, и с stdout, и с stderr запущенного процесса, + * а также узнать код и статус завершения. + */ +class ShellCommand +{ +private: + pid_t pid; + + ShellCommand(pid_t pid, int in_fd, int out_fd, int err_fd) + : pid(pid), in(in_fd), out(out_fd), err(err_fd) {}; + + static std::unique_ptr executeImpl(const char * filename, char * const argv[]); + +public: + WriteBufferFromFile in; /// Если команда читает из stdin, то не забудьте вызвать in.close() после записи туда всех данных. + ReadBufferFromFile out; + ReadBufferFromFile err; + + /// Выполнить команду с использованием /bin/sh -c + static std::unique_ptr execute(const std::string & command); + + /// Выполнить исполняемый файл с указаннами аргументами. arguments - без argv[0]. + static std::unique_ptr executeDirect(const std::string & path, const std::vector & arguments); + + /// Подождать завершения процесса, кинуть исключение, если код не 0 или если процесс был завершён не самостоятельно. + void wait(); + + /// Подождать завершения процесса, узнать код возврата. Кинуть исключение, если процесс был завершён не самостоятельно. + int tryWait(); +}; + + +} diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index a8445610850..34e25e7968c 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -299,6 +299,14 @@ namespace ErrorCodes RECEIVED_EMPTY_DATA = 295, NO_REMOTE_SHARD_FOUND = 296, SHARD_HAS_NO_CONNECTIONS = 297, + CANNOT_PIPE = 298, + CANNOT_FORK = 299, + CANNOT_DLSYM = 300, + CANNOT_CREATE_CHILD_PROCESS = 301, + CHILD_WAS_NOT_EXITED_NORMALLY = 302, + CANNOT_SELECT = 303, + CANNOT_WAITPID = 304, + TABLE_WAS_NOT_DROPPED = 305, KEEPER_EXCEPTION = 999, POCO_EXCEPTION = 1000, diff --git a/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h b/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h index 1d989db265f..175afc1e15f 100644 --- a/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB @@ -11,9 +12,47 @@ namespace DB /** Доагрегирует потоки блоков, держа в оперативной памяти только по одному или несколько (до merging_threads) блоков из каждого источника. - * Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом потоке будет до 256 блоков с частями результата. + * Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом источнике будет до 256 блоков с частями результата. * * Агрегатные функции в блоках не должны быть финализированы, чтобы их состояния можно было объединить. + * + * Используется для решения двух задач: + * + * 1. Внешняя агрегация со сбросом данных на диск. + * Частично агрегированные данные (предварительно разбитые на 256 корзин) сброшены в какое-то количество файлов на диске. + * Нужно читать их и мерджить по корзинам - держа в оперативке одновременно только несколько корзин из каждого файла. + * + * 2. Слияние результатов агрегации при распределённой обработке запроса. + * С разных серверов приезжают частично агрегированные данные, которые могут быть разбиты, а могут быть не разбиты на 256 корзин, + * и эти корзины отдаются нам по сети с каждого сервера последовательно, друг за другом. + * Надо так же читать и мерджить по корзинам. + * + * Суть работы: + * + * Есть какое-то количество источников. Они отдают блоки с частично агрегированными данными. + * Каждый источник может отдать одну из следующих последовательностей блоков: + * 1. "неразрезанный" блок с bucket_num = -1; + * 2. "разрезанные" (two_level) блоки с bucket_num от 0 до 255; + * В обоих случаях, может ещё присутствовать блок "переполнений" (overflows) с bucket_num = -1 и is_overflows = true; + * + * Исходим из соглашения, что разрезанные блоки всегда передаются в порядке bucket_num. + * То есть, если a < b, то блок с bucket_num = a идёт раньше bucket_num = b. + * Это нужно для экономного по памяти слияния + * - чтобы не надо было читать блоки наперёд, а идти по всем последовательностям по возрастанию bucket_num. + * + * При этом, не все bucket_num из диапазона 0..255 могут присутствовать. + * Блок переполнений может присутствовать в любом порядке относительно других блоков (но он может быть только один). + * + * Необходимо объединить эти последовательности блоков и отдать результат в виде последовательности с такими же свойствами. + * То есть, на выходе, если в последовательности есть "разрезанные" блоки, то они должны идти в порядке bucket_num. + * + * Мердж можно осуществлять с использованием нескольких (merging_threads) потоков. + * Для этого, получение набора блоков для следующего bucket_num надо делать последовательно, + * а затем, когда мы имеем несколько полученных наборов, их объединение можно делать параллельно. + * + * При получении следующих блоков из разных источников, + * данные из источников можно также читать в несколько потоков (reading_threads) + * для оптимальной работы при наличии быстрой сети или дисков (откуда эти блоки читаются). */ class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream { @@ -22,7 +61,7 @@ public: BlockInputStreams inputs_, const Aggregator::Params & params, bool final_, size_t reading_threads_, size_t merging_threads_); - ~MergingAggregatedMemoryEfficientBlockInputStream(); + ~MergingAggregatedMemoryEfficientBlockInputStream() override; String getName() const override { return "MergingAggregatedMemoryEfficient"; } @@ -31,20 +70,34 @@ public: /// Отправляет запрос (инициирует вычисления) раньше, чем read. void readPrefix() override; + /// Вызывается либо после того, как всё прочитано, либо после cancel-а. + void readSuffix() override; + + /** Отличается от реализации по-умолчанию тем, что пытается остановить все источники, + * пропуская отвалившиеся по эксепшену. + */ + void cancel() override; + protected: Block readImpl() override; private: + static constexpr size_t NUM_BUCKETS = 256; + Aggregator aggregator; bool final; size_t reading_threads; size_t merging_threads; bool started = false; + bool all_read = false; volatile bool has_two_level = false; volatile bool has_overflows = false; int current_bucket_num = -1; + Logger * log = &Logger::get("MergingAggregatedMemoryEfficientBlockInputStream"); + + struct Input { BlockInputStreamPtr stream; @@ -68,31 +121,34 @@ private: std::unique_ptr reading_pool; /// Для параллельного мерджа. - struct OutputData - { - Block block; - std::exception_ptr exception; - - OutputData() {} - OutputData(Block && block_) : block(std::move(block_)) {} - OutputData(std::exception_ptr && exception_) : exception(std::move(exception_)) {} - }; struct ParallelMergeData { boost::threadpool::pool pool; + /// Сейчас один из мерджащих потоков получает следующие блоки для мерджа. Эта операция должна делаться последовательно. std::mutex get_next_blocks_mutex; - ConcurrentBoundedQueue result_queue; bool exhausted = false; /// Данных больше нет. bool finish = false; /// Нужно завершить работу раньше, чем данные закончились. - std::atomic active_threads; - ParallelMergeData(size_t max_threads) : pool(max_threads), result_queue(max_threads), active_threads(max_threads) {} + std::exception_ptr exception; + /// Следует отдавать блоки стого в порядке ключа (bucket_num). + /// Если значение - пустой блок - то нужно дождаться его мерджа. + /// (Такое значение означает обещание, что здесь будут данные. Это важно, потому что данные нужно отдавать в порядке ключа - bucket_num) + std::map merged_blocks; + std::mutex merged_blocks_mutex; + /// Событие, с помощью которого мерджащие потоки говорят главному потоку, что новый блок готов. + std::condition_variable merged_blocks_changed; + /// Событие, с помощью которого главный поток говорят мерджащим потокам, что можно обработать следующую группу блоков. + std::condition_variable have_space; + + ParallelMergeData(size_t max_threads) : pool(max_threads) {} }; std::unique_ptr parallel_merge_data; void mergeThread(MemoryTracker * memory_tracker); + + void finalize(); }; } diff --git a/dbms/include/DB/Functions/FunctionsArray.h b/dbms/include/DB/Functions/FunctionsArray.h index 5a41d0d9488..9f6a5b00fa9 100644 --- a/dbms/include/DB/Functions/FunctionsArray.h +++ b/dbms/include/DB/Functions/FunctionsArray.h @@ -18,6 +18,12 @@ #include #include #include +#include +#include +#include +#include +#include +#include #include @@ -47,6 +53,8 @@ namespace DB * - для кортежей из элементов на соответствующих позициях в нескольких массивах. * * emptyArrayToSingle(arr) - заменить пустые массивы на массивы из одного элемента со значением "по-умолчанию". + * + * arrayReduce('agg', arr1, ...) - применить агрегатную функцию agg к массивам arr1... */ @@ -2320,6 +2328,183 @@ private: }; +/** Применяет к массиву агрегатную функцию и возвращает её результат. + * Также может быть применена к нескольким массивам одинаковых размеров, если агрегатная функция принимает несколько аргументов. + */ +class FunctionArrayReduce : public IFunction +{ +public: + static constexpr auto name = "arrayReduce"; + static IFunction * create(const Context & context) { return new FunctionArrayReduce; } + + /// Получить имя функции. + String getName() const override + { + return name; + } + + void getReturnTypeAndPrerequisites( + const ColumnsWithTypeAndName & arguments, + DataTypePtr & out_return_type, + std::vector & out_prerequisites) override + { + /// Первый аргумент - константная строка с именем агрегатной функции (возможно, с параметрами в скобках, например: "quantile(0.99)"). + + if (arguments.size() < 2) + throw Exception("Number of arguments for function " + getName() + " doesn't match: passed " + + toString(arguments.size()) + ", should be at least 2.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + const ColumnConstString * aggregate_function_name_column = typeid_cast(arguments[0].column.get()); + if (!aggregate_function_name_column) + throw Exception("First argument for function " + getName() + " must be constant string: name of aggregate function.", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + DataTypes argument_types(arguments.size() - 1); + for (size_t i = 1, size = arguments.size(); i < size; ++i) + { + const DataTypeArray * arg = typeid_cast(arguments[i].type.get()); + if (!arg) + throw Exception("Argument " + toString(i) + " for function " + getName() + " must be array.", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + argument_types[i - 1] = arg->getNestedType()->clone(); + } + + if (!aggregate_function) + { + const String & aggregate_function_name_with_params = aggregate_function_name_column->getData(); + + if (aggregate_function_name_with_params.empty()) + throw Exception("First argument for function " + getName() + " (name of aggregate function) cannot be empty.", + ErrorCodes::BAD_ARGUMENTS); + + bool has_parameters = ')' == aggregate_function_name_with_params.back(); + + String aggregate_function_name = aggregate_function_name_with_params; + String parameters; + Array params_row; + + if (has_parameters) + { + size_t pos = aggregate_function_name_with_params.find('('); + if (pos == std::string::npos || pos + 2 >= aggregate_function_name_with_params.size()) + throw Exception("First argument for function " + getName() + " doesn't look like aggregate function name.", + ErrorCodes::BAD_ARGUMENTS); + + aggregate_function_name = aggregate_function_name_with_params.substr(0, pos); + parameters = aggregate_function_name_with_params.substr(pos + 1, aggregate_function_name_with_params.size() - pos - 2); + + if (aggregate_function_name.empty()) + throw Exception("First argument for function " + getName() + " doesn't look like aggregate function name.", + ErrorCodes::BAD_ARGUMENTS); + + ParserExpressionList params_parser(false); + ASTPtr args_ast = parseQuery(params_parser, + parameters.data(), parameters.data() + parameters.size(), + "parameters of aggregate function"); + + ASTExpressionList & args_list = typeid_cast(*args_ast); + + if (args_list.children.empty()) + throw Exception("Incorrect list of parameters to aggregate function " + + aggregate_function_name, ErrorCodes::BAD_ARGUMENTS); + + params_row.reserve(args_list.children.size()); + for (const auto & child : args_list.children) + { + const ASTLiteral * lit = typeid_cast(child.get()); + if (!lit) + throw Exception("Parameters to aggregate functions must be literals", + ErrorCodes::PARAMETERS_TO_AGGREGATE_FUNCTIONS_MUST_BE_LITERALS); + + params_row.push_back(lit->value); + } + } + + aggregate_function = AggregateFunctionFactory().get(aggregate_function_name, argument_types); + + /// Потому что владение состояниями агрегатных функций никуда не отдаётся. + if (aggregate_function->isState()) + throw Exception("Using aggregate function with -State modifier in function arrayReduce is not supported", ErrorCodes::BAD_ARGUMENTS); + + if (has_parameters) + aggregate_function->setParameters(params_row); + aggregate_function->setArguments(argument_types); + } + + out_return_type = aggregate_function->getReturnType(); + } + + + void execute(Block & block, const ColumnNumbers & arguments, size_t result) override + { + IAggregateFunction & agg_func = *aggregate_function.get(); + std::unique_ptr place_holder { new char[agg_func.sizeOfData()] }; + AggregateDataPtr place = place_holder.get(); + + size_t rows = block.rowsInFirstColumn(); + + /// Агрегатные функции не поддерживают константные столбцы. Поэтому, материализуем их. + std::vector materialized_columns; + + std::vector aggregate_arguments_vec(arguments.size() - 1); + + for (size_t i = 0, size = arguments.size() - 1; i < size; ++i) + { + const IColumn * col = block.unsafeGetByPosition(arguments[i + 1]).column.get(); + if (const ColumnArray * arr = typeid_cast(col)) + { + aggregate_arguments_vec[i] = arr->getDataPtr().get(); + } + else if (const ColumnConstArray * arr = typeid_cast(col)) + { + materialized_columns.emplace_back(arr->convertToFullColumn()); + aggregate_arguments_vec[i] = typeid_cast(*materialized_columns.back().get()).getDataPtr().get(); + } + else + throw Exception("Illegal column " + col->getName() + " as argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN); + + } + const IColumn ** aggregate_arguments = aggregate_arguments_vec.data(); + + const ColumnArray::Offsets_t & offsets = typeid_cast(!materialized_columns.empty() + ? *materialized_columns.front().get() + : *block.unsafeGetByPosition(arguments[1]).column.get()).getOffsets(); + + ColumnPtr result_holder = block.getByPosition(result).type->createColumn(); + block.getByPosition(result).column = result_holder; + IColumn & res_col = *result_holder.get(); + + ColumnArray::Offset_t current_offset = 0; + for (size_t i = 0; i < rows; ++i) + { + agg_func.create(place); + ColumnArray::Offset_t next_offset = offsets[i]; + + try + { + for (size_t j = current_offset; j < next_offset; ++j) + agg_func.add(place, aggregate_arguments, j); + + agg_func.insertResultInto(place, res_col); + } + catch (...) + { + agg_func.destroy(place); + throw; + } + + agg_func.destroy(place); + current_offset = next_offset; + } + } + +private: + AggregateFunctionPtr aggregate_function; +}; + + struct NameHas { static constexpr auto name = "has"; }; struct NameIndexOf { static constexpr auto name = "indexOf"; }; struct NameCountEqual { static constexpr auto name = "countEqual"; }; diff --git a/dbms/include/DB/Functions/FunctionsDateTime.h b/dbms/include/DB/Functions/FunctionsDateTime.h index 94913eb6296..5faecf01943 100644 --- a/dbms/include/DB/Functions/FunctionsDateTime.h +++ b/dbms/include/DB/Functions/FunctionsDateTime.h @@ -587,17 +587,27 @@ public: + toString(arguments.size()) + ", should be 1 or 2.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - if (typeid_cast(&*arguments[0]) == nullptr) + if (typeid_cast(&*arguments[0]) != nullptr) { - if (arguments.size() != 1) + if (arguments.size() > 1) throw Exception("Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size()) + ", should be 1.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); } - else if ((arguments.size()) == 2 && typeid_cast(&*arguments[1]) == nullptr) + else if (typeid_cast(&*arguments[0]) != nullptr) + { + /// Ничего не делаем. + } + else + throw Exception{ + "Illegal type " + arguments[0]->getName() + " of argument 1 of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + + if ((arguments.size() == 2) && (typeid_cast(&*arguments[1]) == nullptr)) { throw Exception{ - "Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[1]->getName() + " of argument 2 of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } diff --git a/dbms/include/DB/IO/CachedCompressedReadBuffer.h b/dbms/include/DB/IO/CachedCompressedReadBuffer.h index 510e93b116a..169f3c0e5bb 100644 --- a/dbms/include/DB/IO/CachedCompressedReadBuffer.h +++ b/dbms/include/DB/IO/CachedCompressedReadBuffer.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -23,8 +24,7 @@ private: size_t estimated_size; size_t aio_threshold; - /// SharedPtr - для ленивой инициализации (только в случае кэш-промаха). - Poco::SharedPtr file_in; + std::unique_ptr file_in; size_t file_pos; /// Кусок данных из кэша, или кусок считанных данных, который мы положим в кэш. @@ -34,8 +34,11 @@ private: { if (!file_in) { - file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size); + file_in.reset(createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size)); compressed_in = &*file_in; + + if (profile_callback) + file_in->setProfileCallback(profile_callback, clock_type); } } @@ -81,6 +84,11 @@ private: return true; } + + /// Передаётся в file_in. + ReadBufferFromFileBase::ProfileCallback profile_callback; + clockid_t clock_type; + public: CachedCompressedReadBuffer( const std::string & path_, UncompressedCache * cache_, size_t estimated_size_, size_t aio_threshold_, @@ -115,6 +123,13 @@ public: bytes -= offset(); } } + + + void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE) + { + profile_callback = profile_callback_; + clock_type = clock_type_; + } }; } diff --git a/dbms/include/DB/IO/CompressedReadBufferFromFile.h b/dbms/include/DB/IO/CompressedReadBufferFromFile.h index a8aa293e679..6bfd6fb6a37 100644 --- a/dbms/include/DB/IO/CompressedReadBufferFromFile.h +++ b/dbms/include/DB/IO/CompressedReadBufferFromFile.h @@ -120,6 +120,11 @@ public: return bytes_read; } + + void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE) + { + file_in.setProfileCallback(profile_callback_, clock_type_); + } }; } diff --git a/dbms/include/DB/IO/ReadBufferFromFile.h b/dbms/include/DB/IO/ReadBufferFromFile.h index 69ce0fdfc7f..6fcedd4334d 100644 --- a/dbms/include/DB/IO/ReadBufferFromFile.h +++ b/dbms/include/DB/IO/ReadBufferFromFile.h @@ -14,7 +14,7 @@ class ReadBufferFromFile : public ReadBufferFromFileDescriptor { private: std::string file_name; - + public: ReadBufferFromFile(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, char * existing_memory = nullptr, size_t alignment = 0) @@ -28,9 +28,28 @@ public: throwFromErrno("Cannot open file " + file_name, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE); } + /// Использовать уже открытый файл. + ReadBufferFromFile(int fd, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, + char * existing_memory = nullptr, size_t alignment = 0) + : ReadBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), file_name("(fd = " + toString(fd) + ")") + { + } + virtual ~ReadBufferFromFile() { - close(fd); + if (fd < 0) + return; + + ::close(fd); + } + + /// Закрыть файл раньше вызова деструктора. + void close() + { + if (0 != ::close(fd)) + throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE); + + fd = -1; } virtual std::string getFileName() diff --git a/dbms/include/DB/IO/ReadBufferFromFileBase.h b/dbms/include/DB/IO/ReadBufferFromFileBase.h index 9681977d08a..caf69dd8473 100644 --- a/dbms/include/DB/IO/ReadBufferFromFileBase.h +++ b/dbms/include/DB/IO/ReadBufferFromFileBase.h @@ -19,7 +19,27 @@ public: virtual std::string getFileName() const = 0; virtual int getFD() const = 0; + /// Есть возможность получать информацию о времени каждого чтения. + struct ProfileInfo + { + size_t bytes_requested; + size_t bytes_read; + size_t nanoseconds; + }; + + using ProfileCallback = std::function; + + /// CLOCK_MONOTONIC_COARSE более чем достаточно для отслеживания долгих чтений - например, залипаний на секунды. + void setProfileCallback(const ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE) + { + profile_callback = profile_callback_; + clock_type = clock_type_; + } + protected: + ProfileCallback profile_callback; + clockid_t clock_type; + virtual off_t doSeek(off_t off, int whence) = 0; }; diff --git a/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h b/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h index 5aeaabf6774..c907ab7c21a 100644 --- a/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h +++ b/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h @@ -2,8 +2,12 @@ #include #include +#include + +#include #include +#include #include #include @@ -32,6 +36,10 @@ protected: { ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead); + std::experimental::optional watch; + if (profile_callback) + watch.emplace(clock_type); + ssize_t res = ::read(fd, internal_buffer.begin(), internal_buffer.size()); if (!res) break; @@ -41,6 +49,15 @@ protected: if (res > 0) bytes_read += res; + + if (profile_callback) + { + ProfileInfo info; + info.bytes_requested = internal_buffer.size(); + info.bytes_read = res; + info.nanoseconds = watch->elapsed(); + profile_callback(info); + } } pos_in_file += bytes_read; @@ -76,7 +93,9 @@ public: return pos_in_file - (working_buffer.end() - pos); } + private: + /// Если offset такой маленький, что мы не выйдем за пределы буфера, настоящий seek по файлу не делается. off_t doSeek(off_t offset, int whence) override { @@ -108,6 +127,23 @@ private: return res; } } + + + /// При условии, что файловый дескриптор позволяет использовать select, проверяет в течение таймаута, есть ли данные для чтения. + bool poll(size_t timeout_microseconds) + { + fd_set fds; + FD_ZERO(&fds); + FD_SET(fd, &fds); + timeval timeout = { time_t(timeout_microseconds / 1000000), time_t(timeout_microseconds % 1000000) }; + + int res = select(1, &fds, 0, 0, &timeout); + + if (-1 == res) + throwFromErrno("Cannot select", ErrorCodes::CANNOT_SELECT); + + return res > 0; + } }; } diff --git a/dbms/include/DB/IO/WriteBufferFromFile.h b/dbms/include/DB/IO/WriteBufferFromFile.h index d3a8553ac35..9c542bbdf51 100644 --- a/dbms/include/DB/IO/WriteBufferFromFile.h +++ b/dbms/include/DB/IO/WriteBufferFromFile.h @@ -18,7 +18,7 @@ class WriteBufferFromFile : public WriteBufferFromFileDescriptor { private: std::string file_name; - + public: WriteBufferFromFile(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, mode_t mode = 0666, char * existing_memory = nullptr, size_t alignment = 0) @@ -27,13 +27,23 @@ public: ProfileEvents::increment(ProfileEvents::FileOpen); fd = open(file_name.c_str(), flags == -1 ? O_WRONLY | O_TRUNC | O_CREAT : flags, mode); - + if (-1 == fd) throwFromErrno("Cannot open file " + file_name, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE); } + /// Использовать уже открытый файл. + WriteBufferFromFile(int fd, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, mode_t mode = 0666, + char * existing_memory = nullptr, size_t alignment = 0) + : WriteBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), file_name("(fd = " + toString(fd) + ")") + { + } + ~WriteBufferFromFile() { + if (fd < 0) + return; + try { next(); @@ -43,9 +53,20 @@ public: tryLogCurrentException(__PRETTY_FUNCTION__); } - close(fd); + ::close(fd); } - + + /// Закрыть файл раньше вызова деструктора. + void close() + { + next(); + + if (0 != ::close(fd)) + throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE); + + fd = -1; + } + /** fsync() transfers ("flushes") all modified in-core data of (i.e., modified buffer cache pages for) the file * referred to by the file descriptor fd to the disk device (or other permanent storage device) * so that all changed information can be retrieved even after the system crashed or was rebooted. diff --git a/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h b/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h index 17fbb70acbc..b6c48fb547c 100644 --- a/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h +++ b/dbms/include/DB/IO/WriteBufferFromFileDescriptor.h @@ -67,7 +67,8 @@ public: { try { - next(); + if (fd >= 0) + next(); } catch (...) { diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index 8428f6f0a97..357181a3791 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -176,8 +176,18 @@ struct Settings M(SettingUInt64, select_sequential_consistency, 0) \ /** Максимальное количество различных шардов и максимальное количество реплик одного шарда в функции remote. */ \ M(SettingUInt64, table_function_remote_max_addresses, 1000) \ - /** Маскимальное количество потоков при распределённой обработке одного запроса **/ \ + /** Маскимальное количество потоков при распределённой обработке одного запроса */ \ M(SettingUInt64, max_distributed_processing_threads, 8) \ + \ + /** Настройки понижения числа потоков в случае медленных чтений. */ \ + /** Обращать внимания только на чтения, занявшие не меньше такого количества времени. */ \ + M(SettingMilliseconds, read_backoff_min_latency_ms, 1000) \ + /** Считать события, когда пропускная способность меньше стольки байт в секунду. */ \ + M(SettingUInt64, read_backoff_max_throughput, 1048576) \ + /** Не обращать внимания на событие, если от предыдущего прошло меньше стольки-то времени. */ \ + M(SettingMilliseconds, read_backoff_min_interval_between_events_ms, 1000) \ + /** Количество событий, после которого количество потоков будет уменьшено. */ \ + M(SettingUInt64, read_backoff_min_events, 2) \ /// Всевозможные ограничения на выполнение запроса. Limits limits; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeReadPool.h b/dbms/include/DB/Storages/MergeTree/MergeTreeReadPool.h index f982d7a882c..6c8469b8538 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeReadPool.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeReadPool.h @@ -48,19 +48,64 @@ using MergeTreeReadTaskPtr = std::unique_ptr; /** Provides read tasks for MergeTreeThreadBlockInputStream`s in fine-grained batches, allowing for more * uniform distribution of work amongst multiple threads. All parts and their ranges are divided into `threads` * workloads with at most `sum_marks / threads` marks. Then, threads are performing reads from these workloads - * in "sequential" manner, requesting work in small batches. As soon as some thread some thread has exhausted + * in "sequential" manner, requesting work in small batches. As soon as some thread has exhausted * it's workload, it either is signaled that no more work is available (`do_not_steal_tasks == false`) or * continues taking small batches from other threads' workloads (`do_not_steal_tasks == true`). */ class MergeTreeReadPool { +public: + /** Пул может динамически уменьшать количество потоков, если чтения происходят медленно. + * Настройки порогов для такого уменьшения. + */ + struct BackoffSettings + { + /// Обращать внимания только на чтения, занявшие не меньше такого количества времени. Если выставлено в 0 - значит backoff выключен. + size_t min_read_latency_ms = 1000; + /// Считать события, когда пропускная способность меньше стольки байт в секунду. + size_t max_throughput = 1048576; + /// Не обращать внимания на событие, если от предыдущего прошло меньше стольки-то времени. + size_t min_interval_between_events_ms = 1000; + /// Количество событий, после которого количество потоков будет уменьшено. + size_t min_events = 2; + + /// Константы выше приведены лишь в качестве примера. + BackoffSettings(const Settings & settings) + : min_read_latency_ms(settings.read_backoff_min_latency_ms.totalMilliseconds()), + max_throughput(settings.read_backoff_max_throughput), + min_interval_between_events_ms(settings.read_backoff_min_interval_between_events_ms.totalMilliseconds()), + min_events(settings.read_backoff_min_events) + { + } + + BackoffSettings() : min_read_latency_ms(0) {} + }; + + BackoffSettings backoff_settings; + +private: + /** Состояние для отслеживания скорости чтений. + */ + struct BackoffState + { + size_t current_threads; + Stopwatch time_since_prev_event {CLOCK_MONOTONIC_COARSE}; + size_t num_events = 0; + + BackoffState(size_t threads) : current_threads(threads) {} + }; + + BackoffState backoff_state; + public: MergeTreeReadPool( const std::size_t threads, const std::size_t sum_marks, const std::size_t min_marks_for_concurrent_read, RangesInDataParts parts, MergeTreeData & data, const ExpressionActionsPtr & prewhere_actions, const String & prewhere_column_name, const bool check_columns, const Names & column_names, + const BackoffSettings & backoff_settings, const bool do_not_steal_tasks = false) - : data{data}, column_names{column_names}, do_not_steal_tasks{do_not_steal_tasks} + : backoff_settings{backoff_settings}, backoff_state{threads}, + data{data}, column_names{column_names}, do_not_steal_tasks{do_not_steal_tasks} { const auto per_part_sum_marks = fillPerPartInfo(parts, prewhere_actions, prewhere_column_name, check_columns); fillPerThreadInfo(threads, sum_marks, per_part_sum_marks, parts, min_marks_for_concurrent_read); @@ -73,6 +118,10 @@ public: { const std::lock_guard lock{mutex}; + /// Если количество потоков было уменьшено из-за backoff, то не будем отдавать задачи для более чем backoff_state.current_threads потоков. + if (thread >= backoff_state.current_threads) + return nullptr; + if (remaining_thread_tasks.empty()) return nullptr; @@ -150,6 +199,50 @@ public: per_part_remove_prewhere_column[part_idx], per_part_should_reorder[part_idx]); } + /** Каждый обработчик задач может вызвать этот метод, передав в него информацию о скорости чтения. + * Если скорость чтения слишком низкая, то пул может принять решение уменьшить число потоков - не отдавать больше задач в некоторые потоки. + * Это позволяет бороться с чрезмерной нагрузкой на дисковую подсистему в случаях, когда чтения осуществляются не из page cache. + */ + void profileFeedback(const ReadBufferFromFileBase::ProfileInfo info) + { + if (backoff_settings.min_read_latency_ms == 0 || do_not_steal_tasks) + return; + + if (info.nanoseconds < backoff_settings.min_read_latency_ms * 1000000) + return; + + std::lock_guard lock(mutex); + + if (backoff_state.current_threads <= 1) + return; + + size_t throughput = info.bytes_read * 1000000000 / info.nanoseconds; + + if (throughput >= backoff_settings.max_throughput) + return; + + if (backoff_state.time_since_prev_event.elapsed() < backoff_settings.min_interval_between_events_ms * 1000000) + return; + + backoff_state.time_since_prev_event.restart(); + ++backoff_state.num_events; + + ProfileEvents::increment(ProfileEvents::SlowRead); + LOG_DEBUG(log, std::fixed << std::setprecision(3) + << "Slow read, event №" << backoff_state.num_events + << ": read " << info.bytes_read << " bytes in " << info.nanoseconds / 1000000000.0 << " sec., " + << info.bytes_read * 1000.0 / info.nanoseconds << " MB/s."); + + if (backoff_state.num_events < backoff_settings.min_events) + return; + + backoff_state.num_events = 0; + --backoff_state.current_threads; + + ProfileEvents::increment(ProfileEvents::ReadBackoff); + LOG_DEBUG(log, "Will lower number of threads to " << backoff_state.current_threads); + } + private: std::vector fillPerPartInfo( RangesInDataParts & parts, const ExpressionActionsPtr & prewhere_actions, const String & prewhere_column_name, @@ -381,7 +474,7 @@ private: std::vector> per_part_columns_lock; MergeTreeData & data; Names column_names; - const bool do_not_steal_tasks; + bool do_not_steal_tasks; std::vector per_part_column_name_set; std::vector per_part_columns; std::vector per_part_pre_columns; @@ -414,6 +507,8 @@ private: std::set remaining_thread_tasks; mutable std::mutex mutex; + + Logger * log = &Logger::get("MergeTreeReadPool"); }; using MergeTreeReadPoolPtr = std::shared_ptr; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h index beb7786b522..59114ff2b52 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h @@ -24,14 +24,17 @@ namespace DB class MergeTreeReader { using OffsetColumns = std::map; - using ValueSizeMap = std::map; public: + using ValueSizeMap = std::map; + MergeTreeReader(const String & path, /// Путь к куску const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns, UncompressedCache * uncompressed_cache, MarkCache * mark_cache, MergeTreeData & storage, const MarkRanges & all_mark_ranges, - size_t aio_threshold, size_t max_read_buffer_size, const ValueSizeMap & avg_value_size_hints = ValueSizeMap{}) + size_t aio_threshold, size_t max_read_buffer_size, const ValueSizeMap & avg_value_size_hints = ValueSizeMap{}, + const ReadBufferFromFileBase::ProfileCallback & profile_callback = ReadBufferFromFileBase::ProfileCallback{}, + clockid_t clock_type = CLOCK_MONOTONIC_COARSE) : avg_value_size_hints(avg_value_size_hints), path(path), data_part(data_part), columns(columns), uncompressed_cache(uncompressed_cache), mark_cache(mark_cache), storage(storage), all_mark_ranges(all_mark_ranges), aio_threshold(aio_threshold), max_read_buffer_size(max_read_buffer_size) @@ -42,7 +45,7 @@ public: throw Exception("Part " + path + " is missing", ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART); for (const NameAndTypePair & column : columns) - addStream(column.name, *column.type, all_mark_ranges); + addStream(column.name, *column.type, all_mark_ranges, profile_callback, clock_type); } catch (...) { @@ -151,7 +154,8 @@ private: Stream( const String & path_prefix_, UncompressedCache * uncompressed_cache, MarkCache * mark_cache, - const MarkRanges & all_mark_ranges, size_t aio_threshold, size_t max_read_buffer_size) + const MarkRanges & all_mark_ranges, size_t aio_threshold, size_t max_read_buffer_size, + const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type) : path_prefix(path_prefix_) { loadMarks(mark_cache); @@ -205,12 +209,20 @@ private: { cached_buffer = std::make_unique( path_prefix + ".bin", uncompressed_cache, estimated_size, aio_threshold, buffer_size); + + if (profile_callback) + cached_buffer->setProfileCallback(profile_callback, clock_type); + data_buffer = cached_buffer.get(); } else { non_cached_buffer = std::make_unique( path_prefix + ".bin", estimated_size, aio_threshold, buffer_size); + + if (profile_callback) + non_cached_buffer->setProfileCallback(profile_callback, clock_type); + data_buffer = non_cached_buffer.get(); } } @@ -289,7 +301,9 @@ private: size_t aio_threshold; size_t max_read_buffer_size; - void addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges, size_t level = 0) + void addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges, + const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type, + size_t level = 0) { String escaped_column_name = escapeForFileName(name); @@ -310,14 +324,14 @@ private: if (!streams.count(size_name)) streams.emplace(size_name, std::make_unique( path + escaped_size_name, uncompressed_cache, mark_cache, - all_mark_ranges, aio_threshold, max_read_buffer_size)); + all_mark_ranges, aio_threshold, max_read_buffer_size, profile_callback, clock_type)); - addStream(name, *type_arr->getNestedType(), all_mark_ranges, level + 1); + addStream(name, *type_arr->getNestedType(), all_mark_ranges, profile_callback, clock_type, level + 1); } else streams.emplace(name, std::make_unique( path + escaped_column_name, uncompressed_cache, mark_cache, - all_mark_ranges, aio_threshold, max_read_buffer_size)); + all_mark_ranges, aio_threshold, max_read_buffer_size, profile_callback, clock_type)); } diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h index 8e26219f4a4..e3229fc628f 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h @@ -102,6 +102,9 @@ private: const auto path = storage.getFullPath() + task->data_part->name + '/'; + /// Позволяет пулу уменьшать количество потоков в случае слишком медленных чтений. + auto profile_callback = [this](ReadBufferFromFileBase::ProfileInfo info) { pool->profileFeedback(info); }; + if (!reader) { if (use_uncompressed_cache) @@ -111,13 +114,13 @@ private: reader = std::make_unique( path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), - storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size); + storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback); if (prewhere_actions) pre_reader = std::make_unique( path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), storage, task->mark_ranges, min_bytes_to_use_direct_io, - max_read_buffer_size); + max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback); } else { @@ -125,13 +128,13 @@ private: reader = std::make_unique( path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size, - reader->getAvgValueSizeHints()); + reader->getAvgValueSizeHints(), profile_callback); if (prewhere_actions) pre_reader = std::make_unique( path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), storage, task->mark_ranges, min_bytes_to_use_direct_io, - max_read_buffer_size, pre_reader->getAvgValueSizeHints()); + max_read_buffer_size, pre_reader->getAvgValueSizeHints(), profile_callback); } return true; diff --git a/dbms/src/Client/Client.cpp b/dbms/src/Client/Client.cpp index af6b65607bb..d746e486248 100644 --- a/dbms/src/Client/Client.cpp +++ b/dbms/src/Client/Client.cpp @@ -238,6 +238,21 @@ private: } + /// Стоит ли сделать хоть что-нибудь ради праздника. + bool isNewYearMode() + { + time_t current_time = time(0); + + /// Плохо быть навязчивым. + if (current_time % 3 != 0) + return false; + + mysqlxx::Date now(current_time); + return (now.month() == 12 && now.day() >= 20) + || (now.month() == 1 && now.day() <= 5); + } + + int mainImpl(const std::vector & args) { /** Будем работать в batch режиме, если выполнено одно из следующих условий: @@ -305,7 +320,7 @@ private: loop(); - std::cout << "Bye." << std::endl; + std::cout << (isNewYearMode() ? "Happy new year." : "Bye.") << std::endl; return 0; } diff --git a/dbms/src/Common/ShellCommand.cpp b/dbms/src/Common/ShellCommand.cpp new file mode 100644 index 00000000000..70d62cea7c1 --- /dev/null +++ b/dbms/src/Common/ShellCommand.cpp @@ -0,0 +1,193 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include + + +namespace +{ + struct Pipe + { + union + { + int fds[2]; + struct + { + int read_fd; + int write_fd; + }; + }; + + Pipe() + { + if (0 != pipe2(fds, O_CLOEXEC)) + DB::throwFromErrno("Cannot create pipe", DB::ErrorCodes::CANNOT_PIPE); + } + + ~Pipe() + { + if (read_fd >= 0) + close(read_fd); + if (write_fd >= 0) + close(write_fd); + } + }; + + /// По этим кодам возврата из дочернего процесса мы узнаем (наверняка) об ошибках при его создании. + enum class ReturnCodes : int + { + CANNOT_DUP_STDIN = 42, /// Значение не принципиально, но выбрано так, чтобы редко конфликтовать с кодом возврата программы. + CANNOT_DUP_STDOUT = 43, + CANNOT_DUP_STDERR = 44, + CANNOT_EXEC = 45, + }; +} + + +namespace DB +{ + + +std::unique_ptr ShellCommand::executeImpl(const char * filename, char * const argv[]) +{ + /** Тут написано, что при обычном вызове vfork, есть шанс deadlock-а в многопоточных программах, + * из-за резолвинга символов в shared-библиотеке: + * http://www.oracle.com/technetwork/server-storage/solaris10/subprocess-136439.html + * Поэтому, отделим резолвинг символа от вызова. + */ + static void * real_vfork = dlsym(RTLD_DEFAULT, "vfork"); + + if (!real_vfork) + throwFromErrno("Cannot find symbol vfork in myself", ErrorCodes::CANNOT_DLSYM); + + Pipe pipe_stdin; + Pipe pipe_stdout; + Pipe pipe_stderr; + + pid_t pid = reinterpret_cast(real_vfork)(); + + if (-1 == pid) + throwFromErrno("Cannot vfork", ErrorCodes::CANNOT_FORK); + + if (0 == pid) + { + /// Находимся в свежесозданном процессе. + + /// Почему _exit а не exit? Потому что exit вызывает atexit и деструкторы thread local storage. + /// А там куча мусора (в том числе, например, блокируется mutex). А это нельзя делать после vfork - происходит deadlock. + + /// Заменяем файловые дескрипторы на концы наших пайпов. + if (STDIN_FILENO != dup2(pipe_stdin.read_fd, STDIN_FILENO)) + _exit(int(ReturnCodes::CANNOT_DUP_STDIN)); + + if (STDOUT_FILENO != dup2(pipe_stdout.write_fd, STDOUT_FILENO)) + _exit(int(ReturnCodes::CANNOT_DUP_STDOUT)); + + if (STDERR_FILENO != dup2(pipe_stderr.write_fd, STDERR_FILENO)) + _exit(int(ReturnCodes::CANNOT_DUP_STDERR)); + + execv(filename, argv); + /// Если процесс запущен, то execv не возвращает сюда. + + _exit(int(ReturnCodes::CANNOT_EXEC)); + } + + std::unique_ptr res(new ShellCommand(pid, pipe_stdin.write_fd, pipe_stdout.read_fd, pipe_stderr.read_fd)); + + /// Теперь владение файловыми дескрипторами передано в результат. + pipe_stdin.write_fd = -1; + pipe_stdout.read_fd = -1; + pipe_stderr.read_fd = -1; + + return res; +} + + +std::unique_ptr ShellCommand::execute(const std::string & command) +{ + /// Аргументы в неконстантных кусках памяти (как требуется для execv). + /// Причём, их копирование должно быть совершено раньше вызова vfork, чтобы после vfork делать минимум вещей. + std::vector argv0("sh", "sh" + strlen("sh") + 1); + std::vector argv1("-c", "-c" + strlen("-c") + 1); + std::vector argv2(command.data(), command.data() + command.size() + 1); + + char * const argv[] = { argv0.data(), argv1.data(), argv2.data(), nullptr }; + + return executeImpl("/bin/sh", argv); +} + + +std::unique_ptr ShellCommand::executeDirect(const std::string & path, const std::vector & arguments) +{ + size_t argv_sum_size = path.size() + 1; + for (const auto & arg : arguments) + argv_sum_size += arg.size() + 1; + + std::vector argv(arguments.size() + 2); + std::vector argv_data(argv_sum_size); + WriteBuffer writer(argv_data.data(), argv_sum_size); + + argv[0] = writer.position(); + writer.write(path.data(), path.size() + 1); + + for (size_t i = 0, size = arguments.size(); i < size; ++i) + { + argv[i + 1] = writer.position(); + writer.write(arguments[i].data(), arguments[i].size() + 1); + } + + argv[arguments.size() + 1] = nullptr; + + return executeImpl(path.data(), argv.data()); +} + + +int ShellCommand::tryWait() +{ + int status = 0; + if (-1 == waitpid(pid, &status, 0)) + throwFromErrno("Cannot waitpid", ErrorCodes::CANNOT_WAITPID); + + if (WIFEXITED(status)) + return WEXITSTATUS(status); + + if (WIFSIGNALED(status)) + throw Exception("Child process was terminated by signal " + toString(WTERMSIG(status)), ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY); + + if (WIFSTOPPED(status)) + throw Exception("Child process was stopped by signal " + toString(WSTOPSIG(status)), ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY); + + throw Exception("Child process was not exited normally by unknown reason", ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY); +} + + +void ShellCommand::wait() +{ + int retcode = tryWait(); + + if (retcode != EXIT_SUCCESS) + { + switch (retcode) + { + case int(ReturnCodes::CANNOT_DUP_STDIN): + throw Exception("Cannot dup2 stdin of child process", ErrorCodes::CANNOT_CREATE_CHILD_PROCESS); + case int(ReturnCodes::CANNOT_DUP_STDOUT): + throw Exception("Cannot dup2 stdout of child process", ErrorCodes::CANNOT_CREATE_CHILD_PROCESS); + case int(ReturnCodes::CANNOT_DUP_STDERR): + throw Exception("Cannot dup2 stderr of child process", ErrorCodes::CANNOT_CREATE_CHILD_PROCESS); + case int(ReturnCodes::CANNOT_EXEC): + throw Exception("Cannot execv in child process", ErrorCodes::CANNOT_CREATE_CHILD_PROCESS); + default: + throw Exception("Child process was exited with return code " + toString(retcode), ErrorCodes::CHILD_WAS_NOT_EXITED_NORMALLY); + } + } +} + + +} diff --git a/dbms/src/Common/tests/shell_command_test.cpp b/dbms/src/Common/tests/shell_command_test.cpp new file mode 100644 index 00000000000..6fcdeae6513 --- /dev/null +++ b/dbms/src/Common/tests/shell_command_test.cpp @@ -0,0 +1,49 @@ +#include +#include +#include +#include +#include + +using namespace DB; + + +int main(int arg, char ** argv) +try +{ + { + auto command = ShellCommand::execute("echo 'Hello, world!'"); + + WriteBufferFromFileDescriptor out(STDOUT_FILENO); + copyData(command->out, out); + + command->wait(); + } + + { + auto command = ShellCommand::executeDirect("/bin/echo", {"Hello, world!"}); + + WriteBufferFromFileDescriptor out(STDOUT_FILENO); + copyData(command->out, out); + + command->wait(); + } + + { + auto command = ShellCommand::execute("cat"); + + String in_str = "Hello, world!\n"; + ReadBufferFromString in(in_str); + copyData(in, command->in); + command->in.close(); + + WriteBufferFromFileDescriptor out(STDOUT_FILENO); + copyData(command->out, out); + + command->wait(); + } +} +catch (...) +{ + std::cerr << getCurrentExceptionMessage(false) << "\n"; + return 1; +} diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp index 44e6ad4ccd0..34282986528 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp @@ -34,6 +34,53 @@ void MergingAggregatedMemoryEfficientBlockInputStream::readPrefix() } +void MergingAggregatedMemoryEfficientBlockInputStream::readSuffix() +{ + if (!all_read && !is_cancelled.load(std::memory_order_seq_cst)) + throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR); + + finalize(); + + for (size_t i = 0; i < children.size(); ++i) + children[i]->readSuffix(); +} + + +void MergingAggregatedMemoryEfficientBlockInputStream::cancel() +{ + bool old_val = false; + if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) + return; + + if (parallel_merge_data) + { + std::unique_lock lock(parallel_merge_data->merged_blocks_mutex); + + parallel_merge_data->finish = true; + parallel_merge_data->merged_blocks_changed.notify_one(); + } + + for (auto & input : inputs) + { + if (IProfilingBlockInputStream * child = dynamic_cast(input.stream.get())) + { + try + { + child->cancel(); + } + catch (...) + { + /** Если не удалось попросить остановиться одного или несколько источников. + * (например, разорвано соединение при распределённой обработке запроса) + * - то пофиг. + */ + LOG_ERROR(log, "Exception while cancelling " << child->getName()); + } + } + } +} + + void MergingAggregatedMemoryEfficientBlockInputStream::start() { if (started) @@ -73,6 +120,23 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start() for (auto & task : tasks) task.get_future().get(); } + + if (merging_threads > 1) + { + /** Создадим несколько потоков. Каждый из них в цикле будет доставать следующий набор блоков для мерджа, + * затем мерджить их и класть результат в очередь, откуда мы будем читать готовые результаты. + */ + parallel_merge_data.reset(new ParallelMergeData(merging_threads)); + + auto & pool = parallel_merge_data->pool; + + /** Создаём потоки, которые будут получать и мерджить данные. + */ + + for (size_t i = 0; i < merging_threads; ++i) + pool.schedule(std::bind(&MergingAggregatedMemoryEfficientBlockInputStream::mergeThread, + this, current_memory_tracker)); + } } @@ -88,55 +152,74 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl() } else { - /** Создадим несколько потоков. Каждый из них в цикле будет доставать следующий набор блоков для мерджа, - * затем мерджить их и класть результат в очередь, откуда мы будем читать готовые результаты. - */ + Block res; - if (!parallel_merge_data) + while (true) { - parallel_merge_data.reset(new ParallelMergeData(merging_threads)); + std::unique_lock lock(parallel_merge_data->merged_blocks_mutex); - auto & pool = parallel_merge_data->pool; + if (parallel_merge_data->exception) + std::rethrow_exception(parallel_merge_data->exception); - /** Создаём потоки, которые будут получать и мерджить данные. - */ + if (parallel_merge_data->finish) + break; - for (size_t i = 0; i < merging_threads; ++i) - pool.schedule(std::bind(&MergingAggregatedMemoryEfficientBlockInputStream::mergeThread, - this, current_memory_tracker)); + if (!parallel_merge_data->merged_blocks.empty()) + { + auto it = parallel_merge_data->merged_blocks.begin(); + + if (it->second) + { + res.swap(it->second); + parallel_merge_data->merged_blocks.erase(it); + parallel_merge_data->have_space.notify_one(); + break; + } + } + else if (parallel_merge_data->exhausted) + break; + + parallel_merge_data->merged_blocks_changed.wait(lock); } - OutputData res; - parallel_merge_data->result_queue.pop(res); + if (!res) + all_read = true; - if (res.exception) - std::rethrow_exception(res.exception); - - if (!res.block) - parallel_merge_data->pool.wait(); - - return res.block; + return res; } } MergingAggregatedMemoryEfficientBlockInputStream::~MergingAggregatedMemoryEfficientBlockInputStream() { + try + { + if (!all_read) + cancel(); + + finalize(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + + +void MergingAggregatedMemoryEfficientBlockInputStream::finalize() +{ + if (!started) + return; + + LOG_TRACE(log, "Waiting for threads to finish"); + if (reading_pool) reading_pool->wait(); if (parallel_merge_data) - { - LOG_TRACE((&Logger::get("MergingAggregatedMemoryEfficientBlockInputStream")), "Waiting for threads to finish"); - - { - std::lock_guard lock(parallel_merge_data->get_next_blocks_mutex); - parallel_merge_data->finish = true; - } - - parallel_merge_data->result_queue.clear(); parallel_merge_data->pool.wait(); - } + + LOG_TRACE(log, "Waited for threads to finish"); } @@ -147,7 +230,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker try { - while (true) + while (!parallel_merge_data->finish) { /** Получение следующих блоков делается в одном пуле потоков, а мердж - в другом. * Это весьма сложное взаимодействие. @@ -157,6 +240,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker * - один из merging_threads выполняет слияние этой группы блоков; */ BlocksToMerge blocks_to_merge; + int output_order = -1; { std::lock_guard lock(parallel_merge_data->get_next_blocks_mutex); @@ -168,32 +252,53 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker if (!blocks_to_merge || blocks_to_merge->empty()) { + std::unique_lock lock(parallel_merge_data->merged_blocks_mutex); + parallel_merge_data->exhausted = true; + parallel_merge_data->merged_blocks_changed.notify_one(); break; } + + output_order = blocks_to_merge->front().info.is_overflows + ? NUM_BUCKETS /// Блоки "переполнений" отдаются функцией getNextBlocksToMerge позже всех остальных. + : blocks_to_merge->front().info.bucket_num; + + { + std::unique_lock lock(parallel_merge_data->merged_blocks_mutex); + + while (parallel_merge_data->merged_blocks.size() >= merging_threads) + parallel_merge_data->have_space.wait(lock); + + /** Кладём пустой блок, что означает обещание его заполнить. + * Основной поток должен возвращать результаты строго в порядке output_order, поэтому это важно. + */ + parallel_merge_data->merged_blocks[output_order]; + } } Block res = aggregator.mergeBlocks(*blocks_to_merge, final); { - std::lock_guard lock(parallel_merge_data->get_next_blocks_mutex); + std::lock_guard lock(parallel_merge_data->merged_blocks_mutex); if (parallel_merge_data->finish) break; - parallel_merge_data->result_queue.push(OutputData(std::move(res))); + parallel_merge_data->merged_blocks[output_order] = res; + parallel_merge_data->merged_blocks_changed.notify_one(); } } } catch (...) { - parallel_merge_data->result_queue.push(std::current_exception()); - return; - } + { + std::lock_guard lock(parallel_merge_data->merged_blocks_mutex); + parallel_merge_data->exception = std::current_exception(); + parallel_merge_data->merged_blocks_changed.notify_one(); + } - /// Последний поток при выходе сообщает, что данных больше нет. - if (0 == --parallel_merge_data->active_threads) - parallel_merge_data->result_queue.push(Block()); + cancel(); + } } @@ -217,9 +322,6 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate * Это дополнительные данные для строк, не прошедших через max_rows_to_group_by. * Они должны объединяться друг с другом отдельно. */ - - constexpr size_t NUM_BUCKETS = 256; - ++current_bucket_num; /// Получить из источника следующий блок с номером корзины не больше current_bucket_num. diff --git a/dbms/src/Functions/FunctionsArray.cpp b/dbms/src/Functions/FunctionsArray.cpp index 4210e77480a..2313f60cd37 100644 --- a/dbms/src/Functions/FunctionsArray.cpp +++ b/dbms/src/Functions/FunctionsArray.cpp @@ -29,6 +29,7 @@ void registerFunctionsArray(FunctionFactory & factory) factory.registerFunction(); factory.registerFunction(); factory.registerFunction(); + factory.registerFunction(); } } diff --git a/dbms/src/IO/ReadBufferAIO.cpp b/dbms/src/IO/ReadBufferAIO.cpp index a4513c0d6f3..10de3a7117f 100644 --- a/dbms/src/IO/ReadBufferAIO.cpp +++ b/dbms/src/IO/ReadBufferAIO.cpp @@ -1,11 +1,15 @@ #include #include +#include #include #include #include #include +#include + + namespace DB { @@ -61,6 +65,10 @@ bool ReadBufferAIO::nextImpl() if (is_eof) return false; + std::experimental::optional watch; + if (profile_callback) + watch.emplace(clock_type); + if (!is_aio) { synchronousRead(); @@ -69,6 +77,15 @@ bool ReadBufferAIO::nextImpl() else receive(); + if (profile_callback) + { + ProfileInfo info; + info.bytes_requested = requested_byte_count; + info.bytes_read = bytes_read; + info.nanoseconds = watch->elapsed(); + profile_callback(info); + } + is_started = true; /// Если конец файла только что достигнут, больше ничего не делаем. diff --git a/dbms/src/Interpreters/Compiler.cpp b/dbms/src/Interpreters/Compiler.cpp index 42a19112e5e..d5790d4f66e 100644 --- a/dbms/src/Interpreters/Compiler.cpp +++ b/dbms/src/Interpreters/Compiler.cpp @@ -1,10 +1,9 @@ -#include - #include #include #include #include +#include #include #include @@ -155,35 +154,6 @@ SharedLibraryPtr Compiler::getOrCount( } -struct Pipe : private boost::noncopyable -{ - FILE * f; - - Pipe(const std::string & command) - { - errno = 0; - f = popen(command.c_str(), "r"); - - if (!f) - throwFromErrno("Cannot popen"); - } - - ~Pipe() - { - try - { - errno = 0; - if (f && -1 == pclose(f)) - throwFromErrno("Cannot pclose"); - } - catch (...) - { - tryLogCurrentException("Pipe"); - } - } -}; - - void Compiler::compile( HashedKey hashed_key, std::string file_name, @@ -231,18 +201,14 @@ void Compiler::compile( std::string compile_result; { - Pipe pipe(command.str()); - - int pipe_fd = fileno(pipe.f); - if (-1 == pipe_fd) - throwFromErrno("Cannot fileno"); + auto process = ShellCommand::execute(command.str()); { - ReadBufferFromFileDescriptor command_output(pipe_fd); WriteBufferFromString res(compile_result); - - copyData(command_output, res); + copyData(process->out, res); } + + process->wait(); } if (!compile_result.empty()) diff --git a/dbms/src/Interpreters/InterpreterDropQuery.cpp b/dbms/src/Interpreters/InterpreterDropQuery.cpp index 47571e67dde..39ddf16a3c3 100644 --- a/dbms/src/Interpreters/InterpreterDropQuery.cpp +++ b/dbms/src/Interpreters/InterpreterDropQuery.cpp @@ -72,7 +72,7 @@ BlockIO InterpreterDropQuery::execute() String current_table_name = table->getTableName(); /// Удаляем информацию о таблице из оперативки - context.detachTable(database_name, current_table_name); + StoragePtr detached = context.detachTable(database_name, current_table_name); /// Удаляем данные таблицы if (!drop.detach) @@ -81,12 +81,40 @@ BlockIO InterpreterDropQuery::execute() String current_metadata_path = metadata_path + escapeForFileName(current_table_name) + ".sql"; /// Для таблиц типа ChunkRef, файла с метаданными не существует. - if (Poco::File(current_metadata_path).exists()) - Poco::File(current_metadata_path).remove(); + bool metadata_file_exists = Poco::File(current_metadata_path).exists(); + if (metadata_file_exists) + { + if (Poco::File(current_metadata_path + ".bak").exists()) + Poco::File(current_metadata_path + ".bak").remove(); + + Poco::File(current_metadata_path).renameTo(current_metadata_path + ".bak"); + } + + try + { + table->drop(); + } + catch (const Exception & e) + { + /// Такая ошибка означает, что таблицу невозможно удалить, и данные пока ещё консистентны. Можно вернуть таблицу на место. + /// NOTE Таблица будет оставаться в состоянии после shutdown - не производить всевозможной фоновой работы. + if (e.code() == ErrorCodes::TABLE_WAS_NOT_DROPPED) + { + if (metadata_file_exists) + Poco::File(current_metadata_path + ".bak").renameTo(current_metadata_path); + + context.addTable(database_name, current_table_name, detached); + throw; + } + else + throw; + } - table->drop(); table->is_dropped = true; + if (metadata_file_exists) + Poco::File(current_metadata_path + ".bak").remove(); + if (Poco::File(current_data_path).exists()) Poco::File(current_data_path).remove(true); } @@ -115,6 +143,7 @@ BlockIO InterpreterDropQuery::execute() return {}; } + void InterpreterDropQuery::dropDetachedTable(String database_name, StoragePtr table, Context & context) { table->shutdown(); diff --git a/dbms/src/Interpreters/loadMetadata.cpp b/dbms/src/Interpreters/loadMetadata.cpp index 4311b917089..7ea0a0fe978 100644 --- a/dbms/src/Interpreters/loadMetadata.cpp +++ b/dbms/src/Interpreters/loadMetadata.cpp @@ -111,6 +111,12 @@ static void loadTable(Context & context, const String & path, const Table & tabl } +static bool endsWith(const String & s, const char * suffix) +{ + return s.size() >= strlen(suffix) && 0 == s.compare(s.size() - strlen(suffix), strlen(suffix), suffix); +} + + void loadMetadata(Context & context) { Logger * log = &Logger::get("loadMetadata"); @@ -153,11 +159,15 @@ void loadMetadata(Context & context) if (jt.name().at(0) == '.') continue; - /// Файлы имеют имена вида table_name.sql - if (jt.name().compare(jt.name().size() - 4, 4, ".sql")) - throw Exception("Incorrect file extension: " + jt.name() + " in metadata directory " + it->path(), ErrorCodes::INCORRECT_FILE_NAME); + /// Есть файлы .sql.bak - пропускаем. + if (endsWith(jt.name(), ".sql.bak")) + continue; - file_names.push_back(jt.name()); + /// Нужные файлы имеют имена вида table_name.sql + if (endsWith(jt.name(), ".sql")) + file_names.push_back(jt.name()); + else + throw Exception("Incorrect file extension: " + jt.name() + " in metadata directory " + it->path(), ErrorCodes::INCORRECT_FILE_NAME); } /** Таблицы быстрее грузятся, если их грузить в сортированном (по именам) порядке. diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 646e33468b6..8e3519813b4 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -528,7 +528,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads( MergeTreeReadPoolPtr pool = std::make_shared( threads, sum_marks, min_marks_for_concurrent_read, parts, data, prewhere_actions, prewhere_column, true, - column_names); + column_names, MergeTreeReadPool::BackoffSettings(settings)); /// Оценим общее количество строк - для прогресс-бара. const std::size_t total_rows = data.index_granularity * sum_marks; @@ -669,7 +669,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal MergeTreeReadPoolPtr pool = std::make_shared( parts.size(), sum_marks, min_marks_for_read_task, parts, data, prewhere_actions, prewhere_column, true, - column_names, true); + column_names, MergeTreeReadPool::BackoffSettings{}, true); /// Оценим общее количество строк - для прогресс-бара. const std::size_t total_rows = data.index_granularity * sum_marks; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 6785968ef49..59c21d6cfa8 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3058,6 +3058,9 @@ void StorageReplicatedMergeTree::drop() auto zookeeper = getZooKeeper(); + if (zookeeper->expired()) + throw Exception("Table was not dropped because ZooKeeper session has been expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED); + LOG_INFO(log, "Removing replica " << replica_path); replica_is_active_node = nullptr; zookeeper->tryRemoveRecursive(replica_path); diff --git a/dbms/tests/queries/0_stateless/00291_array_reduce.reference b/dbms/tests/queries/0_stateless/00291_array_reduce.reference new file mode 100644 index 00000000000..c102aaf0487 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00291_array_reduce.reference @@ -0,0 +1,13 @@ +2 4 4 3 +[nan,nan] [] +[0,0] [0] +[0.5,0.9] [0,1] +[1,1.8] [0,1,2] +[1.5,2.7] [0,1,2,3] +[2,3.6] [0,1,2,3,4] +[2.5,4.5] [0,1,2,3,4,5] +[3,5.4] [0,1,2,3,4,5,6] +[3.5,6.3] [0,1,2,3,4,5,6,7] +[4,7.2] [0,1,2,3,4,5,6,7,8] +[4.5,8.1] [0,1,2,3,4,5,6,7,8,9] +[5,9] [0,1,2,3,4,5,6,7,8,9,10] diff --git a/dbms/tests/queries/0_stateless/00291_array_reduce.sql b/dbms/tests/queries/0_stateless/00291_array_reduce.sql new file mode 100644 index 00000000000..c11909247de --- /dev/null +++ b/dbms/tests/queries/0_stateless/00291_array_reduce.sql @@ -0,0 +1,7 @@ +SELECT + arrayReduce('uniq', [1, 2, 1]) AS a, + arrayReduce('uniq', [1, 2, 2, 1], ['hello', 'world', '', '']) AS b, + arrayReduce('uniqUpTo(5)', [1, 2, 2, 1], materialize(['hello', 'world', '', ''])) AS c, + arrayReduce('uniqExactIf', [1, 2, 3, 4], [1, 0, 1, 1]) AS d; + +SELECT arrayReduce('quantiles(0.5, 0.9)', range(number) AS r), r FROM system.numbers LIMIT 12;