Added missed modifications

This commit is contained in:
palasonicq 2019-05-28 21:30:10 +03:00
parent c873ed39bf
commit 0823e397ea
10 changed files with 237 additions and 46 deletions

View File

@ -830,7 +830,7 @@ private:
connection->forceConnected();
/// INSERT query for which data transfer is needed (not an INSERT SELECT) is processed separately.
if (insert && !insert->select)
if (insert && (!insert->select || insert->input_function))
processInsertQuery();
else
processOrdinaryQuery();

View File

@ -185,6 +185,43 @@ void TCPHandler::runImpl()
state.maybe_compressed_in.reset(); /// For more accurate accounting by MemoryTracker.
});
/// Send structure of columns to client for function input()
query_context->setInputInitializer([this] (Context & context, const StoragePtr & input_storage)
{
if (&context != &*query_context)
throw Exception("Unexpected context in Input initializer", ErrorCodes::LOGICAL_ERROR);
state.need_receive_data_for_input = true;
/// Send ColumnsDescription for input storage.
if (client_revision >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA
&& query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
{
sendTableColumns(input_storage->getColumns());
}
/// Send block to the client - input storage structure.
state.input_header = input_storage->getSampleBlock();
sendData(state.input_header);
});
query_context->setInputBlocksReaderCallback([&global_settings, this] (Context & context) -> Block
{
if (&context != &*query_context)
throw Exception("Unexpected context in InputBlocksReader", ErrorCodes::LOGICAL_ERROR);
size_t poll_interval;
int receive_timeout;
std::tie(poll_interval, receive_timeout) = getReadTimeouts(global_settings);
if (!readDataNext(poll_interval, receive_timeout))
{
state.block_in.reset();
state.maybe_compressed_in.reset();
return Block();
};
return state.block_for_input;
});
customizeContext(*query_context);
bool may_have_embedded_data = client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA;
@ -200,6 +237,13 @@ void TCPHandler::runImpl()
/// Does the request require receive data from client?
if (state.need_receive_data_for_insert)
processInsertQuery(global_settings);
else if (state.need_receive_data_for_input)
{
/// It is special case for input(), all works for reading data from client will be done in callbacks.
/// state.io.in is NullAndDoCopyBlockInputStream so read it once.
state.io.in->read();
state.io.onFinish();
}
else
processOrdinaryQuery();
@ -304,7 +348,50 @@ void TCPHandler::runImpl()
}
void TCPHandler::readData(const Settings & global_settings)
bool TCPHandler::readDataNext(const size_t & poll_interval, const int & receive_timeout)
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
/// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.
while (true)
{
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(poll_interval))
break;
/// Do we need to shut down?
if (server.isCancelled())
return false;
/** Have we waited for data for too long?
* If we periodically poll, the receive_timeout of the socket itself does not work.
* Therefore, an additional check is added.
*/
double elapsed = watch.elapsedSeconds();
if (elapsed > receive_timeout)
{
std::stringstream ss;
ss << "Timeout exceeded while receiving data from client.";
ss << " Waited for " << static_cast<size_t>(elapsed) << " seconds,";
ss << " timeout is " << receive_timeout << " seconds.";
throw Exception(ss.str(), ErrorCodes::SOCKET_TIMEOUT);
}
}
/// If client disconnected.
if (in->eof())
return false;
/// We accept and process data. And if they are over, then we leave.
if (!receivePacket())
return false;
sendLogs();
return true;
}
std::tuple<size_t, int> TCPHandler::getReadTimeouts(const Settings & global_settings)
{
const auto receive_timeout = query_context->getSettingsRef().receive_timeout.value;
@ -314,48 +401,21 @@ void TCPHandler::readData(const Settings & global_settings)
constexpr size_t min_poll_interval = 5000; // 5 ms
size_t poll_interval = std::max(min_poll_interval, std::min(default_poll_interval, current_poll_interval));
return std::make_tuple(poll_interval, receive_timeout.totalSeconds());
}
void TCPHandler::readData(const Settings & global_settings)
{
size_t poll_interval;
int receive_timeout;
std::tie(poll_interval, receive_timeout) = getReadTimeouts(global_settings);
sendLogs();
while (true)
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
/// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.
while (true)
{
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(poll_interval))
break;
/// Do we need to shut down?
if (server.isCancelled())
return;
/** Have we waited for data for too long?
* If we periodically poll, the receive_timeout of the socket itself does not work.
* Therefore, an additional check is added.
*/
double elapsed = watch.elapsedSeconds();
if (elapsed > receive_timeout.totalSeconds())
{
std::stringstream ss;
ss << "Timeout exceeded while receiving data from client.";
ss << " Waited for " << static_cast<size_t>(elapsed) << " seconds,";
ss << " timeout is " << receive_timeout.totalSeconds() << " seconds.";
throw Exception(ss.str(), ErrorCodes::SOCKET_TIMEOUT);
}
}
/// If client disconnected.
if (in->eof())
if (!readDataNext(poll_interval, receive_timeout))
return;
/// We accept and process data. And if they are over, then we leave.
if (!receivePacket())
break;
sendLogs();
}
}
@ -716,7 +776,7 @@ bool TCPHandler::receiveData()
{
/// If there is an insert request, then the data should be written directly to `state.io.out`.
/// Otherwise, we write the blocks in the temporary `external_table_name` table.
if (!state.need_receive_data_for_insert)
if (!state.need_receive_data_for_insert && !state.need_receive_data_for_input)
{
StoragePtr storage;
/// If such a table does not exist, create it.
@ -730,7 +790,9 @@ bool TCPHandler::receiveData()
/// The data will be written directly to the table.
state.io.out = storage->write(ASTPtr(), *query_context);
}
if (block)
if (state.need_receive_data_for_input)
state.block_for_input = block;
else
state.io.out->write(block);
return true;
}
@ -751,6 +813,8 @@ void TCPHandler::initBlockInput()
Block header;
if (state.io.out)
header = state.io.out->getHeader();
else if (state.need_receive_data_for_input)
header = state.input_header;
state.block_in = std::make_shared<NativeBlockInputStream>(
*state.maybe_compressed_in,

View File

@ -58,6 +58,13 @@ struct QueryState
/// Request requires data from the client (INSERT, but not INSERT SELECT).
bool need_receive_data_for_insert = false;
/// Request requires data from client for function input()
bool need_receive_data_for_input = false;
/// temporary place for incoming data block for input()
Block block_for_input;
/// sample block from StorageInput
Block input_header;
/// To output progress, the difference after the previous sending of progress.
Progress progress;
@ -135,7 +142,9 @@ private:
bool receivePacket();
void receiveQuery();
bool receiveData();
bool readDataNext(const size_t & poll_interval, const int & receive_timeout);
void readData(const Settings & global_settings);
std::tuple<size_t, int> getReadTimeouts(const Settings & global_settings);
/// Process INSERT query
void processInsertQuery(const Settings & global_settings);

View File

@ -51,7 +51,7 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
res_stream = context.getInputFormat(format, *input_buffer_contacenated, header, context.getSettings().max_insert_block_size);
if (context.getSettingsRef().input_format_defaults_for_omitted_fields)
if (context.getSettingsRef().input_format_defaults_for_omitted_fields && !ast_insert_query->input_function)
{
StoragePtr storage = context.getTable(ast_insert_query->database, ast_insert_query->table);
auto column_defaults = storage->getColumns().getDefaults();

View File

@ -1944,6 +1944,51 @@ void Context::initializeExternalTablesIfSet()
}
void Context::setInputInitializer(InputInitializer && initializer)
{
if (input_initializer_callback)
throw Exception("Input initializer is already set", ErrorCodes::LOGICAL_ERROR);
input_initializer_callback = std::move(initializer);
}
void Context::initializeInput(const StoragePtr & input_storage)
{
if (!input_initializer_callback)
throw Exception("Input initializer is not set", ErrorCodes::LOGICAL_ERROR);
input_initializer_callback(*this, input_storage);
/// Reset callback
input_initializer_callback = {};
}
void Context::setInputBlocksReaderCallback(InputBlocksReader && reader)
{
if (input_blocks_reader)
throw Exception("Input blocks reader is already set", ErrorCodes::LOGICAL_ERROR);
input_blocks_reader = std::move(reader);
}
InputBlocksReader Context::getInputBlocksReaderCallback() const
{
return input_blocks_reader;
}
void Context::resetInputCallbacks()
{
if (input_initializer_callback)
input_initializer_callback = {};
if (input_blocks_reader)
input_blocks_reader = {};
}
SessionCleaner::~SessionCleaner()
{
try

View File

@ -95,6 +95,11 @@ using TableAndCreateASTs = std::map<String, TableAndCreateAST>;
/// Callback for external tables initializer
using ExternalTablesInitializer = std::function<void(Context &)>;
/// Callback for initialize input()
using InputInitializer = std::function<void(Context &, const StoragePtr &)>;
/// Callback for reading blocks of data from client for function input()
using InputBlocksReader = std::function<Block(Context &)>;
/// An empty interface for an arbitrary object that may be attached by a shared pointer
/// to query context, when using ClickHouse as a library.
struct IHostContext
@ -119,6 +124,9 @@ private:
ClientInfo client_info;
ExternalTablesInitializer external_tables_initializer_callback;
InputInitializer input_initializer_callback;
InputBlocksReader input_blocks_reader;
std::shared_ptr<QuotaForIntervals> quota; /// Current quota. By default - empty quota, that have no limits.
String current_database;
Settings settings; /// Setting for query execution.
@ -200,6 +208,17 @@ public:
/// This method is called in executeQuery() and will call the external tables initializer.
void initializeExternalTablesIfSet();
/// When input() is present we have to send columns structure to client
void setInputInitializer(InputInitializer && initializer);
/// This method is called in StorageInput::read while executing query
void initializeInput(const StoragePtr & input_storage);
/// Callback for read data blocks from client one by one for function input()
void setInputBlocksReaderCallback(InputBlocksReader && reader);
/// Get callback for reading data for input()
InputBlocksReader getInputBlocksReaderCallback() const;
void resetInputCallbacks();
ClientInfo & getClientInfo() { return client_info; }
const ClientInfo & getClientInfo() const { return client_info; }

View File

@ -21,6 +21,8 @@
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Storages/StorageInput.h>
#include <Interpreters/Quota.h>
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/ProcessList.h>
@ -146,7 +148,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool has_query_tail)
bool has_query_tail,
ReadBuffer * istr)
{
time_t current_time = time(nullptr);
@ -221,6 +224,24 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Load external tables if they were provided
context.initializeExternalTablesIfSet();
/// Prepare Input storage before executing interpreter.
auto * insert_query = ast->as<ASTInsertQuery>();
if (insert_query && insert_query->input_function)
{
/// If we already got a buffer with data then initialize input stream.
if (istr)
{
StoragePtr storage = context.executeTableFunction(insert_query->input_function);
auto * input_storage = dynamic_cast<StorageInput *>(storage.get());
BlockInputStreamPtr input_stream = std::make_shared<InputStreamFromASTInsertQuery>(ast, istr,
input_storage->getSampleBlock(), context);
input_storage->setInputStream(input_stream);
}
}
else
/// reset Input callbacks if query is not INSERT SELECT
context.resetInputCallbacks();
auto interpreter = InterpreterFactory::get(ast, context, stage);
res = interpreter->execute();
if (auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
@ -434,7 +455,8 @@ BlockIO executeQuery(
bool may_have_embedded_data)
{
BlockIO streams;
std::tie(std::ignore, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context, internal, stage, !may_have_embedded_data);
std::tie(std::ignore, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context,
internal, stage, !may_have_embedded_data, nullptr);
return streams;
}
@ -485,7 +507,7 @@ void executeQuery(
ASTPtr ast;
BlockIO streams;
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail);
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail, &istr);
try
{

View File

@ -17,6 +17,7 @@ public:
ASTPtr columns;
String format;
ASTPtr select;
ASTPtr input_function;
ASTPtr table_function;
ASTPtr settings_ast;
@ -40,6 +41,7 @@ public:
if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); }
if (select) { res->select = select->clone(); res->children.push_back(res->select); }
if (input_function) { res->input_function = input_function->clone(); res->children.push_back(res->input_function); }
if (table_function) { res->table_function = table_function->clone(); res->children.push_back(res->table_function); }
if (settings_ast) { res->settings_ast = settings_ast->clone(); res->children.push_back(res->settings_ast); }

