Fix local

This commit is contained in:
kssenii 2021-08-19 14:07:47 +03:00
parent 8e36872685
commit b5f6a7cb97
11 changed files with 232 additions and 269 deletions

View File

@ -424,7 +424,7 @@ void Client::connect()
} }
void Client::reportQueryError(const String & query) const void Client::processError(const String & query) const
{ {
if (server_exception) if (server_exception)
{ {
@ -472,6 +472,80 @@ void Client::printChangedSettings() const
} }
void Client::executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query)
{
client_exception.reset();
server_exception.reset();
{
/// Temporarily apply query settings to context.
std::optional<Settings> old_settings;
SCOPE_EXIT_SAFE({
if (old_settings)
global_context->setSettings(*old_settings);
});
auto apply_query_settings = [&](const IAST & settings_ast)
{
if (!old_settings)
old_settings.emplace(global_context->getSettingsRef());
global_context->applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes);
};
const auto * insert = parsed_query->as<ASTInsertQuery>();
if (insert && insert->settings_ast)
apply_query_settings(*insert->settings_ast);
/// FIXME: try to prettify this cast using `as<>()`
const auto * with_output = dynamic_cast<const ASTQueryWithOutput *>(parsed_query.get());
if (with_output && with_output->settings_ast)
apply_query_settings(*with_output->settings_ast);
if (!connection->checkConnected())
connect();
ASTPtr input_function;
if (insert && insert->select)
insert->tryFindInputFunction(input_function);
/// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately.
if (insert && (!insert->select || input_function) && !insert->watch)
{
if (input_function && insert->format.empty())
throw Exception("FORMAT must be specified for function input()", ErrorCodes::INVALID_USAGE_OF_INPUT);
processInsertQuery(query_to_execute, parsed_query);
}
else
processOrdinaryQuery(query_to_execute, parsed_query);
}
/// Do not change context (current DB, settings) in case of an exception.
if (!have_error)
{
if (const auto * set_query = parsed_query->as<ASTSetQuery>())
{
/// Save all changes in settings to avoid losing them if the connection is lost.
for (const auto & change : set_query->changes)
{
if (change.name == "profile")
current_profile = change.value.safeGet<String>();
else
global_context->applySettingChange(change);
}
}
if (const auto * use_query = parsed_query->as<ASTUseQuery>())
{
const String & new_database = use_query->database;
/// If the client initiates the reconnection, it takes the settings from the config.
config().setString("database", new_database);
/// If the connection initiates the reconnection, it uses its variable.
connection->setDefaultDatabase(new_database);
}
}
}
/// Returns false when server is not available. /// Returns false when server is not available.
bool Client::processWithFuzzing(const String & full_query) bool Client::processWithFuzzing(const String & full_query)
{ {
@ -574,7 +648,7 @@ bool Client::processWithFuzzing(const String & full_query)
parsed_query = ast_to_process; parsed_query = ast_to_process;
query_to_execute = parsed_query->formatForErrorMessage(); query_to_execute = parsed_query->formatForErrorMessage();
processSingleQueryImpl(full_query, query_to_execute, parsed_query); processParsedSingleQuery(full_query, query_to_execute, parsed_query);
} }
catch (...) catch (...)
{ {

View File

@ -10,15 +10,13 @@ class Client : public ClientBase
{ {
public: public:
Client() = default; Client() = default;
void initialize(Poco::Util::Application & self) override; void initialize(Poco::Util::Application & self) override;
protected: protected:
void executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query) override;
bool processWithFuzzing(const String & full_query) override; bool processWithFuzzing(const String & full_query) override;
void processError(const String & query) const override;
void reportQueryError(const String & query) const override;
void loadSuggestionData(Suggest & suggest) override; void loadSuggestionData(Suggest & suggest) override;
@ -38,9 +36,6 @@ protected:
void processConfig() override; void processConfig() override;
private: private:
UInt64 server_revision = 0;
String server_version;
void connect() override; void connect() override;
void printChangedSettings() const; void printChangedSettings() const;

View File

@ -57,6 +57,7 @@ namespace ErrorCodes
extern const int CANNOT_LOAD_CONFIG; extern const int CANNOT_LOAD_CONFIG;
extern const int FILE_ALREADY_EXISTS; extern const int FILE_ALREADY_EXISTS;
extern const int QUERY_WAS_CANCELLED; extern const int QUERY_WAS_CANCELLED;
extern const int INVALID_USAGE_OF_INPUT;
} }
@ -310,45 +311,55 @@ void LocalServer::setupUsers()
// } // }
// void LocalServer::executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query) void LocalServer::executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query)
// { {
// // ReadBufferFromString read_buf(query_to_execute); cancelled = false;
// // WriteBufferFromFileDescriptor write_buf(STDOUT_FILENO);
// /// To support previous behaviour of clickhouse-local do not reset first exception in case --ignore-error,
// cancelled = false; /// it needs to be thrown after multiquery is finished (test 00385). But I do not think it is ok to output only
// /// first exception or whether we need to even rethrow it because there is --ignore-error.
// /// To support previous behaviour of clickhouse-local do not reset first exception in case --ignore-error, if (!ignore_error)
// /// it needs to be thrown after multiquery is finished (test 00385). But I do not think it is ok to output only server_exception.reset();
// /// first exception or whether we need to even rethrow it because there is --ignore-error.
// if (!ignore_error) auto process_error = [&]()
// local_server_exception.reset(); {
// if (!ignore_error)
// auto process_error = [&]() throw;
// {
// if (!ignore_error) server_exception = std::make_unique<Exception>(getCurrentExceptionMessage(true), getCurrentExceptionCode());
// throw; have_error = true;
// };
// local_server_exception = std::make_unique<Exception>(getCurrentExceptionMessage(true), getCurrentExceptionCode());
// have_error = true; try
// }; {
// const auto * insert = parsed_query->as<ASTInsertQuery>();
// try ASTPtr input_function;
// { if (insert && insert->select)
// processOrdinaryQuery(query_to_execute, parsed_query); insert->tryFindInputFunction(input_function);
// }
// catch (const Exception & e) /// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately.
// { if (insert && (!insert->select || input_function) && !insert->watch)
// if (is_interactive && e.code() == ErrorCodes::QUERY_WAS_CANCELLED) {
// std::cout << "Query was cancelled." << std::endl; if (input_function && insert->format.empty())
// else throw Exception("FORMAT must be specified for function input()", ErrorCodes::INVALID_USAGE_OF_INPUT);
// process_error();
// } processInsertQuery(query_to_execute, parsed_query);
// catch (...) }
// { else
// process_error(); processOrdinaryQuery(query_to_execute, parsed_query);
// } }
// onEndOfStream(); catch (const Exception & e)
// } {
if (is_interactive && e.code() == ErrorCodes::QUERY_WAS_CANCELLED)
std::cout << "Query was cancelled." << std::endl;
else
process_error();
}
catch (...)
{
process_error();
}
}
String LocalServer::getQueryTextPrefix() String LocalServer::getQueryTextPrefix()
@ -357,7 +368,7 @@ String LocalServer::getQueryTextPrefix()
} }
void LocalServer::reportQueryError(const String & query) const void LocalServer::processError(const String & query) const
{ {
/// For non-interactive mode process exception only when all queries were executed. /// For non-interactive mode process exception only when all queries were executed.
if (server_exception && is_interactive) if (server_exception && is_interactive)
@ -388,10 +399,11 @@ try
processConfig(); processConfig();
query_context = Context::createCopy(global_context); applyCmdSettings(global_context);
applyCmdSettings(query_context); // query_context->makeSessionContext();
// query_context->authenticate("default", "", Poco::Net::SocketAddress{});
/// Use the same query_id (and thread group) for all queries /// Use the same query_id (and thread group) for all queries
CurrentThread::QueryScope query_scope_holder(query_context);
connect(); connect();

View File

@ -35,30 +35,25 @@ public:
} }
protected: protected:
void executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query) override;
void connect() override void connect() override
{ {
connection_parameters = ConnectionParameters(config()); connection_parameters = ConnectionParameters(config());
/// Using query context withcmd settings. /// Using query context withcmd settings.
connection = std::make_unique<LocalConnection>(query_context); connection = std::make_unique<LocalConnection>(global_context);
} }
void reportQueryError(const String & query) const override; void processError(const String & query) const override;
String getQueryTextPrefix() override;
void loadSuggestionData(Suggest &) override; void loadSuggestionData(Suggest &) override;
String getQueryTextPrefix() override;
void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> &) override;
void printHelpMessage(const OptionsDescription & options_description) override; void printHelpMessage(const OptionsDescription & options_description) override;
void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> &) override;
void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) override; void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) override;
void processOptions(const OptionsDescription & options_description, void processOptions(const OptionsDescription & options_description,
const CommandLineOptions & options, const CommandLineOptions & options,
const std::vector<Arguments> &) override; const std::vector<Arguments> &) override;
void processConfig() override; void processConfig() override;
int mainImpl() override; int mainImpl() override;
@ -71,20 +66,11 @@ private:
std::string getInitialCreateTableQuery(); std::string getInitialCreateTableQuery();
void tryInitPath(); void tryInitPath();
void applyCmdOptions(ContextMutablePtr context);
void applyCmdSettings(ContextMutablePtr context);
void processQueries();
void setupUsers(); void setupUsers();
void cleanup(); void cleanup();
void checkInterruptListener(); void applyCmdOptions(ContextMutablePtr context);
void applyCmdSettings(ContextMutablePtr context);
ContextMutablePtr query_context;
std::optional<StatusFile> status; std::optional<StatusFile> status;

