Executable dictionaries: fail if program returns non zero exit code (#CLICKHOUSE-3171)

This commit is contained in:
proller 2017-10-02 16:08:09 +03:00 committed by alexey-milovidov
parent 18af75a4f1
commit 1505ad6bac
3 changed files with 27 additions and 5 deletions

View File

@ -65,10 +65,10 @@ namespace
/// By these return codes from the child process, we learn (for sure) about errors when creating it.
enum class ReturnCodes : int
{
CANNOT_DUP_STDIN = 42, /// The value is not important, but it is chosen so that it's rare to conflict with the program return code.
CANNOT_DUP_STDOUT = 43,
CANNOT_DUP_STDERR = 44,
CANNOT_EXEC = 45,
CANNOT_DUP_STDIN = 1431655765, /// The value is not important, but it is chosen so that it's rare to conflict with the program return code.
CANNOT_DUP_STDOUT = 1431655766,
CANNOT_DUP_STDERR = 1431655767,
CANNOT_EXEC = 1431655768,
};
}

View File

@ -27,6 +27,7 @@ private:
String getID() const override { return "Owning(" + stream->getID() + ")"; }
protected:
BlockInputStreamPtr stream;
std::unique_ptr<OwnType> own;
};

View File

@ -20,6 +20,20 @@ namespace DB
static const size_t max_block_size = 8192;
class ShellCommandOwningBlockInputStream : public OwningBlockInputStream<ShellCommand>
{
public:
ShellCommandOwningBlockInputStream(const BlockInputStreamPtr & stream, std::unique_ptr<ShellCommand> own) : OwningBlockInputStream(std::move(stream), std::move(own))
{
}
void readSuffix() override
{
OwningBlockInputStream<ShellCommand>::readSuffix();
own->wait();
}
};
ExecutableDictionarySource::ExecutableDictionarySource(const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
Block & sample_block, const Context & context)
@ -47,7 +61,7 @@ BlockInputStreamPtr ExecutableDictionarySource::loadAll()
LOG_TRACE(log, "loadAll " + toString());
auto process = ShellCommand::execute(command);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<OwningBlockInputStream<ShellCommand>>(input_stream, std::move(process));
return std::make_shared<ShellCommandOwningBlockInputStream>(input_stream, std::move(process));
}
@ -86,6 +100,12 @@ private:
void readSuffix() override
{
IProfilingBlockInputStream::readSuffix();
if (!wait_called)
{
wait_called = true;
command->wait();
}
thread.join();
/// To rethrow an exception, if any.
task.get_future().get();
@ -98,6 +118,7 @@ private:
std::unique_ptr<ShellCommand> command;
std::packaged_task<void()> task;
std::thread thread;
bool wait_called = false;
};