Fix error with executable dictionary source

This commit is contained in:
Alexey Milovidov 2020-09-07 09:40:01 +03:00
parent 4a336e3814
commit 04a6965006
11 changed files with 240 additions and 50 deletions

View File

@ -11,7 +11,7 @@ stage=${stage:-}
# A variable to pass additional flags to CMake.
# Here we explicitly default it to nothing so that bash doesn't complain about
# it being undefined. Also read it as array so that we can pass an empty list
# it being undefined. Also read it as array so that we can pass an empty list
# of additional variable to cmake properly, and it doesn't generate an extra
# empty parameter.
read -ra FASTTEST_CMAKE_FLAGS <<< "${FASTTEST_CMAKE_FLAGS:-}"
@ -128,6 +128,7 @@ ln -s /usr/share/clickhouse-test/config/access_management.xml /etc/clickhouse-se
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/strings_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/decimals_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/executable_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/macros.xml /etc/clickhouse-server/config.d/
ln -s /usr/share/clickhouse-test/config/disks.xml /etc/clickhouse-server/config.d/
#ln -s /usr/share/clickhouse-test/config/secure_ports.xml /etc/clickhouse-server/config.d/

View File

@ -24,6 +24,7 @@ ln -s /usr/share/clickhouse-test/config/access_management.xml /etc/clickhouse-se
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/strings_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/decimals_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/executable_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/macros.xml /etc/clickhouse-server/config.d/
ln -s /usr/share/clickhouse-test/config/disks.xml /etc/clickhouse-server/config.d/
ln -s /usr/share/clickhouse-test/config/secure_ports.xml /etc/clickhouse-server/config.d/

View File

@ -24,6 +24,7 @@ ln -s /usr/share/clickhouse-test/config/access_management.xml /etc/clickhouse-se
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/strings_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/decimals_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/executable_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/macros.xml /etc/clickhouse-server/config.d/
ln -s /usr/share/clickhouse-test/config/disks.xml /etc/clickhouse-server/config.d/
ln -s /usr/share/clickhouse-test/config/secure_ports.xml /etc/clickhouse-server/config.d/

View File

@ -57,6 +57,7 @@ ln -s /usr/share/clickhouse-test/config/access_management.xml /etc/clickhouse-se
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/strings_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/decimals_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/executable_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/macros.xml /etc/clickhouse-server/config.d/
ln -s /usr/share/clickhouse-test/config/disks.xml /etc/clickhouse-server/config.d/
ln -s /usr/share/clickhouse-test/config/secure_ports.xml /etc/clickhouse-server/config.d/

View File