View File

@ -348,7 +348,7 @@ void ClientBase::initLogsOutputStream()
} }
void ClientBase::processSingleQuery(const String & full_query) void ClientBase::processTextAsSingleQuery(const String & full_query)
{ {
/// Some parts of a query (result output and formatting) are executed /// Some parts of a query (result output and formatting) are executed
/// client-side. Thus we need to parse the query. /// client-side. Thus we need to parse the query.
@ -368,10 +368,10 @@ void ClientBase::processSingleQuery(const String & full_query)
else else
query_to_execute = full_query; query_to_execute = full_query;
processSingleQueryImpl(full_query, query_to_execute, parsed_query); processParsedSingleQuery(full_query, query_to_execute, parsed_query);
if (have_error) if (have_error)
reportQueryError(full_query); processError(full_query);
} }
@ -692,6 +692,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
auto * parsed_insert_query = parsed_query->as<ASTInsertQuery>(); auto * parsed_insert_query = parsed_query->as<ASTInsertQuery>();
if (!parsed_insert_query) if (!parsed_insert_query)
return; return;
if (parsed_insert_query->infile) if (parsed_insert_query->infile)
{ {
const auto & in_file_node = parsed_insert_query->infile->as<ASTLiteral &>(); const auto & in_file_node = parsed_insert_query->infile->as<ASTLiteral &>();
@ -860,81 +861,7 @@ bool ClientBase::receiveEndOfQuery()
void ClientBase::executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query) void ClientBase::processParsedSingleQuery(const String & full_query, const String & query_to_execute,
{
client_exception.reset();
server_exception.reset();
{
/// Temporarily apply query settings to context.
std::optional<Settings> old_settings;
SCOPE_EXIT_SAFE({
if (old_settings)
global_context->setSettings(*old_settings);
});
auto apply_query_settings = [&](const IAST & settings_ast)
{
if (!old_settings)
old_settings.emplace(global_context->getSettingsRef());
global_context->applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes);
};
const auto * insert = parsed_query->as<ASTInsertQuery>();
if (insert && insert->settings_ast)
apply_query_settings(*insert->settings_ast);
/// FIXME: try to prettify this cast using `as<>()`
const auto * with_output = dynamic_cast<const ASTQueryWithOutput *>(parsed_query.get());
if (with_output && with_output->settings_ast)
apply_query_settings(*with_output->settings_ast);
if (!connection->checkConnected())
connect();
ASTPtr input_function;
if (insert && insert->select)
insert->tryFindInputFunction(input_function);
/// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately.
if (insert && (!insert->select || input_function) && !insert->watch)
{
if (input_function && insert->format.empty())
throw Exception("FORMAT must be specified for function input()", ErrorCodes::INVALID_USAGE_OF_INPUT);
processInsertQuery(query_to_execute, parsed_query);
}
else
processOrdinaryQuery(query_to_execute, parsed_query);
}
/// Do not change context (current DB, settings) in case of an exception.
if (!have_error)
{
if (const auto * set_query = parsed_query->as<ASTSetQuery>())
{
/// Save all changes in settings to avoid losing them if the connection is lost.
for (const auto & change : set_query->changes)
{
if (change.name == "profile")
current_profile = change.value.safeGet<String>();
else
global_context->applySettingChange(change);
}
}
if (const auto * use_query = parsed_query->as<ASTUseQuery>())
{
const String & new_database = use_query->database;
/// If the client initiates the reconnection, it takes the settings from the config.
config().setString("database", new_database);
/// If the connection initiates the reconnection, it uses its variable.
connection->setDefaultDatabase(new_database);
}
}
}
void ClientBase::processSingleQueryImpl(const String & full_query, const String & query_to_execute,
ASTPtr parsed_query, std::optional<bool> echo_query_, bool report_error) ASTPtr parsed_query, std::optional<bool> echo_query_, bool report_error)
{ {
resetOutput(); resetOutput();
@ -947,18 +874,18 @@ void ClientBase::processSingleQueryImpl(const String & full_query, const String
std_out.next(); std_out.next();
} }
global_context->setCurrentQueryId(""); // if (is_interactive)
if (is_interactive) // {
{ // global_context->setCurrentQueryId("");
// Generate a new query_id // // Generate a new query_id
for (const auto & query_id_format : query_id_formats) // for (const auto & query_id_format : query_id_formats)
{ // {
writeString(query_id_format.first, std_out); // writeString(query_id_format.first, std_out);
writeString(fmt::format(query_id_format.second, fmt::arg("query_id", global_context->getCurrentQueryId())), std_out); // writeString(fmt::format(query_id_format.second, fmt::arg("query_id", global_context->getCurrentQueryId())), std_out);
writeChar('\n', std_out); // writeChar('\n', std_out);
std_out.next(); // std_out.next();
} // }
} // }
processed_rows = 0; processed_rows = 0;
written_first_block = false; written_first_block = false;
@ -978,7 +905,7 @@ void ClientBase::processSingleQueryImpl(const String & full_query, const String
} }
if (have_error && report_error) if (have_error && report_error)
reportQueryError(full_query); processError(full_query);
} }
@ -1110,7 +1037,7 @@ bool ClientBase::processMultiQueryImpl(const String & all_queries_text,
// Report error. // Report error.
if (have_error) if (have_error)
reportQueryError(full_query); processError(full_query);
// Stop processing queries if needed. // Stop processing queries if needed.
if (have_error && !ignore_error) if (have_error && !ignore_error)
@ -1135,7 +1062,7 @@ bool ClientBase::processQueryText(const String & text)
if (!is_multiquery) if (!is_multiquery)
{ {
assert(!query_fuzzer_runs); assert(!query_fuzzer_runs);
processSingleQuery(text); processTextAsSingleQuery(text);
return true; return true;
} }
@ -1160,7 +1087,7 @@ bool ClientBase::processMultiQuery(const String & all_queries_text)
/// disable logs if expects errors /// disable logs if expects errors
TestHint test_hint(test_mode, all_queries_text); TestHint test_hint(test_mode, all_queries_text);
if (test_hint.clientError() || test_hint.serverError()) if (test_hint.clientError() || test_hint.serverError())
processSingleQuery("SET send_logs_level = 'fatal'"); processTextAsSingleQuery("SET send_logs_level = 'fatal'");
} }
bool echo_query = echo_queries; bool echo_query = echo_queries;
@ -1177,7 +1104,7 @@ bool ClientBase::processMultiQuery(const String & all_queries_text)
echo_query = test_hint.echoQueries().value_or(echo_query); echo_query = test_hint.echoQueries().value_or(echo_query);
try try
{ {
processSingleQueryImpl(full_query, query_to_execute, parsed_query, echo_query, false); processParsedSingleQuery(full_query, query_to_execute, parsed_query, echo_query, false);
} }
catch (...) catch (...)
{ {

View File

@ -31,90 +31,47 @@ public:
using Arguments = std::vector<String>; using Arguments = std::vector<String>;
void init(int argc, char ** argv); void init(int argc, char ** argv);
int main(const std::vector<String> & /*args*/) override; int main(const std::vector<String> & /*args*/) override;
protected: protected:
/*
* Run interactive or non-interactive mode. Depends on:
* - processSingleQuery
* - processMultiQuery
* - processWithFuzzing
*/
void runNonInteractive();
void runInteractive(); void runInteractive();
void runNonInteractive();
/// Initialize `connection` object with `Connection` or `LocalConnection`.
virtual void connect() = 0;
/// Process single query. Can use processOrdinaryQuery(), processInsertQuery().
virtual void executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query) = 0;
void processOrdinaryQuery(const String & query_to_execute, ASTPtr parsed_query);
void processInsertQuery(const String & query_to_execute, ASTPtr parsed_query);
virtual bool processWithFuzzing(const String &) virtual bool processWithFuzzing(const String &)
{ {
throw Exception("Query processing with fuzzing is not implemented", ErrorCodes::NOT_IMPLEMENTED); throw Exception("Query processing with fuzzing is not implemented", ErrorCodes::NOT_IMPLEMENTED);
} }
virtual void processError(const String & query) const = 0;
virtual void loadSuggestionData(Suggest &) = 0;
void processOrdinaryQuery(const String & query_to_execute, ASTPtr parsed_query);
void receiveResult(ASTPtr parsed_query);
bool receiveAndProcessPacket(ASTPtr parsed_query, bool cancelled);
void initBlockOutputStream(const Block & block, ASTPtr parsed_query);
void initLogsOutputStream();
void sendExternalTables(ASTPtr parsed_query);
virtual void connect() = 0;
void executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query);
void processInsertQuery(const String & query_to_execute, ASTPtr parsed_query);
bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description, ASTPtr parsed_query);
void sendData(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query);
void sendDataFrom(ReadBuffer & buf, Block & sample,
const ColumnsDescription & columns_description, ASTPtr parsed_query);
bool receiveEndOfQuery();
void receiveLogs(ASTPtr parsed_query);
void writeFinalProgress();
void processSingleQuery(const String & full_query);
bool processMultiQuery(const String & all_queries_text);
/*
* Process multiquery - several queries separated by ';'. Depends on executeSingleQuery().
* Also in case of clickhouse-server:
* - INSERT data is ended by the end of line, not ';'.
* - An exception is VALUES format where we also support semicolon in addition to end of line.
**/
bool processMultiQueryImpl(const String & all_queries_text, bool processMultiQueryImpl(const String & all_queries_text,
std::function<void(const String & full_query, const String & query_to_execute, ASTPtr parsed_query)> execute_single_query, std::function<void(const String & full_query, const String & query_to_execute, ASTPtr parsed_query)> execute_single_query,
std::function<void(const String &, Exception &)> process_parse_query_error = {}); std::function<void(const String &, Exception &)> process_parse_query_error = {});
/* void processParsedSingleQuery(const String & query, const String & query_to_execute, ASTPtr parsed_query,
* Process parsed single query. Depends on executeSingleQuery().
**/
void processSingleQueryImpl(const String & query, const String & query_to_execute, ASTPtr parsed_query,
std::optional<bool> echo_query_ = {}, bool report_error = false); std::optional<bool> echo_query_ = {}, bool report_error = false);
virtual void reportQueryError(const String & query) const = 0;
/// For non-interactive multi-query mode get queries text prefix. /// For non-interactive multi-query mode get queries text prefix.
virtual String getQueryTextPrefix() { return ""; } virtual String getQueryTextPrefix() { return ""; }
virtual void loadSuggestionData(Suggest &) = 0;
void onData(Block & block, ASTPtr parsed_query);
void onLogData(Block & block);
void onTotals(Block & block, ASTPtr parsed_query);
void onExtremes(Block & block, ASTPtr parsed_query);
void onReceiveExceptionFromServer(std::unique_ptr<Exception> && e);
void onProfileInfo(const BlockStreamProfileInfo & profile_info);
void resetOutput(); void resetOutput();
ASTPtr parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const; ASTPtr parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const;
void onProgress(const Progress & value);
void onEndOfStream();
/// Prepare for and call either runInteractive() or runNonInteractive(). /// Prepare for and call either runInteractive() or runNonInteractive().
virtual int mainImpl() = 0; virtual int mainImpl() = 0;
virtual void readArguments(int argc, char ** argv,
Arguments & common_arguments, std::vector<Arguments> &) = 0;
using ProgramOptionsDescription = boost::program_options::options_description; using ProgramOptionsDescription = boost::program_options::options_description;
using CommandLineOptions = boost::program_options::variables_map; using CommandLineOptions = boost::program_options::variables_map;
@ -125,16 +82,44 @@ protected:
}; };
virtual void printHelpMessage(const OptionsDescription & options_description) = 0; virtual void printHelpMessage(const OptionsDescription & options_description) = 0;
virtual void readArguments(int argc, char ** argv,
Arguments & common_arguments, std::vector<Arguments> &) = 0;
virtual void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) = 0; virtual void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) = 0;
virtual void processOptions(const OptionsDescription & options_description, virtual void processOptions(const OptionsDescription & options_description,
const CommandLineOptions & options, const CommandLineOptions & options,
const std::vector<Arguments> & external_tables_arguments) = 0; const std::vector<Arguments> & external_tables_arguments) = 0;
virtual void processConfig() = 0; virtual void processConfig() = 0;
private: private:
bool processQueryText(const String & text);
void processTextAsSingleQuery(const String & full_query);
bool processMultiQuery(const String & all_queries_text);
void receiveResult(ASTPtr parsed_query);
bool receiveAndProcessPacket(ASTPtr parsed_query, bool cancelled);
void receiveLogs(ASTPtr parsed_query);
bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description, ASTPtr parsed_query);
bool receiveEndOfQuery();
void onProgress(const Progress & value);
void onData(Block & block, ASTPtr parsed_query);
void onLogData(Block & block);
void onTotals(Block & block, ASTPtr parsed_query);
void onExtremes(Block & block, ASTPtr parsed_query);
void onReceiveExceptionFromServer(std::unique_ptr<Exception> && e);
void onProfileInfo(const BlockStreamProfileInfo & profile_info);
void onEndOfStream();
void sendData(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query);
void sendDataFrom(ReadBuffer & buf, Block & sample,
const ColumnsDescription & columns_description, ASTPtr parsed_query);
void initBlockOutputStream(const Block & block, ASTPtr parsed_query);
void initLogsOutputStream();
void sendExternalTables(ASTPtr parsed_query);
inline String prompt() const inline String prompt() const
{ {
return boost::replace_all_copy(prompt_by_server_display_name, "{database}", config().getString("database", "default")); return boost::replace_all_copy(prompt_by_server_display_name, "{database}", config().getString("database", "default"));
@ -142,10 +127,6 @@ private:
void outputQueryInfo(bool echo_query_); void outputQueryInfo(bool echo_query_);
/// Process query text (multiquery or single query) according to options.
bool processQueryText(const String & text);
protected: protected:
bool is_interactive = false; /// Use either interactive line editing interface or batch mode. bool is_interactive = false; /// Use either interactive line editing interface or batch mode.
bool is_multiquery = false; bool is_multiquery = false;
@ -229,6 +210,9 @@ protected:
size_t insert_format_max_block_size = 0; /// Max block size when reading INSERT data. size_t insert_format_max_block_size = 0; /// Max block size when reading INSERT data.
size_t max_client_network_bandwidth = 0; /// The maximum speed of data exchange over the network for the client in bytes per second. size_t max_client_network_bandwidth = 0; /// The maximum speed of data exchange over the network for the client in bytes per second.
UInt64 server_revision = 0;
String server_version;
}; };
} }

