allow to read subcolumns of complex types

This commit is contained in:
Anton Popov 2020-09-14 14:22:17 +03:00
parent 48f29ae11f
commit cb4801e3be
31 changed files with 384 additions and 112 deletions

View File

@ -17,6 +17,29 @@ namespace ErrorCodes
extern const int THERE_IS_NO_COLUMN;
}
NameAndTypePair::NameAndTypePair(
const String & name_, const String & subcolumn_name_,
const DataTypePtr & storage_type_, const DataTypePtr & type_)
: name(name_ + "." + subcolumn_name_)
, type(type_)
, storage_type(storage_type_)
, subcolumn_delimiter_position(name_.size()) {}
String NameAndTypePair::getStorageName() const
{
if (subcolumn_delimiter_position == -1)
return name;
return name.substr(0, subcolumn_delimiter_position);
}
String NameAndTypePair::getSubcolumnName() const
{
if (subcolumn_delimiter_position == -1)
return "";
return name.substr(subcolumn_delimiter_position + 1, name.size() - subcolumn_delimiter_position);
}
void NamesAndTypesList::readText(ReadBuffer & buf)
{
@ -137,25 +160,20 @@ NamesAndTypesList NamesAndTypesList::filter(const Names & names) const
NamesAndTypesList NamesAndTypesList::addTypes(const Names & names) const
{
/// NOTE: It's better to make a map in `IStorage` than to create it here every time again.
#if !defined(ARCADIA_BUILD)
google::dense_hash_map<StringRef, const DataTypePtr *, StringRefHash> types;
#else
google::sparsehash::dense_hash_map<StringRef, const DataTypePtr *, StringRefHash> types;
#endif
types.set_empty_key(StringRef());
std::unordered_map<String, const NameAndTypePair *> self_columns;
for (const NameAndTypePair & column : *this)
types[column.name] = &column.type;
for (const auto & column : *this)
self_columns[column.name] = &column;
NamesAndTypesList res;
for (const String & name : names)
{
auto it = types.find(name);
if (it == types.end())
auto it = self_columns.find(name);
if (it == self_columns.end())
throw Exception("No column " + name, ErrorCodes::THERE_IS_NO_COLUMN);
res.emplace_back(name, *it->second);
res.emplace_back(*it->second);
}
return res;
}

View File

@ -15,11 +15,17 @@ namespace DB
struct NameAndTypePair
{
String name;
DataTypePtr type;
NameAndTypePair() {}
public:
NameAndTypePair() = default;
NameAndTypePair(const String & name_, const DataTypePtr & type_) : name(name_), type(type_) {}
NameAndTypePair(const String & name_, const String & subcolumn_name_,
const DataTypePtr & storage_type_, const DataTypePtr & type_);
String getStorageName() const;
String getSubcolumnName() const;
bool isSubcolumn() const { return subcolumn_delimiter_position != -1; }
DataTypePtr getStorageType() const { return storage_type; }
bool operator<(const NameAndTypePair & rhs) const
{
@ -30,8 +36,24 @@ struct NameAndTypePair
{
return name == rhs.name && type->equals(*rhs.type);
}
String name;
DataTypePtr type;
private:
DataTypePtr storage_type;
ssize_t subcolumn_delimiter_position = -1;
};
template<int I>
auto get(const NameAndTypePair & name_and_type)
{
if constexpr (I == 0)
return name_and_type.name;
else if constexpr (I == 1)
return name_and_type.type;
}
using NamesAndTypes = std::vector<NameAndTypePair>;
class NamesAndTypesList : public std::list<NameAndTypePair>
@ -81,3 +103,10 @@ public:
};
}
namespace std
{
template <> struct tuple_size<DB::NameAndTypePair> : std::integral_constant<size_t, 2> {};
template <> struct tuple_element<0, DB::NameAndTypePair> { using type = DB::String; };
template <> struct tuple_element<1, DB::NameAndTypePair> { using type = DB::DataTypePtr; };
}

View File

@ -15,7 +15,10 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/escapeForFileName.h>
#include <IO/ReadHelpers.h>
#include <Core/NamesAndTypes.h>
namespace DB
{
@ -499,6 +502,33 @@ bool DataTypeArray::equals(const IDataType & rhs) const
return typeid(rhs) == typeid(*this) && nested->equals(*static_cast<const DataTypeArray &>(rhs).nested);
}
DataTypePtr DataTypeArray::getSubcolumnType(const String & subcolumn_name) const
{
ReadBufferFromString buf(subcolumn_name);
size_t dim;
if (checkString("size", buf) && tryReadIntText(dim, buf) && dim < getNumberOfDimensions())
return std::make_shared<DataTypeUInt64>();
return nullptr;
}
std::vector<String> DataTypeArray::getSubcolumnNames() const
{
size_t num_of_dimentions = getNumberOfDimensions();
std::vector<String> res(num_of_dimentions);
for (size_t i = 0; i < num_of_dimentions; ++i)
res[i] = "size" + std::to_string(i);
return res;
}
String DataTypeArray::getEscapedFileName(const NameAndTypePair & column) const
{
if (column.isSubcolumn())
return escapeForFileName(column.getStorageName()) + "." + column.getSubcolumnName();
return escapeForFileName(column.name);
}
size_t DataTypeArray::getNumberOfDimensions() const
{

View File

@ -111,6 +111,10 @@ public:
return nested->isValueUnambiguouslyRepresentedInFixedSizeContiguousMemoryRegion();
}
DataTypePtr getSubcolumnType(const String & subcolumn_name) const override;
std::vector<String> getSubcolumnNames() const override;
String getEscapedFileName(const NameAndTypePair & column) const override;
const DataTypePtr & getNestedType() const { return nested; }
/// 1 for plain array, 2 for array of arrays and so on.

