ClickHouse/src/Client/LocalConnection.cpp

385 lines
9.0 KiB
C++
Raw Normal View History

2021-08-17 19:59:51 +00:00
#include "LocalConnection.h"
#include <Interpreters/executeQuery.h>
2021-08-18 14:39:04 +00:00
#include <Storages/IStorage.h>
2021-08-17 19:59:51 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_PACKET_FROM_SERVER;
2021-08-19 20:27:40 +00:00
extern const int UNKNOWN_EXCEPTION;
2021-08-17 19:59:51 +00:00
}
LocalConnection::LocalConnection(ContextPtr context_)
: WithContext(context_)
2021-08-20 08:38:50 +00:00
, session(getContext(), ClientInfo::Interface::TCP)
2021-08-17 19:59:51 +00:00
{
2021-08-20 08:38:50 +00:00
/// Authenticate and create a context to execute queries.
session.authenticate("default", "", Poco::Net::SocketAddress{});
2021-08-17 19:59:51 +00:00
}
2021-08-21 10:55:54 +00:00
LocalConnection::~LocalConnection()
{
try
{
state.reset();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
2021-08-17 19:59:51 +00:00
void LocalConnection::setDefaultDatabase(const String & database)
{
default_database = database;
}
void LocalConnection::getServerVersion(
const ConnectionTimeouts & /* timeouts */, String & name,
UInt64 & version_major, UInt64 & version_minor,
UInt64 & version_patch, UInt64 & revision)
{
name = server_name;
version_major = server_version_major;
version_minor = server_version_minor;
version_patch = server_version_patch;
revision = server_revision;
}
UInt64 LocalConnection::getServerRevision(const ConnectionTimeouts &)
{
return server_revision;
}
const String & LocalConnection::getDescription() const
{
return description;
}
const String & LocalConnection::getServerTimezone(const ConnectionTimeouts &)
{
return server_timezone;
}
const String & LocalConnection::getServerDisplayName(const ConnectionTimeouts &)
{
return server_display_name;
}
2021-08-21 10:55:54 +00:00
bool LocalConnection::hasReadPendingData() const
{
return !state->is_finished;
}
std::optional<UInt64> LocalConnection::checkPacket(size_t)
{
return next_packet_type;
}
void LocalConnection::updateProgress(const Progress & value)
{
state->progress.incrementPiecewiseAtomically(value);
}
2021-08-17 19:59:51 +00:00
/*
* SendQuery: execute query and suspend the result, which will be received back via poll.
**/
void LocalConnection::sendQuery(
const ConnectionTimeouts &,
const String & query_,
const String & query_id_,
UInt64,
const Settings *,
const ClientInfo *,
bool)
{
2021-08-19 20:27:40 +00:00
state.reset();
state.emplace();
state->query_id = query_id_;
state->query = query_;
2021-08-20 08:38:50 +00:00
query_context = session.makeQueryContext();
query_context->makeSessionContext(); /// initial_create_query requires a session context to be set.
query_context->setCurrentQueryId("");
CurrentThread::QueryScope query_scope_holder(query_context);
2021-08-21 10:55:54 +00:00
state->after_send_progress.restart();
state->query_execution_time.restart();
2021-08-20 08:38:50 +00:00
2021-08-19 20:27:40 +00:00
try
{
2021-08-21 10:55:54 +00:00
std::cerr << "query: " << state->query << std::endl;
2021-08-19 20:27:40 +00:00
state->io = executeQuery(state->query, query_context, false, state->stage, true);
if (state->io.out)
{
2021-08-21 10:55:54 +00:00
std::cerr << "state io out\n";
2021-08-19 20:27:40 +00:00
state->need_receive_data_for_insert = true;
2021-08-21 10:55:54 +00:00
state->io.out->writePrefix();
next_packet_type = Protocol::Server::Data;
state->block = state->io.out->getHeader();
2021-08-19 20:27:40 +00:00
}
else if (state->io.pipeline.initialized())
{
2021-08-21 10:55:54 +00:00
std::cerr << "pipeline processing\n";
2021-08-19 20:27:40 +00:00
state->executor = std::make_unique<PullingAsyncPipelineExecutor>(state->io.pipeline);
}
else if (state->io.in)
{
2021-08-21 10:55:54 +00:00
std::cerr << "sream processing\n";
2021-08-19 20:27:40 +00:00
state->async_in = std::make_unique<AsynchronousBlockInputStream>(state->io.in);
state->async_in->readPrefix();
}
2021-08-21 10:55:54 +00:00
else
std::cerr << "\n\nanother FFFFFFFFFFFFFFFFF\n\n";
2021-08-19 20:27:40 +00:00
}
catch (const Exception & e)
2021-08-17 19:59:51 +00:00
{
2021-08-19 20:27:40 +00:00
state->io.onException();
state->exception.emplace(e);
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
catch (const std::exception & e)
2021-08-17 19:59:51 +00:00
{
2021-08-19 20:27:40 +00:00
state->io.onException();
state->exception.emplace(Exception::CreateFromSTDTag{}, e);
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
catch (...)
2021-08-17 19:59:51 +00:00
{
2021-08-19 20:27:40 +00:00
state->io.onException();
state->exception.emplace("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
2021-08-17 19:59:51 +00:00
}
}
2021-08-18 14:39:04 +00:00
void LocalConnection::sendData(const Block & block, const String &, bool)
{
if (block)
{
2021-08-21 10:55:54 +00:00
/// INSERT query.
state->io.out->write(block);
}
else
{
state->io.out->writeSuffix();
finishQuery();
2021-08-18 14:39:04 +00:00
}
}
2021-08-17 19:59:51 +00:00
void LocalConnection::sendCancel()
{
2021-08-19 20:27:40 +00:00
if (state->async_in)
2021-08-17 19:59:51 +00:00
{
2021-08-19 20:27:40 +00:00
state->async_in->cancel(false);
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
else if (state->executor)
2021-08-17 19:59:51 +00:00
{
2021-08-19 20:27:40 +00:00
state->executor->cancel();
2021-08-17 19:59:51 +00:00
}
}
2021-08-19 20:27:40 +00:00
bool LocalConnection::pullBlock(Block & block)
2021-08-17 19:59:51 +00:00
{
2021-08-19 20:27:40 +00:00
if (state->async_in)
2021-08-17 19:59:51 +00:00
{
2021-08-21 10:55:54 +00:00
if (state->async_in->poll(query_context->getSettingsRef().interactive_delay / 1000))
2021-08-19 20:27:40 +00:00
block = state->async_in->read();
if (block)
return true;
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
else if (state->executor)
2021-08-17 19:59:51 +00:00
{
2021-08-21 10:55:54 +00:00
return state->executor->pull(block, query_context->getSettingsRef().interactive_delay / 1000);
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
return false;
2021-08-17 19:59:51 +00:00
}
void LocalConnection::finishQuery()
{
2021-08-21 10:55:54 +00:00
next_packet_type = Protocol::Server::EndOfStream;
if (!state)
return;
2021-08-19 20:27:40 +00:00
if (state->async_in)
2021-08-17 19:59:51 +00:00
{
2021-08-19 20:27:40 +00:00
state->async_in->readSuffix();
state->async_in.reset();
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
else if (state->executor)
2021-08-17 19:59:51 +00:00
{
2021-08-19 20:27:40 +00:00
state->executor.reset();
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
state->io.onFinish();
2021-08-20 08:38:50 +00:00
state.reset();
2021-08-17 19:59:51 +00:00
query_context.reset();
}
bool LocalConnection::poll(size_t)
{
2021-08-21 10:55:54 +00:00
if (!state)
return false;
if (state->after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
2021-08-17 19:59:51 +00:00
{
2021-08-21 10:55:54 +00:00
state->after_send_progress.restart();
2021-08-17 19:59:51 +00:00
next_packet_type = Protocol::Server::Progress;
2021-08-19 20:27:40 +00:00
return true;
}
2021-08-17 19:59:51 +00:00
2021-08-21 10:55:54 +00:00
if (state->is_finished)
{
finishQuery();
return true;
}
2021-08-19 20:27:40 +00:00
try
{
pollImpl();
}
catch (const Exception & e)
{
state->io.onException();
state->exception.emplace(e);
}
catch (const std::exception & e)
{
state->io.onException();
state->exception.emplace(Exception::CreateFromSTDTag{}, e);
}
catch (...)
{
state->io.onException();
state->exception.emplace("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
if (state->exception)
{
2021-08-21 10:55:54 +00:00
finishQuery();
2021-08-19 20:27:40 +00:00
next_packet_type = Protocol::Server::Exception;
2021-08-17 19:59:51 +00:00
return true;
}
2021-08-19 20:27:40 +00:00
if (state->is_finished && !state->sent_totals)
{
state->sent_totals = true;
Block totals;
if (state->io.in)
totals = state->io.in->getTotals();
else if (state->executor)
totals = state->executor->getTotalsBlock();
if (totals)
{
next_packet_type = Protocol::Server::Totals;
state->block.emplace(totals);
return true;
}
}
if (state->is_finished && !state->sent_extremes)
{
state->sent_extremes = true;
Block extremes;
if (state->io.in)
extremes = state->io.in->getExtremes();
else if (state->executor)
extremes = state->executor->getExtremesBlock();
if (extremes)
{
next_packet_type = Protocol::Server::Extremes;
state->block.emplace(extremes);
return true;
}
}
if (state->is_finished)
{
finishQuery();
return true;
}
if (state->block)
2021-08-17 19:59:51 +00:00
{
next_packet_type = Protocol::Server::Data;
2021-08-19 20:27:40 +00:00
return true;
}
return false;
}
2021-08-17 19:59:51 +00:00
2021-08-19 20:27:40 +00:00
bool LocalConnection::pollImpl()
{
Block block;
auto next_read = pullBlock(block);
if (block)
{
if (state->io.null_format)
state->block.emplace();
2021-08-17 19:59:51 +00:00
else
2021-08-19 20:27:40 +00:00
state->block.emplace(block);
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
else if (!next_read)
2021-08-17 19:59:51 +00:00
{
2021-08-19 20:27:40 +00:00
state->is_finished = true;
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
2021-08-17 19:59:51 +00:00
return true;
}
Packet LocalConnection::receivePacket()
{
Packet packet;
2021-08-21 10:55:54 +00:00
if (!next_packet_type)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No packet");
2021-08-17 19:59:51 +00:00
packet.type = next_packet_type.value();
switch (next_packet_type.value())
{
2021-08-19 20:27:40 +00:00
case Protocol::Server::Totals: [[fallthrough]];
case Protocol::Server::Extremes: [[fallthrough]];
2021-08-17 19:59:51 +00:00
case Protocol::Server::Data:
{
2021-08-19 20:27:40 +00:00
if (state->block)
2021-08-17 19:59:51 +00:00
{
2021-08-19 20:27:40 +00:00
packet.block = std::move(*state->block);
state->block.reset();
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
break;
}
case Protocol::Server::Exception:
{
packet.exception = std::make_unique<Exception>(*state->exception);
2021-08-17 19:59:51 +00:00
break;
}
case Protocol::Server::Progress:
{
2021-08-19 20:27:40 +00:00
packet.progress = std::move(state->progress);
state->progress.reset();
2021-08-17 19:59:51 +00:00
break;
}
case Protocol::Server::EndOfStream:
{
break;
}
default:
throw Exception("Unknown packet " + toString(packet.type)
+ " from server " + getDescription(), ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
2021-08-21 10:55:54 +00:00
if (state && state->query_execution_time.elapsed() > static_cast<Float64>(query_context->getSettingsRef().max_execution_time.totalMilliseconds()))
state->is_finished = true;
2021-08-17 19:59:51 +00:00
2021-08-21 10:55:54 +00:00
next_packet_type.reset();
return packet;
2021-08-17 19:59:51 +00:00
}
}