2021-08-17 19:59:51 +00:00
|
|
|
#include "LocalConnection.h"
|
|
|
|
#include <Interpreters/executeQuery.h>
|
2021-09-29 19:03:30 +00:00
|
|
|
#include <Processors/Executors/CompletedPipelineExecutor.h>
|
|
|
|
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
|
|
|
|
#include <Processors/Executors/PushingPipelineExecutor.h>
|
|
|
|
#include <Processors/Executors/PushingAsyncPipelineExecutor.h>
|
2021-08-18 14:39:04 +00:00
|
|
|
#include <Storages/IStorage.h>
|
2021-10-18 14:53:42 +00:00
|
|
|
#include <Core/Protocol.h>
|
2022-02-15 08:25:07 +00:00
|
|
|
#include <DataTypes/DataTypesNumber.h>
|
|
|
|
#include <DataTypes/DataTypeString.h>
|
|
|
|
#include <Interpreters/ProfileEventsExt.h>
|
2021-08-17 19:59:51 +00:00
|
|
|
|
2021-09-26 21:22:04 +00:00
|
|
|
|
2021-08-17 19:59:51 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int UNKNOWN_PACKET_FROM_SERVER;
|
2021-08-19 20:27:40 +00:00
|
|
|
extern const int UNKNOWN_EXCEPTION;
|
2021-09-04 18:19:01 +00:00
|
|
|
extern const int NOT_IMPLEMENTED;
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|
|
|
|
|
2022-02-15 08:25:07 +00:00
|
|
|
LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_, bool send_profile_events_)
|
2021-08-17 19:59:51 +00:00
|
|
|
: WithContext(context_)
|
2021-09-04 18:19:01 +00:00
|
|
|
, session(getContext(), ClientInfo::Interface::LOCAL)
|
2021-09-11 17:16:37 +00:00
|
|
|
, send_progress(send_progress_)
|
2022-02-15 08:25:07 +00:00
|
|
|
, send_profile_events(send_profile_events_)
|
2021-08-17 19:59:51 +00:00
|
|
|
{
|
2021-08-20 08:38:50 +00:00
|
|
|
/// Authenticate and create a context to execute queries.
|
|
|
|
session.authenticate("default", "", Poco::Net::SocketAddress{});
|
2021-09-24 08:29:01 +00:00
|
|
|
session.makeSessionContext();
|
2021-09-11 11:34:22 +00:00
|
|
|
|
|
|
|
if (!CurrentThread::isInitialized())
|
|
|
|
thread_status.emplace();
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|
|
|
|
|
2021-08-21 10:55:54 +00:00
|
|
|
LocalConnection::~LocalConnection()
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
state.reset();
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
bool LocalConnection::hasReadPendingData() const
|
|
|
|
{
|
|
|
|
return !state->is_finished;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::optional<UInt64> LocalConnection::checkPacket(size_t)
|
|
|
|
{
|
|
|
|
return next_packet_type;
|
|
|
|
}
|
|
|
|
|
|
|
|
void LocalConnection::updateProgress(const Progress & value)
|
|
|
|
{
|
|
|
|
state->progress.incrementPiecewiseAtomically(value);
|
|
|
|
}
|
|
|
|
|
2022-02-15 12:11:13 +00:00
|
|
|
void LocalConnection::getProfileEvents(Block & block)
|
2022-02-15 08:25:07 +00:00
|
|
|
{
|
2022-02-15 12:26:53 +00:00
|
|
|
using namespace ProfileEvents;
|
|
|
|
|
2022-02-15 08:25:07 +00:00
|
|
|
static const NamesAndTypesList column_names_and_types = {
|
|
|
|
{"host_name", std::make_shared<DataTypeString>()},
|
|
|
|
{"current_time", std::make_shared<DataTypeDateTime>()},
|
|
|
|
{"thread_id", std::make_shared<DataTypeUInt64>()},
|
2022-02-15 12:26:53 +00:00
|
|
|
{"type", TypeEnum},
|
2022-02-15 08:25:07 +00:00
|
|
|
{"name", std::make_shared<DataTypeString>()},
|
|
|
|
{"value", std::make_shared<DataTypeInt64>()},
|
|
|
|
};
|
|
|
|
|
|
|
|
ColumnsWithTypeAndName temp_columns;
|
|
|
|
for (auto const & name_and_type : column_names_and_types)
|
|
|
|
temp_columns.emplace_back(name_and_type.type, name_and_type.name);
|
|
|
|
|
|
|
|
block = Block(std::move(temp_columns));
|
|
|
|
MutableColumns columns = block.mutateColumns();
|
|
|
|
auto thread_group = CurrentThread::getGroup();
|
|
|
|
auto const current_thread_id = CurrentThread::get().thread_id;
|
|
|
|
std::vector<ProfileEventsSnapshot> snapshots;
|
|
|
|
ThreadIdToCountersSnapshot new_snapshots;
|
|
|
|
ProfileEventsSnapshot group_snapshot;
|
|
|
|
{
|
|
|
|
auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads();
|
|
|
|
snapshots.reserve(stats.size());
|
|
|
|
|
|
|
|
for (auto & stat : stats)
|
|
|
|
{
|
|
|
|
auto const thread_id = stat.thread_id;
|
|
|
|
if (thread_id == current_thread_id)
|
|
|
|
continue;
|
|
|
|
auto current_time = time(nullptr);
|
|
|
|
auto previous_snapshot = last_sent_snapshots.find(thread_id);
|
|
|
|
auto increment =
|
|
|
|
previous_snapshot != last_sent_snapshots.end()
|
|
|
|
? CountersIncrement(stat.counters, previous_snapshot->second)
|
|
|
|
: CountersIncrement(stat.counters);
|
|
|
|
snapshots.push_back(ProfileEventsSnapshot{
|
|
|
|
thread_id,
|
|
|
|
std::move(increment),
|
|
|
|
stat.memory_usage,
|
|
|
|
current_time
|
|
|
|
});
|
|
|
|
new_snapshots[thread_id] = std::move(stat.counters);
|
|
|
|
}
|
|
|
|
|
|
|
|
group_snapshot.thread_id = 0;
|
|
|
|
group_snapshot.current_time = time(nullptr);
|
|
|
|
group_snapshot.memory_usage = thread_group->memory_tracker.get();
|
|
|
|
auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot();
|
|
|
|
auto prev_group_snapshot = last_sent_snapshots.find(0);
|
|
|
|
group_snapshot.counters =
|
|
|
|
prev_group_snapshot != last_sent_snapshots.end()
|
|
|
|
? CountersIncrement(group_counters, prev_group_snapshot->second)
|
|
|
|
: CountersIncrement(group_counters);
|
|
|
|
new_snapshots[0] = std::move(group_counters);
|
|
|
|
}
|
|
|
|
last_sent_snapshots = std::move(new_snapshots);
|
|
|
|
|
|
|
|
const String server_display_name = "localhost";
|
|
|
|
for (auto & snapshot : snapshots)
|
|
|
|
{
|
|
|
|
dumpProfileEvents(snapshot, columns, server_display_name);
|
|
|
|
dumpMemoryTracker(snapshot, columns, server_display_name);
|
|
|
|
}
|
|
|
|
dumpProfileEvents(group_snapshot, columns, server_display_name);
|
|
|
|
dumpMemoryTracker(group_snapshot, columns, server_display_name);
|
|
|
|
|
|
|
|
MutableColumns logs_columns;
|
|
|
|
Block curr_block;
|
|
|
|
size_t rows = 0;
|
|
|
|
|
|
|
|
for (; state->profile_queue->tryPop(curr_block); ++rows)
|
|
|
|
{
|
|
|
|
auto curr_columns = curr_block.getColumns();
|
|
|
|
for (size_t j = 0; j < curr_columns.size(); ++j)
|
|
|
|
columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-17 19:59:51 +00:00
|
|
|
void LocalConnection::sendQuery(
|
|
|
|
const ConnectionTimeouts &,
|
2021-10-16 08:41:50 +00:00
|
|
|
const String & query,
|
|
|
|
const String & query_id,
|
|
|
|
UInt64 stage,
|
2021-08-17 19:59:51 +00:00
|
|
|
const Settings *,
|
|
|
|
const ClientInfo *,
|
|
|
|
bool)
|
|
|
|
{
|
2021-09-24 08:29:01 +00:00
|
|
|
query_context = session.makeQueryContext();
|
2021-10-16 08:41:50 +00:00
|
|
|
query_context->setCurrentQueryId(query_id);
|
2021-09-24 08:29:01 +00:00
|
|
|
if (send_progress)
|
2021-10-31 21:33:03 +00:00
|
|
|
{
|
2021-09-24 08:29:01 +00:00
|
|
|
query_context->setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });
|
2021-10-31 21:33:03 +00:00
|
|
|
query_context->setFileProgressCallback([this](const FileProgress & value) { this->updateProgress(Progress(value)); });
|
|
|
|
}
|
2022-02-06 15:11:18 +00:00
|
|
|
if (!current_database.empty())
|
|
|
|
query_context->setCurrentDatabase(current_database);
|
2021-09-24 08:29:01 +00:00
|
|
|
|
2022-02-15 12:11:13 +00:00
|
|
|
query_scope_holder.reset();
|
|
|
|
query_scope_holder = std::make_unique<CurrentThread::QueryScope>(query_context);
|
2021-09-24 08:29:01 +00:00
|
|
|
|
2021-08-19 20:27:40 +00:00
|
|
|
state.reset();
|
|
|
|
state.emplace();
|
|
|
|
|
2021-10-16 08:41:50 +00:00
|
|
|
state->query_id = query_id;
|
|
|
|
state->query = query;
|
|
|
|
state->stage = QueryProcessingStage::Enum(stage);
|
2022-02-15 08:25:07 +00:00
|
|
|
state->profile_queue = std::make_shared<InternalProfileEventsQueue>(std::numeric_limits<int>::max());
|
|
|
|
CurrentThread::attachInternalProfileEventsQueue(state->profile_queue);
|
2021-08-19 20:27:40 +00:00
|
|
|
|
2021-09-11 17:16:37 +00:00
|
|
|
if (send_progress)
|
|
|
|
state->after_send_progress.restart();
|
2021-08-20 08:38:50 +00:00
|
|
|
|
2022-02-15 08:25:07 +00:00
|
|
|
if (send_profile_events)
|
|
|
|
state->after_send_profile_events.restart();
|
|
|
|
|
2021-09-11 17:16:37 +00:00
|
|
|
next_packet_type.reset();
|
2021-09-11 12:03:28 +00:00
|
|
|
|
2021-08-19 20:27:40 +00:00
|
|
|
try
|
|
|
|
{
|
2021-09-16 16:14:17 +00:00
|
|
|
state->io = executeQuery(state->query, query_context, false, state->stage);
|
2021-08-23 07:13:27 +00:00
|
|
|
|
2021-09-29 19:03:30 +00:00
|
|
|
if (state->io.pipeline.pushing())
|
2021-08-19 20:27:40 +00:00
|
|
|
{
|
2021-09-29 19:03:30 +00:00
|
|
|
size_t num_threads = state->io.pipeline.getNumThreads();
|
|
|
|
if (num_threads > 1)
|
|
|
|
{
|
|
|
|
state->pushing_async_executor = std::make_unique<PushingAsyncPipelineExecutor>(state->io.pipeline);
|
|
|
|
state->pushing_async_executor->start();
|
|
|
|
state->block = state->pushing_async_executor->getHeader();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
state->pushing_executor = std::make_unique<PushingPipelineExecutor>(state->io.pipeline);
|
|
|
|
state->pushing_executor->start();
|
|
|
|
state->block = state->pushing_executor->getHeader();
|
|
|
|
}
|
2021-10-18 14:53:42 +00:00
|
|
|
|
|
|
|
const auto & table_id = query_context->getInsertionTable();
|
|
|
|
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
|
|
|
|
{
|
|
|
|
if (!table_id.empty())
|
|
|
|
{
|
|
|
|
auto storage_ptr = DatabaseCatalog::instance().getTable(table_id, query_context);
|
|
|
|
state->columns_description = storage_ptr->getInMemoryMetadataPtr()->getColumns();
|
|
|
|
}
|
|
|
|
}
|
2021-08-19 20:27:40 +00:00
|
|
|
}
|
2021-09-29 19:03:30 +00:00
|
|
|
else if (state->io.pipeline.pulling())
|
2021-08-19 20:27:40 +00:00
|
|
|
{
|
2021-08-23 07:13:27 +00:00
|
|
|
state->block = state->io.pipeline.getHeader();
|
2021-09-19 18:24:06 +00:00
|
|
|
state->executor = std::make_unique<PullingAsyncPipelineExecutor>(state->io.pipeline);
|
2021-08-19 20:27:40 +00:00
|
|
|
}
|
2021-09-29 19:03:30 +00:00
|
|
|
else if (state->io.pipeline.completed())
|
2021-08-19 20:27:40 +00:00
|
|
|
{
|
2021-09-29 19:03:30 +00:00
|
|
|
CompletedPipelineExecutor executor(state->io.pipeline);
|
|
|
|
executor.execute();
|
2021-08-19 20:27:40 +00:00
|
|
|
}
|
2021-08-23 08:50:12 +00:00
|
|
|
|
2021-10-18 14:53:42 +00:00
|
|
|
if (state->columns_description)
|
|
|
|
next_packet_type = Protocol::Server::TableColumns;
|
|
|
|
else if (state->block)
|
2021-08-23 08:50:12 +00:00
|
|
|
next_packet_type = Protocol::Server::Data;
|
2021-08-19 20:27:40 +00:00
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
2021-08-17 19:59:51 +00:00
|
|
|
{
|
2021-08-19 20:27:40 +00:00
|
|
|
state->io.onException();
|
|
|
|
state->exception.emplace(e);
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|
2021-08-19 20:27:40 +00:00
|
|
|
catch (const std::exception & e)
|
2021-08-17 19:59:51 +00:00
|
|
|
{
|
2021-08-19 20:27:40 +00:00
|
|
|
state->io.onException();
|
|
|
|
state->exception.emplace(Exception::CreateFromSTDTag{}, e);
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|
2021-08-19 20:27:40 +00:00
|
|
|
catch (...)
|
2021-08-17 19:59:51 +00:00
|
|
|
{
|
2021-08-19 20:27:40 +00:00
|
|
|
state->io.onException();
|
|
|
|
state->exception.emplace("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-18 14:39:04 +00:00
|
|
|
void LocalConnection::sendData(const Block & block, const String &, bool)
|
|
|
|
{
|
2021-09-29 19:17:26 +00:00
|
|
|
if (!block)
|
|
|
|
return;
|
|
|
|
|
2021-09-29 19:03:30 +00:00
|
|
|
if (state->pushing_async_executor)
|
2021-08-18 14:39:04 +00:00
|
|
|
{
|
2021-09-29 19:03:30 +00:00
|
|
|
state->pushing_async_executor->push(std::move(block));
|
2021-08-21 10:55:54 +00:00
|
|
|
}
|
2021-09-29 19:03:30 +00:00
|
|
|
else if (state->pushing_executor)
|
2021-08-21 10:55:54 +00:00
|
|
|
{
|
2021-09-29 19:03:30 +00:00
|
|
|
state->pushing_executor->push(std::move(block));
|
2021-08-18 14:39:04 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-17 19:59:51 +00:00
|
|
|
void LocalConnection::sendCancel()
|
|
|
|
{
|
2021-09-29 19:03:30 +00:00
|
|
|
if (state->executor)
|
2021-08-19 20:27:40 +00:00
|
|
|
state->executor->cancel();
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|
|
|
|
|
2021-08-19 20:27:40 +00:00
|
|
|
bool LocalConnection::pullBlock(Block & block)
|
2021-08-17 19:59:51 +00:00
|
|
|
{
|
2021-09-29 19:03:30 +00:00
|
|
|
if (state->executor)
|
2021-08-21 10:55:54 +00:00
|
|
|
return state->executor->pull(block, query_context->getSettingsRef().interactive_delay / 1000);
|
2021-08-19 20:27:40 +00:00
|
|
|
|
|
|
|
return false;
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void LocalConnection::finishQuery()
|
|
|
|
{
|
2021-08-21 10:55:54 +00:00
|
|
|
next_packet_type = Protocol::Server::EndOfStream;
|
|
|
|
|
|
|
|
if (!state)
|
|
|
|
return;
|
|
|
|
|
2021-09-29 19:03:30 +00:00
|
|
|
if (state->executor)
|
2021-08-17 19:59:51 +00:00
|
|
|
{
|
2021-09-29 19:03:30 +00:00
|
|
|
state->executor.reset();
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|
2021-09-29 19:03:30 +00:00
|
|
|
else if (state->pushing_async_executor)
|
2021-08-17 19:59:51 +00:00
|
|
|
{
|
2021-09-29 19:03:30 +00:00
|
|
|
state->pushing_async_executor->finish();
|
|
|
|
}
|
|
|
|
else if (state->pushing_executor)
|
|
|
|
{
|
|
|
|
state->pushing_executor->finish();
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|
|
|
|
|
2021-08-19 20:27:40 +00:00
|
|
|
state->io.onFinish();
|
2021-08-20 08:38:50 +00:00
|
|
|
state.reset();
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool LocalConnection::poll(size_t)
|
|
|
|
{
|
2021-08-21 10:55:54 +00:00
|
|
|
if (!state)
|
|
|
|
return false;
|
|
|
|
|
2021-09-11 17:16:37 +00:00
|
|
|
/// Wait for next poll to collect current packet.
|
|
|
|
if (next_packet_type)
|
|
|
|
return true;
|
|
|
|
|
2022-01-11 21:27:42 +00:00
|
|
|
if (state->exception)
|
|
|
|
{
|
|
|
|
next_packet_type = Protocol::Server::Exception;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2021-08-21 15:29:28 +00:00
|
|
|
if (!state->is_finished)
|
2021-08-19 20:27:40 +00:00
|
|
|
{
|
2021-12-23 09:24:44 +00:00
|
|
|
if (send_progress && (state->after_send_progress.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay))
|
|
|
|
{
|
|
|
|
state->after_send_progress.restart();
|
|
|
|
next_packet_type = Protocol::Server::Progress;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2022-02-15 08:25:07 +00:00
|
|
|
if (send_profile_events && (state->after_send_profile_events.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay))
|
|
|
|
{
|
2022-02-15 12:11:13 +00:00
|
|
|
Block block;
|
2022-02-15 08:25:07 +00:00
|
|
|
state->after_send_profile_events.restart();
|
|
|
|
next_packet_type = Protocol::Server::ProfileEvents;
|
2022-02-15 12:11:13 +00:00
|
|
|
getProfileEvents(block);
|
|
|
|
state->block.emplace(std::move(block));
|
2022-02-15 08:25:07 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2021-08-21 15:29:28 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
pollImpl();
|
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
|
|
|
state->io.onException();
|
|
|
|
state->exception.emplace(e);
|
|
|
|
}
|
|
|
|
catch (const std::exception & e)
|
|
|
|
{
|
|
|
|
state->io.onException();
|
|
|
|
state->exception.emplace(Exception::CreateFromSTDTag{}, e);
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
state->io.onException();
|
|
|
|
state->exception.emplace("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
|
|
|
|
}
|
2021-08-19 20:27:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (state->exception)
|
|
|
|
{
|
|
|
|
next_packet_type = Protocol::Server::Exception;
|
2021-08-17 19:59:51 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2021-08-19 20:27:40 +00:00
|
|
|
if (state->is_finished && !state->sent_totals)
|
|
|
|
{
|
|
|
|
state->sent_totals = true;
|
|
|
|
Block totals;
|
|
|
|
|
2021-09-29 19:03:30 +00:00
|
|
|
if (state->executor)
|
2021-08-19 20:27:40 +00:00
|
|
|
totals = state->executor->getTotalsBlock();
|
|
|
|
|
|
|
|
if (totals)
|
|
|
|
{
|
|
|
|
next_packet_type = Protocol::Server::Totals;
|
|
|
|
state->block.emplace(totals);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (state->is_finished && !state->sent_extremes)
|
|
|
|
{
|
|
|
|
state->sent_extremes = true;
|
|
|
|
Block extremes;
|
|
|
|
|
2021-09-29 19:03:30 +00:00
|
|
|
if (state->executor)
|
2021-08-19 20:27:40 +00:00
|
|
|
extremes = state->executor->getExtremesBlock();
|
|
|
|
|
|
|
|
if (extremes)
|
|
|
|
{
|
|
|
|
next_packet_type = Protocol::Server::Extremes;
|
|
|
|
state->block.emplace(extremes);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-23 09:24:44 +00:00
|
|
|
if (state->is_finished && !state->sent_profile_info)
|
|
|
|
{
|
|
|
|
state->sent_profile_info = true;
|
|
|
|
|
|
|
|
if (state->executor)
|
|
|
|
{
|
|
|
|
next_packet_type = Protocol::Server::ProfileInfo;
|
|
|
|
state->profile_info = state->executor->getProfileInfo();
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-16 10:17:00 +00:00
|
|
|
if (state->is_finished)
|
2021-08-23 07:13:27 +00:00
|
|
|
{
|
2021-10-16 10:17:00 +00:00
|
|
|
finishQuery();
|
2021-08-23 07:13:27 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2021-09-19 18:24:06 +00:00
|
|
|
if (state->block && state->block.value())
|
2021-08-17 19:59:51 +00:00
|
|
|
{
|
|
|
|
next_packet_type = Protocol::Server::Data;
|
2021-08-19 20:27:40 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
return false;
|
|
|
|
}
|
2021-08-17 19:59:51 +00:00
|
|
|
|
2021-08-19 20:27:40 +00:00
|
|
|
bool LocalConnection::pollImpl()
|
|
|
|
{
|
|
|
|
Block block;
|
|
|
|
auto next_read = pullBlock(block);
|
2021-10-16 10:17:00 +00:00
|
|
|
|
|
|
|
if (block && !state->io.null_format)
|
2021-08-19 20:27:40 +00:00
|
|
|
{
|
2021-09-04 18:19:01 +00:00
|
|
|
state->block.emplace(block);
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|
2021-08-19 20:27:40 +00:00
|
|
|
else if (!next_read)
|
2021-08-17 19:59:51 +00:00
|
|
|
{
|
2021-08-19 20:27:40 +00:00
|
|
|
state->is_finished = true;
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|
2021-08-19 20:27:40 +00:00
|
|
|
|
2021-08-17 19:59:51 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
Packet LocalConnection::receivePacket()
|
|
|
|
{
|
|
|
|
Packet packet;
|
2021-09-11 11:34:22 +00:00
|
|
|
if (!state)
|
|
|
|
{
|
|
|
|
packet.type = Protocol::Server::EndOfStream;
|
|
|
|
return packet;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!next_packet_type)
|
2021-09-22 21:35:29 +00:00
|
|
|
poll(0);
|
2021-09-11 11:34:22 +00:00
|
|
|
|
|
|
|
if (!next_packet_type)
|
2021-08-21 15:29:28 +00:00
|
|
|
{
|
|
|
|
packet.type = Protocol::Server::EndOfStream;
|
|
|
|
return packet;
|
|
|
|
}
|
2021-08-21 10:55:54 +00:00
|
|
|
|
2021-08-17 19:59:51 +00:00
|
|
|
packet.type = next_packet_type.value();
|
|
|
|
switch (next_packet_type.value())
|
|
|
|
{
|
2021-08-19 20:27:40 +00:00
|
|
|
case Protocol::Server::Totals: [[fallthrough]];
|
|
|
|
case Protocol::Server::Extremes: [[fallthrough]];
|
2021-09-11 17:16:37 +00:00
|
|
|
case Protocol::Server::Log: [[fallthrough]];
|
2021-08-17 19:59:51 +00:00
|
|
|
case Protocol::Server::Data:
|
2021-08-30 11:04:59 +00:00
|
|
|
case Protocol::Server::ProfileEvents:
|
2021-08-17 19:59:51 +00:00
|
|
|
{
|
2021-09-19 18:24:06 +00:00
|
|
|
if (state->block && state->block.value())
|
2021-08-17 19:59:51 +00:00
|
|
|
{
|
2021-09-19 18:24:06 +00:00
|
|
|
packet.block = std::move(state->block.value());
|
2021-08-19 20:27:40 +00:00
|
|
|
state->block.reset();
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|
2021-10-18 14:53:42 +00:00
|
|
|
next_packet_type.reset();
|
|
|
|
break;
|
|
|
|
}
|
2021-12-23 09:24:44 +00:00
|
|
|
case Protocol::Server::ProfileInfo:
|
|
|
|
{
|
|
|
|
if (state->profile_info)
|
|
|
|
{
|
|
|
|
packet.profile_info = std::move(*state->profile_info);
|
|
|
|
state->profile_info.reset();
|
|
|
|
}
|
|
|
|
next_packet_type.reset();
|
|
|
|
break;
|
|
|
|
}
|
2021-10-18 14:53:42 +00:00
|
|
|
case Protocol::Server::TableColumns:
|
|
|
|
{
|
|
|
|
if (state->columns_description)
|
|
|
|
{
|
|
|
|
/// Send external table name (empty name is the main table)
|
|
|
|
/// (see TCPHandler::sendTableColumns)
|
|
|
|
packet.multistring_message = {"", state->columns_description->toString()};
|
|
|
|
}
|
|
|
|
|
|
|
|
if (state->block)
|
|
|
|
{
|
|
|
|
next_packet_type = Protocol::Server::Data;
|
|
|
|
}
|
|
|
|
|
2021-08-19 20:27:40 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
case Protocol::Server::Exception:
|
|
|
|
{
|
|
|
|
packet.exception = std::make_unique<Exception>(*state->exception);
|
2021-10-18 14:53:42 +00:00
|
|
|
next_packet_type.reset();
|
2021-08-17 19:59:51 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
case Protocol::Server::Progress:
|
|
|
|
{
|
2021-08-19 20:27:40 +00:00
|
|
|
packet.progress = std::move(state->progress);
|
|
|
|
state->progress.reset();
|
2021-10-18 14:53:42 +00:00
|
|
|
next_packet_type.reset();
|
2021-08-17 19:59:51 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
case Protocol::Server::EndOfStream:
|
|
|
|
{
|
2021-10-18 14:53:42 +00:00
|
|
|
next_packet_type.reset();
|
2021-08-17 19:59:51 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
default:
|
2021-09-26 21:22:04 +00:00
|
|
|
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER,
|
|
|
|
"Unknown packet {} for {}", toString(packet.type), getDescription());
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|
|
|
|
|
2021-08-21 10:55:54 +00:00
|
|
|
return packet;
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|
|
|
|
|
2021-08-23 08:50:12 +00:00
|
|
|
void LocalConnection::getServerVersion(
|
2021-08-27 08:46:31 +00:00
|
|
|
const ConnectionTimeouts & /* timeouts */, String & /* name */,
|
|
|
|
UInt64 & /* version_major */, UInt64 & /* version_minor */,
|
2021-09-04 18:19:01 +00:00
|
|
|
UInt64 & /* version_patch */, UInt64 & /* revision */)
|
2021-08-23 08:50:12 +00:00
|
|
|
{
|
2021-09-04 18:19:01 +00:00
|
|
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
|
2021-08-23 08:50:12 +00:00
|
|
|
}
|
|
|
|
|
2022-02-06 15:11:18 +00:00
|
|
|
void LocalConnection::setDefaultDatabase(const String & database)
|
2021-08-23 08:50:12 +00:00
|
|
|
{
|
2022-02-06 15:11:18 +00:00
|
|
|
current_database = database;
|
2021-08-23 08:50:12 +00:00
|
|
|
}
|
|
|
|
|
2021-09-04 18:19:01 +00:00
|
|
|
UInt64 LocalConnection::getServerRevision(const ConnectionTimeouts &)
|
2021-08-23 08:50:12 +00:00
|
|
|
{
|
2021-09-04 18:19:01 +00:00
|
|
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
|
2021-08-23 08:50:12 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
const String & LocalConnection::getServerTimezone(const ConnectionTimeouts &)
|
|
|
|
{
|
2021-09-04 18:19:01 +00:00
|
|
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
|
2021-08-23 08:50:12 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
const String & LocalConnection::getServerDisplayName(const ConnectionTimeouts &)
|
|
|
|
{
|
2021-09-04 18:19:01 +00:00
|
|
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
|
|
|
|
}
|
|
|
|
|
|
|
|
void LocalConnection::sendExternalTablesData(ExternalTablesData &)
|
|
|
|
{
|
2021-09-24 08:29:01 +00:00
|
|
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
|
2021-08-23 08:50:12 +00:00
|
|
|
}
|
|
|
|
|
2021-12-09 10:39:28 +00:00
|
|
|
void LocalConnection::sendMergeTreeReadTaskResponse(const PartitionReadResponse &)
|
|
|
|
{
|
|
|
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
|
|
|
|
}
|
|
|
|
|
2022-02-15 12:11:13 +00:00
|
|
|
ServerConnectionPtr LocalConnection::createConnection(const ConnectionParameters &, ContextPtr current_context, bool send_progress, bool send_profile_events)
|
2021-09-11 11:34:22 +00:00
|
|
|
{
|
2022-02-15 12:11:13 +00:00
|
|
|
return std::make_unique<LocalConnection>(current_context, send_progress, send_profile_events);
|
2021-09-11 11:34:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-08-17 19:59:51 +00:00
|
|
|
}
|