View File

@ -4,6 +4,7 @@
#include <DataTypes/DataTypeFactory.h>
#include <Columns/ColumnNullable.h>
#include <Core/Field.h>
#include <Core/NamesAndTypes.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadHelpers.h>
@ -13,6 +14,7 @@
#include <Parsers/IAST.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/escapeForFileName.h>
namespace DB
@ -511,6 +513,27 @@ bool DataTypeNullable::equals(const IDataType & rhs) const
return rhs.isNullable() && nested_data_type->equals(*static_cast<const DataTypeNullable &>(rhs).nested_data_type);
}
DataTypePtr DataTypeNullable::getSubcolumnType(const String & subcolumn_name) const
{
if (subcolumn_name == "null")
return std::make_shared<DataTypeUInt8>();
return nullptr;
}
std::vector<String> DataTypeNullable::getSubcolumnNames() const
{
return {"null"};
}
String DataTypeNullable::getEscapedFileName(const NameAndTypePair & column) const
{
if (column.isSubcolumn())
return escapeForFileName(column.getStorageName()) + "." + column.getSubcolumnName();
return escapeForFileName(column.name);
}
static DataTypePtr create(const ASTPtr & arguments)
{

View File

@ -97,6 +97,9 @@ public:
size_t getSizeOfValueInMemory() const override;
bool onlyNull() const override;
bool canBeInsideLowCardinality() const override { return nested_data_type->canBeInsideLowCardinality(); }
DataTypePtr getSubcolumnType(const String & subcolumn_name) const override;
std::vector<String> getSubcolumnNames() const override;
String getEscapedFileName(const NameAndTypePair & column) const override;
const DataTypePtr & getNestedType() const { return nested_data_type; }

View File

@ -529,6 +529,16 @@ size_t DataTypeTuple::getSizeOfValueInMemory() const
return res;
}
DataTypePtr DataTypeTuple::getSubcolumnType(const String & subcolumn_name) const
{
return elems[getPositionByName(subcolumn_name)];
}
std::vector<String> DataTypeTuple::getSubcolumnNames() const
{
return names;
}
static DataTypePtr create(const ASTPtr & arguments)
{

View File

@ -94,6 +94,8 @@ public:
bool haveMaximumSizeOfValue() const override;
size_t getMaximumSizeOfValueInMemory() const override;
size_t getSizeOfValueInMemory() const override;
DataTypePtr getSubcolumnType(const String & subcolumn_name) const override;
std::vector<String> getSubcolumnNames() const override;
const DataTypes & getElements() const { return elems; }
const Strings & getElementNames() const { return names; }

View File

@ -10,7 +10,6 @@
#include <DataTypes/DataTypeCustom.h>
#include <DataTypes/NestedUtils.h>
namespace DB
{
@ -93,6 +92,20 @@ size_t IDataType::getSizeOfValueInMemory() const
throw Exception("Value of type " + getName() + " in memory is not of fixed size.", ErrorCodes::LOGICAL_ERROR);
}
String IDataType::getEscapedFileName(const NameAndTypePair & column) const
{
return escapeForFileName(column.name);
}
String IDataType::getFileNameForStream(const NameAndTypePair & column, const IDataType::SubstreamPath & path)
{
if (!column.isSubcolumn())
return getFileNameForStream(column.name, path);
auto stream_name = column.getStorageType()->getEscapedFileName(column);
return getFileNameForStreamImpl(std::move(stream_name), path);
}
String IDataType::getFileNameForStream(const String & column_name, const IDataType::SubstreamPath & path)
{
@ -105,8 +118,13 @@ String IDataType::getFileNameForStream(const String & column_name, const IDataTy
&& path[0].type == IDataType::Substream::ArraySizes
&& nested_table_name != column_name;
auto stream_name = escapeForFileName(is_sizes_of_nested_type ? nested_table_name : column_name);
return getFileNameForStreamImpl(std::move(stream_name), path);
}
String IDataType::getFileNameForStreamImpl(String stream_name, const IDataType::SubstreamPath & path)
{
size_t array_level = 0;
String stream_name = escapeForFileName(is_sizes_of_nested_type ? nested_table_name : column_name);
for (const Substream & elem : path)
{
if (elem.type == Substream::NullMap)
@ -126,6 +144,7 @@ String IDataType::getFileNameForStream(const String & column_name, const IDataTy
else if (elem.type == Substream::DictionaryKeys)
stream_name += ".dict";
}
return stream_name;
}

View File

@ -27,6 +27,8 @@ using DataTypes = std::vector<DataTypePtr>;
class ProtobufReader;
class ProtobufWriter;
struct NameAndTypePair;
/** Properties of data type.
* Contains methods for serialization/deserialization.
@ -227,6 +229,9 @@ public:
virtual void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const = 0;
virtual void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const = 0;
virtual DataTypePtr getSubcolumnType(const String & /* subcolumn_path */) const { return nullptr; }
virtual std::vector<String> getSubcolumnNames() const { return {}; }
/** Text serialization with escaping but without quoting.
*/
void serializeAsTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const;
@ -437,9 +442,12 @@ public:
/// Strings, Numbers, Date, DateTime, Nullable
virtual bool canBeInsideLowCardinality() const { return false; }
virtual String getEscapedFileName(const NameAndTypePair & column) const;
/// Updates avg_value_size_hint for newly read column. Uses to optimize deserialization. Zero expected for first column.
static void updateAvgValueSizeHint(const IColumn & column, double & avg_value_size_hint);
static String getFileNameForStream(const NameAndTypePair & column, const SubstreamPath & path);
static String getFileNameForStream(const String & column_name, const SubstreamPath & path);
private:
@ -451,6 +459,8 @@ private:
mutable DataTypeCustomNamePtr custom_name;
mutable DataTypeCustomTextSerializationPtr custom_text_serialization;
static String getFileNameForStreamImpl(String stream_name, const SubstreamPath & path);
public:
const IDataTypeCustomName * getCustomName() const { return custom_name.get(); }
};

View File

@ -373,7 +373,9 @@ void TreeRewriterResult::collectSourceColumns(bool add_special)
{
const ColumnsDescription & columns = metadata_snapshot->getColumns();
auto columns_from_storage = add_special ? columns.getAll() : columns.getAllPhysical();
UNUSED(add_special);
// auto columns_from_storage = add_special ? columns.getAll() : columns.getAllPhysical();
auto columns_from_storage = columns.getAllWithSubcolumns();
if (source_columns.empty())
source_columns.swap(columns_from_storage);
else

View File

@ -23,6 +23,7 @@
#include <Interpreters/Context.h>
#include <Storages/IStorage.h>
#include <Common/typeid_cast.h>
#include <Common/StringUtils/StringUtils.h>
#include <Core/Defines.h>
#include <Compression/CompressionFactory.h>
#include <Interpreters/ExpressionAnalyzer.h>
@ -322,6 +323,19 @@ NamesAndTypesList ColumnsDescription::getAll() const
return ret;
}
NamesAndTypesList ColumnsDescription::getAllWithSubcolumns() const
{
NamesAndTypesList ret;
for (const auto & col : columns)
{
ret.emplace_back(col.name, col.type);
for (const auto & subcolumn : col.type->getSubcolumnNames())
ret.emplace_back(col.name, subcolumn, col.type, col.type->getSubcolumnType(subcolumn));
}
return ret;
}
bool ColumnsDescription::has(const String & column_name) const
{
@ -371,13 +385,44 @@ NameAndTypePair ColumnsDescription::getPhysical(const String & column_name) cons
return NameAndTypePair(it->name, it->type);
}
NameAndTypePair ColumnsDescription::getPhysicalOrSubcolumn(const String & column_name) const
{
auto it = columns.get<1>().find(column_name);
if (it != columns.get<1>().end() && it->default_desc.kind != ColumnDefaultKind::Alias)
return NameAndTypePair(it->name, it->type);
std::optional<NameAndTypePair> res;
for (const auto & storage_column : columns)
{
if (startsWith(column_name, storage_column.name))
{
ReadBufferFromString buf(column_name);
if (checkString(storage_column.name, buf) && checkChar('.', buf))
{
String subcolumn_name;
readString(subcolumn_name, buf);
auto subcolumn_type = storage_column.type->getSubcolumnType(subcolumn_name);
if (subcolumn_type)
{
res.emplace(storage_column.name, subcolumn_name, storage_column.type, subcolumn_type);
break;
}
}
}
}
if (!res)
throw Exception("There is no physical column or subcolumn " + column_name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
return *res;
}
bool ColumnsDescription::hasPhysical(const String & column_name) const
{
auto it = columns.get<1>().find(column_name);
return it != columns.get<1>().end() && it->default_desc.kind != ColumnDefaultKind::Alias;
}
ColumnDefaults ColumnsDescription::getDefaults() const
{
ColumnDefaults ret;

View File

@ -77,6 +77,7 @@ public:
NamesAndTypesList getAliases() const;
NamesAndTypesList getAllPhysical() const; /// ordinary + materialized.
NamesAndTypesList getAll() const; /// ordinary + materialized + aliases
NamesAndTypesList getAllWithSubcolumns() const;
using ColumnTTLs = std::unordered_map<String, ASTPtr>;
ColumnTTLs getColumnTTLs() const;
@ -106,6 +107,7 @@ public:
Names getNamesOfPhysical() const;
bool hasPhysical(const String & column_name) const;
NameAndTypePair getPhysical(const String & column_name) const;
NameAndTypePair getPhysicalOrSubcolumn(const String & column_name) const;
ColumnDefaults getDefaults() const; /// TODO: remove
bool hasDefault(const String & column_name) const;

View File

@ -366,7 +366,7 @@ String IMergeTreeDataPart::getColumnNameWithMinumumCompressedSize(const StorageM
if (alter_conversions.isColumnRenamed(column.name))
column_name = alter_conversions.getColumnOldName(column.name);
if (!hasColumnFiles(column_name, *column_type))
if (!hasColumnFiles(column))
continue;
const auto size = getColumnSize(column_name, *column_type).data_compressed;

View File

@ -324,7 +324,7 @@ public:
/// NOTE: Doesn't take column renames into account, if some column renames
/// take place, you must take original name of column for this part from
/// storage and pass it to this method.
virtual bool hasColumnFiles(const String & /* column */, const IDataType & /* type */) const{ return false; }
virtual bool hasColumnFiles(const NameAndTypePair & /* column */) const { return false; }
/// Calculate the total size of the entire directory with all the files
static UInt64 calculateTotalSizeOnDisk(const DiskPtr & disk_, const String & from);

View File

@ -53,7 +53,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
column.type->enumerateStreams(
[&](const IDataType::SubstreamPath & substream_path)
{
++stream_counts[IDataType::getFileNameForStream(column.name, substream_path)];
++stream_counts[IDataType::getFileNameForStream(column, substream_path)];
},
{});
}

View File

@ -17,7 +17,6 @@ NameSet injectRequiredColumns(const MergeTreeData & storage, const StorageMetada
{
NameSet required_columns{std::begin(columns), std::end(columns)};
NameSet injected_columns;
auto all_column_files_missing = true;
const auto & storage_columns = metadata_snapshot->getColumns();
@ -30,8 +29,10 @@ NameSet injectRequiredColumns(const MergeTreeData & storage, const StorageMetada
if (alter_conversions.isColumnRenamed(column_name_in_part))
column_name_in_part = alter_conversions.getColumnOldName(column_name_in_part);
auto column_in_storage = storage_columns.getPhysicalOrSubcolumn(column_name_in_part);
/// column has files and hence does not require evaluation
if (part->hasColumnFiles(column_name_in_part, *storage_columns.getPhysical(columns[i]).type))
if (part->hasColumnFiles(column_in_storage))
{
all_column_files_missing = false;
continue;
@ -256,7 +257,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
if (check_columns)
{
const NamesAndTypesList & physical_columns = metadata_snapshot->getColumns().getAllPhysical();
const NamesAndTypesList & physical_columns = metadata_snapshot->getColumns().getAllWithSubcolumns();
result.pre_columns = physical_columns.addTypes(pre_column_names);
result.columns = physical_columns.addTypes(column_names);
}

View File

@ -121,9 +121,9 @@ void MergeTreeDataPartCompact::loadIndexGranularity()
index_granularity.setInitialized();
}
bool MergeTreeDataPartCompact::hasColumnFiles(const String & column_name, const IDataType &) const
bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) const
{
if (!getColumnPosition(column_name))
if (!getColumnPosition(column.name))
return false;
auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION);

View File

@ -55,7 +55,7 @@ public:
bool isStoredOnDisk() const override { return true; }
bool hasColumnFiles(const String & column_name, const IDataType & type) const override;
bool hasColumnFiles(const NameAndTypePair & column) const override;
String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return DATA_FILE_NAME; }

View File

@ -41,7 +41,7 @@ public:
const MergeTreeIndexGranularity & computed_index_granularity) const override;
bool isStoredOnDisk() const override { return false; }
bool hasColumnFiles(const String & column_name, const IDataType & /* type */) const override { return !!getColumnPosition(column_name); }
bool hasColumnFiles(const NameAndTypePair & column) const override { return !!getColumnPosition(column.name); }
String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return ""; }
void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const override;
void makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const override;

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/MergeTreeReaderWide.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterWide.h>
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
#include <Core/NamesAndTypes.h>
namespace DB
@ -201,13 +202,13 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
}
}
bool MergeTreeDataPartWide::hasColumnFiles(const String & column_name, const IDataType & type) const
bool MergeTreeDataPartWide::hasColumnFiles(const NameAndTypePair & column) const
{
bool res = true;
type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
String file_name = IDataType::getFileNameForStream(column_name, substream_path);
String file_name = IDataType::getFileNameForStream(column, substream_path);
auto bin_checksum = checksums.files.find(file_name + ".bin");
auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension);

View File

@ -54,7 +54,7 @@ public:
~MergeTreeDataPartWide() override;
bool hasColumnFiles(const String & column, const IDataType & type) const override;
bool hasColumnFiles(const NameAndTypePair & column) const override;
private:
void checkConsistency(bool require_part_metadata) const override;

View File

@ -9,7 +9,6 @@
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
namespace DB
{
@ -50,7 +49,7 @@ MergeTreeReaderWide::MergeTreeReaderWide(
for (const NameAndTypePair & column : columns)
{
auto column_from_part = getColumnFromPart(column);
addStreams(column_from_part.name, *column_from_part.type, profile_callback_, clock_type_);
addStreams(column_from_part, profile_callback_, clock_type_);
}
}
catch (...)
@ -77,7 +76,8 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
auto name_and_type = columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{
auto [name, type] = getColumnFromPart(*name_and_type);
auto column_from_part = getColumnFromPart(*name_and_type);
const auto & [name, type] = column_from_part;
/// The column is already present in the block so we will append the values to the end.
bool append = res_columns[pos] != nullptr;
@ -114,7 +114,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
{
size_t column_size_before_reading = column->size();
readData(name, *type, *column, from_mark, continue_reading, max_rows_to_read, read_offsets);
readData(column_from_part, *column, from_mark, continue_reading, max_rows_to_read, read_offsets);
/// For elements of Nested, column_size_before_reading may be greater than column size
/// if offsets are not empty and were already read, but elements are empty.
@ -159,12 +159,12 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
return read_rows;
}
void MergeTreeReaderWide::addStreams(const String & name, const IDataType & type,
void MergeTreeReaderWide::addStreams(const NameAndTypePair & name_and_type,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
{
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
{
String stream_name = IDataType::getFileNameForStream(name, substream_path);
String stream_name = IDataType::getFileNameForStream(name_and_type, substream_path);
if (streams.count(stream_name))
return;
@ -186,12 +186,12 @@ void MergeTreeReaderWide::addStreams(const String & name, const IDataType & type
};
IDataType::SubstreamPath substream_path;
type.enumerateStreams(callback, substream_path);
name_and_type.type->enumerateStreams(callback, substream_path);
}
void MergeTreeReaderWide::readData(
const String & name, const IDataType & type, IColumn & column,
const NameAndTypePair & name_and_type, IColumn & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
bool with_offsets)
{
@ -203,7 +203,7 @@ void MergeTreeReaderWide::readData(
if (!with_offsets && substream_path.size() == 1 && substream_path[0].type == IDataType::Substream::ArraySizes)
return nullptr;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
String stream_name = IDataType::getFileNameForStream(name_and_type, substream_path);
auto it = streams.find(stream_name);
if (it == streams.end())
@ -223,20 +223,20 @@ void MergeTreeReaderWide::readData(
};
};
double & avg_value_size_hint = avg_value_size_hints[name];
double & avg_value_size_hint = avg_value_size_hints[name_and_type.name];
IDataType::DeserializeBinaryBulkSettings deserialize_settings;
deserialize_settings.avg_value_size_hint = avg_value_size_hint;
if (deserialize_binary_bulk_state_map.count(name) == 0)
if (deserialize_binary_bulk_state_map.count(name_and_type.name) == 0)
{
deserialize_settings.getter = get_stream_getter(true);
type.deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name]);
name_and_type.type->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name_and_type.name]);
}
deserialize_settings.getter = get_stream_getter(false);
deserialize_settings.continuous_reading = continue_reading;
auto & deserialize_state = deserialize_binary_bulk_state_map[name];
type.deserializeBinaryBulkWithMultipleStreams(column, max_rows_to_read, deserialize_settings, deserialize_state);
auto & deserialize_state = deserialize_binary_bulk_state_map[name_and_type.name];
name_and_type.type->deserializeBinaryBulkWithMultipleStreams(column, max_rows_to_read, deserialize_settings, deserialize_state);
IDataType::updateAvgValueSizeHint(column, avg_value_size_hint);
}

View File

@ -37,11 +37,11 @@ private:
FileStreams streams;
void addStreams(const String & name, const IDataType & type,
void addStreams(const NameAndTypePair & name_and_type,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
void readData(
const String & name, const IDataType & type, IColumn & column,
const NameAndTypePair & name_and_type, IColumn & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
bool with_offsets = true);
};

View File

@ -3,7 +3,10 @@
#include <sparsehash/dense_hash_map>
#include <sparsehash/dense_hash_set>
#include <Common/quoteString.h>
#include <Common/StringUtils/StringUtils.h>
#include <Core/ColumnWithTypeAndName.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
namespace DB
@ -256,7 +259,7 @@ Block StorageInMemoryMetadata::getSampleBlockForColumns(
std::unordered_map<String, DataTypePtr> columns_map;
NamesAndTypesList all_columns = getColumns().getAll();
NamesAndTypesList all_columns = getColumns().getAllWithSubcolumns();
for (const auto & elem : all_columns)
columns_map.emplace(elem.name, elem.type);
@ -445,7 +448,7 @@ namespace
void StorageInMemoryMetadata::check(const Names & column_names, const NamesAndTypesList & virtuals, const StorageID & storage_id) const
{
NamesAndTypesList available_columns = getColumns().getAllPhysical();
NamesAndTypesList available_columns = getColumns().getAllWithSubcolumns();
available_columns.insert(available_columns.end(), virtuals.begin(), virtuals.end());
const String list_of_columns = listOfColumns(available_columns);

View File

@ -107,7 +107,7 @@ private:
using DeserializeStates = std::map<String, DeserializeState>;
DeserializeStates deserialize_states;
void readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read);
void readData(const NameAndTypePair & name_and_type, IColumn & column, size_t max_rows_to_read);
};
@ -185,9 +185,9 @@ private:
using SerializeStates = std::map<String, SerializeState>;
SerializeStates serialize_states;
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenStreams & written_streams);
IDataType::OutputStreamGetter createStreamGetter(const NameAndTypePair & name_and_type, WrittenStreams & written_streams);
void writeData(const String & name, const IDataType & type, const IColumn & column,
void writeData(const NameAndTypePair & name_and_type, const IColumn & column,
MarksForColumns & out_marks,
WrittenStreams & written_streams);
@ -214,7 +214,7 @@ Chunk LogSource::generate()
try
{
readData(name_type.name, *name_type.type, *column, max_rows_to_read);
readData(name_type, *column, max_rows_to_read);
}
catch (Exception & e)
{
@ -244,15 +244,16 @@ Chunk LogSource::generate()
}
void LogSource::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read)
void LogSource::readData(const NameAndTypePair & name_and_type, IColumn & column, size_t max_rows_to_read)
{
IDataType::DeserializeBinaryBulkSettings settings; /// TODO Use avg_value_size_hint.
const auto & [name, type] = name_and_type;
auto create_string_getter = [&](bool stream_for_prefix)
{
return [&, stream_for_prefix] (const IDataType::SubstreamPath & path) -> ReadBuffer *
{
String stream_name = IDataType::getFileNameForStream(name, path);
String stream_name = IDataType::getFileNameForStream(name_and_type, path);
const auto & file_it = storage.files.find(stream_name);
if (storage.files.end() == file_it)
@ -271,11 +272,11 @@ void LogSource::readData(const String & name, const IDataType & type, IColumn &
if (deserialize_states.count(name) == 0)
{
settings.getter = create_string_getter(true);
type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]);
type->deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]);
}
settings.getter = create_string_getter(false);
type.deserializeBinaryBulkWithMultipleStreams(column, max_rows_to_read, settings, deserialize_states[name]);
type->deserializeBinaryBulkWithMultipleStreams(column, max_rows_to_read, settings, deserialize_states[name]);
}
@ -292,7 +293,7 @@ void LogBlockOutputStream::write(const Block & block)
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
writeData(column.name, *column.type, *column.column, marks, written_streams);
writeData(NameAndTypePair(column.name, column.type), *column.column, marks, written_streams);
}
writeMarks(std::move(marks));
@ -311,7 +312,7 @@ void LogBlockOutputStream::writeSuffix()
auto it = serialize_states.find(column.name);
if (it != serialize_states.end())
{
settings.getter = createStreamGetter(column.name, written_streams);
settings.getter = createStreamGetter(NameAndTypePair(column.name, column.type), written_streams);
column.type->serializeBinaryBulkStateSuffix(settings, it->second);
}
}
@ -337,12 +338,12 @@ void LogBlockOutputStream::writeSuffix()
}
IDataType::OutputStreamGetter LogBlockOutputStream::createStreamGetter(const String & name,
IDataType::OutputStreamGetter LogBlockOutputStream::createStreamGetter(const NameAndTypePair & name_and_type,
WrittenStreams & written_streams)
{
return [&] (const IDataType::SubstreamPath & path) -> WriteBuffer *
{
String stream_name = IDataType::getFileNameForStream(name, path);
String stream_name = IDataType::getFileNameForStream(name_and_type, path);
if (written_streams.count(stream_name))
return nullptr;
@ -355,14 +356,15 @@ IDataType::OutputStreamGetter LogBlockOutputStream::createStreamGetter(const Str
}
void LogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column,
void LogBlockOutputStream::writeData(const NameAndTypePair & name_and_type, const IColumn & column,
MarksForColumns & out_marks, WrittenStreams & written_streams)
{
IDataType::SerializeBinaryBulkSettings settings;
const auto & [name, type] = name_and_type;
type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
type->enumerateStreams([&] (const IDataType::SubstreamPath & path)
{
String stream_name = IDataType::getFileNameForStream(name, path);
String stream_name = IDataType::getFileNameForStream(name_and_type, path);
if (written_streams.count(stream_name))
return;
@ -371,18 +373,18 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
stream_name,
storage.disk,
storage.files[stream_name].data_file_path,
columns.getCodecOrDefault(name),
columns.getCodecOrDefault(name_and_type.name),
storage.max_compress_block_size);
}, settings.path);
settings.getter = createStreamGetter(name, written_streams);
settings.getter = createStreamGetter(name_and_type, written_streams);
if (serialize_states.count(name) == 0)
type.serializeBinaryBulkStatePrefix(settings, serialize_states[name]);
type->serializeBinaryBulkStatePrefix(settings, serialize_states[name]);
type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
type->enumerateStreams([&] (const IDataType::SubstreamPath & path)
{
String stream_name = IDataType::getFileNameForStream(name, path);
String stream_name = IDataType::getFileNameForStream(name_and_type, path);
if (written_streams.count(stream_name))
return;
@ -396,11 +398,11 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
out_marks.emplace_back(file.column_index, mark);
}, settings.path);
type.serializeBinaryBulkWithMultipleStreams(column, 0, 0, settings, serialize_states[name]);
type->serializeBinaryBulkWithMultipleStreams(column, 0, 0, settings, serialize_states[name]);
type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
type->enumerateStreams([&] (const IDataType::SubstreamPath & path)
{
String stream_name = IDataType::getFileNameForStream(name, path);
String stream_name = IDataType::getFileNameForStream(name_and_type, path);
if (!written_streams.emplace(stream_name).second)
return;
@ -469,7 +471,7 @@ StorageLog::StorageLog(
}
for (const auto & column : storage_metadata.getColumns().getAllPhysical())
addFiles(column.name, *column.type);
addFiles(column);
marks_file_path = table_path + DBMS_STORAGE_LOG_MARKS_FILE_NAME;
@ -479,15 +481,15 @@ StorageLog::StorageLog(
}
void StorageLog::addFiles(const String & column_name, const IDataType & type)
void StorageLog::addFiles(const NameAndTypePair & column)
{
if (files.end() != files.find(column_name))
throw Exception("Duplicate column with name " + column_name + " in constructor of StorageLog.",
if (files.end() != files.find(column.name))
throw Exception("Duplicate column with name " + column.name + " in constructor of StorageLog.",
ErrorCodes::DUPLICATE_COLUMN);
IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path)
{
String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
String stream_name = IDataType::getFileNameForStream(column, substream_path);
if (!files.count(stream_name))
{
@ -501,7 +503,7 @@ void StorageLog::addFiles(const String & column_name, const IDataType & type)
};
IDataType::SubstreamPath substream_path;
type.enumerateStreams(stream_callback, substream_path);
column.type->enumerateStreams(stream_callback, substream_path);
}
@ -573,7 +575,7 @@ void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_sn
disk->clearDirectory(table_path);
for (const auto & column : metadata_snapshot->getColumns().getAllPhysical())
addFiles(column.name, *column.type);
addFiles(column);
file_checker = FileChecker{disk, table_path + "sizes.json"};
marks_file_path = table_path + DBMS_STORAGE_LOG_MARKS_FILE_NAME;
@ -583,8 +585,7 @@ void StorageLog::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_sn
const StorageLog::Marks & StorageLog::getMarksWithRealRowCount(const StorageMetadataPtr & metadata_snapshot) const
{
/// There should be at least one physical column
const String column_name = metadata_snapshot->getColumns().getAllPhysical().begin()->name;
const auto column_type = metadata_snapshot->getColumns().getAllPhysical().begin()->type;
auto column = *metadata_snapshot->getColumns().getAllPhysical().begin();
String filename;
/** We take marks from first column.
@ -592,10 +593,10 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount(const StorageMeta
* (Example: for Array data type, first stream is array sizes; and number of array sizes is the number of arrays).
*/
IDataType::SubstreamPath substream_root_path;
column_type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
if (filename.empty())
filename = IDataType::getFileNameForStream(column_name, substream_path);
filename = IDataType::getFileNameForStream(column, substream_path);
}, substream_root_path);
Files::const_iterator it = files.find(filename);

View File

@ -8,6 +8,7 @@
#include <Storages/IStorage.h>
#include <Common/FileChecker.h>
#include <Common/escapeForFileName.h>
#include <Core/NamesAndTypes.h>
namespace DB
@ -92,7 +93,7 @@ private:
String marks_file_path;
/// The order of adding files should not change: it corresponds to the order of the columns in the marks file.
void addFiles(const String & column_name, const IDataType & type);
void addFiles(const NameAndTypePair & column);
bool loaded_marks = false;

View File

@ -102,7 +102,7 @@ private:
using DeserializeStates = std::map<String, DeserializeState>;
DeserializeStates deserialize_states;
void readData(const String & name, const IDataType & type, IColumn & column, UInt64 limit);
void readData(const NameAndTypePair & name_and_type, IColumn & column, UInt64 limit);
};
@ -169,8 +169,8 @@ private:
using WrittenStreams = std::set<String>;
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenStreams & written_streams);
void writeData(const String & name, const IDataType & type, const IColumn & column, WrittenStreams & written_streams);
IDataType::OutputStreamGetter createStreamGetter(const NameAndTypePair & column, WrittenStreams & written_streams);
void writeData(const NameAndTypePair & name_and_type, const IColumn & column, WrittenStreams & written_streams);
};
@ -199,7 +199,7 @@ Chunk TinyLogSource::generate()
try
{
readData(name_type.name, *name_type.type, *column, block_size);
readData(name_type, *column, block_size);
}
catch (Exception & e)
{
@ -222,12 +222,13 @@ Chunk TinyLogSource::generate()
}
void TinyLogSource::readData(const String & name, const IDataType & type, IColumn & column, UInt64 limit)
void TinyLogSource::readData(const NameAndTypePair & name_and_type, IColumn & column, UInt64 limit)
{
IDataType::DeserializeBinaryBulkSettings settings; /// TODO Use avg_value_size_hint.
const auto & [name, type] = name_and_type;
settings.getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer *
{
String stream_name = IDataType::getFileNameForStream(name, path);
String stream_name = IDataType::getFileNameForStream(name_and_type, path);
if (!streams.count(stream_name))
streams[stream_name] = std::make_unique<Stream>(storage.disk, storage.files[stream_name].data_file_path, max_read_buffer_size);
@ -236,19 +237,19 @@ void TinyLogSource::readData(const String & name, const IDataType & type, IColum
};
if (deserialize_states.count(name) == 0)
type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]);
type->deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]);
type.deserializeBinaryBulkWithMultipleStreams(column, limit, settings, deserialize_states[name]);
type->deserializeBinaryBulkWithMultipleStreams(column, limit, settings, deserialize_states[name]);
}
IDataType::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(
const String & name,
const NameAndTypePair & column,
WrittenStreams & written_streams)
{
return [&] (const IDataType::SubstreamPath & path) -> WriteBuffer *
{
String stream_name = IDataType::getFileNameForStream(name, path);
String stream_name = IDataType::getFileNameForStream(column, path);
if (!written_streams.insert(stream_name).second)
return nullptr;
@ -258,7 +259,7 @@ IDataType::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(
streams[stream_name] = std::make_unique<Stream>(
storage.disk,
storage.files[stream_name].data_file_path,
columns.getCodecOrDefault(name),
columns.getCodecOrDefault(column.name),
storage.max_compress_block_size);
return &streams[stream_name]->compressed;
@ -266,15 +267,16 @@ IDataType::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(
}
void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, WrittenStreams & written_streams)
void TinyLogBlockOutputStream::writeData(const NameAndTypePair & name_and_type, const IColumn & column, WrittenStreams & written_streams)
{
IDataType::SerializeBinaryBulkSettings settings;
settings.getter = createStreamGetter(name, written_streams);
const auto & [name, type] = name_and_type;
settings.getter = createStreamGetter(name_and_type, written_streams);
if (serialize_states.count(name) == 0)
type.serializeBinaryBulkStatePrefix(settings, serialize_states[name]);
type->serializeBinaryBulkStatePrefix(settings, serialize_states[name]);
type.serializeBinaryBulkWithMultipleStreams(column, 0, 0, settings, serialize_states[name]);
type->serializeBinaryBulkWithMultipleStreams(column, 0, 0, settings, serialize_states[name]);
}
@ -297,7 +299,7 @@ void TinyLogBlockOutputStream::writeSuffix()
auto it = serialize_states.find(column.name);
if (it != serialize_states.end())
{
settings.getter = createStreamGetter(column.name, written_streams);
settings.getter = createStreamGetter(NameAndTypePair(column.name, column.type), written_streams);
column.type->serializeBinaryBulkStateSuffix(settings, it->second);
}
}
@ -329,7 +331,7 @@ void TinyLogBlockOutputStream::write(const Block & block)
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
writeData(column.name, *column.type, *column.column, written_streams);
writeData(NameAndTypePair(column.name, column.type), *column.column, written_streams);
}
}
@ -375,7 +377,7 @@ StorageTinyLog::StorageTinyLog(
}
for (const auto & col : storage_metadata.getColumns().getAllPhysical())
addFiles(col.name, *col.type);
addFiles(col);
if (!attach)
for (const auto & file : files)
@ -383,15 +385,16 @@ StorageTinyLog::StorageTinyLog(
}
void StorageTinyLog::addFiles(const String & column_name, const IDataType & type)
void StorageTinyLog::addFiles(const NameAndTypePair & column)
{
if (files.end() != files.find(column_name))
throw Exception("Duplicate column with name " + column_name + " in constructor of StorageTinyLog.",
const auto & [name, type] = column;
if (files.end() != files.find(name))
throw Exception("Duplicate column with name " + name + " in constructor of StorageTinyLog.",
ErrorCodes::DUPLICATE_COLUMN);
IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path)
{
String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
String stream_name = IDataType::getFileNameForStream(column, substream_path);
if (!files.count(stream_name))
{
ColumnData column_data;
@ -401,7 +404,7 @@ void StorageTinyLog::addFiles(const String & column_name, const IDataType & type
};
IDataType::SubstreamPath substream_path;
type.enumerateStreams(stream_callback, substream_path);
type->enumerateStreams(stream_callback, substream_path);
}
@ -461,7 +464,7 @@ void StorageTinyLog::truncate(
file_checker = FileChecker{disk, table_path + "sizes.json"};
for (const auto & column : metadata_snapshot->getColumns().getAllPhysical())
addFiles(column.name, *column.type);
addFiles(column);
}
void StorageTinyLog::drop()

View File

@ -74,7 +74,7 @@ private:
Poco::Logger * log;
void addFiles(const String & column_name, const IDataType & type);
void addFiles(const NameAndTypePair & column);
};
}

