diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index b08d6c2629f..9e2db95c65c 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -424,7 +424,7 @@ void Client::connect() } -void Client::reportQueryError(const String & query) const +void Client::processError(const String & query) const { if (server_exception) { @@ -472,6 +472,80 @@ void Client::printChangedSettings() const } +void Client::executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query) +{ + client_exception.reset(); + server_exception.reset(); + + { + /// Temporarily apply query settings to context. + std::optional old_settings; + SCOPE_EXIT_SAFE({ + if (old_settings) + global_context->setSettings(*old_settings); + }); + + auto apply_query_settings = [&](const IAST & settings_ast) + { + if (!old_settings) + old_settings.emplace(global_context->getSettingsRef()); + global_context->applySettingsChanges(settings_ast.as()->changes); + }; + + const auto * insert = parsed_query->as(); + if (insert && insert->settings_ast) + apply_query_settings(*insert->settings_ast); + + /// FIXME: try to prettify this cast using `as<>()` + const auto * with_output = dynamic_cast(parsed_query.get()); + if (with_output && with_output->settings_ast) + apply_query_settings(*with_output->settings_ast); + + if (!connection->checkConnected()) + connect(); + + ASTPtr input_function; + if (insert && insert->select) + insert->tryFindInputFunction(input_function); + + /// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately. + if (insert && (!insert->select || input_function) && !insert->watch) + { + if (input_function && insert->format.empty()) + throw Exception("FORMAT must be specified for function input()", ErrorCodes::INVALID_USAGE_OF_INPUT); + + processInsertQuery(query_to_execute, parsed_query); + } + else + processOrdinaryQuery(query_to_execute, parsed_query); + } + + /// Do not change context (current DB, settings) in case of an exception. + if (!have_error) + { + if (const auto * set_query = parsed_query->as()) + { + /// Save all changes in settings to avoid losing them if the connection is lost. + for (const auto & change : set_query->changes) + { + if (change.name == "profile") + current_profile = change.value.safeGet(); + else + global_context->applySettingChange(change); + } + } + if (const auto * use_query = parsed_query->as()) + { + const String & new_database = use_query->database; + /// If the client initiates the reconnection, it takes the settings from the config. + config().setString("database", new_database); + /// If the connection initiates the reconnection, it uses its variable. + connection->setDefaultDatabase(new_database); + } + } +} + + /// Returns false when server is not available. bool Client::processWithFuzzing(const String & full_query) { @@ -574,7 +648,7 @@ bool Client::processWithFuzzing(const String & full_query) parsed_query = ast_to_process; query_to_execute = parsed_query->formatForErrorMessage(); - processSingleQueryImpl(full_query, query_to_execute, parsed_query); + processParsedSingleQuery(full_query, query_to_execute, parsed_query); } catch (...) { diff --git a/programs/client/Client.h b/programs/client/Client.h index 7eede1b0ecc..e39f9b81847 100644 --- a/programs/client/Client.h +++ b/programs/client/Client.h @@ -10,15 +10,13 @@ class Client : public ClientBase { public: Client() = default; - void initialize(Poco::Util::Application & self) override; protected: + void executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query) override; bool processWithFuzzing(const String & full_query) override; - - void reportQueryError(const String & query) const override; - + void processError(const String & query) const override; void loadSuggestionData(Suggest & suggest) override; @@ -38,9 +36,6 @@ protected: void processConfig() override; private: - UInt64 server_revision = 0; - String server_version; - void connect() override; void printChangedSettings() const; diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 91255306e93..1c95d395b95 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -57,6 +57,7 @@ namespace ErrorCodes extern const int CANNOT_LOAD_CONFIG; extern const int FILE_ALREADY_EXISTS; extern const int QUERY_WAS_CANCELLED; + extern const int INVALID_USAGE_OF_INPUT; } @@ -310,45 +311,55 @@ void LocalServer::setupUsers() // } -// void LocalServer::executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query) -// { -// // ReadBufferFromString read_buf(query_to_execute); -// // WriteBufferFromFileDescriptor write_buf(STDOUT_FILENO); -// -// cancelled = false; -// -// /// To support previous behaviour of clickhouse-local do not reset first exception in case --ignore-error, -// /// it needs to be thrown after multiquery is finished (test 00385). But I do not think it is ok to output only -// /// first exception or whether we need to even rethrow it because there is --ignore-error. -// if (!ignore_error) -// local_server_exception.reset(); -// -// auto process_error = [&]() -// { -// if (!ignore_error) -// throw; -// -// local_server_exception = std::make_unique(getCurrentExceptionMessage(true), getCurrentExceptionCode()); -// have_error = true; -// }; -// -// try -// { -// processOrdinaryQuery(query_to_execute, parsed_query); -// } -// catch (const Exception & e) -// { -// if (is_interactive && e.code() == ErrorCodes::QUERY_WAS_CANCELLED) -// std::cout << "Query was cancelled." << std::endl; -// else -// process_error(); -// } -// catch (...) -// { -// process_error(); -// } -// onEndOfStream(); -// } +void LocalServer::executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query) +{ + cancelled = false; + + /// To support previous behaviour of clickhouse-local do not reset first exception in case --ignore-error, + /// it needs to be thrown after multiquery is finished (test 00385). But I do not think it is ok to output only + /// first exception or whether we need to even rethrow it because there is --ignore-error. + if (!ignore_error) + server_exception.reset(); + + auto process_error = [&]() + { + if (!ignore_error) + throw; + + server_exception = std::make_unique(getCurrentExceptionMessage(true), getCurrentExceptionCode()); + have_error = true; + }; + + try + { + const auto * insert = parsed_query->as(); + ASTPtr input_function; + if (insert && insert->select) + insert->tryFindInputFunction(input_function); + + /// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately. + if (insert && (!insert->select || input_function) && !insert->watch) + { + if (input_function && insert->format.empty()) + throw Exception("FORMAT must be specified for function input()", ErrorCodes::INVALID_USAGE_OF_INPUT); + + processInsertQuery(query_to_execute, parsed_query); + } + else + processOrdinaryQuery(query_to_execute, parsed_query); + } + catch (const Exception & e) + { + if (is_interactive && e.code() == ErrorCodes::QUERY_WAS_CANCELLED) + std::cout << "Query was cancelled." << std::endl; + else + process_error(); + } + catch (...) + { + process_error(); + } +} String LocalServer::getQueryTextPrefix() @@ -357,7 +368,7 @@ String LocalServer::getQueryTextPrefix() } -void LocalServer::reportQueryError(const String & query) const +void LocalServer::processError(const String & query) const { /// For non-interactive mode process exception only when all queries were executed. if (server_exception && is_interactive) @@ -388,10 +399,11 @@ try processConfig(); - query_context = Context::createCopy(global_context); - applyCmdSettings(query_context); + applyCmdSettings(global_context); + // query_context->makeSessionContext(); + // query_context->authenticate("default", "", Poco::Net::SocketAddress{}); + /// Use the same query_id (and thread group) for all queries - CurrentThread::QueryScope query_scope_holder(query_context); connect(); diff --git a/programs/local/LocalServer.h b/programs/local/LocalServer.h index 3756a39542b..88fccd9f732 100644 --- a/programs/local/LocalServer.h +++ b/programs/local/LocalServer.h @@ -35,30 +35,25 @@ public: } protected: + void executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query) override; + void connect() override { connection_parameters = ConnectionParameters(config()); /// Using query context withcmd settings. - connection = std::make_unique(query_context); + connection = std::make_unique(global_context); } - void reportQueryError(const String & query) const override; - - String getQueryTextPrefix() override; - + void processError(const String & query) const override; void loadSuggestionData(Suggest &) override; - - - void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector &) override; - + String getQueryTextPrefix() override; void printHelpMessage(const OptionsDescription & options_description) override; + void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector &) override; void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) override; - void processOptions(const OptionsDescription & options_description, const CommandLineOptions & options, const std::vector &) override; - void processConfig() override; int mainImpl() override; @@ -71,20 +66,11 @@ private: std::string getInitialCreateTableQuery(); void tryInitPath(); - - void applyCmdOptions(ContextMutablePtr context); - - void applyCmdSettings(ContextMutablePtr context); - - void processQueries(); - void setupUsers(); - void cleanup(); - void checkInterruptListener(); - - ContextMutablePtr query_context; + void applyCmdOptions(ContextMutablePtr context); + void applyCmdSettings(ContextMutablePtr context); std::optional status; diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index f47a3c9fdf9..1240353fc2c 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -348,7 +348,7 @@ void ClientBase::initLogsOutputStream() } -void ClientBase::processSingleQuery(const String & full_query) +void ClientBase::processTextAsSingleQuery(const String & full_query) { /// Some parts of a query (result output and formatting) are executed /// client-side. Thus we need to parse the query. @@ -368,10 +368,10 @@ void ClientBase::processSingleQuery(const String & full_query) else query_to_execute = full_query; - processSingleQueryImpl(full_query, query_to_execute, parsed_query); + processParsedSingleQuery(full_query, query_to_execute, parsed_query); if (have_error) - reportQueryError(full_query); + processError(full_query); } @@ -692,6 +692,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des auto * parsed_insert_query = parsed_query->as(); if (!parsed_insert_query) return; + if (parsed_insert_query->infile) { const auto & in_file_node = parsed_insert_query->infile->as(); @@ -860,81 +861,7 @@ bool ClientBase::receiveEndOfQuery() -void ClientBase::executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query) -{ - client_exception.reset(); - server_exception.reset(); - - { - /// Temporarily apply query settings to context. - std::optional old_settings; - SCOPE_EXIT_SAFE({ - if (old_settings) - global_context->setSettings(*old_settings); - }); - - auto apply_query_settings = [&](const IAST & settings_ast) - { - if (!old_settings) - old_settings.emplace(global_context->getSettingsRef()); - global_context->applySettingsChanges(settings_ast.as()->changes); - }; - - const auto * insert = parsed_query->as(); - if (insert && insert->settings_ast) - apply_query_settings(*insert->settings_ast); - - /// FIXME: try to prettify this cast using `as<>()` - const auto * with_output = dynamic_cast(parsed_query.get()); - if (with_output && with_output->settings_ast) - apply_query_settings(*with_output->settings_ast); - - if (!connection->checkConnected()) - connect(); - - ASTPtr input_function; - if (insert && insert->select) - insert->tryFindInputFunction(input_function); - - /// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately. - if (insert && (!insert->select || input_function) && !insert->watch) - { - if (input_function && insert->format.empty()) - throw Exception("FORMAT must be specified for function input()", ErrorCodes::INVALID_USAGE_OF_INPUT); - - processInsertQuery(query_to_execute, parsed_query); - } - else - processOrdinaryQuery(query_to_execute, parsed_query); - } - - /// Do not change context (current DB, settings) in case of an exception. - if (!have_error) - { - if (const auto * set_query = parsed_query->as()) - { - /// Save all changes in settings to avoid losing them if the connection is lost. - for (const auto & change : set_query->changes) - { - if (change.name == "profile") - current_profile = change.value.safeGet(); - else - global_context->applySettingChange(change); - } - } - if (const auto * use_query = parsed_query->as()) - { - const String & new_database = use_query->database; - /// If the client initiates the reconnection, it takes the settings from the config. - config().setString("database", new_database); - /// If the connection initiates the reconnection, it uses its variable. - connection->setDefaultDatabase(new_database); - } - } -} - - -void ClientBase::processSingleQueryImpl(const String & full_query, const String & query_to_execute, +void ClientBase::processParsedSingleQuery(const String & full_query, const String & query_to_execute, ASTPtr parsed_query, std::optional echo_query_, bool report_error) { resetOutput(); @@ -947,18 +874,18 @@ void ClientBase::processSingleQueryImpl(const String & full_query, const String std_out.next(); } - global_context->setCurrentQueryId(""); - if (is_interactive) - { - // Generate a new query_id - for (const auto & query_id_format : query_id_formats) - { - writeString(query_id_format.first, std_out); - writeString(fmt::format(query_id_format.second, fmt::arg("query_id", global_context->getCurrentQueryId())), std_out); - writeChar('\n', std_out); - std_out.next(); - } - } + // if (is_interactive) + // { + // global_context->setCurrentQueryId(""); + // // Generate a new query_id + // for (const auto & query_id_format : query_id_formats) + // { + // writeString(query_id_format.first, std_out); + // writeString(fmt::format(query_id_format.second, fmt::arg("query_id", global_context->getCurrentQueryId())), std_out); + // writeChar('\n', std_out); + // std_out.next(); + // } + // } processed_rows = 0; written_first_block = false; @@ -978,7 +905,7 @@ void ClientBase::processSingleQueryImpl(const String & full_query, const String } if (have_error && report_error) - reportQueryError(full_query); + processError(full_query); } @@ -1110,7 +1037,7 @@ bool ClientBase::processMultiQueryImpl(const String & all_queries_text, // Report error. if (have_error) - reportQueryError(full_query); + processError(full_query); // Stop processing queries if needed. if (have_error && !ignore_error) @@ -1135,7 +1062,7 @@ bool ClientBase::processQueryText(const String & text) if (!is_multiquery) { assert(!query_fuzzer_runs); - processSingleQuery(text); + processTextAsSingleQuery(text); return true; } @@ -1160,7 +1087,7 @@ bool ClientBase::processMultiQuery(const String & all_queries_text) /// disable logs if expects errors TestHint test_hint(test_mode, all_queries_text); if (test_hint.clientError() || test_hint.serverError()) - processSingleQuery("SET send_logs_level = 'fatal'"); + processTextAsSingleQuery("SET send_logs_level = 'fatal'"); } bool echo_query = echo_queries; @@ -1177,7 +1104,7 @@ bool ClientBase::processMultiQuery(const String & all_queries_text) echo_query = test_hint.echoQueries().value_or(echo_query); try { - processSingleQueryImpl(full_query, query_to_execute, parsed_query, echo_query, false); + processParsedSingleQuery(full_query, query_to_execute, parsed_query, echo_query, false); } catch (...) { diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index d742339c522..3a20fdbcec9 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -31,90 +31,47 @@ public: using Arguments = std::vector; void init(int argc, char ** argv); - int main(const std::vector & /*args*/) override; protected: - /* - * Run interactive or non-interactive mode. Depends on: - * - processSingleQuery - * - processMultiQuery - * - processWithFuzzing - */ - void runNonInteractive(); void runInteractive(); + void runNonInteractive(); + + /// Initialize `connection` object with `Connection` or `LocalConnection`. + virtual void connect() = 0; + /// Process single query. Can use processOrdinaryQuery(), processInsertQuery(). + virtual void executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query) = 0; + + + void processOrdinaryQuery(const String & query_to_execute, ASTPtr parsed_query); + void processInsertQuery(const String & query_to_execute, ASTPtr parsed_query); virtual bool processWithFuzzing(const String &) { throw Exception("Query processing with fuzzing is not implemented", ErrorCodes::NOT_IMPLEMENTED); } + virtual void processError(const String & query) const = 0; + virtual void loadSuggestionData(Suggest &) = 0; - void processOrdinaryQuery(const String & query_to_execute, ASTPtr parsed_query); - void receiveResult(ASTPtr parsed_query); - bool receiveAndProcessPacket(ASTPtr parsed_query, bool cancelled); - void initBlockOutputStream(const Block & block, ASTPtr parsed_query); - void initLogsOutputStream(); - void sendExternalTables(ASTPtr parsed_query); - virtual void connect() = 0; - void executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query); - void processInsertQuery(const String & query_to_execute, ASTPtr parsed_query); - bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description, ASTPtr parsed_query); - void sendData(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query); - void sendDataFrom(ReadBuffer & buf, Block & sample, - const ColumnsDescription & columns_description, ASTPtr parsed_query); - bool receiveEndOfQuery(); - void receiveLogs(ASTPtr parsed_query); - void writeFinalProgress(); - void processSingleQuery(const String & full_query); - bool processMultiQuery(const String & all_queries_text); - - - /* - * Process multiquery - several queries separated by ';'. Depends on executeSingleQuery(). - * Also in case of clickhouse-server: - * - INSERT data is ended by the end of line, not ';'. - * - An exception is VALUES format where we also support semicolon in addition to end of line. - **/ bool processMultiQueryImpl(const String & all_queries_text, std::function execute_single_query, std::function process_parse_query_error = {}); - /* - * Process parsed single query. Depends on executeSingleQuery(). - **/ - void processSingleQueryImpl(const String & query, const String & query_to_execute, ASTPtr parsed_query, + void processParsedSingleQuery(const String & query, const String & query_to_execute, ASTPtr parsed_query, std::optional echo_query_ = {}, bool report_error = false); - virtual void reportQueryError(const String & query) const = 0; - /// For non-interactive multi-query mode get queries text prefix. virtual String getQueryTextPrefix() { return ""; } - virtual void loadSuggestionData(Suggest &) = 0; - - void onData(Block & block, ASTPtr parsed_query); - void onLogData(Block & block); - void onTotals(Block & block, ASTPtr parsed_query); - void onExtremes(Block & block, ASTPtr parsed_query); - void onReceiveExceptionFromServer(std::unique_ptr && e); - void onProfileInfo(const BlockStreamProfileInfo & profile_info); void resetOutput(); ASTPtr parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const; - void onProgress(const Progress & value); - - void onEndOfStream(); - - /// Prepare for and call either runInteractive() or runNonInteractive(). virtual int mainImpl() = 0; - virtual void readArguments(int argc, char ** argv, - Arguments & common_arguments, std::vector &) = 0; - using ProgramOptionsDescription = boost::program_options::options_description; using CommandLineOptions = boost::program_options::variables_map; @@ -125,16 +82,44 @@ protected: }; virtual void printHelpMessage(const OptionsDescription & options_description) = 0; - + virtual void readArguments(int argc, char ** argv, + Arguments & common_arguments, std::vector &) = 0; virtual void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) = 0; - virtual void processOptions(const OptionsDescription & options_description, const CommandLineOptions & options, const std::vector & external_tables_arguments) = 0; - virtual void processConfig() = 0; private: + bool processQueryText(const String & text); + void processTextAsSingleQuery(const String & full_query); + bool processMultiQuery(const String & all_queries_text); + + void receiveResult(ASTPtr parsed_query); + bool receiveAndProcessPacket(ASTPtr parsed_query, bool cancelled); + void receiveLogs(ASTPtr parsed_query); + bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description, ASTPtr parsed_query); + bool receiveEndOfQuery(); + + void onProgress(const Progress & value); + void onData(Block & block, ASTPtr parsed_query); + void onLogData(Block & block); + void onTotals(Block & block, ASTPtr parsed_query); + void onExtremes(Block & block, ASTPtr parsed_query); + void onReceiveExceptionFromServer(std::unique_ptr && e); + void onProfileInfo(const BlockStreamProfileInfo & profile_info); + void onEndOfStream(); + + void sendData(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query); + + void sendDataFrom(ReadBuffer & buf, Block & sample, + const ColumnsDescription & columns_description, ASTPtr parsed_query); + + void initBlockOutputStream(const Block & block, ASTPtr parsed_query); + void initLogsOutputStream(); + + void sendExternalTables(ASTPtr parsed_query); + inline String prompt() const { return boost::replace_all_copy(prompt_by_server_display_name, "{database}", config().getString("database", "default")); @@ -142,10 +127,6 @@ private: void outputQueryInfo(bool echo_query_); - /// Process query text (multiquery or single query) according to options. - bool processQueryText(const String & text); - - protected: bool is_interactive = false; /// Use either interactive line editing interface or batch mode. bool is_multiquery = false; @@ -229,6 +210,9 @@ protected: size_t insert_format_max_block_size = 0; /// Max block size when reading INSERT data. size_t max_client_network_bandwidth = 0; /// The maximum speed of data exchange over the network for the client in bytes per second. + UInt64 server_revision = 0; + String server_version; + }; } diff --git a/src/Client/LocalConnection.cpp b/src/Client/LocalConnection.cpp index 50e874b46ad..e25660b2d23 100644 --- a/src/Client/LocalConnection.cpp +++ b/src/Client/LocalConnection.cpp @@ -67,35 +67,34 @@ void LocalConnection::sendQuery( bool) { query_context = Context::createCopy(getContext()); - query_context->makeSessionContext(); query_context->makeQueryContext(); - query_context->setUser("default", "", Poco::Net::SocketAddress{}); - query_context->setCurrentQueryId(""); query_context->setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); }); + query_context->setCurrentQueryId(""); + CurrentThread::QueryScope query_scope_holder(query_context); /// query_context->setCurrentDatabase(default_database); /// Send structure of columns to client for function input() - query_context->setInputInitializer([this] (ContextPtr context, const StoragePtr & input_storage) - { - if (context != query_context) - throw Exception("Unexpected context in Input initializer", ErrorCodes::LOGICAL_ERROR); + // query_context->setInputInitializer([this] (ContextPtr context, const StoragePtr & input_storage) + // { + // if (context != query_context) + // throw Exception("Unexpected context in Input initializer", ErrorCodes::LOGICAL_ERROR); - auto metadata_snapshot = input_storage->getInMemoryMetadataPtr(); - state.need_receive_data_for_input = true; + // auto metadata_snapshot = input_storage->getInMemoryMetadataPtr(); + // state.need_receive_data_for_input = true; - /// Send ColumnsDescription for input storage. - // if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA - // && query_context->getSettingsRef().input_format_defaults_for_omitted_fields) - // { - // sendTableColumns(metadata_snapshot->getColumns()); - // } + // /// Send ColumnsDescription for input storage. + // // if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA + // // && query_context->getSettingsRef().input_format_defaults_for_omitted_fields) + // // { + // // sendTableColumns(metadata_snapshot->getColumns()); + // // } - /// Send block to the client - input storage structure. - state.input_header = metadata_snapshot->getSampleBlock(); - next_packet_type = Protocol::Server::Data; - state.block = state.input_header; - }); + // /// Send block to the client - input storage structure. + // state.input_header = metadata_snapshot->getSampleBlock(); + // next_packet_type = Protocol::Server::Data; + // state.block = state.input_header; + // }); state.query_id = query_id_; state.query = query_; @@ -185,7 +184,6 @@ void LocalConnection::finishQuery() // sendProgress(); state.io.onFinish(); query_context.reset(); - // state.reset(); } bool LocalConnection::poll(size_t) diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 2e9742992fc..ccfc6a23543 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -970,8 +970,7 @@ void executeQuery( bool allow_into_outfile, ContextMutablePtr context, SetResultDetailsFunc set_result_details, - const std::optional & output_format_settings, - FlushBufferFunc flush_buffer_func) + const std::optional & output_format_settings) { PODArray parse_buf; const char * begin; @@ -1130,9 +1129,6 @@ void executeQuery( out->onProgress(progress); }); - if (flush_buffer_func) - out->setFlushBufferCallback(flush_buffer_func); - if (set_result_details) set_result_details( context->getClientInfo().current_query_id, out->getContentType(), format_name, DateLUT::instance().getTimeZone()); diff --git a/src/Interpreters/executeQuery.h b/src/Interpreters/executeQuery.h index 87fb7c203ce..0505c9c8016 100644 --- a/src/Interpreters/executeQuery.h +++ b/src/Interpreters/executeQuery.h @@ -11,7 +11,6 @@ class ReadBuffer; class WriteBuffer; using SetResultDetailsFunc = std::function; -using FlushBufferFunc = std::function; /// Parse and execute a query. void executeQuery( @@ -20,8 +19,7 @@ void executeQuery( bool allow_into_outfile, /// If true and the query contains INTO OUTFILE section, redirect output to that file. ContextMutablePtr context, /// DB, tables, data types, storage engines, functions, aggregate functions... SetResultDetailsFunc set_result_details, /// If a non-empty callback is passed, it will be called with the query id, the content-type, the format, and the timezone. - const std::optional & output_format_settings = std::nullopt, /// Format settings for output format, will be calculated from the context if not set. - FlushBufferFunc flush_buffer_func = {} + const std::optional & output_format_settings = std::nullopt /// Format settings for output format, will be calculated from the context if not set. ); diff --git a/src/Processors/Formats/IOutputFormat.cpp b/src/Processors/Formats/IOutputFormat.cpp index e7c8d7c741f..0f94622b7c6 100644 --- a/src/Processors/Formats/IOutputFormat.cpp +++ b/src/Processors/Formats/IOutputFormat.cpp @@ -105,10 +105,7 @@ void IOutputFormat::work() void IOutputFormat::flush() { - if (flush_callback) - flush_callback(out, result_rows); - else - out.next(); + out.next(); } void IOutputFormat::write(const Block & block) diff --git a/src/Processors/Formats/IOutputFormat.h b/src/Processors/Formats/IOutputFormat.h index cba901ec6f6..d4497140140 100644 --- a/src/Processors/Formats/IOutputFormat.h +++ b/src/Processors/Formats/IOutputFormat.h @@ -67,8 +67,6 @@ public: /// Passed value are delta, that must be summarized. virtual void onProgress(const Progress & /*progress*/) {} - void setFlushBufferCallback(std::function callback) { flush_callback = callback; } - /// Content-Type to set when sending HTTP response. virtual std::string getContentType() const { return "text/plain; charset=UTF-8"; } @@ -93,7 +91,5 @@ private: size_t result_bytes = 0; bool prefix_written = false; - - std::function flush_callback; }; }