mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Support multiquery option
This commit is contained in:
parent
a10b9ed63a
commit
e31eccfaf2
@ -1,5 +1,3 @@
|
|||||||
#include "TestHint.h"
|
|
||||||
|
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <signal.h>
|
#include <signal.h>
|
||||||
@ -33,6 +31,7 @@
|
|||||||
#include <Common/PODArray.h>
|
#include <Common/PODArray.h>
|
||||||
#include <Client/IClient.h>
|
#include <Client/IClient.h>
|
||||||
#include <Client/QueryFuzzer.h>
|
#include <Client/QueryFuzzer.h>
|
||||||
|
#include <Client/TestHint.h>
|
||||||
#include <Core/QueryProcessingStage.h>
|
#include <Core/QueryProcessingStage.h>
|
||||||
#include <Core/ExternalTable.h>
|
#include <Core/ExternalTable.h>
|
||||||
#include <IO/ReadBufferFromFile.h>
|
#include <IO/ReadBufferFromFile.h>
|
||||||
@ -147,36 +146,6 @@ private:
|
|||||||
size_t insert_format_max_block_size = 0; /// Max block size when reading INSERT data.
|
size_t insert_format_max_block_size = 0; /// Max block size when reading INSERT data.
|
||||||
size_t max_client_network_bandwidth = 0; /// The maximum speed of data exchange over the network for the client in bytes per second.
|
size_t max_client_network_bandwidth = 0; /// The maximum speed of data exchange over the network for the client in bytes per second.
|
||||||
|
|
||||||
SharedContextHolder shared_context = Context::createShared();
|
|
||||||
ContextMutablePtr context = Context::createGlobal(shared_context.get());
|
|
||||||
|
|
||||||
/// Buffer that reads from stdin in batch mode.
|
|
||||||
ReadBufferFromFileDescriptor std_in{STDIN_FILENO};
|
|
||||||
|
|
||||||
/// Console output.
|
|
||||||
WriteBufferFromFileDescriptor std_out{STDOUT_FILENO};
|
|
||||||
std::unique_ptr<ShellCommand> pager_cmd;
|
|
||||||
|
|
||||||
/// The user can specify to redirect query output to a file.
|
|
||||||
std::optional<WriteBufferFromFile> out_file_buf;
|
|
||||||
BlockOutputStreamPtr block_out_stream;
|
|
||||||
|
|
||||||
/// The user could specify special file for server logs (stderr by default)
|
|
||||||
std::unique_ptr<WriteBuffer> out_logs_buf;
|
|
||||||
String server_logs_file;
|
|
||||||
BlockOutputStreamPtr logs_out_stream;
|
|
||||||
|
|
||||||
String current_profile;
|
|
||||||
|
|
||||||
/// How many rows have been read or written.
|
|
||||||
size_t processed_rows = 0;
|
|
||||||
|
|
||||||
/// The last exception that was received from the server. Is used for the
|
|
||||||
/// return code in batch mode.
|
|
||||||
std::unique_ptr<Exception> server_exception;
|
|
||||||
/// Likewise, the last exception that occurred on the client.
|
|
||||||
std::unique_ptr<Exception> client_exception;
|
|
||||||
|
|
||||||
UInt64 server_revision = 0;
|
UInt64 server_revision = 0;
|
||||||
String server_version;
|
String server_version;
|
||||||
|
|
||||||
@ -188,11 +157,21 @@ private:
|
|||||||
|
|
||||||
ConnectionParameters connection_parameters;
|
ConnectionParameters connection_parameters;
|
||||||
|
|
||||||
/// We will format query_id in interactive mode in various ways, the default is just to print Query id: ...
|
bool supportPasswordOption() const override { return true; }
|
||||||
std::vector<std::pair<String, String>> query_id_formats;
|
|
||||||
QueryProcessingStage::Enum query_processing_stage;
|
|
||||||
|
|
||||||
bool supportPasswordOption() override { return true; }
|
bool splitQueryIntoParts() const override { return true; }
|
||||||
|
|
||||||
|
void reconnectIfNeeded() override
|
||||||
|
{
|
||||||
|
if (!connection->checkConnected())
|
||||||
|
connect();
|
||||||
|
}
|
||||||
|
|
||||||
|
void setDatabase(const String & new_database) override
|
||||||
|
{
|
||||||
|
/// If the connection initiates the reconnection, it uses its variable.
|
||||||
|
connection->setDefaultDatabase(new_database);
|
||||||
|
}
|
||||||
|
|
||||||
void initialize(Poco::Util::Application & self) override
|
void initialize(Poco::Util::Application & self) override
|
||||||
{
|
{
|
||||||
@ -204,20 +183,20 @@ private:
|
|||||||
|
|
||||||
configReadClient(config(), home_path);
|
configReadClient(config(), home_path);
|
||||||
|
|
||||||
context->setApplicationType(Context::ApplicationType::CLIENT);
|
global_context->setApplicationType(Context::ApplicationType::CLIENT);
|
||||||
context->setQueryParameters(query_parameters);
|
global_context->setQueryParameters(query_parameters);
|
||||||
|
|
||||||
/// settings and limits could be specified in config file, but passed settings has higher priority
|
/// settings and limits could be specified in config file, but passed settings has higher priority
|
||||||
for (const auto & setting : context->getSettingsRef().allUnchanged())
|
for (const auto & setting : global_context->getSettingsRef().allUnchanged())
|
||||||
{
|
{
|
||||||
const auto & name = setting.getName();
|
const auto & name = setting.getName();
|
||||||
if (config().has(name))
|
if (config().has(name))
|
||||||
context->setSetting(name, config().getString(name));
|
global_context->setSetting(name, config().getString(name));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set path for format schema files
|
/// Set path for format schema files
|
||||||
if (config().has("format_schema_path"))
|
if (config().has("format_schema_path"))
|
||||||
context->setFormatSchemaPath(fs::weakly_canonical(config().getString("format_schema_path")));
|
global_context->setFormatSchemaPath(fs::weakly_canonical(config().getString("format_schema_path")));
|
||||||
|
|
||||||
/// Initialize query_id_formats if any
|
/// Initialize query_id_formats if any
|
||||||
if (config().has("query_id_formats"))
|
if (config().has("query_id_formats"))
|
||||||
@ -330,7 +309,7 @@ private:
|
|||||||
{
|
{
|
||||||
auto query_id = config().getString("query_id", "");
|
auto query_id = config().getString("query_id", "");
|
||||||
if (!query_id.empty())
|
if (!query_id.empty())
|
||||||
context->setCurrentQueryId(query_id);
|
global_context->setCurrentQueryId(query_id);
|
||||||
|
|
||||||
nonInteractive();
|
nonInteractive();
|
||||||
|
|
||||||
@ -419,7 +398,7 @@ private:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!context->getSettingsRef().use_client_time_zone)
|
if (!global_context->getSettingsRef().use_client_time_zone)
|
||||||
{
|
{
|
||||||
const auto & time_zone = connection->getServerTimezone(connection_parameters.timeouts);
|
const auto & time_zone = connection->getServerTimezone(connection_parameters.timeouts);
|
||||||
if (!time_zone.empty())
|
if (!time_zone.empty())
|
||||||
@ -521,51 +500,6 @@ private:
|
|||||||
processQueryText(text);
|
processQueryText(text);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Consumes trailing semicolons and tries to consume the same-line trailing
|
|
||||||
// comment.
|
|
||||||
static void adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, int max_parser_depth)
|
|
||||||
{
|
|
||||||
// We have to skip the trailing semicolon that might be left
|
|
||||||
// after VALUES parsing or just after a normal semicolon-terminated query.
|
|
||||||
Tokens after_query_tokens(this_query_end, all_queries_end);
|
|
||||||
IParser::Pos after_query_iterator(after_query_tokens, max_parser_depth);
|
|
||||||
while (after_query_iterator.isValid() && after_query_iterator->type == TokenType::Semicolon)
|
|
||||||
{
|
|
||||||
this_query_end = after_query_iterator->end;
|
|
||||||
++after_query_iterator;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now we have to do some extra work to add the trailing
|
|
||||||
// same-line comment to the query, but preserve the leading
|
|
||||||
// comments of the next query. The trailing comment is important
|
|
||||||
// because the test hints are usually written this way, e.g.:
|
|
||||||
// select nonexistent_column; -- { serverError 12345 }.
|
|
||||||
// The token iterator skips comments and whitespace, so we have
|
|
||||||
// to find the newline in the string manually. If it's earlier
|
|
||||||
// than the next significant token, it means that the text before
|
|
||||||
// newline is some trailing whitespace or comment, and we should
|
|
||||||
// add it to our query. There are also several special cases
|
|
||||||
// that are described below.
|
|
||||||
const auto * newline = find_first_symbols<'\n'>(this_query_end, all_queries_end);
|
|
||||||
const char * next_query_begin = after_query_iterator->begin;
|
|
||||||
|
|
||||||
// We include the entire line if the next query starts after
|
|
||||||
// it. This is a generic case of trailing in-line comment.
|
|
||||||
// The "equals" condition is for case of end of input (they both equal
|
|
||||||
// all_queries_end);
|
|
||||||
if (newline <= next_query_begin)
|
|
||||||
{
|
|
||||||
assert(newline >= this_query_end);
|
|
||||||
this_query_end = newline;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// Many queries on one line, can't do anything. By the way, this
|
|
||||||
// syntax is probably going to work as expected:
|
|
||||||
// select nonexistent /* { serverError 12345 } */; select 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void reportQueryError() const override
|
void reportQueryError() const override
|
||||||
{
|
{
|
||||||
if (server_exception)
|
if (server_exception)
|
||||||
@ -598,287 +532,10 @@ private:
|
|||||||
assert(have_error == (client_exception || server_exception));
|
assert(have_error == (client_exception || server_exception));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool processMultiQuery(const String & all_queries_text)
|
|
||||||
{
|
|
||||||
// It makes sense not to base any control flow on this, so that it is
|
|
||||||
// the same in tests and in normal usage. The only difference is that in
|
|
||||||
// normal mode we ignore the test hints.
|
|
||||||
const bool test_mode = config().has("testmode");
|
|
||||||
|
|
||||||
{
|
|
||||||
/// disable logs if expects errors
|
|
||||||
TestHint test_hint(test_mode, all_queries_text);
|
|
||||||
if (test_hint.clientError() || test_hint.serverError())
|
|
||||||
processTextAsSingleQuery("SET send_logs_level = 'fatal'");
|
|
||||||
}
|
|
||||||
|
|
||||||
bool echo_query = echo_queries;
|
|
||||||
|
|
||||||
/// Several queries separated by ';'.
|
|
||||||
/// INSERT data is ended by the end of line, not ';'.
|
|
||||||
/// An exception is VALUES format where we also support semicolon in
|
|
||||||
/// addition to end of line.
|
|
||||||
|
|
||||||
const char * this_query_begin = all_queries_text.data();
|
|
||||||
const char * all_queries_end = all_queries_text.data() + all_queries_text.size();
|
|
||||||
|
|
||||||
while (this_query_begin < all_queries_end)
|
|
||||||
{
|
|
||||||
// Remove leading empty newlines and other whitespace, because they
|
|
||||||
// are annoying to filter in query log. This is mostly relevant for
|
|
||||||
// the tests.
|
|
||||||
while (this_query_begin < all_queries_end && isWhitespaceASCII(*this_query_begin))
|
|
||||||
{
|
|
||||||
++this_query_begin;
|
|
||||||
}
|
|
||||||
if (this_query_begin >= all_queries_end)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If there are only comments left until the end of file, we just
|
|
||||||
// stop. The parser can't handle this situation because it always
|
|
||||||
// expects that there is some query that it can parse.
|
|
||||||
// We can get into this situation because the parser also doesn't
|
|
||||||
// skip the trailing comments after parsing a query. This is because
|
|
||||||
// they may as well be the leading comments for the next query,
|
|
||||||
// and it makes more sense to treat them as such.
|
|
||||||
{
|
|
||||||
Tokens tokens(this_query_begin, all_queries_end);
|
|
||||||
IParser::Pos token_iterator(tokens, context->getSettingsRef().max_parser_depth);
|
|
||||||
if (!token_iterator.isValid())
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to parse the query.
|
|
||||||
const char * this_query_end = this_query_begin;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
parsed_query = parseQuery(this_query_end, all_queries_end, true);
|
|
||||||
}
|
|
||||||
catch (Exception & e)
|
|
||||||
{
|
|
||||||
// Try to find test hint for syntax error. We don't know where
|
|
||||||
// the query ends because we failed to parse it, so we consume
|
|
||||||
// the entire line.
|
|
||||||
this_query_end = find_first_symbols<'\n'>(this_query_end, all_queries_end);
|
|
||||||
|
|
||||||
TestHint hint(test_mode, String(this_query_begin, this_query_end - this_query_begin));
|
|
||||||
|
|
||||||
if (hint.serverError())
|
|
||||||
{
|
|
||||||
// Syntax errors are considered as client errors
|
|
||||||
e.addMessage("\nExpected server error '{}'.", hint.serverError());
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (hint.clientError() != e.code())
|
|
||||||
{
|
|
||||||
if (hint.clientError())
|
|
||||||
e.addMessage("\nExpected client error: " + std::to_string(hint.clientError()));
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// It's expected syntax error, skip the line
|
|
||||||
this_query_begin = this_query_end;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!parsed_query)
|
|
||||||
{
|
|
||||||
if (ignore_error)
|
|
||||||
{
|
|
||||||
Tokens tokens(this_query_begin, all_queries_end);
|
|
||||||
IParser::Pos token_iterator(tokens, context->getSettingsRef().max_parser_depth);
|
|
||||||
while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid())
|
|
||||||
++token_iterator;
|
|
||||||
this_query_begin = token_iterator->end;
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// INSERT queries may have the inserted data in the query text
|
|
||||||
// that follow the query itself, e.g. "insert into t format CSV 1;2".
|
|
||||||
// They need special handling. First of all, here we find where the
|
|
||||||
// inserted data ends. In multy-query mode, it is delimited by a
|
|
||||||
// newline.
|
|
||||||
// The VALUES format needs even more handling -- we also allow the
|
|
||||||
// data to be delimited by semicolon. This case is handled later by
|
|
||||||
// the format parser itself.
|
|
||||||
// We can't do multiline INSERTs with inline data, because most
|
|
||||||
// row input formats (e.g. TSV) can't tell when the input stops,
|
|
||||||
// unlike VALUES.
|
|
||||||
auto * insert_ast = parsed_query->as<ASTInsertQuery>();
|
|
||||||
if (insert_ast && insert_ast->data)
|
|
||||||
{
|
|
||||||
this_query_end = find_first_symbols<'\n'>(insert_ast->data, all_queries_end);
|
|
||||||
insert_ast->end = this_query_end;
|
|
||||||
query_to_send = all_queries_text.substr(this_query_begin - all_queries_text.data(), insert_ast->data - this_query_begin);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
query_to_send = all_queries_text.substr(this_query_begin - all_queries_text.data(), this_query_end - this_query_begin);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to include the trailing comment with test hints. It is just
|
|
||||||
// a guess for now, because we don't yet know where the query ends
|
|
||||||
// if it is an INSERT query with inline data. We will do it again
|
|
||||||
// after we have processed the query. But even this guess is
|
|
||||||
// beneficial so that we see proper trailing comments in "echo" and
|
|
||||||
// server log.
|
|
||||||
adjustQueryEnd(this_query_end, all_queries_end, context->getSettingsRef().max_parser_depth);
|
|
||||||
|
|
||||||
// full_query is the query + inline INSERT data + trailing comments
|
|
||||||
// (the latter is our best guess for now).
|
|
||||||
full_query = all_queries_text.substr(this_query_begin - all_queries_text.data(), this_query_end - this_query_begin);
|
|
||||||
|
|
||||||
if (query_fuzzer_runs)
|
|
||||||
{
|
|
||||||
if (!processWithFuzzing(full_query))
|
|
||||||
return false;
|
|
||||||
|
|
||||||
this_query_begin = this_query_end;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now we know for sure where the query ends.
|
|
||||||
// Look for the hint in the text of query + insert data + trailing
|
|
||||||
// comments,
|
|
||||||
// e.g. insert into t format CSV 'a' -- { serverError 123 }.
|
|
||||||
// Use the updated query boundaries we just calculated.
|
|
||||||
TestHint test_hint(test_mode, std::string(this_query_begin, this_query_end - this_query_begin));
|
|
||||||
|
|
||||||
// Echo all queries if asked; makes for a more readable reference
|
|
||||||
// file.
|
|
||||||
echo_query = test_hint.echoQueries().value_or(echo_query);
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
processParsedSingleQuery(echo_query);
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
// Surprisingly, this is a client error. A server error would
|
|
||||||
// have been reported w/o throwing (see onReceiveSeverException()).
|
|
||||||
client_exception = std::make_unique<Exception>(getCurrentExceptionMessage(true), getCurrentExceptionCode());
|
|
||||||
have_error = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// For INSERTs with inline data: use the end of inline data as
|
|
||||||
// reported by the format parser (it is saved in sendData()).
|
|
||||||
// This allows us to handle queries like:
|
|
||||||
// insert into t values (1); select 1
|
|
||||||
// , where the inline data is delimited by semicolon and not by a
|
|
||||||
// newline.
|
|
||||||
if (insert_ast && insert_ast->data)
|
|
||||||
{
|
|
||||||
this_query_end = insert_ast->end;
|
|
||||||
adjustQueryEnd(this_query_end, all_queries_end, context->getSettingsRef().max_parser_depth);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check whether the error (or its absence) matches the test hints
|
|
||||||
// (or their absence).
|
|
||||||
bool error_matches_hint = true;
|
|
||||||
if (have_error)
|
|
||||||
{
|
|
||||||
if (test_hint.serverError())
|
|
||||||
{
|
|
||||||
if (!server_exception)
|
|
||||||
{
|
|
||||||
error_matches_hint = false;
|
|
||||||
fmt::print(stderr, "Expected server error code '{}' but got no server error.\n", test_hint.serverError());
|
|
||||||
}
|
|
||||||
else if (server_exception->code() != test_hint.serverError())
|
|
||||||
{
|
|
||||||
error_matches_hint = false;
|
|
||||||
std::cerr << "Expected server error code: " << test_hint.serverError() << " but got: " << server_exception->code()
|
|
||||||
<< "." << std::endl;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (test_hint.clientError())
|
|
||||||
{
|
|
||||||
if (!client_exception)
|
|
||||||
{
|
|
||||||
error_matches_hint = false;
|
|
||||||
fmt::print(stderr, "Expected client error code '{}' but got no client error.\n", test_hint.clientError());
|
|
||||||
}
|
|
||||||
else if (client_exception->code() != test_hint.clientError())
|
|
||||||
{
|
|
||||||
error_matches_hint = false;
|
|
||||||
fmt::print(
|
|
||||||
stderr, "Expected client error code '{}' but got '{}'.\n", test_hint.clientError(), client_exception->code());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!test_hint.clientError() && !test_hint.serverError())
|
|
||||||
{
|
|
||||||
// No error was expected but it still occurred. This is the
|
|
||||||
// default case w/o test hint, doesn't need additional
|
|
||||||
// diagnostics.
|
|
||||||
error_matches_hint = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (test_hint.clientError())
|
|
||||||
{
|
|
||||||
fmt::print(stderr, "The query succeeded but the client error '{}' was expected.\n", test_hint.clientError());
|
|
||||||
error_matches_hint = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (test_hint.serverError())
|
|
||||||
{
|
|
||||||
fmt::print(stderr, "The query succeeded but the server error '{}' was expected.\n", test_hint.serverError());
|
|
||||||
error_matches_hint = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the error is expected, force reconnect and ignore it.
|
|
||||||
if (have_error && error_matches_hint)
|
|
||||||
{
|
|
||||||
client_exception.reset();
|
|
||||||
server_exception.reset();
|
|
||||||
have_error = false;
|
|
||||||
|
|
||||||
if (!connection->checkConnected())
|
|
||||||
connect();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Report error.
|
|
||||||
if (have_error)
|
|
||||||
{
|
|
||||||
reportQueryError();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop processing queries if needed.
|
|
||||||
if (have_error && !ignore_error)
|
|
||||||
{
|
|
||||||
if (is_interactive)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
this_query_begin = this_query_end;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prints changed settings to stderr. Useful for debugging fuzzing failures.
|
// Prints changed settings to stderr. Useful for debugging fuzzing failures.
|
||||||
void printChangedSettings() const
|
void printChangedSettings() const
|
||||||
{
|
{
|
||||||
const auto & changes = context->getSettingsRef().changes();
|
const auto & changes = global_context->getSettingsRef().changes();
|
||||||
if (!changes.empty())
|
if (!changes.empty())
|
||||||
{
|
{
|
||||||
fmt::print(stderr, "Changed settings: ");
|
fmt::print(stderr, "Changed settings: ");
|
||||||
@ -1198,122 +855,6 @@ private:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void processParsedSingleQuery(std::optional<bool> echo_query = {})
|
|
||||||
{
|
|
||||||
// Parameters are in global variables:
|
|
||||||
// 'parsed_query' -- the query AST,
|
|
||||||
// 'query_to_send' -- the query text that is sent to server,
|
|
||||||
// 'full_query' -- for INSERT queries, contains the query and the data that
|
|
||||||
// follow it. Its memory is referenced by ASTInsertQuery::begin, end.
|
|
||||||
|
|
||||||
resetOutput();
|
|
||||||
client_exception.reset();
|
|
||||||
server_exception.reset();
|
|
||||||
have_error = false;
|
|
||||||
|
|
||||||
if (echo_query.value_or(echo_queries))
|
|
||||||
{
|
|
||||||
writeString(full_query, std_out);
|
|
||||||
writeChar('\n', std_out);
|
|
||||||
std_out.next();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (is_interactive)
|
|
||||||
{
|
|
||||||
// Generate a new query_id
|
|
||||||
context->setCurrentQueryId("");
|
|
||||||
for (const auto & query_id_format : query_id_formats)
|
|
||||||
{
|
|
||||||
writeString(query_id_format.first, std_out);
|
|
||||||
writeString(fmt::format(query_id_format.second, fmt::arg("query_id", context->getCurrentQueryId())), std_out);
|
|
||||||
writeChar('\n', std_out);
|
|
||||||
std_out.next();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
processed_rows = 0;
|
|
||||||
written_first_block = false;
|
|
||||||
progress_indication.resetProgress();
|
|
||||||
|
|
||||||
{
|
|
||||||
/// Temporarily apply query settings to context.
|
|
||||||
std::optional<Settings> old_settings;
|
|
||||||
SCOPE_EXIT_SAFE({
|
|
||||||
if (old_settings)
|
|
||||||
context->setSettings(*old_settings);
|
|
||||||
});
|
|
||||||
auto apply_query_settings = [&](const IAST & settings_ast)
|
|
||||||
{
|
|
||||||
if (!old_settings)
|
|
||||||
old_settings.emplace(context->getSettingsRef());
|
|
||||||
context->applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes);
|
|
||||||
};
|
|
||||||
const auto * insert = parsed_query->as<ASTInsertQuery>();
|
|
||||||
if (insert && insert->settings_ast)
|
|
||||||
apply_query_settings(*insert->settings_ast);
|
|
||||||
/// FIXME: try to prettify this cast using `as<>()`
|
|
||||||
const auto * with_output = dynamic_cast<const ASTQueryWithOutput *>(parsed_query.get());
|
|
||||||
if (with_output && with_output->settings_ast)
|
|
||||||
apply_query_settings(*with_output->settings_ast);
|
|
||||||
|
|
||||||
if (!connection->checkConnected())
|
|
||||||
connect();
|
|
||||||
|
|
||||||
ASTPtr input_function;
|
|
||||||
if (insert && insert->select)
|
|
||||||
insert->tryFindInputFunction(input_function);
|
|
||||||
|
|
||||||
/// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately.
|
|
||||||
if (insert && (!insert->select || input_function) && !insert->watch)
|
|
||||||
{
|
|
||||||
if (input_function && insert->format.empty())
|
|
||||||
throw Exception("FORMAT must be specified for function input()", ErrorCodes::INVALID_USAGE_OF_INPUT);
|
|
||||||
processInsertQuery();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
processOrdinaryQuery();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Do not change context (current DB, settings) in case of an exception.
|
|
||||||
if (!have_error)
|
|
||||||
{
|
|
||||||
if (const auto * set_query = parsed_query->as<ASTSetQuery>())
|
|
||||||
{
|
|
||||||
/// Save all changes in settings to avoid losing them if the connection is lost.
|
|
||||||
for (const auto & change : set_query->changes)
|
|
||||||
{
|
|
||||||
if (change.name == "profile")
|
|
||||||
current_profile = change.value.safeGet<String>();
|
|
||||||
else
|
|
||||||
context->applySettingChange(change);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (const auto * use_query = parsed_query->as<ASTUseQuery>())
|
|
||||||
{
|
|
||||||
const String & new_database = use_query->database;
|
|
||||||
/// If the client initiates the reconnection, it takes the settings from the config.
|
|
||||||
config().setString("database", new_database);
|
|
||||||
/// If the connection initiates the reconnection, it uses its variable.
|
|
||||||
connection->setDefaultDatabase(new_database);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (is_interactive)
|
|
||||||
{
|
|
||||||
std::cout << std::endl << processed_rows << " rows in set. Elapsed: " << progress_indication.elapsedSeconds() << " sec. ";
|
|
||||||
|
|
||||||
/// Write final progress if it makes sense to do so.
|
|
||||||
writeFinalProgress();
|
|
||||||
|
|
||||||
std::cout << std::endl << std::endl;
|
|
||||||
}
|
|
||||||
else if (print_time_to_stderr)
|
|
||||||
{
|
|
||||||
std::cerr << progress_indication.elapsedSeconds() << "\n";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// Convert external tables to ExternalTableData and send them using the connection.
|
/// Convert external tables to ExternalTableData and send them using the connection.
|
||||||
void sendExternalTables()
|
void sendExternalTables()
|
||||||
@ -1324,14 +865,14 @@ private:
|
|||||||
|
|
||||||
std::vector<ExternalTableDataPtr> data;
|
std::vector<ExternalTableDataPtr> data;
|
||||||
for (auto & table : external_tables)
|
for (auto & table : external_tables)
|
||||||
data.emplace_back(table.getData(context));
|
data.emplace_back(table.getData(global_context));
|
||||||
|
|
||||||
connection->sendExternalTablesData(data);
|
connection->sendExternalTablesData(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Process the query that doesn't require transferring data blocks to the server.
|
/// Process the query that doesn't require transferring data blocks to the server.
|
||||||
void processOrdinaryQuery()
|
void processOrdinaryQuery() override
|
||||||
{
|
{
|
||||||
/// Rewrite query only when we have query parameters.
|
/// Rewrite query only when we have query parameters.
|
||||||
/// Note that if query is rewritten, comments in query are lost.
|
/// Note that if query is rewritten, comments in query are lost.
|
||||||
@ -1356,10 +897,10 @@ private:
|
|||||||
connection->sendQuery(
|
connection->sendQuery(
|
||||||
connection_parameters.timeouts,
|
connection_parameters.timeouts,
|
||||||
query_to_send,
|
query_to_send,
|
||||||
context->getCurrentQueryId(),
|
global_context->getCurrentQueryId(),
|
||||||
query_processing_stage,
|
query_processing_stage,
|
||||||
&context->getSettingsRef(),
|
&global_context->getSettingsRef(),
|
||||||
&context->getClientInfo(),
|
&global_context->getClientInfo(),
|
||||||
true);
|
true);
|
||||||
|
|
||||||
sendExternalTables();
|
sendExternalTables();
|
||||||
@ -1386,7 +927,7 @@ private:
|
|||||||
|
|
||||||
|
|
||||||
/// Process the query that requires transferring data blocks to the server.
|
/// Process the query that requires transferring data blocks to the server.
|
||||||
void processInsertQuery()
|
void processInsertQuery() override
|
||||||
{
|
{
|
||||||
const auto parsed_insert_query = parsed_query->as<ASTInsertQuery &>();
|
const auto parsed_insert_query = parsed_query->as<ASTInsertQuery &>();
|
||||||
if (!parsed_insert_query.data && (is_interactive || (!stdin_is_a_tty && std_in.eof())))
|
if (!parsed_insert_query.data && (is_interactive || (!stdin_is_a_tty && std_in.eof())))
|
||||||
@ -1395,10 +936,10 @@ private:
|
|||||||
connection->sendQuery(
|
connection->sendQuery(
|
||||||
connection_parameters.timeouts,
|
connection_parameters.timeouts,
|
||||||
query_to_send,
|
query_to_send,
|
||||||
context->getCurrentQueryId(),
|
global_context->getCurrentQueryId(),
|
||||||
query_processing_stage,
|
query_processing_stage,
|
||||||
&context->getSettingsRef(),
|
&global_context->getSettingsRef(),
|
||||||
&context->getClientInfo(),
|
&global_context->getClientInfo(),
|
||||||
true);
|
true);
|
||||||
|
|
||||||
sendExternalTables();
|
sendExternalTables();
|
||||||
@ -1416,43 +957,6 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
ASTPtr parseQuery(const char *& pos, const char * end, bool allow_multi_statements)
|
|
||||||
{
|
|
||||||
ParserQuery parser(end);
|
|
||||||
ASTPtr res;
|
|
||||||
|
|
||||||
const auto & settings = context->getSettingsRef();
|
|
||||||
size_t max_length = 0;
|
|
||||||
if (!allow_multi_statements)
|
|
||||||
max_length = settings.max_query_size;
|
|
||||||
|
|
||||||
if (is_interactive || ignore_error)
|
|
||||||
{
|
|
||||||
String message;
|
|
||||||
res = tryParseQuery(parser, pos, end, message, true, "", allow_multi_statements, max_length, settings.max_parser_depth);
|
|
||||||
|
|
||||||
if (!res)
|
|
||||||
{
|
|
||||||
std::cerr << std::endl << message << std::endl << std::endl;
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
res = parseQueryAndMovePosition(parser, pos, end, "", allow_multi_statements, max_length, settings.max_parser_depth);
|
|
||||||
|
|
||||||
if (is_interactive)
|
|
||||||
{
|
|
||||||
std::cout << std::endl;
|
|
||||||
WriteBufferFromOStream res_buf(std::cout, 4096);
|
|
||||||
formatAST(*res, res_buf);
|
|
||||||
res_buf.next();
|
|
||||||
std::cout << std::endl << std::endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void sendData(Block & sample, const ColumnsDescription & columns_description)
|
void sendData(Block & sample, const ColumnsDescription & columns_description)
|
||||||
{
|
{
|
||||||
/// If INSERT data must be sent.
|
/// If INSERT data must be sent.
|
||||||
@ -1492,10 +996,10 @@ private:
|
|||||||
progress_indication.updateProgress(Progress(file_progress));
|
progress_indication.updateProgress(Progress(file_progress));
|
||||||
|
|
||||||
/// Set callback to be called on file progress.
|
/// Set callback to be called on file progress.
|
||||||
progress_indication.setFileProgressCallback(context, true);
|
progress_indication.setFileProgressCallback(global_context, true);
|
||||||
|
|
||||||
/// Add callback to track reading from fd.
|
/// Add callback to track reading from fd.
|
||||||
std_in.setProgressCallback(context);
|
std_in.setProgressCallback(global_context);
|
||||||
}
|
}
|
||||||
|
|
||||||
sendDataFrom(std_in, sample, columns_description);
|
sendDataFrom(std_in, sample, columns_description);
|
||||||
@ -1522,10 +1026,10 @@ private:
|
|||||||
current_format = insert->format;
|
current_format = insert->format;
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockInputStreamPtr block_input = context->getInputFormat(current_format, buf, sample, insert_format_max_block_size);
|
BlockInputStreamPtr block_input = global_context->getInputFormat(current_format, buf, sample, insert_format_max_block_size);
|
||||||
|
|
||||||
if (columns_description.hasDefaults())
|
if (columns_description.hasDefaults())
|
||||||
block_input = std::make_shared<AddingDefaultsBlockInputStream>(block_input, columns_description, context);
|
block_input = std::make_shared<AddingDefaultsBlockInputStream>(block_input, columns_description, global_context);
|
||||||
|
|
||||||
BlockInputStreamPtr async_block_input = std::make_shared<AsynchronousBlockInputStream>(block_input);
|
BlockInputStreamPtr async_block_input = std::make_shared<AsynchronousBlockInputStream>(block_input);
|
||||||
|
|
||||||
@ -1561,35 +1065,6 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Flush all buffers.
|
|
||||||
void resetOutput()
|
|
||||||
{
|
|
||||||
block_out_stream.reset();
|
|
||||||
logs_out_stream.reset();
|
|
||||||
|
|
||||||
if (pager_cmd)
|
|
||||||
{
|
|
||||||
pager_cmd->in.close();
|
|
||||||
pager_cmd->wait();
|
|
||||||
}
|
|
||||||
pager_cmd = nullptr;
|
|
||||||
|
|
||||||
if (out_file_buf)
|
|
||||||
{
|
|
||||||
out_file_buf->next();
|
|
||||||
out_file_buf.reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (out_logs_buf)
|
|
||||||
{
|
|
||||||
out_logs_buf->next();
|
|
||||||
out_logs_buf.reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
std_out.next();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// Receives and processes packets coming from server.
|
/// Receives and processes packets coming from server.
|
||||||
/// Also checks if query execution should be cancelled.
|
/// Also checks if query execution should be cancelled.
|
||||||
void receiveResult()
|
void receiveResult()
|
||||||
@ -1848,9 +1323,9 @@ private:
|
|||||||
|
|
||||||
/// It is not clear how to write progress with parallel formatting. It may increase code complexity significantly.
|
/// It is not clear how to write progress with parallel formatting. It may increase code complexity significantly.
|
||||||
if (!need_render_progress)
|
if (!need_render_progress)
|
||||||
block_out_stream = context->getOutputStreamParallelIfPossible(current_format, *out_buf, block);
|
block_out_stream = global_context->getOutputStreamParallelIfPossible(current_format, *out_buf, block);
|
||||||
else
|
else
|
||||||
block_out_stream = context->getOutputStream(current_format, *out_buf, block);
|
block_out_stream = global_context->getOutputStream(current_format, *out_buf, block);
|
||||||
|
|
||||||
block_out_stream->writePrefix();
|
block_out_stream->writePrefix();
|
||||||
}
|
}
|
||||||
@ -2203,12 +1678,12 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
context->makeGlobalContext();
|
global_context->makeGlobalContext();
|
||||||
context->setSettings(cmd_settings);
|
global_context->setSettings(cmd_settings);
|
||||||
|
|
||||||
/// Copy settings-related program options to config.
|
/// Copy settings-related program options to config.
|
||||||
/// TODO: Is this code necessary?
|
/// TODO: Is this code necessary?
|
||||||
for (const auto & setting : context->getSettingsRef().all())
|
for (const auto & setting : global_context->getSettingsRef().all())
|
||||||
{
|
{
|
||||||
const auto & name = setting.getName();
|
const auto & name = setting.getName();
|
||||||
if (options.count(name))
|
if (options.count(name))
|
||||||
@ -2300,7 +1775,7 @@ public:
|
|||||||
{
|
{
|
||||||
std::string traceparent = options["opentelemetry-traceparent"].as<std::string>();
|
std::string traceparent = options["opentelemetry-traceparent"].as<std::string>();
|
||||||
std::string error;
|
std::string error;
|
||||||
if (!context->getClientInfo().client_trace_context.parseTraceparentHeader(traceparent, error))
|
if (!global_context->getClientInfo().client_trace_context.parseTraceparentHeader(traceparent, error))
|
||||||
{
|
{
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse OpenTelemetry traceparent '{}': {}", traceparent, error);
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse OpenTelemetry traceparent '{}': {}", traceparent, error);
|
||||||
}
|
}
|
||||||
@ -2308,7 +1783,7 @@ public:
|
|||||||
|
|
||||||
if (options.count("opentelemetry-tracestate"))
|
if (options.count("opentelemetry-tracestate"))
|
||||||
{
|
{
|
||||||
context->getClientInfo().client_trace_context.tracestate = options["opentelemetry-tracestate"].as<std::string>();
|
global_context->getClientInfo().client_trace_context.tracestate = options["opentelemetry-tracestate"].as<std::string>();
|
||||||
}
|
}
|
||||||
|
|
||||||
is_default_format = !config().has("vertical") && !config().has("format");
|
is_default_format = !config().has("vertical") && !config().has("format");
|
||||||
@ -2317,17 +1792,17 @@ public:
|
|||||||
else
|
else
|
||||||
format = config().getString("format", is_interactive ? "PrettyCompact" : "TabSeparated");
|
format = config().getString("format", is_interactive ? "PrettyCompact" : "TabSeparated");
|
||||||
|
|
||||||
format_max_block_size = config().getInt("format_max_block_size", context->getSettingsRef().max_block_size);
|
format_max_block_size = config().getInt("format_max_block_size", global_context->getSettingsRef().max_block_size);
|
||||||
|
|
||||||
insert_format = "Values";
|
insert_format = "Values";
|
||||||
|
|
||||||
/// Setting value from cmd arg overrides one from config
|
/// Setting value from cmd arg overrides one from config
|
||||||
if (context->getSettingsRef().max_insert_block_size.changed)
|
if (global_context->getSettingsRef().max_insert_block_size.changed)
|
||||||
insert_format_max_block_size = context->getSettingsRef().max_insert_block_size;
|
insert_format_max_block_size = global_context->getSettingsRef().max_insert_block_size;
|
||||||
else
|
else
|
||||||
insert_format_max_block_size = config().getInt("insert_format_max_block_size", context->getSettingsRef().max_insert_block_size);
|
insert_format_max_block_size = config().getInt("insert_format_max_block_size", global_context->getSettingsRef().max_insert_block_size);
|
||||||
|
|
||||||
ClientInfo & client_info = context->getClientInfo();
|
ClientInfo & client_info = global_context->getClientInfo();
|
||||||
client_info.setInitialQuery();
|
client_info.setInitialQuery();
|
||||||
client_info.quota_key = config().getString("quota_key", "");
|
client_info.quota_key = config().getString("quota_key", "");
|
||||||
}
|
}
|
||||||
|
@ -407,8 +407,6 @@ int LocalServer::childMainImpl()
|
|||||||
for (const auto & [key, value] : prompt_substitutions)
|
for (const auto & [key, value] : prompt_substitutions)
|
||||||
boost::replace_all(prompt_by_server_display_name, "{" + key + "}", value);
|
boost::replace_all(prompt_by_server_display_name, "{" + key + "}", value);
|
||||||
|
|
||||||
shared_context = Context::createShared();
|
|
||||||
global_context = Context::createGlobal(shared_context.get());
|
|
||||||
global_context->makeGlobalContext();
|
global_context->makeGlobalContext();
|
||||||
global_context->setApplicationType(Context::ApplicationType::LOCAL);
|
global_context->setApplicationType(Context::ApplicationType::LOCAL);
|
||||||
tryInitPath();
|
tryInitPath();
|
||||||
|
@ -67,6 +67,12 @@ protected:
|
|||||||
processQuery(input, e);
|
processQuery(input, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void processOrdinaryQuery() override
|
||||||
|
{
|
||||||
|
std::exception_ptr e;
|
||||||
|
processQuery(query_to_send, e);
|
||||||
|
}
|
||||||
|
|
||||||
void reportQueryError() const override;
|
void reportQueryError() const override;
|
||||||
|
|
||||||
void printHelpMessage(const OptionsDescription & options_description) override;
|
void printHelpMessage(const OptionsDescription & options_description) override;
|
||||||
@ -79,10 +85,9 @@ protected:
|
|||||||
const CommandLineOptions & options,
|
const CommandLineOptions & options,
|
||||||
const std::vector<Arguments> &) override;
|
const std::vector<Arguments> &) override;
|
||||||
|
|
||||||
bool supportPasswordOption() override { return false; }
|
bool supportPasswordOption() const override { return false; }
|
||||||
|
|
||||||
SharedContextHolder shared_context;
|
bool splitQueryIntoParts() const override { return false; }
|
||||||
ContextMutablePtr global_context;
|
|
||||||
|
|
||||||
std::optional<std::filesystem::path> temporary_directory_to_delete;
|
std::optional<std::filesystem::path> temporary_directory_to_delete;
|
||||||
|
|
||||||
|
@ -8,10 +8,11 @@
|
|||||||
# include <Common/config_version.h>
|
# include <Common/config_version.h>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include <common/argsToConfig.h>
|
||||||
#include <common/DateLUT.h>
|
#include <common/DateLUT.h>
|
||||||
#include <common/LocalDate.h>
|
#include <common/LocalDate.h>
|
||||||
#include <common/LineReader.h>
|
#include <common/LineReader.h>
|
||||||
#include <common/argsToConfig.h>
|
#include <common/scope_guard_safe.h>
|
||||||
|
|
||||||
#include <Common/UTF8Helpers.h>
|
#include <Common/UTF8Helpers.h>
|
||||||
#include <Common/TerminalSize.h>
|
#include <Common/TerminalSize.h>
|
||||||
@ -24,6 +25,17 @@
|
|||||||
#include <Parsers/ParserQuery.h>
|
#include <Parsers/ParserQuery.h>
|
||||||
#include <Parsers/formatAST.h>
|
#include <Parsers/formatAST.h>
|
||||||
#include <Parsers/ASTInsertQuery.h>
|
#include <Parsers/ASTInsertQuery.h>
|
||||||
|
#include <Parsers/ASTCreateQuery.h>
|
||||||
|
#include <Parsers/ASTDropQuery.h>
|
||||||
|
#include <Parsers/ASTSetQuery.h>
|
||||||
|
#include <Parsers/ASTUseQuery.h>
|
||||||
|
#include <Parsers/ASTSelectQuery.h>
|
||||||
|
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||||
|
#include <Parsers/ASTQueryWithOutput.h>
|
||||||
|
#include <Parsers/ASTLiteral.h>
|
||||||
|
#include <Parsers/ASTIdentifier.h>
|
||||||
|
|
||||||
|
#include <Client/TestHint.h>
|
||||||
|
|
||||||
#include <IO/WriteBufferFromOStream.h>
|
#include <IO/WriteBufferFromOStream.h>
|
||||||
|
|
||||||
@ -36,6 +48,7 @@ namespace DB
|
|||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int UNRECOGNIZED_ARGUMENTS;
|
extern const int UNRECOGNIZED_ARGUMENTS;
|
||||||
|
extern const int INVALID_USAGE_OF_INPUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -197,6 +210,512 @@ void IClient::highlight(const String & query, std::vector<replxx::Replxx::Color>
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
ASTPtr IClient::parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const
|
||||||
|
{
|
||||||
|
ParserQuery parser(end);
|
||||||
|
ASTPtr res;
|
||||||
|
|
||||||
|
const auto & settings = global_context->getSettingsRef();
|
||||||
|
size_t max_length = 0;
|
||||||
|
|
||||||
|
if (!allow_multi_statements)
|
||||||
|
max_length = settings.max_query_size;
|
||||||
|
|
||||||
|
if (is_interactive || ignore_error)
|
||||||
|
{
|
||||||
|
String message;
|
||||||
|
res = tryParseQuery(parser, pos, end, message, true, "", allow_multi_statements, max_length, settings.max_parser_depth);
|
||||||
|
|
||||||
|
if (!res)
|
||||||
|
{
|
||||||
|
std::cerr << std::endl << message << std::endl << std::endl;
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
res = parseQueryAndMovePosition(parser, pos, end, "", allow_multi_statements, max_length, settings.max_parser_depth);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_interactive)
|
||||||
|
{
|
||||||
|
std::cout << std::endl;
|
||||||
|
WriteBufferFromOStream res_buf(std::cout, 4096);
|
||||||
|
formatAST(*res, res_buf);
|
||||||
|
res_buf.next();
|
||||||
|
std::cout << std::endl << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Consumes trailing semicolons and tries to consume the same-line trailing
|
||||||
|
// comment.
|
||||||
|
static void adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, int max_parser_depth)
|
||||||
|
{
|
||||||
|
// We have to skip the trailing semicolon that might be left
|
||||||
|
// after VALUES parsing or just after a normal semicolon-terminated query.
|
||||||
|
Tokens after_query_tokens(this_query_end, all_queries_end);
|
||||||
|
IParser::Pos after_query_iterator(after_query_tokens, max_parser_depth);
|
||||||
|
while (after_query_iterator.isValid() && after_query_iterator->type == TokenType::Semicolon)
|
||||||
|
{
|
||||||
|
this_query_end = after_query_iterator->end;
|
||||||
|
++after_query_iterator;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now we have to do some extra work to add the trailing
|
||||||
|
// same-line comment to the query, but preserve the leading
|
||||||
|
// comments of the next query. The trailing comment is important
|
||||||
|
// because the test hints are usually written this way, e.g.:
|
||||||
|
// select nonexistent_column; -- { serverError 12345 }.
|
||||||
|
// The token iterator skips comments and whitespace, so we have
|
||||||
|
// to find the newline in the string manually. If it's earlier
|
||||||
|
// than the next significant token, it means that the text before
|
||||||
|
// newline is some trailing whitespace or comment, and we should
|
||||||
|
// add it to our query. There are also several special cases
|
||||||
|
// that are described below.
|
||||||
|
const auto * newline = find_first_symbols<'\n'>(this_query_end, all_queries_end);
|
||||||
|
const char * next_query_begin = after_query_iterator->begin;
|
||||||
|
|
||||||
|
// We include the entire line if the next query starts after
|
||||||
|
// it. This is a generic case of trailing in-line comment.
|
||||||
|
// The "equals" condition is for case of end of input (they both equal
|
||||||
|
// all_queries_end);
|
||||||
|
if (newline <= next_query_begin)
|
||||||
|
{
|
||||||
|
assert(newline >= this_query_end);
|
||||||
|
this_query_end = newline;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Many queries on one line, can't do anything. By the way, this
|
||||||
|
// syntax is probably going to work as expected:
|
||||||
|
// select nonexistent /* { serverError 12345 } */; select 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Flush all buffers.
|
||||||
|
void IClient::resetOutput()
|
||||||
|
{
|
||||||
|
block_out_stream.reset();
|
||||||
|
logs_out_stream.reset();
|
||||||
|
|
||||||
|
if (pager_cmd)
|
||||||
|
{
|
||||||
|
pager_cmd->in.close();
|
||||||
|
pager_cmd->wait();
|
||||||
|
}
|
||||||
|
pager_cmd = nullptr;
|
||||||
|
|
||||||
|
if (out_file_buf)
|
||||||
|
{
|
||||||
|
out_file_buf->next();
|
||||||
|
out_file_buf.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (out_logs_buf)
|
||||||
|
{
|
||||||
|
out_logs_buf->next();
|
||||||
|
out_logs_buf.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
std_out.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void IClient::processParsedSingleQuery(std::optional<bool> echo_query_)
|
||||||
|
{
|
||||||
|
// Parameters are in global variables:
|
||||||
|
// 'parsed_query' -- the query AST,
|
||||||
|
// 'query_to_send' -- the query text that is sent to server,
|
||||||
|
// 'full_query' -- for INSERT queries, contains the query and the data that
|
||||||
|
// follow it. Its memory is referenced by ASTInsertQuery::begin, end.
|
||||||
|
|
||||||
|
resetOutput();
|
||||||
|
client_exception.reset();
|
||||||
|
server_exception.reset();
|
||||||
|
have_error = false;
|
||||||
|
|
||||||
|
if (echo_query_.value_or(echo_queries))
|
||||||
|
{
|
||||||
|
writeString(full_query, std_out);
|
||||||
|
writeChar('\n', std_out);
|
||||||
|
std_out.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_interactive)
|
||||||
|
{
|
||||||
|
// Generate a new query_id
|
||||||
|
global_context->setCurrentQueryId("");
|
||||||
|
for (const auto & query_id_format : query_id_formats)
|
||||||
|
{
|
||||||
|
writeString(query_id_format.first, std_out);
|
||||||
|
writeString(fmt::format(query_id_format.second, fmt::arg("query_id", global_context->getCurrentQueryId())), std_out);
|
||||||
|
writeChar('\n', std_out);
|
||||||
|
std_out.next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
processed_rows = 0;
|
||||||
|
written_first_block = false;
|
||||||
|
progress_indication.resetProgress();
|
||||||
|
|
||||||
|
{
|
||||||
|
/// Temporarily apply query settings to context.
|
||||||
|
std::optional<Settings> old_settings;
|
||||||
|
SCOPE_EXIT_SAFE({
|
||||||
|
if (old_settings)
|
||||||
|
global_context->setSettings(*old_settings);
|
||||||
|
});
|
||||||
|
auto apply_query_settings = [&](const IAST & settings_ast)
|
||||||
|
{
|
||||||
|
if (!old_settings)
|
||||||
|
old_settings.emplace(global_context->getSettingsRef());
|
||||||
|
global_context->applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes);
|
||||||
|
};
|
||||||
|
const auto * insert = parsed_query->as<ASTInsertQuery>();
|
||||||
|
if (insert && insert->settings_ast)
|
||||||
|
apply_query_settings(*insert->settings_ast);
|
||||||
|
/// FIXME: try to prettify this cast using `as<>()`
|
||||||
|
const auto * with_output = dynamic_cast<const ASTQueryWithOutput *>(parsed_query.get());
|
||||||
|
if (with_output && with_output->settings_ast)
|
||||||
|
apply_query_settings(*with_output->settings_ast);
|
||||||
|
|
||||||
|
reconnectIfNeeded();
|
||||||
|
|
||||||
|
ASTPtr input_function;
|
||||||
|
if (insert && insert->select)
|
||||||
|
insert->tryFindInputFunction(input_function);
|
||||||
|
|
||||||
|
/// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately.
|
||||||
|
if (splitQueryIntoParts() && insert && (!insert->select || input_function) && !insert->watch)
|
||||||
|
{
|
||||||
|
if (input_function && insert->format.empty())
|
||||||
|
throw Exception("FORMAT must be specified for function input()", ErrorCodes::INVALID_USAGE_OF_INPUT);
|
||||||
|
|
||||||
|
processInsertQuery();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
processOrdinaryQuery();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Do not change context (current DB, settings) in case of an exception.
|
||||||
|
if (!have_error)
|
||||||
|
{
|
||||||
|
if (const auto * set_query = parsed_query->as<ASTSetQuery>())
|
||||||
|
{
|
||||||
|
/// Save all changes in settings to avoid losing them if the connection is lost.
|
||||||
|
for (const auto & change : set_query->changes)
|
||||||
|
{
|
||||||
|
if (change.name == "profile")
|
||||||
|
current_profile = change.value.safeGet<String>();
|
||||||
|
else
|
||||||
|
global_context->applySettingChange(change);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (const auto * use_query = parsed_query->as<ASTUseQuery>())
|
||||||
|
{
|
||||||
|
const String & new_database = use_query->database;
|
||||||
|
/// If the client initiates the reconnection, it takes the settings from the config.
|
||||||
|
config().setString("database", new_database);
|
||||||
|
setDatabase(new_database);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_interactive)
|
||||||
|
{
|
||||||
|
std::cout << std::endl << processed_rows << " rows in set. Elapsed: " << progress_indication.elapsedSeconds() << " sec. ";
|
||||||
|
|
||||||
|
/// Write final progress if it makes sense to do so.
|
||||||
|
progress_indication.writeFinalProgress();
|
||||||
|
|
||||||
|
std::cout << std::endl << std::endl;
|
||||||
|
}
|
||||||
|
else if (print_time_to_stderr)
|
||||||
|
{
|
||||||
|
std::cerr << progress_indication.elapsedSeconds() << "\n";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool IClient::processMultiQuery(const String & all_queries_text)
|
||||||
|
{
|
||||||
|
// It makes sense not to base any control flow on this, so that it is
|
||||||
|
// the same in tests and in normal usage. The only difference is that in
|
||||||
|
// normal mode we ignore the test hints.
|
||||||
|
const bool test_mode = config().has("testmode");
|
||||||
|
|
||||||
|
{
|
||||||
|
/// disable logs if expects errors
|
||||||
|
TestHint test_hint(test_mode, all_queries_text);
|
||||||
|
if (test_hint.clientError() || test_hint.serverError())
|
||||||
|
processTextAsSingleQuery("SET send_logs_level = 'fatal'");
|
||||||
|
}
|
||||||
|
|
||||||
|
bool echo_query = echo_queries;
|
||||||
|
|
||||||
|
/// Several queries separated by ';'.
|
||||||
|
/// INSERT data is ended by the end of line, not ';'.
|
||||||
|
/// An exception is VALUES format where we also support semicolon in
|
||||||
|
/// addition to end of line.
|
||||||
|
|
||||||
|
const char * this_query_begin = all_queries_text.data();
|
||||||
|
const char * all_queries_end = all_queries_text.data() + all_queries_text.size();
|
||||||
|
|
||||||
|
while (this_query_begin < all_queries_end)
|
||||||
|
{
|
||||||
|
// Remove leading empty newlines and other whitespace, because they
|
||||||
|
// are annoying to filter in query log. This is mostly relevant for
|
||||||
|
// the tests.
|
||||||
|
while (this_query_begin < all_queries_end && isWhitespaceASCII(*this_query_begin))
|
||||||
|
++this_query_begin;
|
||||||
|
|
||||||
|
if (this_query_begin >= all_queries_end)
|
||||||
|
break;
|
||||||
|
|
||||||
|
// If there are only comments left until the end of file, we just
|
||||||
|
// stop. The parser can't handle this situation because it always
|
||||||
|
// expects that there is some query that it can parse.
|
||||||
|
// We can get into this situation because the parser also doesn't
|
||||||
|
// skip the trailing comments after parsing a query. This is because
|
||||||
|
// they may as well be the leading comments for the next query,
|
||||||
|
// and it makes more sense to treat them as such.
|
||||||
|
{
|
||||||
|
Tokens tokens(this_query_begin, all_queries_end);
|
||||||
|
IParser::Pos token_iterator(tokens, global_context->getSettingsRef().max_parser_depth);
|
||||||
|
if (!token_iterator.isValid())
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to parse the query.
|
||||||
|
const char * this_query_end = this_query_begin;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
parsed_query = parseQuery(this_query_end, all_queries_end, true);
|
||||||
|
}
|
||||||
|
catch (Exception & e)
|
||||||
|
{
|
||||||
|
// Try to find test hint for syntax error. We don't know where
|
||||||
|
// the query ends because we failed to parse it, so we consume
|
||||||
|
// the entire line.
|
||||||
|
this_query_end = find_first_symbols<'\n'>(this_query_end, all_queries_end);
|
||||||
|
|
||||||
|
TestHint hint(test_mode, String(this_query_begin, this_query_end - this_query_begin));
|
||||||
|
|
||||||
|
if (hint.serverError())
|
||||||
|
{
|
||||||
|
// Syntax errors are considered as client errors
|
||||||
|
e.addMessage("\nExpected server error '{}'.", hint.serverError());
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hint.clientError() != e.code())
|
||||||
|
{
|
||||||
|
if (hint.clientError())
|
||||||
|
e.addMessage("\nExpected client error: " + std::to_string(hint.clientError()));
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// It's expected syntax error, skip the line
|
||||||
|
this_query_begin = this_query_end;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!parsed_query)
|
||||||
|
{
|
||||||
|
if (ignore_error)
|
||||||
|
{
|
||||||
|
Tokens tokens(this_query_begin, all_queries_end);
|
||||||
|
IParser::Pos token_iterator(tokens, global_context->getSettingsRef().max_parser_depth);
|
||||||
|
while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid())
|
||||||
|
++token_iterator;
|
||||||
|
|
||||||
|
this_query_begin = token_iterator->end;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// INSERT queries may have the inserted data in the query text
|
||||||
|
// that follow the query itself, e.g. "insert into t format CSV 1;2".
|
||||||
|
// They need special handling. First of all, here we find where the
|
||||||
|
// inserted data ends. In multy-query mode, it is delimited by a
|
||||||
|
// newline.
|
||||||
|
// The VALUES format needs even more handling -- we also allow the
|
||||||
|
// data to be delimited by semicolon. This case is handled later by
|
||||||
|
// the format parser itself.
|
||||||
|
// We can't do multiline INSERTs with inline data, because most
|
||||||
|
// row input formats (e.g. TSV) can't tell when the input stops,
|
||||||
|
// unlike VALUES.
|
||||||
|
auto * insert_ast = parsed_query->as<ASTInsertQuery>();
|
||||||
|
/// But do not split query for clickhouse-local.
|
||||||
|
if (splitQueryIntoParts() && insert_ast && insert_ast->data)
|
||||||
|
{
|
||||||
|
this_query_end = find_first_symbols<'\n'>(insert_ast->data, all_queries_end);
|
||||||
|
insert_ast->end = this_query_end;
|
||||||
|
query_to_send = all_queries_text.substr(this_query_begin - all_queries_text.data(), insert_ast->data - this_query_begin);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
query_to_send = all_queries_text.substr(this_query_begin - all_queries_text.data(), this_query_end - this_query_begin);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to include the trailing comment with test hints. It is just
|
||||||
|
// a guess for now, because we don't yet know where the query ends
|
||||||
|
// if it is an INSERT query with inline data. We will do it again
|
||||||
|
// after we have processed the query. But even this guess is
|
||||||
|
// beneficial so that we see proper trailing comments in "echo" and
|
||||||
|
// server log.
|
||||||
|
adjustQueryEnd(this_query_end, all_queries_end, global_context->getSettingsRef().max_parser_depth);
|
||||||
|
|
||||||
|
// full_query is the query + inline INSERT data + trailing comments
|
||||||
|
// (the latter is our best guess for now).
|
||||||
|
full_query = all_queries_text.substr(this_query_begin - all_queries_text.data(), this_query_end - this_query_begin);
|
||||||
|
|
||||||
|
/// TODO:
|
||||||
|
// if (query_fuzzer_runs)
|
||||||
|
// {
|
||||||
|
// if (!processWithFuzzing(full_query))
|
||||||
|
// return false;
|
||||||
|
|
||||||
|
// this_query_begin = this_query_end;
|
||||||
|
// continue;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// Now we know for sure where the query ends.
|
||||||
|
// Look for the hint in the text of query + insert data + trailing
|
||||||
|
// comments,
|
||||||
|
// e.g. insert into t format CSV 'a' -- { serverError 123 }.
|
||||||
|
// Use the updated query boundaries we just calculated.
|
||||||
|
TestHint test_hint(test_mode, std::string(this_query_begin, this_query_end - this_query_begin));
|
||||||
|
|
||||||
|
// Echo all queries if asked; makes for a more readable reference
|
||||||
|
// file.
|
||||||
|
echo_query = test_hint.echoQueries().value_or(echo_query);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
processParsedSingleQuery(echo_query);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
// Surprisingly, this is a client error. A server error would
|
||||||
|
// have been reported w/o throwing (see onReceiveSeverException()).
|
||||||
|
client_exception = std::make_unique<Exception>(getCurrentExceptionMessage(true), getCurrentExceptionCode());
|
||||||
|
have_error = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// For INSERTs with inline data: use the end of inline data as
|
||||||
|
// reported by the format parser (it is saved in sendData()).
|
||||||
|
// This allows us to handle queries like:
|
||||||
|
// insert into t values (1); select 1
|
||||||
|
// , where the inline data is delimited by semicolon and not by a
|
||||||
|
// newline.
|
||||||
|
/// TODO: Better way
|
||||||
|
if (splitQueryIntoParts() && insert_ast && insert_ast->data)
|
||||||
|
{
|
||||||
|
this_query_end = insert_ast->end;
|
||||||
|
adjustQueryEnd(this_query_end, all_queries_end, global_context->getSettingsRef().max_parser_depth);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check whether the error (or its absence) matches the test hints
|
||||||
|
// (or their absence).
|
||||||
|
bool error_matches_hint = true;
|
||||||
|
if (have_error)
|
||||||
|
{
|
||||||
|
if (test_hint.serverError())
|
||||||
|
{
|
||||||
|
if (!server_exception)
|
||||||
|
{
|
||||||
|
error_matches_hint = false;
|
||||||
|
fmt::print(stderr, "Expected server error code '{}' but got no server error.\n", test_hint.serverError());
|
||||||
|
}
|
||||||
|
else if (server_exception->code() != test_hint.serverError())
|
||||||
|
{
|
||||||
|
error_matches_hint = false;
|
||||||
|
std::cerr << "Expected server error code: " << test_hint.serverError() << " but got: " << server_exception->code()
|
||||||
|
<< "." << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (test_hint.clientError())
|
||||||
|
{
|
||||||
|
if (!client_exception)
|
||||||
|
{
|
||||||
|
error_matches_hint = false;
|
||||||
|
fmt::print(stderr, "Expected client error code '{}' but got no client error.\n", test_hint.clientError());
|
||||||
|
}
|
||||||
|
else if (client_exception->code() != test_hint.clientError())
|
||||||
|
{
|
||||||
|
error_matches_hint = false;
|
||||||
|
fmt::print(
|
||||||
|
stderr, "Expected client error code '{}' but got '{}'.\n", test_hint.clientError(), client_exception->code());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!test_hint.clientError() && !test_hint.serverError())
|
||||||
|
{
|
||||||
|
// No error was expected but it still occurred. This is the
|
||||||
|
// default case w/o test hint, doesn't need additional
|
||||||
|
// diagnostics.
|
||||||
|
error_matches_hint = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (test_hint.clientError())
|
||||||
|
{
|
||||||
|
fmt::print(stderr, "The query succeeded but the client error '{}' was expected.\n", test_hint.clientError());
|
||||||
|
error_matches_hint = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (test_hint.serverError())
|
||||||
|
{
|
||||||
|
fmt::print(stderr, "The query succeeded but the server error '{}' was expected.\n", test_hint.serverError());
|
||||||
|
error_matches_hint = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the error is expected, force reconnect and ignore it.
|
||||||
|
if (have_error && error_matches_hint)
|
||||||
|
{
|
||||||
|
client_exception.reset();
|
||||||
|
server_exception.reset();
|
||||||
|
have_error = false;
|
||||||
|
|
||||||
|
reconnectIfNeeded();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Report error.
|
||||||
|
if (have_error)
|
||||||
|
{
|
||||||
|
reportQueryError();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop processing queries if needed.
|
||||||
|
if (have_error && !ignore_error)
|
||||||
|
{
|
||||||
|
if (is_interactive)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this_query_begin = this_query_end;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
bool IClient::processQueryText(const String & text)
|
bool IClient::processQueryText(const String & text)
|
||||||
{
|
{
|
||||||
if (exit_strings.end() != exit_strings.find(trim(text, [](char c) { return isWhitespaceASCII(c) || c == ';'; })))
|
if (exit_strings.end() != exit_strings.find(trim(text, [](char c) { return isWhitespaceASCII(c) || c == ';'; })))
|
||||||
@ -216,9 +735,7 @@ bool IClient::processQueryText(const String & text)
|
|||||||
// return true;
|
// return true;
|
||||||
// }
|
// }
|
||||||
|
|
||||||
// return processMultiQuery(text);
|
return processMultiQuery(text);
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
#include <Common/ProgressIndication.h>
|
#include <Common/ProgressIndication.h>
|
||||||
#include <Client/Suggest.h>
|
#include <Client/Suggest.h>
|
||||||
#include <Client/QueryFuzzer.h>
|
#include <Client/QueryFuzzer.h>
|
||||||
|
#include <Common/ShellCommand.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -55,6 +56,9 @@ protected:
|
|||||||
/// Settings specified via command line args
|
/// Settings specified via command line args
|
||||||
Settings cmd_settings;
|
Settings cmd_settings;
|
||||||
|
|
||||||
|
SharedContextHolder shared_context = Context::createShared();
|
||||||
|
ContextMutablePtr global_context = Context::createGlobal(shared_context.get());
|
||||||
|
|
||||||
QueryFuzzer fuzzer;
|
QueryFuzzer fuzzer;
|
||||||
int query_fuzzer_runs = 0;
|
int query_fuzzer_runs = 0;
|
||||||
|
|
||||||
@ -81,9 +85,37 @@ protected:
|
|||||||
// It may differ from the full query for INSERT queries, for which the data that follows
|
// It may differ from the full query for INSERT queries, for which the data that follows
|
||||||
// the query is stripped and sent separately.
|
// the query is stripped and sent separately.
|
||||||
String query_to_send;
|
String query_to_send;
|
||||||
|
|
||||||
/// If the last query resulted in exception. `server_exception` or
|
/// If the last query resulted in exception. `server_exception` or
|
||||||
/// `client_exception` must be set.
|
/// `client_exception` must be set.
|
||||||
bool have_error = false;
|
bool have_error = false;
|
||||||
|
/// The last exception that was received from the server. Is used for the
|
||||||
|
/// return code in batch mode.
|
||||||
|
std::unique_ptr<Exception> server_exception;
|
||||||
|
/// Likewise, the last exception that occurred on the client.
|
||||||
|
std::unique_ptr<Exception> client_exception;
|
||||||
|
|
||||||
|
/// Buffer that reads from stdin in batch mode.
|
||||||
|
ReadBufferFromFileDescriptor std_in{STDIN_FILENO};
|
||||||
|
/// Console output.
|
||||||
|
WriteBufferFromFileDescriptor std_out{STDOUT_FILENO};
|
||||||
|
std::unique_ptr<ShellCommand> pager_cmd;
|
||||||
|
/// The user can specify to redirect query output to a file.
|
||||||
|
std::optional<WriteBufferFromFile> out_file_buf;
|
||||||
|
BlockOutputStreamPtr block_out_stream;
|
||||||
|
/// The user could specify special file for server logs (stderr by default)
|
||||||
|
std::unique_ptr<WriteBuffer> out_logs_buf;
|
||||||
|
String server_logs_file;
|
||||||
|
BlockOutputStreamPtr logs_out_stream;
|
||||||
|
|
||||||
|
/// We will format query_id in interactive mode in various ways, the default is just to print Query id: ...
|
||||||
|
std::vector<std::pair<String, String>> query_id_formats;
|
||||||
|
QueryProcessingStage::Enum query_processing_stage;
|
||||||
|
|
||||||
|
/// How many rows have been read or written.
|
||||||
|
size_t processed_rows = 0;
|
||||||
|
|
||||||
|
String current_profile;
|
||||||
|
|
||||||
static bool isNewYearMode();
|
static bool isNewYearMode();
|
||||||
|
|
||||||
@ -101,14 +133,24 @@ protected:
|
|||||||
|
|
||||||
void runInteractive();
|
void runInteractive();
|
||||||
|
|
||||||
virtual void processTextAsSingleQuery(const String & input) = 0;
|
ASTPtr parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const;
|
||||||
|
|
||||||
virtual void reportQueryError() const = 0;
|
void resetOutput();
|
||||||
|
|
||||||
|
virtual void processOrdinaryQuery() = 0;
|
||||||
|
|
||||||
|
void processParsedSingleQuery(std::optional<bool> echo_query = {});
|
||||||
|
|
||||||
|
virtual void processTextAsSingleQuery(const String & input) = 0;
|
||||||
|
|
||||||
virtual bool processQueryFromInteractive(const String & input) = 0;
|
virtual bool processQueryFromInteractive(const String & input) = 0;
|
||||||
|
|
||||||
|
virtual void reportQueryError() const = 0;
|
||||||
|
|
||||||
virtual void loadSuggestionDataIfPossible() {}
|
virtual void loadSuggestionDataIfPossible() {}
|
||||||
|
|
||||||
|
virtual void reconnectIfNeeded() {}
|
||||||
|
|
||||||
virtual bool isInteractive() = 0;
|
virtual bool isInteractive() = 0;
|
||||||
|
|
||||||
virtual void processMainImplException(const Exception & e) = 0;
|
virtual void processMainImplException(const Exception & e) = 0;
|
||||||
@ -135,17 +177,20 @@ protected:
|
|||||||
const CommandLineOptions & options,
|
const CommandLineOptions & options,
|
||||||
const std::vector<Arguments> & external_tables_arguments) = 0;
|
const std::vector<Arguments> & external_tables_arguments) = 0;
|
||||||
|
|
||||||
virtual bool supportPasswordOption() = 0;
|
virtual bool supportPasswordOption() const = 0;
|
||||||
|
|
||||||
|
virtual bool splitQueryIntoParts() const = 0;
|
||||||
|
|
||||||
|
virtual void setDatabase(const String &) {}
|
||||||
|
|
||||||
|
virtual void processInsertQuery() {}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
inline String prompt() const
|
inline String prompt() const
|
||||||
{
|
{
|
||||||
return boost::replace_all_copy(prompt_by_server_display_name, "{database}", config().getString("database", "default"));
|
return boost::replace_all_copy(prompt_by_server_display_name, "{database}", config().getString("database", "default"));
|
||||||
}
|
}
|
||||||
|
|
||||||
ASTPtr parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const;
|
|
||||||
|
|
||||||
void processParsedSingleQuery(std::optional<bool> echo_query = {});
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user