ClickHouse/src/Client/LocalConnection.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

521 lines
14 KiB
C++
Raw Normal View History

2021-08-17 19:59:51 +00:00
#include "LocalConnection.h"
#include <Interpreters/executeQuery.h>
2021-09-29 19:03:30 +00:00
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Executors/PushingAsyncPipelineExecutor.h>
2021-08-18 14:39:04 +00:00
#include <Storages/IStorage.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Core/Protocol.h>
2021-08-17 19:59:51 +00:00
2021-09-26 21:22:04 +00:00
2021-08-17 19:59:51 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_PACKET_FROM_SERVER;
2021-08-19 20:27:40 +00:00
extern const int UNKNOWN_EXCEPTION;
2021-09-04 18:19:01 +00:00
extern const int NOT_IMPLEMENTED;
Send profile events for INSERT queries (previously only SELECT was supported) Reproducer: echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 However, clickhouse-local is differnt, it does sent the periodically, but only if query was long enough, i.e.: # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment) v2: Proper support ProfileEvents in INSERTs (with protocol change) v3: Receive profile events on INSERT queries Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-20 15:19:40 +00:00
extern const int LOGICAL_ERROR;
2021-08-17 19:59:51 +00:00
}
2022-03-05 06:22:56 +00:00
LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_, bool send_profile_events_, const String & server_display_name_)
2021-08-17 19:59:51 +00:00
: WithContext(context_)
2021-09-04 18:19:01 +00:00
, session(getContext(), ClientInfo::Interface::LOCAL)
2021-09-11 17:16:37 +00:00
, send_progress(send_progress_)
2022-02-15 08:25:07 +00:00
, send_profile_events(send_profile_events_)
2022-03-05 06:22:56 +00:00
, server_display_name(server_display_name_)
2021-08-17 19:59:51 +00:00
{
2021-08-20 08:38:50 +00:00
/// Authenticate and create a context to execute queries.
session.authenticate("default", "", Poco::Net::SocketAddress{});
2021-09-24 08:29:01 +00:00
session.makeSessionContext();
2021-08-17 19:59:51 +00:00
}
2021-08-21 10:55:54 +00:00
LocalConnection::~LocalConnection()
{
state.reset();
2021-08-21 10:55:54 +00:00
}
bool LocalConnection::hasReadPendingData() const
{
return !state->is_finished;
}
std::optional<UInt64> LocalConnection::checkPacket(size_t)
{
return next_packet_type;
}
void LocalConnection::updateProgress(const Progress & value)
{
state->progress.incrementPiecewiseAtomically(value);
}
Send profile events for INSERT queries (previously only SELECT was supported) Reproducer: echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 However, clickhouse-local is differnt, it does sent the periodically, but only if query was long enough, i.e.: # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment) v2: Proper support ProfileEvents in INSERTs (with protocol change) v3: Receive profile events on INSERT queries Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-20 15:19:40 +00:00
void LocalConnection::sendProfileEvents()
2022-02-15 08:25:07 +00:00
{
Send profile events for INSERT queries (previously only SELECT was supported) Reproducer: echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 However, clickhouse-local is differnt, it does sent the periodically, but only if query was long enough, i.e.: # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment) v2: Proper support ProfileEvents in INSERTs (with protocol change) v3: Receive profile events on INSERT queries Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-20 15:19:40 +00:00
Block profile_block;
state->after_send_profile_events.restart();
next_packet_type = Protocol::Server::ProfileEvents;
ProfileEvents::getProfileEvents(server_display_name, state->profile_queue, profile_block, last_sent_snapshots);
state->block.emplace(std::move(profile_block));
2022-02-15 08:25:07 +00:00
}
2021-08-17 19:59:51 +00:00
void LocalConnection::sendQuery(
const ConnectionTimeouts &,
2021-10-16 08:41:50 +00:00
const String & query,
const NameToNameMap & query_parameters,
2021-10-16 08:41:50 +00:00
const String & query_id,
UInt64 stage,
2021-08-17 19:59:51 +00:00
const Settings *,
const ClientInfo * client_info,
2022-05-06 15:04:03 +00:00
bool,
std::function<void(const Progress &)> process_progress_callback)
2021-08-17 19:59:51 +00:00
{
if (!query_parameters.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "clickhouse local does not support query parameters");
/// Suggestion comes without client_info.
if (client_info)
query_context = session.makeQueryContext(*client_info);
else
query_context = session.makeQueryContext();
2021-10-16 08:41:50 +00:00
query_context->setCurrentQueryId(query_id);
2021-09-24 08:29:01 +00:00
if (send_progress)
2021-10-31 21:33:03 +00:00
{
2022-05-06 15:04:03 +00:00
query_context->setProgressCallback([this] (const Progress & value) { this->updateProgress(value); });
2021-10-31 21:33:03 +00:00
query_context->setFileProgressCallback([this](const FileProgress & value) { this->updateProgress(Progress(value)); });
}
2022-02-06 15:11:18 +00:00
if (!current_database.empty())
query_context->setCurrentDatabase(current_database);
2021-09-24 08:29:01 +00:00
2021-08-19 20:27:40 +00:00
state.reset();
state.emplace();
2021-10-16 08:41:50 +00:00
state->query_id = query_id;
state->query = query;
2022-03-05 06:22:56 +00:00
state->query_scope_holder = std::make_unique<CurrentThread::QueryScope>(query_context);
2021-10-16 08:41:50 +00:00
state->stage = QueryProcessingStage::Enum(stage);
2022-02-15 08:25:07 +00:00
state->profile_queue = std::make_shared<InternalProfileEventsQueue>(std::numeric_limits<int>::max());
CurrentThread::attachInternalProfileEventsQueue(state->profile_queue);
2021-08-19 20:27:40 +00:00
2021-09-11 17:16:37 +00:00
if (send_progress)
state->after_send_progress.restart();
2021-08-20 08:38:50 +00:00
2022-02-15 08:25:07 +00:00
if (send_profile_events)
state->after_send_profile_events.restart();
2021-09-11 17:16:37 +00:00
next_packet_type.reset();
2021-09-11 12:03:28 +00:00
2021-08-19 20:27:40 +00:00
try
{
2021-09-16 16:14:17 +00:00
state->io = executeQuery(state->query, query_context, false, state->stage);
2021-08-23 07:13:27 +00:00
2021-09-29 19:03:30 +00:00
if (state->io.pipeline.pushing())
2021-08-19 20:27:40 +00:00
{
2021-09-29 19:03:30 +00:00
size_t num_threads = state->io.pipeline.getNumThreads();
if (num_threads > 1)
{
state->pushing_async_executor = std::make_unique<PushingAsyncPipelineExecutor>(state->io.pipeline);
state->pushing_async_executor->start();
state->block = state->pushing_async_executor->getHeader();
}
else
{
state->pushing_executor = std::make_unique<PushingPipelineExecutor>(state->io.pipeline);
state->pushing_executor->start();
state->block = state->pushing_executor->getHeader();
}
const auto & table_id = query_context->getInsertionTable();
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
{
if (!table_id.empty())
{
auto storage_ptr = DatabaseCatalog::instance().getTable(table_id, query_context);
state->columns_description = storage_ptr->getInMemoryMetadataPtr()->getColumns();
}
}
2021-08-19 20:27:40 +00:00
}
2021-09-29 19:03:30 +00:00
else if (state->io.pipeline.pulling())
2021-08-19 20:27:40 +00:00
{
2021-08-23 07:13:27 +00:00
state->block = state->io.pipeline.getHeader();
2021-09-19 18:24:06 +00:00
state->executor = std::make_unique<PullingAsyncPipelineExecutor>(state->io.pipeline);
2021-08-19 20:27:40 +00:00
}
2021-09-29 19:03:30 +00:00
else if (state->io.pipeline.completed())
2021-08-19 20:27:40 +00:00
{
2021-09-29 19:03:30 +00:00
CompletedPipelineExecutor executor(state->io.pipeline);
2022-05-06 15:04:03 +00:00
if (process_progress_callback)
{
auto callback = [this, &process_progress_callback]()
{
if (state->is_cancelled)
return true;
process_progress_callback(state->progress.fetchAndResetPiecewiseAtomically());
return false;
};
executor.setCancelCallback(callback, query_context->getSettingsRef().interactive_delay / 1000);
}
2021-09-29 19:03:30 +00:00
executor.execute();
2021-08-19 20:27:40 +00:00
}
2021-08-23 08:50:12 +00:00
if (state->columns_description)
next_packet_type = Protocol::Server::TableColumns;
else if (state->block)
2021-08-23 08:50:12 +00:00
next_packet_type = Protocol::Server::Data;
2021-08-19 20:27:40 +00:00
}
catch (const Exception & e)
2021-08-17 19:59:51 +00:00
{
2021-08-19 20:27:40 +00:00
state->io.onException();
2022-04-15 23:56:45 +00:00
state->exception.reset(e.clone());
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
catch (const std::exception & e)
2021-08-17 19:59:51 +00:00
{
2021-08-19 20:27:40 +00:00
state->io.onException();
2022-04-15 23:56:45 +00:00
state->exception = std::make_unique<Exception>(Exception::CreateFromSTDTag{}, e);
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
catch (...)
2021-08-17 19:59:51 +00:00
{
2021-08-19 20:27:40 +00:00
state->io.onException();
2023-01-23 13:16:14 +00:00
state->exception = std::make_unique<Exception>(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception");
2021-08-17 19:59:51 +00:00
}
}
2021-08-18 14:39:04 +00:00
void LocalConnection::sendData(const Block & block, const String &, bool)
{
2021-09-29 19:17:26 +00:00
if (!block)
return;
2021-09-29 19:03:30 +00:00
if (state->pushing_async_executor)
state->pushing_async_executor->push(block);
2021-09-29 19:03:30 +00:00
else if (state->pushing_executor)
state->pushing_executor->push(block);
Send profile events for INSERT queries (previously only SELECT was supported) Reproducer: echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 However, clickhouse-local is differnt, it does sent the periodically, but only if query was long enough, i.e.: # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment) v2: Proper support ProfileEvents in INSERTs (with protocol change) v3: Receive profile events on INSERT queries Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-20 15:19:40 +00:00
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown executor");
Send profile events for INSERT queries (previously only SELECT was supported) Reproducer: echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 However, clickhouse-local is differnt, it does sent the periodically, but only if query was long enough, i.e.: # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment) v2: Proper support ProfileEvents in INSERTs (with protocol change) v3: Receive profile events on INSERT queries Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-20 15:19:40 +00:00
if (send_profile_events)
sendProfileEvents();
2021-08-18 14:39:04 +00:00
}
2021-08-17 19:59:51 +00:00
void LocalConnection::sendCancel()
{
2022-05-06 15:04:03 +00:00
state->is_cancelled = true;
2021-09-29 19:03:30 +00:00
if (state->executor)
2021-08-19 20:27:40 +00:00
state->executor->cancel();
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
bool LocalConnection::pullBlock(Block & block)
2021-08-17 19:59:51 +00:00
{
2021-09-29 19:03:30 +00:00
if (state->executor)
2021-08-21 10:55:54 +00:00
return state->executor->pull(block, query_context->getSettingsRef().interactive_delay / 1000);
2021-08-19 20:27:40 +00:00
return false;
2021-08-17 19:59:51 +00:00
}
void LocalConnection::finishQuery()
{
2021-08-21 10:55:54 +00:00
next_packet_type = Protocol::Server::EndOfStream;
2021-09-29 19:03:30 +00:00
if (state->executor)
2021-08-17 19:59:51 +00:00
{
2021-09-29 19:03:30 +00:00
state->executor.reset();
2021-08-17 19:59:51 +00:00
}
2021-09-29 19:03:30 +00:00
else if (state->pushing_async_executor)
2021-08-17 19:59:51 +00:00
{
2021-09-29 19:03:30 +00:00
state->pushing_async_executor->finish();
}
else if (state->pushing_executor)
{
state->pushing_executor->finish();
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
state->io.onFinish();
2021-08-20 08:38:50 +00:00
state.reset();
last_sent_snapshots.clear();
2021-08-17 19:59:51 +00:00
}
bool LocalConnection::poll(size_t)
{
2021-08-21 10:55:54 +00:00
if (!state)
return false;
2021-09-11 17:16:37 +00:00
/// Wait for next poll to collect current packet.
if (next_packet_type)
return true;
2022-01-11 21:27:42 +00:00
if (state->exception)
{
next_packet_type = Protocol::Server::Exception;
return true;
}
2021-08-21 15:29:28 +00:00
if (!state->is_finished)
2021-08-19 20:27:40 +00:00
{
2021-12-23 09:24:44 +00:00
if (send_progress && (state->after_send_progress.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay))
{
state->after_send_progress.restart();
next_packet_type = Protocol::Server::Progress;
return true;
}
2022-02-15 08:25:07 +00:00
if (send_profile_events && (state->after_send_profile_events.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay))
{
Send profile events for INSERT queries (previously only SELECT was supported) Reproducer: echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 However, clickhouse-local is differnt, it does sent the periodically, but only if query was long enough, i.e.: # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment) v2: Proper support ProfileEvents in INSERTs (with protocol change) v3: Receive profile events on INSERT queries Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-20 15:19:40 +00:00
sendProfileEvents();
2022-02-15 08:25:07 +00:00
return true;
}
2021-08-21 15:29:28 +00:00
try
{
pollImpl();
}
catch (const Exception & e)
{
state->io.onException();
2022-04-15 23:56:45 +00:00
state->exception.reset(e.clone());
2021-08-21 15:29:28 +00:00
}
catch (const std::exception & e)
{
state->io.onException();
2022-04-15 23:56:45 +00:00
state->exception = std::make_unique<Exception>(Exception::CreateFromSTDTag{}, e);
2021-08-21 15:29:28 +00:00
}
catch (...)
{
state->io.onException();
2023-01-23 13:16:14 +00:00
state->exception = std::make_unique<Exception>(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception");
2021-08-21 15:29:28 +00:00
}
2021-08-19 20:27:40 +00:00
}
if (state->exception)
{
next_packet_type = Protocol::Server::Exception;
2021-08-17 19:59:51 +00:00
return true;
}
2021-08-19 20:27:40 +00:00
if (state->is_finished && !state->sent_totals)
{
state->sent_totals = true;
Block totals;
2021-09-29 19:03:30 +00:00
if (state->executor)
2021-08-19 20:27:40 +00:00
totals = state->executor->getTotalsBlock();
if (totals)
{
next_packet_type = Protocol::Server::Totals;
state->block.emplace(totals);
return true;
}
}
if (state->is_finished && !state->sent_extremes)
{
state->sent_extremes = true;
Block extremes;
2021-09-29 19:03:30 +00:00
if (state->executor)
2021-08-19 20:27:40 +00:00
extremes = state->executor->getExtremesBlock();
if (extremes)
{
next_packet_type = Protocol::Server::Extremes;
state->block.emplace(extremes);
return true;
}
}
2021-12-23 09:24:44 +00:00
if (state->is_finished && !state->sent_profile_info)
{
state->sent_profile_info = true;
if (state->executor)
{
next_packet_type = Protocol::Server::ProfileInfo;
state->profile_info = state->executor->getProfileInfo();
return true;
}
}
if (state->is_finished && !state->sent_profile_events)
{
state->sent_profile_events = true;
if (send_profile_events && state->executor)
{
Send profile events for INSERT queries (previously only SELECT was supported) Reproducer: echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 However, clickhouse-local is differnt, it does sent the periodically, but only if query was long enough, i.e.: # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment) v2: Proper support ProfileEvents in INSERTs (with protocol change) v3: Receive profile events on INSERT queries Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-20 15:19:40 +00:00
sendProfileEvents();
return true;
}
}
if (state->is_finished)
2021-08-23 07:13:27 +00:00
{
finishQuery();
2021-08-23 07:13:27 +00:00
return true;
}
2021-09-19 18:24:06 +00:00
if (state->block && state->block.value())
2021-08-17 19:59:51 +00:00
{
next_packet_type = Protocol::Server::Data;
2021-08-19 20:27:40 +00:00
return true;
}
return false;
}
2021-08-17 19:59:51 +00:00
2021-08-19 20:27:40 +00:00
bool LocalConnection::pollImpl()
{
Block block;
auto next_read = pullBlock(block);
if (block && !state->io.null_format)
2021-08-19 20:27:40 +00:00
{
2021-09-04 18:19:01 +00:00
state->block.emplace(block);
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
else if (!next_read)
2021-08-17 19:59:51 +00:00
{
2021-08-19 20:27:40 +00:00
state->is_finished = true;
2021-08-17 19:59:51 +00:00
}
2021-08-19 20:27:40 +00:00
2021-08-17 19:59:51 +00:00
return true;
}
Packet LocalConnection::receivePacket()
{
Packet packet;
2021-09-11 11:34:22 +00:00
if (!state)
{
packet.type = Protocol::Server::EndOfStream;
return packet;
}
if (!next_packet_type)
poll(0);
2021-09-11 11:34:22 +00:00
if (!next_packet_type)
2021-08-21 15:29:28 +00:00
{
packet.type = Protocol::Server::EndOfStream;
return packet;
}
2021-08-21 10:55:54 +00:00
2021-08-17 19:59:51 +00:00
packet.type = next_packet_type.value();
switch (next_packet_type.value())
{
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
case Protocol::Server::Log:
2021-08-17 19:59:51 +00:00
case Protocol::Server::Data:
2021-08-30 11:04:59 +00:00
case Protocol::Server::ProfileEvents:
2021-08-17 19:59:51 +00:00
{
2021-09-19 18:24:06 +00:00
if (state->block && state->block.value())
2021-08-17 19:59:51 +00:00
{
2021-09-19 18:24:06 +00:00
packet.block = std::move(state->block.value());
2021-08-19 20:27:40 +00:00
state->block.reset();
2021-08-17 19:59:51 +00:00
}
next_packet_type.reset();
break;
}
2021-12-23 09:24:44 +00:00
case Protocol::Server::ProfileInfo:
{
if (state->profile_info)
{
packet.profile_info = *state->profile_info;
2021-12-23 09:24:44 +00:00
state->profile_info.reset();
}
next_packet_type.reset();
break;
}
case Protocol::Server::TableColumns:
{
if (state->columns_description)
{
/// Send external table name (empty name is the main table)
/// (see TCPHandler::sendTableColumns)
packet.multistring_message = {"", state->columns_description->toString()};
}
if (state->block)
{
next_packet_type = Protocol::Server::Data;
}
2021-08-19 20:27:40 +00:00
break;
}
case Protocol::Server::Exception:
{
2022-04-15 23:56:45 +00:00
packet.exception.reset(state->exception->clone());
next_packet_type.reset();
2021-08-17 19:59:51 +00:00
break;
}
case Protocol::Server::Progress:
{
2022-05-06 15:04:03 +00:00
packet.progress = state->progress.fetchAndResetPiecewiseAtomically();
2021-08-19 20:27:40 +00:00
state->progress.reset();
next_packet_type.reset();
2021-08-17 19:59:51 +00:00
break;
}
case Protocol::Server::EndOfStream:
{
next_packet_type.reset();
2021-08-17 19:59:51 +00:00
break;
}
default:
2021-09-26 21:22:04 +00:00
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER,
"Unknown packet {} for {}", toString(packet.type), getDescription());
2021-08-17 19:59:51 +00:00
}
2021-08-21 10:55:54 +00:00
return packet;
2021-08-17 19:59:51 +00:00
}
2021-08-23 08:50:12 +00:00
void LocalConnection::getServerVersion(
2021-08-27 08:46:31 +00:00
const ConnectionTimeouts & /* timeouts */, String & /* name */,
UInt64 & /* version_major */, UInt64 & /* version_minor */,
2021-09-04 18:19:01 +00:00
UInt64 & /* version_patch */, UInt64 & /* revision */)
2021-08-23 08:50:12 +00:00
{
2021-09-04 18:19:01 +00:00
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
2021-08-23 08:50:12 +00:00
}
2022-02-06 15:11:18 +00:00
void LocalConnection::setDefaultDatabase(const String & database)
2021-08-23 08:50:12 +00:00
{
2022-02-06 15:11:18 +00:00
current_database = database;
2021-08-23 08:50:12 +00:00
}
2021-09-04 18:19:01 +00:00
UInt64 LocalConnection::getServerRevision(const ConnectionTimeouts &)
2021-08-23 08:50:12 +00:00
{
2021-09-04 18:19:01 +00:00
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
2021-08-23 08:50:12 +00:00
}
const String & LocalConnection::getServerTimezone(const ConnectionTimeouts &)
{
2021-09-04 18:19:01 +00:00
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
2021-08-23 08:50:12 +00:00
}
const String & LocalConnection::getServerDisplayName(const ConnectionTimeouts &)
{
2021-09-04 18:19:01 +00:00
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
void LocalConnection::sendExternalTablesData(ExternalTablesData &)
{
2021-09-24 08:29:01 +00:00
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
2021-08-23 08:50:12 +00:00
}
2023-02-03 13:34:18 +00:00
void LocalConnection::sendMergeTreeReadTaskResponse(const ParallelReadResponse &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
2022-03-05 06:22:56 +00:00
ServerConnectionPtr LocalConnection::createConnection(
const ConnectionParameters &,
ContextPtr current_context,
bool send_progress,
bool send_profile_events,
const String & server_display_name)
2021-09-11 11:34:22 +00:00
{
2022-03-05 06:22:56 +00:00
return std::make_unique<LocalConnection>(current_context, send_progress, send_profile_events, server_display_name);
2021-09-11 11:34:22 +00:00
}
2021-08-17 19:59:51 +00:00
}