Merge pull request #30336 from kssenii/fix-local-defaults

Send table columns in clickhouse-local
This commit is contained in:
Kseniia Sumarokova 2021-10-20 21:18:02 +03:00 committed by GitHub
commit cae31437e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 44 additions and 3 deletions

View File

@ -5,7 +5,7 @@
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Executors/PushingAsyncPipelineExecutor.h>
#include <Storages/IStorage.h>
#include "Core/Protocol.h"
#include <Core/Protocol.h>
namespace DB
@ -105,6 +105,16 @@ void LocalConnection::sendQuery(
state->pushing_executor->start();
state->block = state->pushing_executor->getHeader();
}
const auto & table_id = query_context->getInsertionTable();
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
{
if (!table_id.empty())
{
auto storage_ptr = DatabaseCatalog::instance().getTable(table_id, query_context);
state->columns_description = storage_ptr->getInMemoryMetadataPtr()->getColumns();
}
}
}
else if (state->io.pipeline.pulling())
{
@ -117,7 +127,9 @@ void LocalConnection::sendQuery(
executor.execute();
}
if (state->block)
if (state->columns_description)
next_packet_type = Protocol::Server::TableColumns;
else if (state->block)
next_packet_type = Protocol::Server::Data;
}
catch (const Exception & e)
@ -338,21 +350,41 @@ Packet LocalConnection::receivePacket()
packet.block = std::move(state->block.value());
state->block.reset();
}
next_packet_type.reset();
break;
}
case Protocol::Server::TableColumns:
{
if (state->columns_description)
{
/// Send external table name (empty name is the main table)
/// (see TCPHandler::sendTableColumns)
packet.multistring_message = {"", state->columns_description->toString()};
}
if (state->block)
{
next_packet_type = Protocol::Server::Data;
}
break;
}
case Protocol::Server::Exception:
{
packet.exception = std::make_unique<Exception>(*state->exception);
next_packet_type.reset();
break;
}
case Protocol::Server::Progress:
{
packet.progress = std::move(state->progress);
state->progress.reset();
next_packet_type.reset();
break;
}
case Protocol::Server::EndOfStream:
{
next_packet_type.reset();
break;
}
default:
@ -360,7 +392,6 @@ Packet LocalConnection::receivePacket()
"Unknown packet {} for {}", toString(packet.type), getDescription());
}
next_packet_type.reset();
return packet;
}

View File

@ -5,6 +5,7 @@
#include <QueryPipeline/BlockIO.h>
#include <IO/TimeoutSetter.h>
#include <Interpreters/Session.h>
#include <Storages/ColumnsDescription.h>
namespace DB
@ -33,6 +34,7 @@ struct LocalQueryState
/// Current block to be sent next.
std::optional<Block> block;
std::optional<ColumnsDescription> columns_description;
/// Is request cancelled
bool is_cancelled = false;

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_LOCAL} --query "create table t (n int, m int default 42) engine=Memory;insert into t values (1, NULL);select * from t"