Merge pull request #28835 from kitaisreal/shell-command-source-fix-logging

ShellCommandSource fix logging
This commit is contained in:
Maksim Kita 2021-09-10 12:27:19 +03:00 committed by GitHub
commit 2e535cf6fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 5 additions and 10 deletions

View File

@ -52,14 +52,12 @@ public:
const std::string & format,
const Block & sample_block,
std::unique_ptr<ShellCommand> && command_,
Poco::Logger * log_,
std::vector<SendDataTask> && send_data_tasks = {},
const ShellCommandSourceConfiguration & configuration_ = {},
std::shared_ptr<ProcessPool> process_pool_ = nullptr)
: SourceWithProgress(sample_block)
, command(std::move(command_))
, configuration(configuration_)
, log(log_)
, process_pool(process_pool_)
{
for (auto && send_data_task : send_data_tasks)
@ -133,7 +131,6 @@ protected:
}
catch (...)
{
tryLogCurrentException(log);
command = nullptr;
throw;
}
@ -176,8 +173,6 @@ private:
size_t current_read_rows = 0;
Poco::Logger * log;
std::shared_ptr<ProcessPool> process_pool;
QueryPipeline pipeline;

View File

@ -72,7 +72,7 @@ Pipe ExecutableDictionarySource::loadAll()
ShellCommand::Config config(configuration.command);
auto process = ShellCommand::execute(config);
Pipe pipe(std::make_unique<ShellCommandSource>(context, configuration.format, sample_block, std::move(process), log));
Pipe pipe(std::make_unique<ShellCommandSource>(context, configuration.format, sample_block, std::move(process)));
return pipe;
}
@ -91,7 +91,7 @@ Pipe ExecutableDictionarySource::loadUpdatedAll()
LOG_TRACE(log, "loadUpdatedAll {}", command_with_update_field);
ShellCommand::Config config(command_with_update_field);
auto process = ShellCommand::execute(config);
Pipe pipe(std::make_unique<ShellCommandSource>(context, configuration.format, sample_block, std::move(process), log));
Pipe pipe(std::make_unique<ShellCommandSource>(context, configuration.format, sample_block, std::move(process)));
return pipe;
}
@ -126,7 +126,7 @@ Pipe ExecutableDictionarySource::getStreamForBlock(const Block & block)
}};
std::vector<ShellCommandSource::SendDataTask> tasks = {std::move(task)};
Pipe pipe(std::make_unique<ShellCommandSource>(context, configuration.format, sample_block, std::move(process), log, std::move(tasks)));
Pipe pipe(std::make_unique<ShellCommandSource>(context, configuration.format, sample_block, std::move(process), std::move(tasks)));
if (configuration.implicit_key)
pipe.addTransform(std::make_shared<TransformWithAdditionalColumns>(block, pipe.getHeader()));

View File

@ -120,7 +120,7 @@ Pipe ExecutablePoolDictionarySource::getStreamForBlock(const Block & block)
ShellCommandSourceConfiguration command_configuration;
command_configuration.read_fixed_number_of_rows = true;
command_configuration.number_of_rows_to_read = rows_to_read;
Pipe pipe(std::make_unique<ShellCommandSource>(context, configuration.format, sample_block, std::move(process), log, std::move(tasks), command_configuration, process_pool));
Pipe pipe(std::make_unique<ShellCommandSource>(context, configuration.format, sample_block, std::move(process), std::move(tasks), command_configuration, process_pool));
if (configuration.implicit_key)
pipe.addTransform(std::make_shared<TransformWithAdditionalColumns>(block, pipe.getHeader()));

View File

@ -183,7 +183,7 @@ Pipe StorageExecutable::read(
configuration.read_number_of_rows_from_process_output = true;
}
Pipe pipe(std::make_unique<ShellCommandSource>(context, format, std::move(sample_block), std::move(process), log, std::move(tasks), configuration, process_pool));
Pipe pipe(std::make_unique<ShellCommandSource>(context, format, std::move(sample_block), std::move(process), std::move(tasks), configuration, process_pool));
return pipe;
}