diff --git a/src/Common/ShellCommandSettings.cpp b/src/Common/ShellCommandSettings.cpp index b1bd8f280de..951a20e949c 100644 --- a/src/Common/ShellCommandSettings.cpp +++ b/src/Common/ShellCommandSettings.cpp @@ -15,18 +15,11 @@ namespace ErrorCodes ExternalCommandStderrReaction parseExternalCommandStderrReaction(const std::string & config) { auto reaction = magic_enum::enum_cast(Poco::toUpper(config)); - if (!reaction) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown stderr_reaction: {}. Possible values are 'none', 'log' and 'throw'", config); - - return *reaction; -} - -ExternalCommandErrorExitReaction parseExternalCommandErrorExitReaction(const std::string & config) -{ - auto reaction = magic_enum::enum_cast(Poco::toUpper(config)); if (!reaction) throw Exception( - ErrorCodes::BAD_ARGUMENTS, "Unknown error_exit_reaction: {}. Possible values are 'none', 'log_first' and 'log_last'", config); + ErrorCodes::BAD_ARGUMENTS, + "Unknown stderr_reaction: {}. Possible values are 'none', 'log', 'log_first', 'log_last' and 'throw'", + config); return *reaction; } diff --git a/src/Common/ShellCommandSettings.h b/src/Common/ShellCommandSettings.h index 40625a6bbf2..4cfd862b873 100644 --- a/src/Common/ShellCommandSettings.h +++ b/src/Common/ShellCommandSettings.h @@ -8,19 +8,12 @@ namespace DB enum class ExternalCommandStderrReaction { NONE, /// Do nothing. - LOG, /// Try to log all outputs of stderr from the external command. - THROW /// Throw exception when the external command outputs something to its stderr. + LOG, /// Try to log all outputs of stderr from the external command immediately. + LOG_FIRST, /// Try to log first 1_KiB outputs of stderr from the external command after exit. + LOG_LAST, /// Same as above, but log last 1_KiB outputs. + THROW /// Immediately throw exception when the external command outputs something to its stderr. }; ExternalCommandStderrReaction parseExternalCommandStderrReaction(const std::string & config); -enum class ExternalCommandErrorExitReaction -{ - NONE, /// Do nothing. - LOG_FIRST, /// Try to log first 1_KiB outputs of stderr from the external command. - LOG_LAST /// Same as above, but log last 1_KiB outputs. -}; - -ExternalCommandErrorExitReaction parseExternalCommandErrorExitReaction(const std::string & config); - } diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index fb6afa6f8a6..a30d8040f47 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -185,10 +185,8 @@ IMPLEMENT_SETTING_ENUM(S3QueueAction, ErrorCodes::BAD_ARGUMENTS, IMPLEMENT_SETTING_ENUM(ExternalCommandStderrReaction, ErrorCodes::BAD_ARGUMENTS, {{"none", ExternalCommandStderrReaction::NONE}, {"log", ExternalCommandStderrReaction::LOG}, + {"log_first", ExternalCommandStderrReaction::LOG_FIRST}, + {"log_last", ExternalCommandStderrReaction::LOG_LAST}, {"throw", ExternalCommandStderrReaction::THROW}}) -IMPLEMENT_SETTING_ENUM(ExternalCommandErrorExitReaction, ErrorCodes::BAD_ARGUMENTS, - {{"none", ExternalCommandErrorExitReaction::NONE}, - {"log_first", ExternalCommandErrorExitReaction::LOG_FIRST}, - {"log_last", ExternalCommandErrorExitReaction::LOG_LAST}}) } diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 7ae0666378e..034e4c8c887 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -241,6 +241,4 @@ DECLARE_SETTING_ENUM(S3QueueAction) DECLARE_SETTING_ENUM(ExternalCommandStderrReaction) -DECLARE_SETTING_ENUM(ExternalCommandErrorExitReaction) - } diff --git a/src/Dictionaries/ExecutableDictionarySource.cpp b/src/Dictionaries/ExecutableDictionarySource.cpp index 88b87b4ff04..f1acd610274 100644 --- a/src/Dictionaries/ExecutableDictionarySource.cpp +++ b/src/Dictionaries/ExecutableDictionarySource.cpp @@ -266,8 +266,6 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory) .command_read_timeout_milliseconds = config.getUInt64(settings_config_prefix + ".command_read_timeout", 10000), .command_write_timeout_milliseconds = config.getUInt64(settings_config_prefix + ".command_write_timeout", 10000), .stderr_reaction = parseExternalCommandStderrReaction(config.getString(settings_config_prefix + ".stderr_reaction", "none")), - .error_exit_reaction - = parseExternalCommandErrorExitReaction(config.getString(settings_config_prefix + ".error_exit_reaction", "none")), .check_exit_code = config.getBool(settings_config_prefix + ".check_exit_code", true), .is_executable_pool = false, .send_chunk_header = config.getBool(settings_config_prefix + ".send_chunk_header", false), diff --git a/src/Dictionaries/ExecutablePoolDictionarySource.cpp b/src/Dictionaries/ExecutablePoolDictionarySource.cpp index 6cd6d200aea..d28c73c9c52 100644 --- a/src/Dictionaries/ExecutablePoolDictionarySource.cpp +++ b/src/Dictionaries/ExecutablePoolDictionarySource.cpp @@ -233,8 +233,6 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory) .command_read_timeout_milliseconds = config.getUInt64(settings_config_prefix + ".command_read_timeout", 10000), .command_write_timeout_milliseconds = config.getUInt64(settings_config_prefix + ".command_write_timeout", 10000), .stderr_reaction = parseExternalCommandStderrReaction(config.getString(settings_config_prefix + ".stderr_reaction", "none")), - .error_exit_reaction - = parseExternalCommandErrorExitReaction(config.getString(settings_config_prefix + ".error_exit_reaction", "none")), .check_exit_code = config.getBool(settings_config_prefix + ".check_exit_code", true), .pool_size = config.getUInt64(settings_config_prefix + ".pool_size", 16), .max_command_execution_time_seconds = max_command_execution_time, diff --git a/src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.cpp b/src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.cpp index 0a20e8a71e4..ca142479ff1 100644 --- a/src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.cpp +++ b/src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.cpp @@ -173,8 +173,7 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create size_t command_write_timeout_milliseconds = config.getUInt64(key_in_config + ".command_write_timeout", 10000); ExternalCommandStderrReaction stderr_reaction = parseExternalCommandStderrReaction(config.getString(key_in_config + ".stderr_reaction", "none")); - ExternalCommandErrorExitReaction error_exit_reaction - = parseExternalCommandErrorExitReaction(config.getString(key_in_config + ".error_exit_reaction", "none")); + bool check_exit_code = config.getBool(key_in_config + ".check_exit_code", true); size_t pool_size = 0; size_t max_command_execution_time = 0; @@ -243,7 +242,7 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create .command_read_timeout_milliseconds = command_read_timeout_milliseconds, .command_write_timeout_milliseconds = command_write_timeout_milliseconds, .stderr_reaction = stderr_reaction, - .error_exit_reaction = error_exit_reaction, + .check_exit_code = check_exit_code, .pool_size = pool_size, .max_command_execution_time_seconds = max_command_execution_time, .is_executable_pool = is_executable_pool, diff --git a/src/Processors/Sources/ShellCommandSource.cpp b/src/Processors/Sources/ShellCommandSource.cpp index 24ffeebe391..ef6cd5ef84c 100644 --- a/src/Processors/Sources/ShellCommandSource.cpp +++ b/src/Processors/Sources/ShellCommandSource.cpp @@ -14,6 +14,7 @@ #include #include #include +#include namespace DB @@ -106,20 +107,18 @@ public: int stdout_fd_, int stderr_fd_, size_t timeout_milliseconds_, - ExternalCommandStderrReaction stderr_reaction_, - ExternalCommandErrorExitReaction error_exit_reaction_) + ExternalCommandStderrReaction stderr_reaction_) : stdout_fd(stdout_fd_) , stderr_fd(stderr_fd_) , timeout_milliseconds(timeout_milliseconds_) , stderr_reaction(stderr_reaction_) - , error_exit_reaction(error_exit_reaction_) { makeFdNonBlocking(stdout_fd); makeFdNonBlocking(stderr_fd); #if defined(OS_LINUX) epoll.add(stdout_fd); - if (stderr_reaction != ExternalCommandStderrReaction::NONE || error_exit_reaction != ExternalCommandErrorExitReaction::NONE) + if (stderr_reaction != ExternalCommandStderrReaction::NONE) epoll.add(stderr_fd); #endif } @@ -129,8 +128,6 @@ public: size_t bytes_read = 0; #if defined(OS_LINUX) - static constexpr size_t BUFFER_SIZE = 4_KiB; - while (!bytes_read) { epoll_event events[2]; @@ -151,35 +148,26 @@ public: if (has_stderr) { - stderr_buf.resize(BUFFER_SIZE); - ssize_t res = ::read(stderr_fd, stderr_buf.data(), stderr_buf.size()); - + if (stderr_read_buf == nullptr) + stderr_read_buf.reset(new char[BUFFER_SIZE]); + ssize_t res = ::read(stderr_fd, stderr_read_buf.get(), BUFFER_SIZE); if (res > 0) { - stderr_buf.resize(res); + std::string_view str(stderr_read_buf.get(), res); if (stderr_reaction == ExternalCommandStderrReaction::THROW) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Executable generates stderr: {}", stderr_buf); + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Executable generates stderr: {}", str); else if (stderr_reaction == ExternalCommandStderrReaction::LOG) LOG_WARNING( - &::Poco::Logger::get("TimeoutReadBufferFromFileDescriptor"), "Executable generates stderr: {}", stderr_buf); - - if (error_exit_reaction == ExternalCommandErrorExitReaction::LOG_FIRST) + &::Poco::Logger::get("TimeoutReadBufferFromFileDescriptor"), "Executable generates stderr: {}", str); + else if (stderr_reaction == ExternalCommandStderrReaction::LOG_FIRST) { - if (BUFFER_SIZE - error_exit_buf.size() < size_t(res)) - res = BUFFER_SIZE - error_exit_buf.size(); - + res = std::min(ssize_t(stderr_result_buf.reserve()), res); if (res > 0) - error_exit_buf.append(stderr_buf.begin(), stderr_buf.begin() + res); + stderr_result_buf.insert(stderr_result_buf.end(), str.begin(), str.begin() + res); } - else if (error_exit_reaction == ExternalCommandErrorExitReaction::LOG_LAST) + else if (stderr_reaction == ExternalCommandStderrReaction::LOG_LAST) { - if (res + error_exit_buf.size() > BUFFER_SIZE) - { - std::shift_left(error_exit_buf.begin(), error_exit_buf.end(), res + error_exit_buf.size() - BUFFER_SIZE); - error_exit_buf.resize(BUFFER_SIZE - res); - } - - error_exit_buf += stderr_buf; + stderr_result_buf.insert(stderr_result_buf.end(), str.begin(), str.begin() + res); } } } @@ -240,20 +228,33 @@ public: { tryMakeFdBlocking(stdout_fd); tryMakeFdBlocking(stderr_fd); - } - String error_exit_buf; +#if defined(OS_LINUX) + if (!stderr_result_buf.empty()) + { + String stderr_result; + stderr_result.reserve(stderr_result_buf.size()); + stderr_result.append(stderr_result_buf.begin(), stderr_result_buf.end()); + LOG_WARNING( + &::Poco::Logger::get("ShellCommandSource"), + "Executable generates stderr at the {}: {}", + stderr_reaction == ExternalCommandStderrReaction::LOG_FIRST ? "beginning" : "end", + stderr_result); + } +#endif + } private: int stdout_fd; int stderr_fd; size_t timeout_milliseconds; [[maybe_unused]] ExternalCommandStderrReaction stderr_reaction; - [[maybe_unused]] ExternalCommandErrorExitReaction error_exit_reaction; #if defined(OS_LINUX) + static constexpr size_t BUFFER_SIZE = 4_KiB; Epoll epoll; - String stderr_buf; + std::unique_ptr stderr_read_buf; + boost::circular_buffer_space_optimized stderr_result_buf{BUFFER_SIZE}; #endif }; @@ -349,7 +350,6 @@ namespace const std::string & format_, size_t command_read_timeout_milliseconds, ExternalCommandStderrReaction stderr_reaction, - ExternalCommandErrorExitReaction error_exit_reaction, bool check_exit_code_, const Block & sample_block_, std::unique_ptr && command_, @@ -363,11 +363,10 @@ namespace , sample_block(sample_block_) , command(std::move(command_)) , configuration(configuration_) - , timeout_command_out( - command->out.getFD(), command->err.getFD(), command_read_timeout_milliseconds, stderr_reaction, error_exit_reaction) + , timeout_command_out(command->out.getFD(), command->err.getFD(), command_read_timeout_milliseconds, stderr_reaction) , command_holder(std::move(command_holder_)) , process_pool(process_pool_) - , check_exit_code(check_exit_code_ || error_exit_reaction != ExternalCommandErrorExitReaction::NONE) + , check_exit_code(check_exit_code_) { for (auto && send_data_task : send_data_tasks) { @@ -416,12 +415,7 @@ namespace thread.join(); if (command_is_invalid) - { command = nullptr; - if (!timeout_command_out.error_exit_buf.empty()) - LOG_ERROR( - &::Poco::Logger::get("ShellCommandSource"), "Executable fails with stderr: {}", timeout_command_out.error_exit_buf); - } if (command_holder && process_pool) { @@ -684,7 +678,6 @@ Pipe ShellCommandSourceCoordinator::createPipe( configuration.format, configuration.command_read_timeout_milliseconds, configuration.stderr_reaction, - configuration.error_exit_reaction, configuration.check_exit_code, std::move(sample_block), std::move(process), diff --git a/src/Processors/Sources/ShellCommandSource.h b/src/Processors/Sources/ShellCommandSource.h index ec34a7cfeb3..44bd725bbe2 100644 --- a/src/Processors/Sources/ShellCommandSource.h +++ b/src/Processors/Sources/ShellCommandSource.h @@ -58,11 +58,10 @@ public: /// Reaction when external command outputs data to its stderr. ExternalCommandStderrReaction stderr_reaction = ExternalCommandStderrReaction::NONE; - /// Reaction when external command exits with non-zero code. - ExternalCommandErrorExitReaction error_exit_reaction = ExternalCommandErrorExitReaction::NONE; - /// Will throw if the command exited with - /// non-zero status code + /// non-zero status code. + /// NOTE: If executable pool is used, we cannot check exit code, + /// which makes this configuration no effect. size_t check_exit_code = false; /// Pool size valid only if executable_pool = true diff --git a/src/Storages/ExecutableSettings.h b/src/Storages/ExecutableSettings.h index ee9f018ec5a..f1ceb7ce59f 100644 --- a/src/Storages/ExecutableSettings.h +++ b/src/Storages/ExecutableSettings.h @@ -16,7 +16,7 @@ class ASTStorage; M(UInt64, command_read_timeout, 10000, "Timeout for reading data from command stdout in milliseconds.", 0) \ M(UInt64, command_write_timeout, 10000, "Timeout for writing data to command stdin in milliseconds.", 0) \ M(ExternalCommandStderrReaction, stderr_reaction, ExternalCommandStderrReaction::NONE, "Reaction when external command outputs data to its stderr.", 0) \ - M(ExternalCommandErrorExitReaction, error_exit_reaction, ExternalCommandErrorExitReaction::NONE, "Reaction when external command exits with non-zero code.", 0) \ + M(Bool, check_exit_code, true, "Throw exception if the command exited with non-zero status code.", 0) \ DECLARE_SETTINGS_TRAITS(ExecutableSettingsTraits, LIST_OF_EXECUTABLE_SETTINGS) diff --git a/src/Storages/StorageExecutable.cpp b/src/Storages/StorageExecutable.cpp index 4f4a67c254f..df03301b5e8 100644 --- a/src/Storages/StorageExecutable.cpp +++ b/src/Storages/StorageExecutable.cpp @@ -93,7 +93,7 @@ StorageExecutable::StorageExecutable( .command_read_timeout_milliseconds = settings.command_read_timeout, .command_write_timeout_milliseconds = settings.command_write_timeout, .stderr_reaction = settings.stderr_reaction, - .error_exit_reaction = settings.error_exit_reaction, + .check_exit_code = settings.check_exit_code, .pool_size = settings.pool_size, .max_command_execution_time_seconds = settings.max_command_execution_time, diff --git a/tests/integration/test_executable_user_defined_function/functions/test_function_config.xml b/tests/integration/test_executable_user_defined_function/functions/test_function_config.xml index 20a6e1d49bd..99efd99ab31 100644 --- a/tests/integration/test_executable_user_defined_function/functions/test_function_config.xml +++ b/tests/integration/test_executable_user_defined_function/functions/test_function_config.xml @@ -348,25 +348,50 @@ executable - test_function_always_error_exit_log_first_python + test_function_always_error_log_first_python String UInt64 TabSeparated input_log_error.py - log_first + log_first executable - test_function_always_error_exit_log_last_python + test_function_always_error_log_last_python String UInt64 TabSeparated input_log_error.py - log_last + log_last + + + + executable + test_function_exit_error_ignore_python + String + + UInt64 + + TabSeparated + input_exit_error.py + 0 + + + + executable + test_function_exit_error_fail_python + String + + UInt64 + + TabSeparated + input_exit_error.py + + diff --git a/tests/integration/test_executable_user_defined_function/test.py b/tests/integration/test_executable_user_defined_function/test.py index 8588ebc530e..f2d84249c6c 100644 --- a/tests/integration/test_executable_user_defined_function/test.py +++ b/tests/integration/test_executable_user_defined_function/test.py @@ -302,33 +302,38 @@ def test_executable_function_always_error_python(started_cluster): == "Key 1\n" ) assert node.contains_in_log( - "{" - + query_id - + "} TimeoutReadBufferFromFileDescriptor: Executable generates stderr: Fake error" + f"{{{query_id}}} TimeoutReadBufferFromFileDescriptor: Executable generates stderr: Fake error" ) query_id = uuid.uuid4().hex - try: + assert ( node.query( - "SELECT test_function_always_error_exit_log_first_python(1)", - query_id=query_id, - ) - assert False, "Exception have to be thrown" - except Exception as ex: - assert "DB::Exception: Child process was exited with return code 1" in str(ex) - assert node.contains_in_log( - f"{{{query_id}}} ShellCommandSource: Executable fails with stderr: {'a' * (3 * 1024)}{'b' * 1024}\n" + "SELECT test_function_always_error_log_first_python(1)", query_id=query_id ) + == "Key 1\n" + ) + assert node.contains_in_log( + f"{{{query_id}}} TimeoutReadBufferFromFileDescriptor: Executable generates stderr at the beginning: {'a' * (3 * 1024)}{'b' * 1024}\n" + ) query_id = uuid.uuid4().hex - try: + assert ( node.query( - "SELECT test_function_always_error_exit_log_last_python(1)", - query_id=query_id, + "SELECT test_function_always_error_log_last_python(1)", query_id=query_id ) + == "Key 1\n" + ) + assert node.contains_in_log( + f"{{{query_id}}} TimeoutReadBufferFromFileDescriptor: Executable generates stderr at the end: {'b' * 1024}{'c' * (3 * 1024)}\n" + ) + + assert ( + node.query("SELECT test_function_exit_error_ignore_python(1)") + == "Key 1\n" + ) + + try: + node.query("SELECT test_function_exit_error_fail_python(1)") assert False, "Exception have to be thrown" except Exception as ex: assert "DB::Exception: Child process was exited with return code 1" in str(ex) - assert node.contains_in_log( - f"{{{query_id}}} ShellCommandSource: Executable fails with stderr: {'b' * 1024}{'c' * (3 * 1024)}\n" - ) diff --git a/tests/integration/test_executable_user_defined_function/user_scripts/input_exit_error.py b/tests/integration/test_executable_user_defined_function/user_scripts/input_exit_error.py new file mode 100755 index 00000000000..036d69a8297 --- /dev/null +++ b/tests/integration/test_executable_user_defined_function/user_scripts/input_exit_error.py @@ -0,0 +1,10 @@ +#!/usr/bin/python3 + +import sys + +if __name__ == "__main__": + for line in sys.stdin: + print("Key " + line, end="") + sys.stdout.flush() + + sys.exit(1) diff --git a/tests/integration/test_executable_user_defined_function/user_scripts/input_log_error.py b/tests/integration/test_executable_user_defined_function/user_scripts/input_log_error.py index e61b51ee253..d622f44f75d 100755 --- a/tests/integration/test_executable_user_defined_function/user_scripts/input_log_error.py +++ b/tests/integration/test_executable_user_defined_function/user_scripts/input_log_error.py @@ -8,5 +8,3 @@ if __name__ == "__main__": for line in sys.stdin: print("Key " + line, end="") sys.stdout.flush() - - sys.exit(1)