View File

@ -9,6 +9,7 @@
#include <Parsers/ParserInsertQuery.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/ASTFunction.h>
#include <Common/typeid_cast.h>
namespace DB
@ -19,6 +20,24 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
}
void tryFindInputFunction(const ASTPtr & ast, ASTPtr & input_function)
{
if (!ast)
return;
for (const auto & child : ast->children)
tryFindInputFunction(child, input_function);
if (const auto * table_function = ast->as<ASTFunction>())
{
if (table_function->name == "input")
{
if (input_function)
throw Exception("You can use 'input()' function only once per request.", ErrorCodes::SYNTAX_ERROR);
input_function = ast;
}
}
}
bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
@ -44,6 +63,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ASTPtr select;
ASTPtr table_function;
ASTPtr settings_ast;
ASTPtr input_function;
/// Insertion data
const char * data = nullptr;
@ -97,6 +117,13 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
pos = before_select;
ParserSelectWithUnionQuery select_p;
select_p.parse(pos, select, expected);
/// Check if we have INSERT SELECT FROM input().
tryFindInputFunction(select, input_function);
/// FORMAT section is required if we have input() in SELECT part
if (input_function)
if (!s_format.ignore(pos, expected) || !name_p.parse(pos, format, expected))
return false;
}
else
{
@ -155,6 +182,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->columns = columns;
query->select = select;
query->input_function = input_function;
query->settings_ast = settings_ast;
query->data = data != end ? data : nullptr;
query->end = end;

View File

@ -12,6 +12,7 @@ void registerTableFunctionNumbers(TableFunctionFactory & factory);
void registerTableFunctionCatBoostPool(TableFunctionFactory & factory);
void registerTableFunctionFile(TableFunctionFactory & factory);
void registerTableFunctionURL(TableFunctionFactory & factory);
void registerTableFunctionInput(TableFunctionFactory & factory);
#if USE_HDFS
void registerTableFunctionHDFS(TableFunctionFactory & factory);
@ -38,6 +39,7 @@ void registerTableFunctions()
registerTableFunctionCatBoostPool(factory);
registerTableFunctionFile(factory);
registerTableFunctionURL(factory);
registerTableFunctionInput(factory);
#if USE_HDFS
registerTableFunctionHDFS(factory);