Added support for external tables.

This commit is contained in:
Vitaly Baranov 2020-11-03 14:47:34 +03:00
parent 23842e7ac6
commit a0e384b0c0
3 changed files with 169 additions and 12 deletions

View File

@ -7,6 +7,7 @@
#include <Common/SettingsChanges.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataTypes/DataTypeFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/executeQuery.h>
@ -24,6 +25,7 @@
#include <Poco/FileStream.h>
#include <Poco/StreamCopier.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <ext/range.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
@ -161,6 +163,8 @@ namespace
str.append(str.empty() ? "" : ", ").append("query_id: ").append(query_info.query_id());
if (!query_info.input_data().empty())
str.append(str.empty() ? "" : ", ").append("input_data: ").append(std::to_string(query_info.input_data().size())).append(" bytes");
if (query_info.external_tables_size())
str.append(str.empty() ? "" : ", ").append("external tables: ").append(std::to_string(query_info.external_tables_size()));
return str;
}
@ -459,6 +463,7 @@ namespace
void processInput();
void initializeBlockInputStream(const Block & header);
void createExternalTables();
void generateOutput();
void generateOutputWithProcessors();
@ -705,6 +710,14 @@ namespace
if (output_format.empty())
output_format = query_context->getDefaultFormat();
/// Set callback to create and fill external tables
query_context->setExternalTablesInitializer([this] (Context & context)
{
if (&context != &*query_context)
throw Exception("Unexpected context in external tables initializer", ErrorCodes::LOGICAL_ERROR);
createExternalTables();
});
/// Set callbacks to execute function input().
query_context->setInputInitializer([this] (Context & context, const StoragePtr & input_storage)
{
@ -803,8 +816,8 @@ namespace
readQueryInfo();
if (!query_info.query().empty() || !query_info.query_id().empty() || !query_info.settings().empty()
|| !query_info.database().empty() || !query_info.input_data_delimiter().empty() || !query_info.output_format().empty()
|| !query_info.user_name().empty() || !query_info.password().empty() || !query_info.quota().empty()
|| !query_info.session_id().empty())
|| query_info.external_tables_size() || !query_info.user_name().empty() || !query_info.password().empty()
|| !query_info.quota().empty() || !query_info.session_id().empty())
{
throw Exception("Extra query infos can be used only to add more input data. "
"Only the following fields can be set: input_data, next_query_info, cancel",
@ -842,6 +855,103 @@ namespace
}
}
void Call::createExternalTables()
{
while (true)
{
for (const auto & external_table : query_info.external_tables())
{
String name = external_table.name();
if (name.empty())
name = "_data";
auto temporary_id = StorageID::createEmpty();
temporary_id.table_name = name;
/// If such a table does not exist, create it.
StoragePtr storage;
if (auto resolved = query_context->tryResolveStorageID(temporary_id, Context::ResolveExternal))
{
storage = DatabaseCatalog::instance().getTable(resolved, *query_context);
}
else
{
NamesAndTypesList columns;
for (size_t column_idx : ext::range(external_table.columns_size()))
{
const auto & name_and_type = external_table.columns(column_idx);
NameAndTypePair column;
column.name = name_and_type.name();
if (column.name.empty())
column.name = "_" + std::to_string(column_idx + 1);
column.type = DataTypeFactory::instance().get(name_and_type.type());
columns.emplace_back(std::move(column));
}
auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns}, {});
storage = temporary_table.getTable();
query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table));
}
if (!external_table.data().empty())
{
/// The data will be written directly to the table.
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
auto out_stream = storage->write(ASTPtr(), metadata_snapshot, *query_context);
ReadBufferFromMemory data(external_table.data().data(), external_table.data().size());
String format = external_table.format();
if (format.empty())
format = "TabSeparated";
Context * external_table_context = &*query_context;
std::optional<Context> temp_context;
if (!external_table.settings().empty())
{
temp_context = *query_context;
external_table_context = &*temp_context;
SettingsChanges settings_changes;
for (const auto & [key, value] : external_table.settings())
settings_changes.push_back({key, value});
external_table_context->checkSettingsConstraints(settings_changes);
external_table_context->applySettingsChanges(settings_changes);
}
auto in_stream = external_table_context->getInputFormat(
format, data, metadata_snapshot->getSampleBlock(), external_table_context->getSettings().max_insert_block_size);
in_stream->readPrefix();
out_stream->writePrefix();
while (auto block = in_stream->read())
out_stream->write(block);
in_stream->readSuffix();
out_stream->writeSuffix();
}
}
if (!query_info.input_data().empty())
{
/// External tables must be created before executing query,
/// so all external tables must be send no later sending any input data.
break;
}
if (!query_info.next_query_info())
break;
if (!isInputStreaming(call_type))
throw Exception("next_query_info is allowed to be set only for streaming input", ErrorCodes::INVALID_GRPC_QUERY_INFO);
readQueryInfo();
if (!query_info.query().empty() || !query_info.query_id().empty() || !query_info.settings().empty()
|| !query_info.database().empty() || !query_info.input_data_delimiter().empty()
|| !query_info.output_format().empty() || !query_info.user_name().empty() || !query_info.password().empty()
|| !query_info.quota().empty() || !query_info.session_id().empty())
{
throw Exception("Extra query infos can be used only to add more data to input or more external tables. "
"Only the following fields can be set: input_data, external_tables, next_query_info, cancel",
ErrorCodes::INVALID_GRPC_QUERY_INFO);
}
if (isQueryCancelled())
break;
LOG_DEBUG(log, "Received extra QueryInfo: external tables: {}", query_info.external_tables_size());
}
}
void Call::generateOutput()
{
if (io.pipeline.initialized())

View File

@ -2,6 +2,19 @@ syntax = "proto3";
package clickhouse.grpc;
message NameAndType {
string name = 1;
string type = 2;
}
message ExternalTable {
string name = 1;
repeated NameAndType columns = 2;
string data = 3;
string format = 4;
map<string, string> settings = 5;
}
message QueryInfo {
string query = 1;
string query_id = 2;
@ -10,14 +23,15 @@ message QueryInfo {
string input_data = 5;
string input_data_delimiter = 6;
string output_format = 7;
string user_name = 8;
string password = 9;
string quota = 10;
string session_id = 11;
bool session_check = 12;
uint32 session_timeout = 13;
bool cancel = 14;
bool next_query_info = 15;
repeated ExternalTable external_tables = 8;
string user_name = 9;
string password = 10;
string quota = 11;
string session_id = 12;
bool session_check = 13;
uint32 session_timeout = 14;
bool cancel = 15;
bool next_query_info = 16;
}
enum LogsLevel {

View File

@ -41,7 +41,7 @@ def create_channel():
main_channel = channel
return channel
def query_common(query_text, settings={}, input_data=[], input_data_delimiter='', output_format='TabSeparated', query_id='123', session_id='', stream_output=False, channel=None):
def query_common(query_text, settings={}, input_data=[], input_data_delimiter='', output_format='TabSeparated', external_tables=[], query_id='123', session_id='', stream_output=False, channel=None):
if type(input_data) == str:
input_data = [input_data]
if not channel:
@ -50,7 +50,8 @@ def query_common(query_text, settings={}, input_data=[], input_data_delimiter=''
def query_info():
input_data_part = input_data.pop(0) if input_data else ''
return clickhouse_grpc_pb2.QueryInfo(query=query_text, settings=settings, input_data=input_data_part, input_data_delimiter=input_data_delimiter,
output_format=output_format, query_id=query_id, session_id=session_id, next_query_info=bool(input_data))
output_format=output_format, external_tables=external_tables, query_id=query_id, session_id=session_id,
next_query_info=bool(input_data))
def send_query_info():
yield query_info()
while input_data:
@ -263,6 +264,38 @@ def test_input_function():
query("INSERT INTO t SELECT col1 * col2 FROM input('col1 UInt8, col2 UInt8') FORMAT CSV 20,10\n", input_data="15,15\n")
assert query("SELECT a FROM t ORDER BY a") == "20\n88\n120\n143\n200\n225\n"
def test_external_table():
columns = [clickhouse_grpc_pb2.NameAndType(name='UserID', type='UInt64'), clickhouse_grpc_pb2.NameAndType(name='UserName', type='String')]
ext1 = clickhouse_grpc_pb2.ExternalTable(name='ext1', columns=columns, data='1\tAlex\n2\tBen\n3\tCarl\n', format='TabSeparated')
assert query("SELECT * FROM ext1 ORDER BY UserID", external_tables=[ext1]) == "1\tAlex\n"\
"2\tBen\n"\
"3\tCarl\n"
ext2 = clickhouse_grpc_pb2.ExternalTable(name='ext2', columns=columns, data='4,Daniel\n5,Ethan\n', format='CSV')
assert query("SELECT * FROM (SELECT * FROM ext1 UNION ALL SELECT * FROM ext2) ORDER BY UserID", external_tables=[ext1, ext2]) == "1\tAlex\n"\
"2\tBen\n"\
"3\tCarl\n"\
"4\tDaniel\n"\
"5\tEthan\n"
unnamed_columns = [clickhouse_grpc_pb2.NameAndType(type='UInt64'), clickhouse_grpc_pb2.NameAndType(type='String')]
unnamed_table = clickhouse_grpc_pb2.ExternalTable(columns=unnamed_columns, data='6\tGeorge\n7\tFred\n')
assert query("SELECT * FROM _data ORDER BY _2", external_tables=[unnamed_table]) == "7\tFred\n"\
"6\tGeorge\n"
def test_external_table_streaming():
columns = [clickhouse_grpc_pb2.NameAndType(name='UserID', type='UInt64'), clickhouse_grpc_pb2.NameAndType(name='UserName', type='String')]
def send_query_info():
yield clickhouse_grpc_pb2.QueryInfo(query="SELECT * FROM exts ORDER BY UserID",
external_tables=[clickhouse_grpc_pb2.ExternalTable(name='exts', columns=columns, data='1\tAlex\n2\tBen\n3\tCarl\n')],
next_query_info=True)
yield clickhouse_grpc_pb2.QueryInfo(external_tables=[clickhouse_grpc_pb2.ExternalTable(name='exts', data='4\tDaniel\n5\tEthan\n')])
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
result = stub.ExecuteQueryWithStreamInput(send_query_info())
assert result.output == "1\tAlex\n"\
"2\tBen\n"\
"3\tCarl\n"\
"4\tDaniel\n"\
"5\tEthan\n"
def test_simultaneous_queries_same_channel():
threads=[]
try: