fix json type with sparse columns

This commit is contained in:
Anton Popov 2022-07-21 14:47:19 +00:00
parent 39d61e9a37
commit e0d2c8fb37
26 changed files with 264 additions and 149 deletions

View File

@ -263,11 +263,6 @@ public:
} }
} }
SerializationInfoPtr getSerializationInfo() const override
{
return data->getSerializationInfo();
}
bool isNullable() const override { return isColumnNullable(*data); } bool isNullable() const override { return isColumnNullable(*data); }
bool onlyNull() const override { return data->isNullAt(0); } bool onlyNull() const override { return data->isNullAt(0); }
bool isNumeric() const override { return data->isNumeric(); } bool isNumeric() const override { return data->isNumeric(); }

View File

@ -561,15 +561,4 @@ void ColumnTuple::getIndicesOfNonDefaultRows(Offsets & indices, size_t from, siz
return getIndicesOfNonDefaultRowsImpl<ColumnTuple>(indices, from, limit); return getIndicesOfNonDefaultRowsImpl<ColumnTuple>(indices, from, limit);
} }
SerializationInfoPtr ColumnTuple::getSerializationInfo() const
{
MutableSerializationInfos infos;
infos.reserve(columns.size());
for (const auto & column : columns)
infos.push_back(const_pointer_cast<SerializationInfo>(column->getSerializationInfo()));
return std::make_shared<SerializationInfoTuple>(std::move(infos), SerializationInfo::Settings{});
}
} }

View File

@ -102,7 +102,6 @@ public:
ColumnPtr compress() const override; ColumnPtr compress() const override;
double getRatioOfDefaultRows(double sample_ratio) const override; double getRatioOfDefaultRows(double sample_ratio) const override;
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override; void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override;
SerializationInfoPtr getSerializationInfo() const override;
size_t tupleSize() const { return columns.size(); } size_t tupleSize() const { return columns.size(); }

View File