View File

@ -67,35 +67,34 @@ void LocalConnection::sendQuery(
bool) bool)
{ {
query_context = Context::createCopy(getContext()); query_context = Context::createCopy(getContext());
query_context->makeSessionContext();
query_context->makeQueryContext(); query_context->makeQueryContext();
query_context->setUser("default", "", Poco::Net::SocketAddress{});
query_context->setCurrentQueryId("");
query_context->setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); }); query_context->setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });
query_context->setCurrentQueryId("");
CurrentThread::QueryScope query_scope_holder(query_context);
/// query_context->setCurrentDatabase(default_database); /// query_context->setCurrentDatabase(default_database);
/// Send structure of columns to client for function input() /// Send structure of columns to client for function input()
query_context->setInputInitializer([this] (ContextPtr context, const StoragePtr & input_storage) // query_context->setInputInitializer([this] (ContextPtr context, const StoragePtr & input_storage)
{
if (context != query_context)
throw Exception("Unexpected context in Input initializer", ErrorCodes::LOGICAL_ERROR);
auto metadata_snapshot = input_storage->getInMemoryMetadataPtr();
state.need_receive_data_for_input = true;
/// Send ColumnsDescription for input storage.
// if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA
// && query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
// { // {
// sendTableColumns(metadata_snapshot->getColumns()); // if (context != query_context)
// } // throw Exception("Unexpected context in Input initializer", ErrorCodes::LOGICAL_ERROR);
/// Send block to the client - input storage structure. // auto metadata_snapshot = input_storage->getInMemoryMetadataPtr();
state.input_header = metadata_snapshot->getSampleBlock(); // state.need_receive_data_for_input = true;
next_packet_type = Protocol::Server::Data;
state.block = state.input_header; // /// Send ColumnsDescription for input storage.
}); // // if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA
// // && query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
// // {
// // sendTableColumns(metadata_snapshot->getColumns());
// // }
// /// Send block to the client - input storage structure.
// state.input_header = metadata_snapshot->getSampleBlock();
// next_packet_type = Protocol::Server::Data;
// state.block = state.input_header;
// });
state.query_id = query_id_; state.query_id = query_id_;
state.query = query_; state.query = query_;
@ -185,7 +184,6 @@ void LocalConnection::finishQuery()
// sendProgress(); // sendProgress();
state.io.onFinish(); state.io.onFinish();
query_context.reset(); query_context.reset();
// state.reset();
} }
bool LocalConnection::poll(size_t) bool LocalConnection::poll(size_t)

