Simplification

This commit is contained in:
Alexey Milovidov 2024-06-19 22:50:52 +02:00
parent 11d54f4809
commit 16c3e36b5a
4 changed files with 74 additions and 80 deletions

View File

@ -439,7 +439,7 @@ void ClientBase::sendExternalTables(ASTPtr parsed_query)
std::vector<ExternalTableDataPtr> data;
for (auto & table : external_tables)
data.emplace_back(table.getData(global_context));
data.emplace_back(table.getData(query_context));
connection->sendExternalTablesData(data);
}
@ -652,10 +652,10 @@ try
/// intermixed with data with parallel formatting.
/// It may increase code complexity significantly.
if (!extras_into_stdout || select_only_into_file)
output_format = global_context->getOutputFormatParallelIfPossible(
output_format = query_context->getOutputFormatParallelIfPossible(
current_format, out_file_buf ? *out_file_buf : *out_buf, block);
else
output_format = global_context->getOutputFormat(
output_format = query_context->getOutputFormat(
current_format, out_file_buf ? *out_file_buf : *out_buf, block);
output_format->setAutoFlush();
@ -949,7 +949,7 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
/// But for asynchronous inserts we don't extract data, because it's needed
/// to be done on server side in that case (for coalescing the data from multiple inserts on server side).
const auto * insert = parsed_query->as<ASTInsertQuery>();
if (insert && isSyncInsertWithData(*insert, global_context))
if (insert && isSyncInsertWithData(*insert, query_context))
query_to_execute = full_query.substr(0, insert->data - full_query.data());
else
query_to_execute = full_query;
@ -1067,7 +1067,7 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
}
}
const auto & settings = global_context->getSettingsRef();
const auto & settings = query_context->getSettingsRef();
const Int32 signals_before_stop = settings.partial_result_on_first_cancel ? 2 : 1;
int retries_left = 10;
@ -1082,10 +1082,10 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
connection_parameters.timeouts,
query,
query_parameters,
global_context->getCurrentQueryId(),
query_context->getCurrentQueryId(),
query_processing_stage,
&global_context->getSettingsRef(),
&global_context->getClientInfo(),
&query_context->getSettingsRef(),
&query_context->getClientInfo(),
true,
[&](const Progress & progress) { onProgress(progress); });
@ -1275,7 +1275,7 @@ void ClientBase::onProgress(const Progress & value)
void ClientBase::onTimezoneUpdate(const String & tz)
{
global_context->setSetting("session_timezone", tz);
query_context->setSetting("session_timezone", tz);
}
@ -1471,13 +1471,13 @@ bool ClientBase::receiveSampleBlock(Block & out, ColumnsDescription & columns_de
void ClientBase::setInsertionTable(const ASTInsertQuery & insert_query)
{
if (!global_context->hasInsertionTable() && insert_query.table)
if (!query_context->hasInsertionTable() && insert_query.table)
{
String table = insert_query.table->as<ASTIdentifier &>().shortName();
if (!table.empty())
{
String database = insert_query.database ? insert_query.database->as<ASTIdentifier &>().shortName() : "";
global_context->setInsertionTable(StorageID(database, table));
query_context->setInsertionTable(StorageID(database, table));
}
}
}
@ -1528,7 +1528,7 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
const auto & parsed_insert_query = parsed_query->as<ASTInsertQuery &>();
if ((!parsed_insert_query.data && !parsed_insert_query.infile) && (is_interactive || (!stdin_is_a_tty && !isStdinNotEmptyAndValid(std_in))))
{
const auto & settings = global_context->getSettingsRef();
const auto & settings = query_context->getSettingsRef();
if (settings.throw_if_no_data_to_insert)
throw Exception(ErrorCodes::NO_DATA_TO_INSERT, "No data to insert");
else
@ -1542,10 +1542,10 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
connection_parameters.timeouts,
query,
query_parameters,
global_context->getCurrentQueryId(),
query_context->getCurrentQueryId(),
query_processing_stage,
&global_context->getSettingsRef(),
&global_context->getClientInfo(),
&query_context->getSettingsRef(),
&query_context->getClientInfo(),
true,
[&](const Progress & progress) { onProgress(progress); });
@ -1593,7 +1593,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
/// Set callback to be called on file progress.
if (tty_buf)
progress_indication.setFileProgressCallback(global_context, *tty_buf);
progress_indication.setFileProgressCallback(query_context, *tty_buf);
}
/// If data fetched from file (maybe compressed file)
@ -1627,10 +1627,10 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
}
StorageFile::CommonArguments args{
WithContext(global_context),
WithContext(query_context),
parsed_insert_query->table_id,
current_format,
getFormatSettings(global_context),
getFormatSettings(query_context),
compression_method,
columns_for_storage_file,
ConstraintsDescription{},
@ -1638,7 +1638,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
{},
String{},
};
StoragePtr storage = std::make_shared<StorageFile>(in_file, global_context->getUserFilesPath(), args);
StoragePtr storage = std::make_shared<StorageFile>(in_file, query_context->getUserFilesPath(), args);
storage->startup();
SelectQueryInfo query_info;
@ -1647,18 +1647,18 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
auto metadata = storage->getInMemoryMetadataPtr();
QueryPlan plan;
storage->read(
plan,
sample.getNames(),
storage->getStorageSnapshot(metadata, global_context),
query_info,
global_context,
{},
global_context->getSettingsRef().max_block_size,
getNumberOfPhysicalCPUCores());
plan,
sample.getNames(),
storage->getStorageSnapshot(metadata, query_context),
query_info,
query_context,
{},
query_context->getSettingsRef().max_block_size,
getNumberOfPhysicalCPUCores());
auto builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(global_context),
BuildQueryPipelineSettings::fromContext(global_context));
QueryPlanOptimizationSettings::fromContext(query_context),
BuildQueryPipelineSettings::fromContext(query_context));
QueryPlanResourceHolder resources;
auto pipe = QueryPipelineBuilder::getPipe(std::move(*builder), resources);
@ -1719,14 +1719,14 @@ void ClientBase::sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDes
current_format = insert->format;
}
auto source = global_context->getInputFormat(current_format, buf, sample, insert_format_max_block_size);
auto source = query_context->getInputFormat(current_format, buf, sample, insert_format_max_block_size);
Pipe pipe(source);
if (columns_description.hasDefaults())
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AddingDefaultsTransform>(header, columns_description, *source, global_context);
return std::make_shared<AddingDefaultsTransform>(header, columns_description, *source, query_context);
});
}
@ -1872,6 +1872,9 @@ void ClientBase::cancelQuery()
void ClientBase::processParsedSingleQuery(const String & full_query, const String & query_to_execute,
ASTPtr parsed_query, std::optional<bool> echo_query_, bool report_error)
{
query_context = Context::createCopy(global_context);
query_context->makeQueryContext();
resetOutput();
have_error = false;
cancelled = false;
@ -1888,12 +1891,12 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
if (is_interactive)
{
global_context->setCurrentQueryId("");
query_context->setCurrentQueryId("");
// Generate a new query_id
for (const auto & query_id_format : query_id_formats)
{
writeString(query_id_format.first, std_out);
writeString(fmt::format(fmt::runtime(query_id_format.second), fmt::arg("query_id", global_context->getCurrentQueryId())), std_out);
writeString(fmt::format(fmt::runtime(query_id_format.second), fmt::arg("query_id", query_context->getCurrentQueryId())), std_out);
writeChar('\n', std_out);
std_out.next();
}
@ -1920,7 +1923,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
auto password = auth_data->getPassword();
if (password)
global_context->getAccessControl().checkPasswordComplexityRules(*password);
query_context->getAccessControl().checkPasswordComplexityRules(*password);
}
}
}
@ -1930,47 +1933,40 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
progress_indication.resetProgress();
profile_events.watch.restart();
/// Apply query settings to context, as they can affect the behavior on client-side.
InterpreterSetQuery::applySettingsFromQuery(parsed_query, query_context);
if (!connection->checkConnected(connection_parameters.timeouts))
connect();
ASTPtr input_function;
const auto * insert = parsed_query->as<ASTInsertQuery>();
if (insert && insert->select)
insert->tryFindInputFunction(input_function);
bool is_async_insert_with_inlined_data = query_context->getSettingsRef().async_insert && insert && insert->hasInlinedData();
if (is_async_insert_with_inlined_data)
{
/// Temporarily apply query settings to context.
Settings old_settings = global_context->getSettings();
SCOPE_EXIT_SAFE({
global_context->setSettings(old_settings);
});
bool have_data_in_stdin = !is_interactive && !stdin_is_a_tty && isStdinNotEmptyAndValid(std_in);
bool have_external_data = have_data_in_stdin || insert->infile;
InterpreterSetQuery::applySettingsFromQuery(parsed_query, global_context);
if (!connection->checkConnected(connection_parameters.timeouts))
connect();
ASTPtr input_function;
const auto * insert = parsed_query->as<ASTInsertQuery>();
if (insert && insert->select)
insert->tryFindInputFunction(input_function);
bool is_async_insert_with_inlined_data = global_context->getSettingsRef().async_insert && insert && insert->hasInlinedData();
if (is_async_insert_with_inlined_data)
{
bool have_data_in_stdin = !is_interactive && !stdin_is_a_tty && isStdinNotEmptyAndValid(std_in);
bool have_external_data = have_data_in_stdin || insert->infile;
if (have_external_data)
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Processing async inserts with both inlined and external data (from stdin or infile) is not supported");
}
/// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately.
if (insert && (!insert->select || input_function) && !is_async_insert_with_inlined_data)
{
if (input_function && insert->format.empty())
throw Exception(ErrorCodes::INVALID_USAGE_OF_INPUT, "FORMAT must be specified for function input()");
processInsertQuery(query_to_execute, parsed_query);
}
else
processOrdinaryQuery(query_to_execute, parsed_query);
if (have_external_data)
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Processing async inserts with both inlined and external data (from stdin or infile) is not supported");
}
/// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately.
if (insert && (!insert->select || input_function) && !is_async_insert_with_inlined_data)
{
if (input_function && insert->format.empty())
throw Exception(ErrorCodes::INVALID_USAGE_OF_INPUT, "FORMAT must be specified for function input()");
processInsertQuery(query_to_execute, parsed_query);
}
else
processOrdinaryQuery(query_to_execute, parsed_query);
/// Do not change context (current DB, settings) in case of an exception.
if (!have_error)
{
@ -2651,10 +2647,8 @@ bool ClientBase::processMultiQueryFromFile(const String & file_name)
if (!has_log_comment)
{
Settings settings = global_context->getSettings();
/// NOTE: cannot use even weakly_canonical() since it fails for /dev/stdin due to resolving of "pipe:[X]"
settings.log_comment = fs::absolute(fs::path(file_name));
global_context->setSettings(settings);
global_context->setSetting("log_comment", String(fs::absolute(fs::path(file_name))));
}
return executeMultiQuery(queries_from_file);

View File

@ -198,6 +198,7 @@ protected:
/// since other members can use them.
SharedContextHolder shared_context;
ContextMutablePtr global_context;
ContextMutablePtr query_context;
bool is_interactive = false; /// Use either interactive line editing interface or batch mode.
bool is_multiquery = false;

View File

@ -532,7 +532,7 @@ ContextMutablePtr Session::makeSessionContext()
session_context->checkSettingsConstraints(settings_from_auth_server, SettingSource::QUERY);
session_context->applySettingsChanges(settings_from_auth_server);
recordLoginSucess(session_context);
recordLoginSuccess(session_context);
return session_context;
}
@ -596,7 +596,7 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std:
{ session_name_ },
max_sessions_for_user);
recordLoginSucess(session_context);
recordLoginSuccess(session_context);
return session_context;
}
@ -672,13 +672,13 @@ ContextMutablePtr Session::makeQueryContextImpl(const ClientInfo * client_info_t
user = query_context->getUser();
/// Interserver does not create session context
recordLoginSucess(query_context);
recordLoginSuccess(query_context);
return query_context;
}
void Session::recordLoginSucess(ContextPtr login_context) const
void Session::recordLoginSuccess(ContextPtr login_context) const
{
if (notified_session_log_about_login)
return;

View File

@ -102,8 +102,7 @@ public:
private:
std::shared_ptr<SessionLog> getSessionLog() const;
ContextMutablePtr makeQueryContextImpl(const ClientInfo * client_info_to_copy, ClientInfo * client_info_to_move) const;
void recordLoginSucess(ContextPtr login_context) const;
void recordLoginSuccess(ContextPtr login_context) const;
mutable bool notified_session_log_about_login = false;
const UUID auth_id;