This commit is contained in:
Amos Bird 2023-08-11 22:53:55 +08:00
parent e49fb4f348
commit c43bf153f5
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
15 changed files with 111 additions and 104 deletions

View File

@ -15,18 +15,11 @@ namespace ErrorCodes
ExternalCommandStderrReaction parseExternalCommandStderrReaction(const std::string & config)
{
auto reaction = magic_enum::enum_cast<ExternalCommandStderrReaction>(Poco::toUpper(config));
if (!reaction)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown stderr_reaction: {}. Possible values are 'none', 'log' and 'throw'", config);
return *reaction;
}
ExternalCommandErrorExitReaction parseExternalCommandErrorExitReaction(const std::string & config)
{
auto reaction = magic_enum::enum_cast<ExternalCommandErrorExitReaction>(Poco::toUpper(config));
if (!reaction)
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Unknown error_exit_reaction: {}. Possible values are 'none', 'log_first' and 'log_last'", config);
ErrorCodes::BAD_ARGUMENTS,
"Unknown stderr_reaction: {}. Possible values are 'none', 'log', 'log_first', 'log_last' and 'throw'",
config);
return *reaction;
}

View File

@ -8,19 +8,12 @@ namespace DB
enum class ExternalCommandStderrReaction
{
NONE, /// Do nothing.
LOG, /// Try to log all outputs of stderr from the external command.
THROW /// Throw exception when the external command outputs something to its stderr.
LOG, /// Try to log all outputs of stderr from the external command immediately.
LOG_FIRST, /// Try to log first 1_KiB outputs of stderr from the external command after exit.
LOG_LAST, /// Same as above, but log last 1_KiB outputs.
THROW /// Immediately throw exception when the external command outputs something to its stderr.
};
ExternalCommandStderrReaction parseExternalCommandStderrReaction(const std::string & config);
enum class ExternalCommandErrorExitReaction
{
NONE, /// Do nothing.
LOG_FIRST, /// Try to log first 1_KiB outputs of stderr from the external command.
LOG_LAST /// Same as above, but log last 1_KiB outputs.
};
ExternalCommandErrorExitReaction parseExternalCommandErrorExitReaction(const std::string & config);
}

View File

@ -185,10 +185,8 @@ IMPLEMENT_SETTING_ENUM(S3QueueAction, ErrorCodes::BAD_ARGUMENTS,
IMPLEMENT_SETTING_ENUM(ExternalCommandStderrReaction, ErrorCodes::BAD_ARGUMENTS,
{{"none", ExternalCommandStderrReaction::NONE},
{"log", ExternalCommandStderrReaction::LOG},
{"log_first", ExternalCommandStderrReaction::LOG_FIRST},
{"log_last", ExternalCommandStderrReaction::LOG_LAST},
{"throw", ExternalCommandStderrReaction::THROW}})
IMPLEMENT_SETTING_ENUM(ExternalCommandErrorExitReaction, ErrorCodes::BAD_ARGUMENTS,
{{"none", ExternalCommandErrorExitReaction::NONE},
{"log_first", ExternalCommandErrorExitReaction::LOG_FIRST},
{"log_last", ExternalCommandErrorExitReaction::LOG_LAST}})
}

View File

@ -241,6 +241,4 @@ DECLARE_SETTING_ENUM(S3QueueAction)
DECLARE_SETTING_ENUM(ExternalCommandStderrReaction)
DECLARE_SETTING_ENUM(ExternalCommandErrorExitReaction)
}

View File

@ -266,8 +266,6 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
.command_read_timeout_milliseconds = config.getUInt64(settings_config_prefix + ".command_read_timeout", 10000),
.command_write_timeout_milliseconds = config.getUInt64(settings_config_prefix + ".command_write_timeout", 10000),
.stderr_reaction = parseExternalCommandStderrReaction(config.getString(settings_config_prefix + ".stderr_reaction", "none")),
.error_exit_reaction
= parseExternalCommandErrorExitReaction(config.getString(settings_config_prefix + ".error_exit_reaction", "none")),
.check_exit_code = config.getBool(settings_config_prefix + ".check_exit_code", true),
.is_executable_pool = false,
.send_chunk_header = config.getBool(settings_config_prefix + ".send_chunk_header", false),

View File

@ -233,8 +233,6 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
.command_read_timeout_milliseconds = config.getUInt64(settings_config_prefix + ".command_read_timeout", 10000),
.command_write_timeout_milliseconds = config.getUInt64(settings_config_prefix + ".command_write_timeout", 10000),
.stderr_reaction = parseExternalCommandStderrReaction(config.getString(settings_config_prefix + ".stderr_reaction", "none")),
.error_exit_reaction
= parseExternalCommandErrorExitReaction(config.getString(settings_config_prefix + ".error_exit_reaction", "none")),
.check_exit_code = config.getBool(settings_config_prefix + ".check_exit_code", true),
.pool_size = config.getUInt64(settings_config_prefix + ".pool_size", 16),
.max_command_execution_time_seconds = max_command_execution_time,