View File

@ -970,8 +970,7 @@ void executeQuery(
bool allow_into_outfile, bool allow_into_outfile,
ContextMutablePtr context, ContextMutablePtr context,
SetResultDetailsFunc set_result_details, SetResultDetailsFunc set_result_details,
const std::optional<FormatSettings> & output_format_settings, const std::optional<FormatSettings> & output_format_settings)
FlushBufferFunc flush_buffer_func)
{ {
PODArray<char> parse_buf; PODArray<char> parse_buf;
const char * begin; const char * begin;
@ -1130,9 +1129,6 @@ void executeQuery(
out->onProgress(progress); out->onProgress(progress);
}); });
if (flush_buffer_func)
out->setFlushBufferCallback(flush_buffer_func);
if (set_result_details) if (set_result_details)
set_result_details( set_result_details(
context->getClientInfo().current_query_id, out->getContentType(), format_name, DateLUT::instance().getTimeZone()); context->getClientInfo().current_query_id, out->getContentType(), format_name, DateLUT::instance().getTimeZone());

View File

@ -11,7 +11,6 @@ class ReadBuffer;
class WriteBuffer; class WriteBuffer;
using SetResultDetailsFunc = std::function<void(const String &, const String &, const String &, const String &)>; using SetResultDetailsFunc = std::function<void(const String &, const String &, const String &, const String &)>;
using FlushBufferFunc = std::function<void(WriteBuffer & out, size_t num_rows)>;
/// Parse and execute a query. /// Parse and execute a query.
void executeQuery( void executeQuery(
@ -20,8 +19,7 @@ void executeQuery(
bool allow_into_outfile, /// If true and the query contains INTO OUTFILE section, redirect output to that file. bool allow_into_outfile, /// If true and the query contains INTO OUTFILE section, redirect output to that file.
ContextMutablePtr context, /// DB, tables, data types, storage engines, functions, aggregate functions... ContextMutablePtr context, /// DB, tables, data types, storage engines, functions, aggregate functions...
SetResultDetailsFunc set_result_details, /// If a non-empty callback is passed, it will be called with the query id, the content-type, the format, and the timezone. SetResultDetailsFunc set_result_details, /// If a non-empty callback is passed, it will be called with the query id, the content-type, the format, and the timezone.
const std::optional<FormatSettings> & output_format_settings = std::nullopt, /// Format settings for output format, will be calculated from the context if not set. const std::optional<FormatSettings> & output_format_settings = std::nullopt /// Format settings for output format, will be calculated from the context if not set.
FlushBufferFunc flush_buffer_func = {}
); );

View File

@ -105,9 +105,6 @@ void IOutputFormat::work()
void IOutputFormat::flush() void IOutputFormat::flush()
{ {
if (flush_callback)
flush_callback(out, result_rows);
else
out.next(); out.next();
} }

View File

@ -67,8 +67,6 @@ public:
/// Passed value are delta, that must be summarized. /// Passed value are delta, that must be summarized.
virtual void onProgress(const Progress & /*progress*/) {} virtual void onProgress(const Progress & /*progress*/) {}
void setFlushBufferCallback(std::function<void(WriteBuffer & out, size_t result_rows)> callback) { flush_callback = callback; }
/// Content-Type to set when sending HTTP response. /// Content-Type to set when sending HTTP response.
virtual std::string getContentType() const { return "text/plain; charset=UTF-8"; } virtual std::string getContentType() const { return "text/plain; charset=UTF-8"; }
@ -93,7 +91,5 @@ private:
size_t result_bytes = 0; size_t result_bytes = 0;
bool prefix_written = false; bool prefix_written = false;
std::function<void(WriteBuffer & out, size_t result_rows)> flush_callback;
}; };
} }