better serialization of serialization kinds in native protocol

This commit is contained in:
Anton Popov 2021-10-04 18:21:38 +03:00
parent 914781052e
commit 07e1224a56
10 changed files with 66 additions and 121 deletions

View File

@ -135,18 +135,6 @@ Block NativeBlockInputStream::readImpl()
rows = index_block_it->num_rows;
}
/// Serialization
SerializationInfoPtr serialization_info;
if (server_revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION)
{
auto serialization_kinds = SerializationInfo::readKindsBinary(istr);
serialization_info = std::make_shared<SerializationInfo>(rows, serialization_kinds);
}
else
{
serialization_info = std::make_shared<SerializationInfo>();
}
for (size_t i = 0; i < columns; ++i)
{
if (use_index)
@ -165,6 +153,25 @@ Block NativeBlockInputStream::readImpl()
readBinary(type_name, istr);
column.type = data_type_factory.get(type_name);
SerializationPtr serialization;
if (server_revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION)
{
serialization = column.type->getSerialization(column.name, [&](const String & /*name*/)
{
UInt8 kind_num;
readBinary(kind_num, istr);
auto kind = magic_enum::enum_cast<ISerialization::Kind>(kind_num);
if (!kind)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown serialization kind " + std::to_string(kind_num));
return *kind;
});
}
else
{
serialization = column.type->getDefaultSerialization();
}
if (use_index)
{
/// Index allows to do more checks.
@ -175,7 +182,6 @@ Block NativeBlockInputStream::readImpl()
}
/// Data
auto serialization = column.type->getSerialization(column.name, *serialization_info);
ColumnPtr read_column = column.type->createColumn(*serialization);
double avg_value_size_hint = avg_value_size_hints.empty() ? 0 : avg_value_size_hints[i];

View File

@ -10,7 +10,7 @@
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/Serializations/SerializationInfo.h>
#include <DataTypes/NestedUtils.h>
#include <Columns/ColumnSparse.h>
namespace DB
@ -86,14 +86,6 @@ void NativeBlockOutputStream::write(const Block & block)
writeVarUInt(rows, *index_ostr);
}
/// Serialization
if (client_revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION)
{
auto serialization_kinds = SerializationInfo::getKinds(block);
SerializationInfo::writeKindsBinary(serialization_kinds, ostr);
}
for (size_t i = 0; i < columns; ++i)
{
/// For the index.
@ -129,15 +121,27 @@ void NativeBlockOutputStream::write(const Block & block)
writeStringBinary(type_name, ostr);
/// Serialization. Dynamic, if client supports it.
SerializationPtr serialization;
if (client_revision < DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION)
if (client_revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION)
{
serialization = column.type->getDefaultSerialization();
column.column = recursiveRemoveSparse(column.column);
serialization = column.type->getSerialization(column.name, [&](const String & name)
{
auto split = Nested::splitName(name);
ISerialization::Kind kind;
if (!split.second.empty() && column.type->tryGetSubcolumnType(split.second))
kind = ISerialization::getKind(*column.type->getSubcolumn(split.second, *column.column));
else
kind = ISerialization::getKind(*column.column);
writeBinary(static_cast<UInt8>(kind), ostr);
return kind;
});
}
else
{
serialization = column.type->getSerialization(*column.column);
serialization = column.type->getDefaultSerialization();
column.column = recursiveRemoveSparse(column.column);
}
/// Data

View File

@ -330,7 +330,7 @@ SerializationPtr DataTypeTuple::doGetDefaultSerialization() const
return std::make_shared<SerializationTuple>(std::move(serializations), use_explicit_names);
}
SerializationPtr DataTypeTuple::getSerialization(const String & column_name, const SerializationInfo & info) const
SerializationPtr DataTypeTuple::getSerialization(const String & column_name, const SerializationCallback & callback) const
{
SerializationTuple::ElementSerializations serializations(elems.size());
bool use_explicit_names = have_explicit_names && serialize_names;
@ -338,7 +338,7 @@ SerializationPtr DataTypeTuple::getSerialization(const String & column_name, con
{
String elem_name = use_explicit_names ? names[i] : toString(i + 1);
auto subcolumn_name = Nested::concatenateName(column_name, elem_name);
auto serializaion = elems[i]->getSerialization(subcolumn_name, info);
auto serializaion = elems[i]->getSerialization(subcolumn_name, callback);
serializations[i] = std::make_shared<SerializationTupleElement>(serializaion, elem_name);
}

View File

@ -57,7 +57,7 @@ public:
DataTypePtr tryGetSubcolumnType(const String & subcolumn_name) const override;
ColumnPtr getSubcolumn(const String & subcolumn_name, const IColumn & column) const override;
SerializationPtr getSerialization(const String & column_name, const SerializationInfo & info) const override;
SerializationPtr getSerialization(const String & column_name, const SerializationCallback & callback) const override;
SerializationPtr getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const override;

View File

@ -166,9 +166,9 @@ SerializationPtr IDataType::getSubcolumnSerialization(const String & subcolumn_n
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
}
SerializationPtr IDataType::getSerialization(const String & column_name, const SerializationInfo & info) const
SerializationPtr IDataType::getSerialization(ISerialization::Kind kind) const
{
if (supportsSparseSerialization() && info.getKind(column_name) == ISerialization::Kind::SPARSE)
if (supportsSparseSerialization() && kind == ISerialization::Kind::SPARSE)
return getSparseSerialization();
return getDefaultSerialization();
@ -176,10 +176,17 @@ SerializationPtr IDataType::getSerialization(const String & column_name, const S
SerializationPtr IDataType::getSerialization(const IColumn & column) const
{
if (column.isSparse())
return getSparseSerialization();
return getSerialization(ISerialization::getKind(column));
}
return getDefaultSerialization();
SerializationPtr IDataType::getSerialization(const String & column_name, const SerializationInfo & info) const
{
return getSerialization(column_name, [&info](const auto & name) { return info.getKind(name); });
}
SerializationPtr IDataType::getSerialization(const String & column_name, const SerializationCallback & callback) const
{
return getSerialization(callback(column_name));
}
SerializationPtr IDataType::getSerialization(const ISerialization::Settings & settings) const

View File

@ -97,15 +97,22 @@ public:
virtual SerializationPtr getSubcolumnSerialization(
const String & subcolumn_name, const BaseSerializationGetter & base_serialization_getter) const;
/// Chooses serialziation according to serialization kind.
SerializationPtr getSerialization(ISerialization::Kind kind) const;
/// Chooses serialziation according to column content.
virtual SerializationPtr getSerialization(const IColumn & column) const;
SerializationPtr getSerialization(const IColumn & column) const;
/// Chooses serialization according to collected information about content of columns.
virtual SerializationPtr getSerialization(const String & column_name, const SerializationInfo & info) const;
SerializationPtr getSerialization(const String & column_name, const SerializationInfo & info) const;
/// Chooses serialization according to settings.
SerializationPtr getSerialization(const ISerialization::Settings & settings) const;
using SerializationCallback = std::function<ISerialization::Kind(const String &)>;
virtual SerializationPtr getSerialization(const String & column_name, const SerializationCallback & callback) const;
/// Chooses between subcolumn serialization and regular serialization according to @column.
/// This method typically should be used to get serialization for reading column or subcolumn.
static SerializationPtr getSerialization(const NameAndTypePair & column, const SerializationInfo & info);

View File

@ -5,6 +5,7 @@
#include <IO/Operators.h>
#include <Common/escapeForFileName.h>
#include <DataTypes/NestedUtils.h>
#include <common/EnumReflection.h>
namespace DB
@ -49,30 +50,11 @@ ISerialization::Kind ISerialization::stringToKind(const String & str)
String ISerialization::Substream::toString() const
{
switch (type)
{
case ArrayElements:
return "ArrayElements";
case ArraySizes:
return "ArraySizes";
case NullableElements:
return "NullableElements";
case NullMap:
return "NullMap";
case TupleElement:
return "TupleElement(" + tuple_element_name + ", "
+ std::to_string(escape_tuple_delimiter) + ")";
case DictionaryKeys:
return "DictionaryKeys";
case DictionaryIndexes:
return "DictionaryIndexes";
case SparseElements:
return "SparseElements";
case SparseOffsets:
return "SparseOffsets";
}
if (type == TupleElement)
return fmt::format("TupleElement({}, escape_tuple_delimiter = {})",
tuple_element_name, escape_tuple_delimiter ? "true" : "false");
__builtin_unreachable();
return String(magic_enum::enum_name(type));
}
String ISerialization::SubstreamPath::toString() const

View File

@ -98,6 +98,7 @@ public:
SparseElements,
SparseOffsets,
};
Type type;
/// Index of tuple element, starting at 1 or name.

View File

@ -102,13 +102,6 @@ SerializationInfoPtr SerializationInfoBuilder::buildFrom(const SerializationInfo
return std::move(info);
}
SerializationInfo::SerializationInfo(size_t number_of_rows_, const NameToKind & kinds)
: number_of_rows(number_of_rows_)
{
for (const auto & [name, kind] : kinds)
columns[name].kind = kind;
}
ISerialization::Kind SerializationInfo::getKind(const String & column_name) const
{
auto it = columns.find(column_name);
@ -203,52 +196,4 @@ void SerializationInfo::writeText(WriteBuffer & out) const
writeString(toJSON(), out);
}
SerializationInfo::NameToKind SerializationInfo::getKinds(const Block & block)
{
NameToKind kinds;
for (const auto & elem : block)
{
kinds[elem.name] = ISerialization::getKind(*elem.column);
for (const auto & subcolumn_name : elem.type->getSubcolumnNames())
{
auto full_name = Nested::concatenateName(elem.name, subcolumn_name);
auto subcolumn = elem.type->getSubcolumn(subcolumn_name, *elem.column);
kinds[full_name] = ISerialization::getKind(*subcolumn);
}
}
return kinds;
}
SerializationInfo::NameToKind SerializationInfo::readKindsBinary(ReadBuffer & in)
{
size_t size = 0;
readVarUInt(size, in);
NameToKind kinds;
kinds.reserve(size);
for (size_t i = 0; i < size; ++i)
{
String name;
UInt8 kind;
readBinary(name, in);
readBinary(kind, in);
if (!kinds.emplace(name, static_cast<ISerialization::Kind>(kind)).second)
throw Exception(ErrorCodes::INCORRECT_DATA, "Duplicated name '{}' found in serialization kinds", name);
}
return kinds;
}
void SerializationInfo::writeKindsBinary(const NameToKind & kinds, WriteBuffer & out)
{
writeVarUInt(kinds.size(), out);
for (const auto & [name, kind] : kinds)
{
writeBinary(name, out);
writeBinary(static_cast<UInt8>(kind), out);
}
}
}

View File

@ -19,10 +19,7 @@ namespace DB
class SerializationInfo
{
public:
using NameToKind = std::unordered_map<String, ISerialization::Kind>;
SerializationInfo() = default;
SerializationInfo(size_t number_of_rows_, const NameToKind & kinds);
static constexpr auto version = 1;
size_t getNumberOfDefaultRows(const String & column_name) const;
@ -34,10 +31,6 @@ public:
void readText(ReadBuffer & in);
void writeText(WriteBuffer & out) const;
static NameToKind getKinds(const Block & block);
static NameToKind readKindsBinary(ReadBuffer & in);
static void writeKindsBinary(const NameToKind & kinds, WriteBuffer & out);
private:
void fromJSON(const String & json_str);
String toJSON() const;