@ -84,3 +84,6 @@ target_link_libraries (procfs_metrics_provider_perf PRIVATE clickhouse_common_io
add_executable (average average.cpp)
target_link_libraries (average PRIVATE clickhouse_common_io)
add_executable (shell_command_inout shell_command_inout.cpp)
target_link_libraries (shell_command_inout PRIVATE clickhouse_common_io)

View File

@ -0,0 +1,47 @@
#include <thread>
#include <Common/ShellCommand.h>
#include <Common/Exception.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/copyData.h>
/** This example shows how we can proxy stdin to ShellCommand and obtain stdout in streaming fashion. */
int main(int argc, char ** argv)
try
{
using namespace DB;
if (argc < 2)
{
std::cerr << "Usage: shell_command_inout 'command...' < in > out\n";
return 1;
}
auto command = ShellCommand::execute(argv[1]);
ReadBufferFromFileDescriptor in(STDIN_FILENO);
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
WriteBufferFromFileDescriptor err(STDERR_FILENO);
/// Background thread sends data and foreground thread receives result.
std::thread thread([&]
{
copyData(in, command->in);
command->in.close();
});
copyData(command->out, out);
copyData(command->err, err);
thread.join();
return 0;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << '\n';
throw;
}

View File

@ -1,12 +1,13 @@
#include "ExecutableDictionarySource.h"
#include <future>
#include <thread>
#include <functional>
#include <ext/scope_guard.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/copyData.h>
#include <Common/ShellCommand.h>
#include <Common/ThreadPool.h>
#include <common/logger_useful.h>
@ -16,6 +17,7 @@
#include "DictionaryStructure.h"
#include "registerDictionaries.h"
namespace DB
{
static const UInt64 max_block_size = 8192;
@ -31,15 +33,23 @@ namespace
/// Owns ShellCommand and calls wait for it.
class ShellCommandOwningBlockInputStream : public OwningBlockInputStream<ShellCommand>
{
private:
Poco::Logger * log;
public:
ShellCommandOwningBlockInputStream(const BlockInputStreamPtr & impl, std::unique_ptr<ShellCommand> own_)
: OwningBlockInputStream(std::move(impl), std::move(own_))
ShellCommandOwningBlockInputStream(Poco::Logger * log_, const BlockInputStreamPtr & impl, std::unique_ptr<ShellCommand> command_)
: OwningBlockInputStream(std::move(impl), std::move(command_)), log(log_)
{
}
void readSuffix() override
{
OwningBlockInputStream<ShellCommand>::readSuffix();
std::string err;
readStringUntilEOF(err, own->err);
if (!err.empty())
LOG_ERROR(log, "Having stderr: {}", err);
own->wait();
}
};
@ -80,7 +90,7 @@ BlockInputStreamPtr ExecutableDictionarySource::loadAll()
LOG_TRACE(log, "loadAll {}", toString());
auto process = ShellCommand::execute(command);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<ShellCommandOwningBlockInputStream>(input_stream, std::move(process));
return std::make_shared<ShellCommandOwningBlockInputStream>(log, input_stream, std::move(process));
}
BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll()
@ -95,67 +105,77 @@ BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll()
LOG_TRACE(log, "loadUpdatedAll {}", command_with_update_field);
auto process = ShellCommand::execute(command_with_update_field);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<ShellCommandOwningBlockInputStream>(input_stream, std::move(process));
return std::make_shared<ShellCommandOwningBlockInputStream>(log, input_stream, std::move(process));
}
namespace
{
/** A stream, that also runs and waits for background thread
* (that will feed data into pipe to be read from the other side of the pipe).
/** A stream, that runs child process and sends data to its stdin in background thread,
* and receives data from its stdout.
*/
class BlockInputStreamWithBackgroundThread final : public IBlockInputStream
{
public:
BlockInputStreamWithBackgroundThread(
const BlockInputStreamPtr & stream_, std::unique_ptr<ShellCommand> && command_, std::packaged_task<void()> && task_)
: stream{stream_}, command{std::move(command_)}, task(std::move(task_)), thread([this] {
task();
command->in.close();
})
const Context & context,
const std::string & format,
const Block & sample_block,
const std::string & command_str,
Poco::Logger * log_,
std::function<void(WriteBufferFromFile &)> && send_data_)
: log(log_),
command(ShellCommand::execute(command_str)),
send_data(std::move(send_data_)),
thread([this] { send_data(command->in); })
{
children.push_back(stream);
//WriteBufferFromFileDescriptor err(STDERR_FILENO);
//copyData(command->out, err);
//err.next();
//thread.join();
stream = context.getInputFormat(format, command->out, sample_block, max_block_size);
}
~BlockInputStreamWithBackgroundThread() override
{
if (thread.joinable())
{
try
{
readSuffix();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
thread.join();
}
Block getHeader() const override { return stream->getHeader(); }
Block getHeader() const override
{
return stream->getHeader();
}
private:
Block readImpl() override { return stream->read(); }
Block readImpl() override
{
return stream->read();
}
void readPrefix() override
{
stream->readPrefix();
}
void readSuffix() override
{
IBlockInputStream::readSuffix();
if (!wait_called)
{
wait_called = true;
command->wait();
}
thread.join();
/// To rethrow an exception, if any.
task.get_future().get();
stream->readSuffix();
std::string err;
readStringUntilEOF(err, command->err);
if (!err.empty())
LOG_ERROR(log, "Having stderr: {}", err);
command->wait();
}
String getName() const override { return "WithBackgroundThread"; }
Poco::Logger * log;
BlockInputStreamPtr stream;
std::unique_ptr<ShellCommand> command;
std::packaged_task<void()> task;
ThreadFromGlobalPool thread;
bool wait_called = false;
std::function<void(WriteBufferFromFile &)> send_data;
mutable ThreadFromGlobalPool thread;
};
}
@ -164,28 +184,29 @@ namespace
BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
auto process = ShellCommand::execute(command);
auto output_stream = context.getOutputFormat(format, process->in, sample_block);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<BlockInputStreamWithBackgroundThread>(
input_stream, std::move(process), std::packaged_task<void()>([output_stream, &ids]() mutable { formatIDs(output_stream, ids); }));
context, format, sample_block, command, log,
[&ids, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context.getOutputFormat(format, out, sample_block);
formatIDs(output_stream, ids);
out.close();
});
}
BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size());
auto process = ShellCommand::execute(command);
auto output_stream = context.getOutputFormat(format, process->in, sample_block);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<BlockInputStreamWithBackgroundThread>(
input_stream, std::move(process), std::packaged_task<void()>([output_stream, key_columns, &requested_rows, this]() mutable
context, format, sample_block, command, log,
[key_columns, &requested_rows, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context.getOutputFormat(format, out, sample_block);
formatKeys(dict_struct, output_stream, key_columns, requested_rows);
}));
out.close();
});
}
bool ExecutableDictionarySource::isModified() const