View File

@ -173,8 +173,7 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
size_t command_write_timeout_milliseconds = config.getUInt64(key_in_config + ".command_write_timeout", 10000);
ExternalCommandStderrReaction stderr_reaction
= parseExternalCommandStderrReaction(config.getString(key_in_config + ".stderr_reaction", "none"));
ExternalCommandErrorExitReaction error_exit_reaction
= parseExternalCommandErrorExitReaction(config.getString(key_in_config + ".error_exit_reaction", "none"));
bool check_exit_code = config.getBool(key_in_config + ".check_exit_code", true);
size_t pool_size = 0;
size_t max_command_execution_time = 0;
@ -243,7 +242,7 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
.command_read_timeout_milliseconds = command_read_timeout_milliseconds,
.command_write_timeout_milliseconds = command_write_timeout_milliseconds,
.stderr_reaction = stderr_reaction,
.error_exit_reaction = error_exit_reaction,
.check_exit_code = check_exit_code,
.pool_size = pool_size,
.max_command_execution_time_seconds = max_command_execution_time,
.is_executable_pool = is_executable_pool,

View File

@ -14,6 +14,7 @@
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Interpreters/Context.h>
#include <boost/circular_buffer.hpp>
namespace DB
@ -106,20 +107,18 @@ public:
int stdout_fd_,
int stderr_fd_,
size_t timeout_milliseconds_,
ExternalCommandStderrReaction stderr_reaction_,
ExternalCommandErrorExitReaction error_exit_reaction_)
ExternalCommandStderrReaction stderr_reaction_)
: stdout_fd(stdout_fd_)
, stderr_fd(stderr_fd_)
, timeout_milliseconds(timeout_milliseconds_)
, stderr_reaction(stderr_reaction_)
, error_exit_reaction(error_exit_reaction_)
{
makeFdNonBlocking(stdout_fd);
makeFdNonBlocking(stderr_fd);
#if defined(OS_LINUX)
epoll.add(stdout_fd);
if (stderr_reaction != ExternalCommandStderrReaction::NONE || error_exit_reaction != ExternalCommandErrorExitReaction::NONE)
if (stderr_reaction != ExternalCommandStderrReaction::NONE)
epoll.add(stderr_fd);
#endif
}
@ -129,8 +128,6 @@ public:
size_t bytes_read = 0;
#if defined(OS_LINUX)
static constexpr size_t BUFFER_SIZE = 4_KiB;
while (!bytes_read)
{
epoll_event events[2];
@ -151,35 +148,26 @@ public:
if (has_stderr)
{
stderr_buf.resize(BUFFER_SIZE);
ssize_t res = ::read(stderr_fd, stderr_buf.data(), stderr_buf.size());
if (stderr_read_buf == nullptr)
stderr_read_buf.reset(new char[BUFFER_SIZE]);
ssize_t res = ::read(stderr_fd, stderr_read_buf.get(), BUFFER_SIZE);
if (res > 0)
{
stderr_buf.resize(res);
std::string_view str(stderr_read_buf.get(), res);
if (stderr_reaction == ExternalCommandStderrReaction::THROW)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Executable generates stderr: {}", stderr_buf);
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Executable generates stderr: {}", str);
else if (stderr_reaction == ExternalCommandStderrReaction::LOG)
LOG_WARNING(
&::Poco::Logger::get("TimeoutReadBufferFromFileDescriptor"), "Executable generates stderr: {}", stderr_buf);
if (error_exit_reaction == ExternalCommandErrorExitReaction::LOG_FIRST)
&::Poco::Logger::get("TimeoutReadBufferFromFileDescriptor"), "Executable generates stderr: {}", str);
else if (stderr_reaction == ExternalCommandStderrReaction::LOG_FIRST)
{
if (BUFFER_SIZE - error_exit_buf.size() < size_t(res))
res = BUFFER_SIZE - error_exit_buf.size();
res = std::min(ssize_t(stderr_result_buf.reserve()), res);
if (res > 0)
error_exit_buf.append(stderr_buf.begin(), stderr_buf.begin() + res);
stderr_result_buf.insert(stderr_result_buf.end(), str.begin(), str.begin() + res);
}
else if (error_exit_reaction == ExternalCommandErrorExitReaction::LOG_LAST)
else if (stderr_reaction == ExternalCommandStderrReaction::LOG_LAST)
{
if (res + error_exit_buf.size() > BUFFER_SIZE)
{
std::shift_left(error_exit_buf.begin(), error_exit_buf.end(), res + error_exit_buf.size() - BUFFER_SIZE);
error_exit_buf.resize(BUFFER_SIZE - res);
}
error_exit_buf += stderr_buf;
stderr_result_buf.insert(stderr_result_buf.end(), str.begin(), str.begin() + res);
}
}
}
@ -240,20 +228,33 @@ public:
{
tryMakeFdBlocking(stdout_fd);
tryMakeFdBlocking(stderr_fd);
}
String error_exit_buf;
#if defined(OS_LINUX)
if (!stderr_result_buf.empty())
{
String stderr_result;
stderr_result.reserve(stderr_result_buf.size());
stderr_result.append(stderr_result_buf.begin(), stderr_result_buf.end());
LOG_WARNING(
&::Poco::Logger::get("ShellCommandSource"),
"Executable generates stderr at the {}: {}",
stderr_reaction == ExternalCommandStderrReaction::LOG_FIRST ? "beginning" : "end",
stderr_result);
}
#endif
}
private:
int stdout_fd;
int stderr_fd;
size_t timeout_milliseconds;
[[maybe_unused]] ExternalCommandStderrReaction stderr_reaction;
[[maybe_unused]] ExternalCommandErrorExitReaction error_exit_reaction;
#if defined(OS_LINUX)
static constexpr size_t BUFFER_SIZE = 4_KiB;
Epoll epoll;
String stderr_buf;
std::unique_ptr<char[]> stderr_read_buf;
boost::circular_buffer_space_optimized<char> stderr_result_buf{BUFFER_SIZE};
#endif
};
@ -349,7 +350,6 @@ namespace
const std::string & format_,
size_t command_read_timeout_milliseconds,
ExternalCommandStderrReaction stderr_reaction,
ExternalCommandErrorExitReaction error_exit_reaction,
bool check_exit_code_,
const Block & sample_block_,
std::unique_ptr<ShellCommand> && command_,
@ -363,11 +363,10 @@ namespace
, sample_block(sample_block_)
, command(std::move(command_))
, configuration(configuration_)
, timeout_command_out(
command->out.getFD(), command->err.getFD(), command_read_timeout_milliseconds, stderr_reaction, error_exit_reaction)
, timeout_command_out(command->out.getFD(), command->err.getFD(), command_read_timeout_milliseconds, stderr_reaction)
, command_holder(std::move(command_holder_))
, process_pool(process_pool_)
, check_exit_code(check_exit_code_ || error_exit_reaction != ExternalCommandErrorExitReaction::NONE)
, check_exit_code(check_exit_code_)
{
for (auto && send_data_task : send_data_tasks)
{
@ -416,12 +415,7 @@ namespace
thread.join();
if (command_is_invalid)
{
command = nullptr;
if (!timeout_command_out.error_exit_buf.empty())
LOG_ERROR(
&::Poco::Logger::get("ShellCommandSource"), "Executable fails with stderr: {}", timeout_command_out.error_exit_buf);
}
if (command_holder && process_pool)
{
@ -684,7 +678,6 @@ Pipe ShellCommandSourceCoordinator::createPipe(
configuration.format,
configuration.command_read_timeout_milliseconds,
configuration.stderr_reaction,
configuration.error_exit_reaction,
configuration.check_exit_code,
std::move(sample_block),
std::move(process),

View File

@ -58,11 +58,10 @@ public:
/// Reaction when external command outputs data to its stderr.
ExternalCommandStderrReaction stderr_reaction = ExternalCommandStderrReaction::NONE;
/// Reaction when external command exits with non-zero code.
ExternalCommandErrorExitReaction error_exit_reaction = ExternalCommandErrorExitReaction::NONE;
/// Will throw if the command exited with
/// non-zero status code
/// non-zero status code.
/// NOTE: If executable pool is used, we cannot check exit code,
/// which makes this configuration no effect.
size_t check_exit_code = false;
/// Pool size valid only if executable_pool = true

View File

@ -16,7 +16,7 @@ class ASTStorage;
M(UInt64, command_read_timeout, 10000, "Timeout for reading data from command stdout in milliseconds.", 0) \
M(UInt64, command_write_timeout, 10000, "Timeout for writing data to command stdin in milliseconds.", 0) \
M(ExternalCommandStderrReaction, stderr_reaction, ExternalCommandStderrReaction::NONE, "Reaction when external command outputs data to its stderr.", 0) \
M(ExternalCommandErrorExitReaction, error_exit_reaction, ExternalCommandErrorExitReaction::NONE, "Reaction when external command exits with non-zero code.", 0) \
M(Bool, check_exit_code, true, "Throw exception if the command exited with non-zero status code.", 0) \
DECLARE_SETTINGS_TRAITS(ExecutableSettingsTraits, LIST_OF_EXECUTABLE_SETTINGS)

View File

@ -93,7 +93,7 @@ StorageExecutable::StorageExecutable(
.command_read_timeout_milliseconds = settings.command_read_timeout,
.command_write_timeout_milliseconds = settings.command_write_timeout,
.stderr_reaction = settings.stderr_reaction,
.error_exit_reaction = settings.error_exit_reaction,
.check_exit_code = settings.check_exit_code,
.pool_size = settings.pool_size,
.max_command_execution_time_seconds = settings.max_command_execution_time,

View File

@ -348,25 +348,50 @@
<function>
<type>executable</type>
<name>test_function_always_error_exit_log_first_python</name>
<name>test_function_always_error_log_first_python</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
</argument>
<format>TabSeparated</format>
<command>input_log_error.py</command>
<error_exit_reaction>log_first</error_exit_reaction>
<stderr_reaction>log_first</stderr_reaction>
</function>
<function>
<type>executable</type>
<name>test_function_always_error_exit_log_last_python</name>
<name>test_function_always_error_log_last_python</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
</argument>
<format>TabSeparated</format>
<command>input_log_error.py</command>
<error_exit_reaction>log_last</error_exit_reaction>
<stderr_reaction>log_last</stderr_reaction>
</function>
<function>
<type>executable</type>
<name>test_function_exit_error_ignore_python</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
</argument>
<format>TabSeparated</format>
<command>input_exit_error.py</command>
<check_exit_code>0</check_exit_code>
</function>
<function>
<type>executable</type>
<name>test_function_exit_error_fail_python</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
</argument>
<format>TabSeparated</format>
<command>input_exit_error.py</command>
<!-- default -->
<!-- <check_exit_code>1</check_exit_code> -->
</function>
</functions>

View File

@ -302,33 +302,38 @@ def test_executable_function_always_error_python(started_cluster):
== "Key 1\n"
)
assert node.contains_in_log(
"{"
+ query_id
+ "} <Warning> TimeoutReadBufferFromFileDescriptor: Executable generates stderr: Fake error"
f"{{{query_id}}} <Warning> TimeoutReadBufferFromFileDescriptor: Executable generates stderr: Fake error"
)
query_id = uuid.uuid4().hex
try:
assert (
node.query(
"SELECT test_function_always_error_exit_log_first_python(1)",
query_id=query_id,
)
assert False, "Exception have to be thrown"
except Exception as ex:
assert "DB::Exception: Child process was exited with return code 1" in str(ex)
assert node.contains_in_log(
f"{{{query_id}}} <Error> ShellCommandSource: Executable fails with stderr: {'a' * (3 * 1024)}{'b' * 1024}\n"
"SELECT test_function_always_error_log_first_python(1)", query_id=query_id
)
== "Key 1\n"
)
assert node.contains_in_log(
f"{{{query_id}}} <Warning> TimeoutReadBufferFromFileDescriptor: Executable generates stderr at the beginning: {'a' * (3 * 1024)}{'b' * 1024}\n"
)
query_id = uuid.uuid4().hex
try:
assert (
node.query(
"SELECT test_function_always_error_exit_log_last_python(1)",
query_id=query_id,
"SELECT test_function_always_error_log_last_python(1)", query_id=query_id
)
== "Key 1\n"
)
assert node.contains_in_log(
f"{{{query_id}}} <Warning> TimeoutReadBufferFromFileDescriptor: Executable generates stderr at the end: {'b' * 1024}{'c' * (3 * 1024)}\n"
)
assert (
node.query("SELECT test_function_exit_error_ignore_python(1)")
== "Key 1\n"
)
try:
node.query("SELECT test_function_exit_error_fail_python(1)")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "DB::Exception: Child process was exited with return code 1" in str(ex)
assert node.contains_in_log(
f"{{{query_id}}} <Error> ShellCommandSource: Executable fails with stderr: {'b' * 1024}{'c' * (3 * 1024)}\n"
)

View File

@ -0,0 +1,10 @@
#!/usr/bin/python3
import sys
if __name__ == "__main__":
for line in sys.stdin:
print("Key " + line, end="")
sys.stdout.flush()
sys.exit(1)

View File

@ -8,5 +8,3 @@ if __name__ == "__main__":
for line in sys.stdin:
print("Key " + line, end="")
sys.stdout.flush()
sys.exit(1)