Merge pull request #6891 from yandex/low_cardinality_in_native_http

Low cardinality in native http
This commit is contained in:
alexey-milovidov 2019-09-13 09:01:08 +03:00 committed by GitHub
commit 12c26e6013
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 55 additions and 18 deletions

View File

@ -1000,8 +1000,7 @@ void TCPHandler::receiveUnexpectedData()
auto skip_block_in = std::make_shared<NativeBlockInputStream>(
*maybe_compressed_in,
last_block_in.header,
client_revision,
!connection_context.getSettingsRef().low_cardinality_allow_in_native_format);
client_revision);
Block skip_block = skip_block_in->read();
throw NetException("Unexpected packet Data received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
@ -1028,8 +1027,7 @@ void TCPHandler::initBlockInput()
state.block_in = std::make_shared<NativeBlockInputStream>(
*state.maybe_compressed_in,
header,
client_revision,
!connection_context.getSettingsRef().low_cardinality_allow_in_native_format);
client_revision);
}
}

View File

@ -29,8 +29,8 @@ NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server
{
}
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_, bool convert_types_to_low_cardinality_)
: istr(istr_), header(header_), server_revision(server_revision_), convert_types_to_low_cardinality(convert_types_to_low_cardinality_)
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_)
: istr(istr_), header(header_), server_revision(server_revision_)
{
}
@ -153,12 +153,15 @@ Block NativeBlockInputStream::readImpl()
column.column = std::move(read_column);
/// Support insert from old clients without low cardinality type.
bool revision_without_low_cardinality = server_revision && server_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE;
if (header && (convert_types_to_low_cardinality || revision_without_low_cardinality))
if (header)
{
column.column = recursiveLowCardinalityConversion(column.column, column.type, header.getByPosition(i).type);
column.type = header.getByPosition(i).type;
/// Support insert from old clients without low cardinality type.
auto & header_column = header.getByName(column.name);
if (!header_column.type->equals(*column.type))
{
column.column = recursiveLowCardinalityConversion(column.column, column.type, header.getByPosition(i).type);
column.type = header.getByPosition(i).type;
}
}
res.insert(std::move(column));
@ -177,6 +180,22 @@ Block NativeBlockInputStream::readImpl()
index_column_it = index_block_it->columns.begin();
}
if (rows && header)
{
/// Allow to skip columns. Fill them with default values.
Block tmp_res;
for (auto & col : header)
{
if (res.has(col.name))
tmp_res.insert(std::move(res.getByName(col.name)));
else
tmp_res.insert({col.type->createColumn()->cloneResized(rows), col.type, col.name});
}
res.swap(tmp_res);
}
return res;
}

View File

@ -65,7 +65,7 @@ public:
/// For cases when data structure (header) is known in advance.
/// NOTE We may use header for data validation and/or type conversions. It is not implemented.
NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_, bool convert_types_to_low_cardinality_ = false);
NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_);
/// For cases when we have an index. It allows to skip columns. Only columns specified in the index will be read.
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
@ -91,8 +91,6 @@ private:
IndexForNativeFormat::Blocks::const_iterator index_block_end;
IndexOfBlockForNativeFormat::Columns::const_iterator index_column_it;
bool convert_types_to_low_cardinality = false;
/// If an index is specified, then `istr` must be CompressedReadBufferFromFile. Unused otherwise.
CompressedReadBufferFromFile * istr_concrete = nullptr;

View File

@ -65,10 +65,7 @@ Block InterpreterInsertQuery::getSampleBlock(const ASTInsertQuery & query, const
/// If the query does not include information about columns
if (!query.columns)
{
/// Format Native ignores header and write blocks as is.
if (query.format == "Native")
return {};
else if (query.no_destination)
if (query.no_destination)
return table->getSampleBlockWithVirtuals();
else
return table_sample_non_materialized;

View File

@ -0,0 +1,4 @@
abc
----
abc
abc

View File

@ -0,0 +1,21 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --query="drop table if exists tab_str";
$CLICKHOUSE_CLIENT --query="drop table if exists tab_str_lc";
$CLICKHOUSE_CLIENT --query="create table tab_str (x String) engine = MergeTree order by tuple()";
$CLICKHOUSE_CLIENT --query="create table tab_str_lc (x LowCardinality(String)) engine = MergeTree order by tuple()";
$CLICKHOUSE_CLIENT --query="insert into tab_str values ('abc')";
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL_PARAMS}&query=select+x+from+tab_str+format+Native" | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL_PARAMS}&query=INSERT+INTO+tab_str_lc+FORMAT+Native" --data-binary @-
$CLICKHOUSE_CLIENT --query="select x from tab_str_lc";
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL_PARAMS}&query=select+x+from+tab_str_lc+format+Native" | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL_PARAMS}&query=INSERT+INTO+tab_str+FORMAT+Native" --data-binary @-
$CLICKHOUSE_CLIENT --query="select '----'";
$CLICKHOUSE_CLIENT --query="select x from tab_str";