View File

@ -0,0 +1,21 @@
====array====
1
0
3
2
2
====tuple====
foo
bar
baz
1
2
42
2
2
====nullable====
0
1
0
1
2

View File

@ -0,0 +1,44 @@
SELECT '====array====';
DROP TABLE IF EXISTS t_arr;
CREATE TABLE t_arr (a Array(UInt32)) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO t_arr VALUES ([1]) ([]) ([1, 2, 3]) ([1, 2]);
SYSTEM DROP MARK CACHE;
SELECT a.size0 FROM t_arr;
SYSTEM FLUSH LOGS;
SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'FileOpen')]
FROM system.query_log
WHERE (type = 'QueryFinish') AND (lower(query) LIKE lower('SELECT a.size0 FROM %t_arr%'))
AND event_time > now() - INTERVAL 10 SECOND AND current_database = currentDatabase();
SELECT '====tuple====';
DROP TABLE IF EXISTS t_tup;
CREATE TABLE t_tup (t Tuple(s String, u UInt32)) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO t_tup VALUES (('foo', 1)) (('bar', 2)) (('baz', 42));
SYSTEM DROP MARK CACHE;
SELECT t.s FROM t_tup;
SYSTEM DROP MARK CACHE;
SELECT t.u FROM t_tup;
SYSTEM FLUSH LOGS;
SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'FileOpen')]
FROM system.query_log
WHERE (type = 'QueryFinish') AND (lower(query) LIKE lower('SELECT t._ FROM %t_tup%'))
AND event_time > now() - INTERVAL 10 SECOND AND current_database = currentDatabase();
SELECT '====nullable====';
DROP TABLE IF EXISTS t_nul;
CREATE TABLE t_nul (n Nullable(UInt32)) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO t_nul VALUES (1) (NULL) (2) (NULL);
SYSTEM DROP MARK CACHE;
SELECT n.null FROM t_nul;
SYSTEM FLUSH LOGS;
SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'FileOpen')]
FROM system.query_log
WHERE (type = 'QueryFinish') AND (lower(query) LIKE lower('SELECT n.null FROM %t_nul%'))
AND event_time > now() - INTERVAL 10 SECOND AND current_database = currentDatabase();