@ -64,11 +64,6 @@ ColumnPtr IColumn::createWithOffsets(const Offsets & offsets, const Field & defa
return res; return res;
} }
SerializationInfoPtr IColumn::getSerializationInfo() const
{
return std::make_shared<SerializationInfo>(ISerialization::getKind(*this), SerializationInfo::Settings{});
}
bool isColumnNullable(const IColumn & column) bool isColumnNullable(const IColumn & column)
{ {
return checkColumn<ColumnNullable>(column); return checkColumn<ColumnNullable>(column);

View File

@ -35,9 +35,6 @@ class ColumnGathererStream;
class Field; class Field;
class WeakHash32; class WeakHash32;
class SerializationInfo;
using SerializationInfoPtr = std::shared_ptr<const SerializationInfo>;
/* /*
* Represents a set of equal ranges in previous column to perform sorting in current column. * Represents a set of equal ranges in previous column to perform sorting in current column.
* Used in sorting by tuples. * Used in sorting by tuples.
@ -445,8 +442,6 @@ public:
/// Used to create full column from sparse. /// Used to create full column from sparse.
[[nodiscard]] virtual Ptr createWithOffsets(const Offsets & offsets, const Field & default_field, size_t total_rows, size_t shift) const; [[nodiscard]] virtual Ptr createWithOffsets(const Offsets & offsets, const Field & default_field, size_t total_rows, size_t shift) const;
[[nodiscard]] virtual SerializationInfoPtr getSerializationInfo() const;
/// Compress column in memory to some representation that allows to decompress it back. /// Compress column in memory to some representation that allows to decompress it back.
/// Return itself if compression is not applicable for this column type. /// Return itself if compression is not applicable for this column type.
[[nodiscard]] virtual Ptr compress() const [[nodiscard]] virtual Ptr compress() const

View File

@ -119,7 +119,6 @@ void DataTypeFactory::registerDataType(const String & family_name, Value creator
throw Exception("DataTypeFactory: the data type family name '" + family_name + "' is not unique", throw Exception("DataTypeFactory: the data type family name '" + family_name + "' is not unique",
ErrorCodes::LOGICAL_ERROR); ErrorCodes::LOGICAL_ERROR);
if (case_sensitiveness == CaseInsensitive if (case_sensitiveness == CaseInsensitive
&& !case_insensitive_data_types.emplace(family_name_lowercase, creator).second) && !case_insensitive_data_types.emplace(family_name_lowercase, creator).second)
throw Exception("DataTypeFactory: the case insensitive data type family name '" + family_name + "' is not unique", throw Exception("DataTypeFactory: the case insensitive data type family name '" + family_name + "' is not unique",

View File

@ -25,7 +25,7 @@ class DataTypeFactory final : private boost::noncopyable, public IFactoryWithAli
private: private:
using SimpleCreator = std::function<DataTypePtr()>; using SimpleCreator = std::function<DataTypePtr()>;
using DataTypesDictionary = std::unordered_map<String, Value>; using DataTypesDictionary = std::unordered_map<String, Value>;
using CreatorWithCustom = std::function<std::pair<DataTypePtr,DataTypeCustomDescPtr>(const ASTPtr & parameters)>; using CreatorWithCustom = std::function<std::pair<DataTypePtr, DataTypeCustomDescPtr>(const ASTPtr & parameters)>;
using SimpleCreatorWithCustom = std::function<std::pair<DataTypePtr,DataTypeCustomDescPtr>()>; using SimpleCreatorWithCustom = std::function<std::pair<DataTypePtr,DataTypeCustomDescPtr>()>;
public: public:

View File

@ -2,6 +2,7 @@
#include <base/range.h> #include <base/range.h>
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
#include <Columns/ColumnTuple.h> #include <Columns/ColumnTuple.h>
#include <Columns/ColumnConst.h>
#include <Core/Field.h> #include <Core/Field.h>
#include <DataTypes/DataTypeTuple.h> #include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
@ -257,6 +258,7 @@ size_t DataTypeTuple::getSizeOfValueInMemory() const
SerializationPtr DataTypeTuple::doGetDefaultSerialization() const SerializationPtr DataTypeTuple::doGetDefaultSerialization() const
{ {
SerializationTuple::ElementSerializations serializations(elems.size()); SerializationTuple::ElementSerializations serializations(elems.size());
for (size_t i = 0; i < elems.size(); ++i) for (size_t i = 0; i < elems.size(); ++i)
{ {
String elem_name = have_explicit_names ? names[i] : toString(i + 1); String elem_name = have_explicit_names ? names[i] : toString(i + 1);
@ -289,7 +291,27 @@ MutableSerializationInfoPtr DataTypeTuple::createSerializationInfo(const Seriali
for (const auto & elem : elems) for (const auto & elem : elems)
infos.push_back(elem->createSerializationInfo(settings)); infos.push_back(elem->createSerializationInfo(settings));
return std::make_shared<SerializationInfoTuple>(std::move(infos), settings); return std::make_shared<SerializationInfoTuple>(std::move(infos), names, settings);
}
SerializationInfoPtr DataTypeTuple::getSerializationInfo(const IColumn & column) const
{
if (const auto * column_const = checkAndGetColumn<ColumnConst>(&column))
return getSerializationInfo(column_const->getDataColumn());
MutableSerializationInfos infos;
infos.reserve(elems.size());
const auto & column_tuple = assert_cast<const ColumnTuple &>(column);
assert(elems.size() == column_tuple.getColumns().size());
for (size_t i = 0; i < elems.size(); ++i)
{
auto element_info = elems[i]->getSerializationInfo(column_tuple.getColumn(i));
infos.push_back(const_pointer_cast<SerializationInfo>(element_info));
}
return std::make_shared<SerializationInfoTuple>(std::move(infos), names, SerializationInfo::Settings{});
} }

View File

@ -22,6 +22,7 @@ private:
DataTypes elems; DataTypes elems;
Strings names; Strings names;
bool have_explicit_names; bool have_explicit_names;
public: public:
static constexpr bool is_parametric = true; static constexpr bool is_parametric = true;
@ -54,6 +55,7 @@ public:
SerializationPtr doGetDefaultSerialization() const override; SerializationPtr doGetDefaultSerialization() const override;
SerializationPtr getSerialization(const SerializationInfo & info) const override; SerializationPtr getSerialization(const SerializationInfo & info) const override;
MutableSerializationInfoPtr createSerializationInfo(const SerializationInfo::Settings & settings) const override; MutableSerializationInfoPtr createSerializationInfo(const SerializationInfo::Settings & settings) const override;
SerializationInfoPtr getSerializationInfo(const IColumn & column) const override;
const DataTypePtr & getElement(size_t i) const { return elems[i]; } const DataTypePtr & getElement(size_t i) const { return elems[i]; }
const DataTypes & getElements() const { return elems; } const DataTypes & getElements() const { return elems; }

View File

@ -179,12 +179,19 @@ void IDataType::setCustomization(DataTypeCustomDescPtr custom_desc_) const
custom_serialization = std::move(custom_desc_->serialization); custom_serialization = std::move(custom_desc_->serialization);
} }
MutableSerializationInfoPtr IDataType::createSerializationInfo( MutableSerializationInfoPtr IDataType::createSerializationInfo(const SerializationInfo::Settings & settings) const
const SerializationInfo::Settings & settings) const
{ {
return std::make_shared<SerializationInfo>(ISerialization::Kind::DEFAULT, settings); return std::make_shared<SerializationInfo>(ISerialization::Kind::DEFAULT, settings);
} }
SerializationInfoPtr IDataType::getSerializationInfo(const IColumn & column) const
{
if (const auto * column_const = checkAndGetColumn<ColumnConst>(&column))
return getSerializationInfo(column_const->getDataColumn());
return std::make_shared<SerializationInfo>(ISerialization::getKind(column), SerializationInfo::Settings{});
}
SerializationPtr IDataType::getDefaultSerialization() const SerializationPtr IDataType::getDefaultSerialization() const
{ {
if (custom_serialization) if (custom_serialization)

View File

@ -101,8 +101,8 @@ public:
Names getSubcolumnNames() const; Names getSubcolumnNames() const;
virtual MutableSerializationInfoPtr createSerializationInfo( virtual MutableSerializationInfoPtr createSerializationInfo(const SerializationInfo::Settings & settings) const;
const SerializationInfo::Settings & settings) const; virtual SerializationInfoPtr getSerializationInfo(const IColumn & column) const;
/// TODO: support more types. /// TODO: support more types.
virtual bool supportsSparseSerialization() const { return !haveSubtypes(); } virtual bool supportsSparseSerialization() const { return !haveSubtypes(); }

View File

@ -1,9 +1,9 @@
#include <DataTypes/Serializations/SerializationInfo.h> #include <DataTypes/Serializations/SerializationInfo.h>
#include <DataTypes/NestedUtils.h>
#include <Columns/ColumnSparse.h> #include <Columns/ColumnSparse.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/VarInt.h> #include <IO/VarInt.h>
#include <Core/Block.h>
#include <base/EnumReflection.h> #include <base/EnumReflection.h>
#include <Poco/JSON/JSON.h> #include <Poco/JSON/JSON.h>
@ -47,12 +47,25 @@ void SerializationInfo::Data::add(const Data & other)
num_defaults += other.num_defaults; num_defaults += other.num_defaults;
} }
void SerializationInfo::Data::addDefaults(size_t length)
{
num_rows += length;
num_defaults += length;
}
SerializationInfo::SerializationInfo(ISerialization::Kind kind_, const Settings & settings_) SerializationInfo::SerializationInfo(ISerialization::Kind kind_, const Settings & settings_)
: settings(settings_) : settings(settings_)
, kind(kind_) , kind(kind_)
{ {
} }
SerializationInfo::SerializationInfo(ISerialization::Kind kind_, const Settings & settings_, const Data & data_)
: settings(settings_)
, kind(kind_)
, data(data_)
{
}
void SerializationInfo::add(const IColumn & column) void SerializationInfo::add(const IColumn & column)
{ {
data.add(column); data.add(column);
@ -67,6 +80,13 @@ void SerializationInfo::add(const SerializationInfo & other)
kind = chooseKind(data, settings); kind = chooseKind(data, settings);
} }
void SerializationInfo::addDefaults(size_t length)
{
data.addDefaults(length);
if (settings.choose_kind)
kind = chooseKind(data, settings);
}
void SerializationInfo::replaceData(const SerializationInfo & other) void SerializationInfo::replaceData(const SerializationInfo & other)
{ {
data = other.data; data = other.data;
@ -74,9 +94,7 @@ void SerializationInfo::replaceData(const SerializationInfo & other)
MutableSerializationInfoPtr SerializationInfo::clone() const MutableSerializationInfoPtr SerializationInfo::clone() const
{ {
auto res = std::make_shared<SerializationInfo>(kind, settings); return std::make_shared<SerializationInfo>(kind, settings, data);
res->data = data;
return res;
} }
void SerializationInfo::serialializeKindBinary(WriteBuffer & out) const void SerializationInfo::serialializeKindBinary(WriteBuffer & out) const

View File

@ -34,6 +34,7 @@ public:
void add(const IColumn & column); void add(const IColumn & column);
void add(const Data & other); void add(const Data & other);
void addDefaults(size_t length);
}; };
struct Settings struct Settings
@ -45,6 +46,7 @@ public:
}; };
SerializationInfo(ISerialization::Kind kind_, const Settings & settings_); SerializationInfo(ISerialization::Kind kind_, const Settings & settings_);
SerializationInfo(ISerialization::Kind kind_, const Settings & settings_, const Data & data_);
virtual ~SerializationInfo() = default; virtual ~SerializationInfo() = default;
@ -52,7 +54,9 @@ public:
virtual void add(const IColumn & column); virtual void add(const IColumn & column);
virtual void add(const SerializationInfo & other); virtual void add(const SerializationInfo & other);
virtual void addDefaults(size_t length);
virtual void replaceData(const SerializationInfo & other); virtual void replaceData(const SerializationInfo & other);
virtual std::shared_ptr<SerializationInfo> clone() const; virtual std::shared_ptr<SerializationInfo> clone() const;
virtual void serialializeKindBinary(WriteBuffer & out) const; virtual void serialializeKindBinary(WriteBuffer & out) const;
@ -61,6 +65,7 @@ public:
virtual Poco::JSON::Object toJSON() const; virtual Poco::JSON::Object toJSON() const;
virtual void fromJSON(const Poco::JSON::Object & object); virtual void fromJSON(const Poco::JSON::Object & object);
void setKind(ISerialization::Kind kind_) { kind = kind_; }
const Settings & getSettings() const { return settings; } const Settings & getSettings() const { return settings; }
const Data & getData() const { return data; } const Data & getData() const { return data; }
ISerialization::Kind getKind() const { return kind; } ISerialization::Kind getKind() const { return kind; }

View File

@ -10,13 +10,18 @@ namespace ErrorCodes
{ {
extern const int CORRUPTED_DATA; extern const int CORRUPTED_DATA;
extern const int THERE_IS_NO_COLUMN; extern const int THERE_IS_NO_COLUMN;
extern const int LOGICAL_ERROR;
} }
SerializationInfoTuple::SerializationInfoTuple( SerializationInfoTuple::SerializationInfoTuple(
MutableSerializationInfos elems_, const Settings & settings_) MutableSerializationInfos elems_, Names names_, const Settings & settings_)
: SerializationInfo(ISerialization::Kind::DEFAULT, settings_) : SerializationInfo(ISerialization::Kind::DEFAULT, settings_)
, elems(std::move(elems_)) , elems(std::move(elems_))
, names(std::move(names_))
{ {
assert(names.size() == elems.size());
for (size_t i = 0; i < names.size(); ++i)
name_to_elem[names[i]] = elems[i];
} }
bool SerializationInfoTuple::hasCustomSerialization() const bool SerializationInfoTuple::hasCustomSerialization() const
@ -40,22 +45,34 @@ void SerializationInfoTuple::add(const SerializationInfo & other)
{ {
SerializationInfo::add(other); SerializationInfo::add(other);
const auto & info_tuple = assert_cast<const SerializationInfoTuple &>(other); const auto & other_info = assert_cast<const SerializationInfoTuple &>(other);
assert(elems.size() == info_tuple.elems.size()); for (const auto & [name, elem] : name_to_elem)
{
auto it = other_info.name_to_elem.find(name);
if (it != other_info.name_to_elem.end())
elem->add(*it->second);
else
elem->addDefaults(other_info.getData().num_rows);
}
}
for (size_t i = 0; i < elems.size(); ++i) void SerializationInfoTuple::addDefaults(size_t length)
elems[i]->add(*info_tuple.elems[i]); {
for (const auto & elem : elems)
elem->addDefaults(length);
} }
void SerializationInfoTuple::replaceData(const SerializationInfo & other) void SerializationInfoTuple::replaceData(const SerializationInfo & other)
{ {
SerializationInfo::add(other); SerializationInfo::add(other);
const auto & info_tuple = assert_cast<const SerializationInfoTuple &>(other); const auto & other_info = assert_cast<const SerializationInfoTuple &>(other);
assert(elems.size() == info_tuple.elems.size()); for (const auto & [name, elem] : name_to_elem)
{
for (size_t i = 0; i < elems.size(); ++i) auto it = other_info.name_to_elem.find(name);
elems[i]->replaceData(*info_tuple.elems[i]); if (it != other_info.name_to_elem.end())
elem->replaceData(*it->second);
}
} }
MutableSerializationInfoPtr SerializationInfoTuple::clone() const MutableSerializationInfoPtr SerializationInfoTuple::clone() const
@ -65,7 +82,7 @@ MutableSerializationInfoPtr SerializationInfoTuple::clone() const
for (const auto & elem : elems) for (const auto & elem : elems)
elems_cloned.push_back(elem->clone()); elems_cloned.push_back(elem->clone());
return std::make_shared<SerializationInfoTuple>(std::move(elems_cloned), settings); return std::make_shared<SerializationInfoTuple>(std::move(elems_cloned), names, settings);
} }
void SerializationInfoTuple::serialializeKindBinary(WriteBuffer & out) const void SerializationInfoTuple::serialializeKindBinary(WriteBuffer & out) const
@ -99,7 +116,7 @@ void SerializationInfoTuple::fromJSON(const Poco::JSON::Object & object)
if (!object.has("subcolumns")) if (!object.has("subcolumns"))
throw Exception(ErrorCodes::CORRUPTED_DATA, throw Exception(ErrorCodes::CORRUPTED_DATA,
"Missed field '{}' in SerializationInfo of columns SerializationInfoTuple"); "Missed field 'subcolumns' in SerializationInfo of columns SerializationInfoTuple");
auto subcolumns = object.getArray("subcolumns"); auto subcolumns = object.getArray("subcolumns");
if (elems.size() != subcolumns->size()) if (elems.size() != subcolumns->size())

View File

@ -1,4 +1,5 @@
#pragma once #pragma once
#include <Core/Names.h>
#include <DataTypes/Serializations/SerializationInfo.h> #include <DataTypes/Serializations/SerializationInfo.h>
namespace DB namespace DB
@ -7,25 +8,32 @@ namespace DB
class SerializationInfoTuple : public SerializationInfo class SerializationInfoTuple : public SerializationInfo
{ {
public: public:
SerializationInfoTuple(MutableSerializationInfos elems_, const Settings & settings_); SerializationInfoTuple(MutableSerializationInfos elems_, Names names_, const Settings & settings_);
bool hasCustomSerialization() const override; bool hasCustomSerialization() const override;
void add(const IColumn & column) override; void add(const IColumn & column) override;
void add(const SerializationInfo & other) override; void add(const SerializationInfo & other) override;
void addDefaults(size_t length) override;
void replaceData(const SerializationInfo & other) override; void replaceData(const SerializationInfo & other) override;
MutableSerializationInfoPtr clone() const override; MutableSerializationInfoPtr clone() const override;
void serialializeKindBinary(WriteBuffer & out) const override; void serialializeKindBinary(WriteBuffer & out) const override;
void deserializeFromKindsBinary(ReadBuffer & in) override; void deserializeFromKindsBinary(ReadBuffer & in) override;
Poco::JSON::Object toJSON() const override; Poco::JSON::Object toJSON() const override;
void fromJSON(const Poco::JSON::Object & object) override; void fromJSON(const Poco::JSON::Object & object) override;
MutableSerializationInfoPtr getElementInfo(size_t i) const { return elems[i]; } const MutableSerializationInfoPtr & getElementInfo(size_t i) const { return elems[i]; }
ISerialization::Kind getElementKind(size_t i) const { return elems[i]->getKind(); } ISerialization::Kind getElementKind(size_t i) const { return elems[i]->getKind(); }
private: private:
MutableSerializationInfos elems; MutableSerializationInfos elems;
Names names;
using NameToElem = std::unordered_map<String, MutableSerializationInfoPtr>;
NameToElem name_to_elem;
}; };
} }

View File

@ -103,7 +103,7 @@ void NativeWriter::write(const Block & block)
mark.offset_in_decompressed_block = ostr_concrete->getRemainingBytes(); mark.offset_in_decompressed_block = ostr_concrete->getRemainingBytes();
} }
ColumnWithTypeAndName column = block.safeGetByPosition(i); auto column = block.safeGetByPosition(i);
/// Send data to old clients without low cardinality type. /// Send data to old clients without low cardinality type.
if (remove_low_cardinality || (client_revision && client_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE)) if (remove_low_cardinality || (client_revision && client_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE))
@ -145,7 +145,7 @@ void NativeWriter::write(const Block & block)
SerializationPtr serialization; SerializationPtr serialization;
if (client_revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION) if (client_revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION)
{ {
auto info = column.column->getSerializationInfo(); auto info = column.type->getSerializationInfo(*column.column);
serialization = column.type->getSerialization(*info); serialization = column.type->getSerialization(*info);
bool has_custom = info->hasCustomSerialization(); bool has_custom = info->hasCustomSerialization();

View File

@ -15,6 +15,8 @@
#include <Storages/StorageReplicatedMergeTree.h> #include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/PartMetadataManagerOrdinary.h> #include <Storages/MergeTree/PartMetadataManagerOrdinary.h>
#include <Storages/MergeTree/PartMetadataManagerWithCache.h> #include <Storages/MergeTree/PartMetadataManagerWithCache.h>
#include <Core/NamesAndTypes.h>
#include <Storages/ColumnsDescription.h>
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
@ -445,6 +447,18 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns)
for (const auto & column : columns) for (const auto & column : columns)
column_name_to_position.emplace(column.name, pos++); column_name_to_position.emplace(column.name, pos++);
columns_description = ColumnsDescription(columns);
}
NameAndTypePair IMergeTreeDataPart::getColumn(const String & column_name) const
{
return columns_description.getColumnOrSubcolumn(GetColumnsOptions::AllPhysical, column_name);
}
std::optional<NameAndTypePair> IMergeTreeDataPart::tryGetColumn(const String & column_name) const
{
return columns_description.tryGetColumnOrSubcolumn(GetColumnsOptions::AllPhysical, column_name);
} }
void IMergeTreeDataPart::setSerializationInfos(const SerializationInfoByName & new_infos) void IMergeTreeDataPart::setSerializationInfos(const SerializationInfoByName & new_infos)
@ -454,10 +468,15 @@ void IMergeTreeDataPart::setSerializationInfos(const SerializationInfoByName & n
SerializationPtr IMergeTreeDataPart::getSerialization(const NameAndTypePair & column) const SerializationPtr IMergeTreeDataPart::getSerialization(const NameAndTypePair & column) const
{ {
auto it = serialization_infos.find(column.getNameInStorage()); auto column_in_part = tryGetColumn(column.name);
return it == serialization_infos.end() if (!column_in_part)
? IDataType::getSerialization(column) return IDataType::getSerialization(column);
: IDataType::getSerialization(column, *it->second);
auto it = serialization_infos.find(column_in_part->getNameInStorage());
if (it == serialization_infos.end())
return IDataType::getSerialization(*column_in_part);
return IDataType::getSerialization(*column_in_part, *it->second);
} }
void IMergeTreeDataPart::removeIfNeeded() void IMergeTreeDataPart::removeIfNeeded()
@ -564,37 +583,38 @@ size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
return checksum->second.file_size; return checksum->second.file_size;
} }
String IMergeTreeDataPart::getColumnNameWithMinimumCompressedSize( String IMergeTreeDataPart::getColumnNameWithMinimumCompressedSize(bool with_subcolumns) const
const StorageSnapshotPtr & storage_snapshot, bool with_subcolumns) const
{ {
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects(); auto find_column_with_minimum_size = [&](const auto & columns_list)
if (with_subcolumns) {
options.withSubcolumns(); std::optional<std::string> minimum_size_column;
UInt64 minimum_size = std::numeric_limits<UInt64>::max();
auto storage_columns = storage_snapshot->getColumns(options); for (const auto & column : columns_list)
MergeTreeData::AlterConversions alter_conversions; {
if (!parent_part) if (!hasColumnFiles(column))
alter_conversions = storage.getAlterConversionsForPart(shared_from_this()); continue;
const auto size = getColumnSize(column.name).data_compressed;
if (size < minimum_size)
{
minimum_size = size;
minimum_size_column = column.name;
}
}
return minimum_size_column;
};
std::optional<std::string> minimum_size_column; std::optional<std::string> minimum_size_column;
UInt64 minimum_size = std::numeric_limits<UInt64>::max(); if (with_subcolumns)
for (const auto & column : storage_columns)
{ {
auto column_name = column.name; auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withSubcolumns();
auto column_type = column.type; minimum_size_column = find_column_with_minimum_size(columns_description.get(options));
if (alter_conversions.isColumnRenamed(column.name)) }
column_name = alter_conversions.getColumnOldName(column.name); else
{
if (!hasColumnFiles(column)) minimum_size_column = find_column_with_minimum_size(columns);
continue;
const auto size = getColumnSize(column_name).data_compressed;
if (size < minimum_size)
{
minimum_size = size;
minimum_size_column = column_name;
}
} }
if (!minimum_size_column) if (!minimum_size_column)
@ -603,22 +623,6 @@ String IMergeTreeDataPart::getColumnNameWithMinimumCompressedSize(
return *minimum_size_column; return *minimum_size_column;
} }
// String IMergeTreeDataPart::getFullPath() const
// {
// if (relative_path.empty())
// throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);
// return fs::path(storage.getFullPathOnDisk(volume->getDisk())) / (parent_part ? parent_part->relative_path : "") / relative_path / "";
// }
// String IMergeTreeDataPart::getRelativePath() const
// {
// if (relative_path.empty())
// throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);
// return fs::path(storage.relative_data_path) / (parent_part ? parent_part->relative_path : "") / relative_path / "";
// }
void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency) void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency)
{ {
assertOnDisk(); assertOnDisk();

View File

@ -14,6 +14,7 @@
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h> #include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
#include <Storages/MergeTree/MergeTreeIOSettings.h> #include <Storages/MergeTree/MergeTreeIOSettings.h>
#include <Storages/MergeTree/KeyCondition.h> #include <Storages/MergeTree/KeyCondition.h>
#include <Storages/ColumnsDescription.h>
#include <Interpreters/TransactionVersionMetadata.h> #include <Interpreters/TransactionVersionMetadata.h>
#include <DataTypes/Serializations/SerializationInfo.h> #include <DataTypes/Serializations/SerializationInfo.h>
#include <Storages/MergeTree/IPartMetadataManager.h> #include <Storages/MergeTree/IPartMetadataManager.h>
@ -136,6 +137,9 @@ public:
const NamesAndTypesList & getColumns() const { return columns; } const NamesAndTypesList & getColumns() const { return columns; }
NameAndTypePair getColumn(const String & name) const;
std::optional<NameAndTypePair> tryGetColumn(const String & column_name) const;
void setSerializationInfos(const SerializationInfoByName & new_infos); void setSerializationInfos(const SerializationInfoByName & new_infos);
const SerializationInfoByName & getSerializationInfos() const { return serialization_infos; } const SerializationInfoByName & getSerializationInfos() const { return serialization_infos; }
@ -167,8 +171,7 @@ public:
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()). /// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column. /// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinimumCompressedSize( String getColumnNameWithMinimumCompressedSize(bool with_subcolumns) const;
const StorageSnapshotPtr & storage_snapshot, bool with_subcolumns) const;
bool contains(const IMergeTreeDataPart & other) const { return info.contains(other.info); } bool contains(const IMergeTreeDataPart & other) const { return info.contains(other.info); }
@ -521,6 +524,10 @@ private:
/// Map from name of column to its serialization info. /// Map from name of column to its serialization info.
SerializationInfoByName serialization_infos; SerializationInfoByName serialization_infos;
/// Columns description for more convinient access
/// to columns by name and getting subcolumns.
ColumnsDescription columns_description;
/// Reads part unique identifier (if exists) from uuid.txt /// Reads part unique identifier (if exists) from uuid.txt
void loadUUID(); void loadUUID();

View File

@ -18,6 +18,7 @@ namespace
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int THERE_IS_NO_COLUMN;
} }
@ -33,7 +34,6 @@ IMergeTreeReader::IMergeTreeReader(
: data_part(data_part_) : data_part(data_part_)
, avg_value_size_hints(avg_value_size_hints_) , avg_value_size_hints(avg_value_size_hints_)
, columns(columns_) , columns(columns_)
, part_columns(data_part->getColumns())
, uncompressed_cache(uncompressed_cache_) , uncompressed_cache(uncompressed_cache_)
, mark_cache(mark_cache_) , mark_cache(mark_cache_)
, settings(settings_) , settings(settings_)
@ -47,11 +47,7 @@ IMergeTreeReader::IMergeTreeReader(
/// For wide parts convert plain arrays of Nested to subcolumns /// For wide parts convert plain arrays of Nested to subcolumns
/// to allow to use shared offset column from cache. /// to allow to use shared offset column from cache.
columns = Nested::convertToSubcolumns(columns); columns = Nested::convertToSubcolumns(columns);
part_columns = Nested::collect(part_columns);
} }
for (const auto & column_from_part : part_columns)
columns_from_part[column_from_part.name] = &column_from_part.type;
} }
IMergeTreeReader::~IMergeTreeReader() = default; IMergeTreeReader::~IMergeTreeReader() = default;
@ -124,37 +120,25 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
} }
} }
NameAndTypePair IMergeTreeReader::getColumnFromPart(const NameAndTypePair & required_column) const String IMergeTreeReader::getColumnNameInPart(const NameAndTypePair & required_column) const
{ {
auto name_in_storage = required_column.getNameInStorage(); auto name_in_storage = required_column.getNameInStorage();
ColumnsFromPart::ConstLookupResult it;
if (alter_conversions.isColumnRenamed(name_in_storage)) if (alter_conversions.isColumnRenamed(name_in_storage))
{ {
String old_name = alter_conversions.getColumnOldName(name_in_storage); name_in_storage = alter_conversions.getColumnOldName(name_in_storage);
it = columns_from_part.find(old_name); return Nested::concatenateName(name_in_storage, required_column.getSubcolumnName());
}
else
{
it = columns_from_part.find(name_in_storage);
} }
if (it == columns_from_part.end()) return required_column.name;
return required_column; }
const DataTypePtr & type = *it->getMapped(); NameAndTypePair IMergeTreeReader::getColumnInPart(const NameAndTypePair & required_column) const
if (required_column.isSubcolumn()) {
{ auto column_in_part = data_part->tryGetColumn(getColumnNameInPart(required_column));
auto subcolumn_name = required_column.getSubcolumnName(); if (column_in_part)
auto subcolumn_type = type->tryGetSubcolumnType(subcolumn_name); return *column_in_part;
if (!subcolumn_type) return required_column;
return required_column;
return {String(it->getKey()), subcolumn_name, type, subcolumn_type};
}
return {String(it->getKey()), type};
} }
void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
@ -183,7 +167,7 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
if (res_columns[pos] == nullptr) if (res_columns[pos] == nullptr)
continue; continue;
copy_block.insert({res_columns[pos], getColumnFromPart(*name_and_type).type, name_and_type->name}); copy_block.insert({res_columns[pos], getColumnInPart(*name_and_type).type, name_and_type->name});
} }
DB::performRequiredConversions(copy_block, columns, storage.getContext()); DB::performRequiredConversions(copy_block, columns, storage.getContext());

View File

@ -63,8 +63,10 @@ public:
MergeTreeData::DataPartPtr data_part; MergeTreeData::DataPartPtr data_part;
protected: protected:
/// Returns actual column type in part, which can differ from table metadata. /// Returns actual column name in part, which can differ from table metadata.
NameAndTypePair getColumnFromPart(const NameAndTypePair & required_column) const; String getColumnNameInPart(const NameAndTypePair & required_column) const;
/// Returns actual column name and type in part, which can differ from table metadata.
NameAndTypePair getColumnInPart(const NameAndTypePair & required_column) const;
void checkNumberOfColumns(size_t num_columns_to_read) const; void checkNumberOfColumns(size_t num_columns_to_read) const;
@ -75,7 +77,6 @@ protected:
/// Columns that are read. /// Columns that are read.
NamesAndTypesList columns; NamesAndTypesList columns;
NamesAndTypesList part_columns;
UncompressedCache * uncompressed_cache; UncompressedCache * uncompressed_cache;
MarkCache * mark_cache; MarkCache * mark_cache;
@ -92,11 +93,6 @@ protected:
private: private:
/// Alter conversions, which must be applied on fly if required /// Alter conversions, which must be applied on fly if required
MergeTreeData::AlterConversions alter_conversions; MergeTreeData::AlterConversions alter_conversions;
/// Actual data type of columns in part
using ColumnsFromPart = HashMapWithSavedHash<StringRef, const DataTypePtr *, StringRefHash>;
ColumnsFromPart columns_from_part;
}; };
} }

View File

@ -122,7 +122,7 @@ NameSet injectRequiredColumns(
*/ */
if (!have_at_least_one_physical_column) if (!have_at_least_one_physical_column)
{ {
const auto minimum_size_column_name = part->getColumnNameWithMinimumCompressedSize(storage_snapshot, with_subcolumns); const auto minimum_size_column_name = part->getColumnNameWithMinimumCompressedSize(with_subcolumns);
columns.push_back(minimum_size_column_name); columns.push_back(minimum_size_column_name);
/// correctly report added column /// correctly report added column
injected_columns.insert(columns.back()); injected_columns.insert(columns.back());

View File

@ -54,14 +54,14 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
{ {
if (name_and_type->isSubcolumn()) if (name_and_type->isSubcolumn())
{ {
auto storage_column_from_part = getColumnFromPart( auto storage_column_from_part = getColumnInPart(
{name_and_type->getNameInStorage(), name_and_type->getTypeInStorage()}); {name_and_type->getNameInStorage(), name_and_type->getTypeInStorage()});
if (!storage_column_from_part.type->tryGetSubcolumnType(name_and_type->getSubcolumnName())) if (!storage_column_from_part.type->tryGetSubcolumnType(name_and_type->getSubcolumnName()))
continue; continue;
} }
auto column_from_part = getColumnFromPart(*name_and_type); auto column_from_part = getColumnInPart(*name_and_type);
auto position = data_part->getColumnPosition(column_from_part.getNameInStorage()); auto position = data_part->getColumnPosition(column_from_part.getNameInStorage());
if (!position && typeid_cast<const DataTypeArray *>(column_from_part.type.get())) if (!position && typeid_cast<const DataTypeArray *>(column_from_part.type.get()))
@ -153,7 +153,7 @@ size_t MergeTreeReaderCompact::readRows(
if (!column_positions[i]) if (!column_positions[i])
continue; continue;
auto column_from_part = getColumnFromPart(*column_it); auto column_from_part = getColumnInPart(*column_it);
if (res_columns[i] == nullptr) if (res_columns[i] == nullptr)
{ {
auto serialization = data_part->getSerialization(column_from_part); auto serialization = data_part->getSerialization(column_from_part);
@ -168,10 +168,11 @@ size_t MergeTreeReaderCompact::readRows(
auto name_and_type = columns.begin(); auto name_and_type = columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type) for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{ {
auto column_from_part = getColumnFromPart(*name_and_type);
if (!res_columns[pos]) if (!res_columns[pos])
continue; continue;
auto column_from_part = getColumnInPart(*name_and_type);
try try
{ {
auto & column = res_columns[pos]; auto & column = res_columns[pos];

View File

@ -34,7 +34,7 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory(
{ {
for (const auto & name_and_type : columns) for (const auto & name_and_type : columns)
{ {
auto [name, type] = getColumnFromPart(name_and_type); auto [name, type] = getColumnInPart(name_and_type);
/// If array of Nested column is missing in part, /// If array of Nested column is missing in part,
/// we have to read its offsets if they exist. /// we have to read its offsets if they exist.
@ -67,7 +67,7 @@ size_t MergeTreeReaderInMemory::readRows(
auto column_it = columns.begin(); auto column_it = columns.begin();
for (size_t i = 0; i < num_columns; ++i, ++column_it) for (size_t i = 0; i < num_columns; ++i, ++column_it)
{ {
auto name_type = getColumnFromPart(*column_it); auto name_type = getColumnInPart(*column_it);
/// Copy offsets, if array of Nested column is missing in part. /// Copy offsets, if array of Nested column is missing in part.
auto offsets_it = positions_for_offsets.find(name_type.name); auto offsets_it = positions_for_offsets.find(name_type.name);

View File

@ -49,7 +49,7 @@ MergeTreeReaderWide::MergeTreeReaderWide(
{ {
for (const NameAndTypePair & column : columns) for (const NameAndTypePair & column : columns)
{ {
auto column_from_part = getColumnFromPart(column); auto column_from_part = getColumnInPart(column);
addStreams(column_from_part, profile_callback_, clock_type_); addStreams(column_from_part, profile_callback_, clock_type_);
} }
} }
@ -83,7 +83,7 @@ size_t MergeTreeReaderWide::readRows(
auto name_and_type = columns.begin(); auto name_and_type = columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type) for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{ {
auto column_from_part = getColumnFromPart(*name_and_type); auto column_from_part = getColumnInPart(*name_and_type);
try try
{ {
auto & cache = caches[column_from_part.getNameInStorage()]; auto & cache = caches[column_from_part.getNameInStorage()];
@ -102,7 +102,7 @@ size_t MergeTreeReaderWide::readRows(
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type) for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{ {
auto column_from_part = getColumnFromPart(*name_and_type); auto column_from_part = getColumnInPart(*name_and_type);
const auto & [name, type] = column_from_part; const auto & [name, type] = column_from_part;
/// The column is already present in the block so we will append the values to the end. /// The column is already present in the block so we will append the values to the end.

View File

@ -0,0 +1,13 @@
k1 Default 2
k2.k3 Default 1
=============
k1 Default 1
k2.k3 Sparse 1
=============
k1 Default 1
k2.k3 Sparse 1
=============
k1 Default 1
k2.k3 Sparse 1
1 1 4
2 400000 0

View File

@ -0,0 +1,59 @@
-- Tags: no-fasttest
DROP TABLE IF EXISTS t_json_sparse;
SET allow_experimental_object_type = 1;
CREATE TABLE t_json_sparse (data JSON)
ENGINE = MergeTree ORDER BY tuple()
SETTINGS ratio_of_defaults_for_sparse_serialization = 0.1,
min_bytes_for_wide_part = 0;
SYSTEM STOP MERGES t_json_sparse;
INSERT INTO t_json_sparse VALUES ('{"k1": 1, "k2": {"k3": 4}}');
INSERT INTO t_json_sparse SELECT '{"k1": 2}' FROM numbers(200000);
SELECT subcolumns.names, subcolumns.serializations, count() FROM system.parts_columns
ARRAY JOIN subcolumns
WHERE database = currentDatabase()
AND table = 't_json_sparse' AND column = 'data' AND active
GROUP BY subcolumns.names, subcolumns.serializations;
SELECT '=============';
SYSTEM START MERGES t_json_sparse;
OPTIMIZE TABLE t_json_sparse FINAL;
SELECT subcolumns.names, subcolumns.serializations, count() FROM system.parts_columns
ARRAY JOIN subcolumns
WHERE database = currentDatabase()
AND table = 't_json_sparse' AND column = 'data' AND active
GROUP BY subcolumns.names, subcolumns.serializations;
SELECT '=============';
DETACH TABLE t_json_sparse;
ATTACH TABLE t_json_sparse;
SELECT subcolumns.names, subcolumns.serializations, count() FROM system.parts_columns
ARRAY JOIN subcolumns
WHERE database = currentDatabase()
AND table = 't_json_sparse' AND column = 'data' AND active
GROUP BY subcolumns.names, subcolumns.serializations;
INSERT INTO t_json_sparse SELECT '{"k1": 2}' FROM numbers(200000);
SELECT '=============';
OPTIMIZE TABLE t_json_sparse FINAL;
SELECT subcolumns.names, subcolumns.serializations, count() FROM system.parts_columns
ARRAY JOIN subcolumns
WHERE database = currentDatabase()
AND table = 't_json_sparse' AND column = 'data' AND active
GROUP BY subcolumns.names, subcolumns.serializations;
SELECT data.k1, count(), sum(data.k2.k3) FROM t_json_sparse GROUP BY data.k1;
DROP TABLE t_json_sparse;