View File

@ -14,6 +14,7 @@ namespace DB
/// Allows loading dictionaries from executable
class ExecutableDictionarySource final : public IDictionarySource
{
friend class BlockInputStreamWithBackgroundThread;
public:
ExecutableDictionarySource(
const DictionaryStructure & dict_struct_,

View File

@ -0,0 +1,108 @@
<dictionaries>
<dictionary>
<name>executable_complex</name>
<source>
<executable>
<format>JSONEachRow</format>
<command>cd /; clickhouse-local --input-format JSONEachRow --output-format JSONEachRow --structure 'x UInt64, y UInt64' --query "SELECT x, y, x + y AS a, x * y AS b FROM table"</command>
</executable>
</source>
<lifetime>0</lifetime>
<layout>
<complex_key_cache>
<size_in_cells>1000</size_in_cells>
</complex_key_cache>
</layout>
<structure>
<key>
<attribute>
<name>x</name>
<type>UInt64</type>
</attribute>
<attribute>
<name>y</name>
<type>UInt64</type>
</attribute>
</key>
<attribute>
<name>a</name>
<type>UInt64</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>b</name>
<type>UInt64</type>
<null_value>0</null_value>
</attribute>
</structure>
</dictionary>
<dictionary>
<name>executable_simple</name>
<source>
<executable>
<format>JSONEachRow</format>
<command>cd /; clickhouse-local --input-format JSONEachRow --output-format JSONEachRow --structure 'x UInt64' --query "SELECT x, x + x AS a, x * x AS b FROM table"</command>
</executable>
</source>
<lifetime>0</lifetime>
<layout>
<cache>
<size_in_cells>1000</size_in_cells>
</cache>
</layout>
<structure>
<id>
<name>x</name>
</id>
<attribute>
<name>a</name>
<type>UInt64</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>b</name>
<type>UInt64</type>
<null_value>0</null_value>
</attribute>
</structure>
</dictionary>
<dictionary>
<name>executable_complex_direct</name>
<source>
<executable>
<format>JSONEachRow</format>
<command>cd /; clickhouse-local --input-format JSONEachRow --output-format JSONEachRow --structure 'x UInt64, y UInt64' --query "SELECT x, y, x + y AS a, x * y AS b FROM table"</command>
</executable>
</source>
<lifetime>0</lifetime>
<layout>
<complex_key_direct />
</layout>
<structure>
<key>
<attribute>
<name>x</name>
<type>UInt64</type>
</attribute>
<attribute>
<name>y</name>
<type>UInt64</type>
</attribute>
</key>
<attribute>
<name>a</name>
<type>UInt64</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>b</name>
<type>UInt64</type>
<null_value>0</null_value>
</attribute>
</structure>
</dictionary>
</dictionaries>

View File

@ -0,0 +1,3 @@
999999 1999998 999998000001
999999 1999998 999998000001
999999 1999998 999998000001

View File

@ -0,0 +1,3 @@
SELECT number, dictGet('executable_complex', 'a', (number, number)) AS a, dictGet('executable_complex', 'b', (number, number)) AS b FROM numbers(1000000) WHERE number = 999999;
SELECT number, dictGet('executable_complex_direct', 'a', (number, number)) AS a, dictGet('executable_complex_direct', 'b', (number, number)) AS b FROM numbers(1000000) WHERE number = 999999;
SELECT number, dictGet('executable_simple', 'a', number) AS a, dictGet('executable_simple', 'b', number) AS b FROM numbers(1000000) WHERE number = 999999;