mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
allow to extract subcolumns from column
This commit is contained in:
parent
80cd2523ce
commit
cbe12a532e
@ -509,17 +509,25 @@ DataTypePtr DataTypeArray::getSubcolumnType(const String & subcolumn_name) const
|
||||
if (checkString("size", buf) && tryReadIntText(dim, buf) && dim < getNumberOfDimensions())
|
||||
return std::make_shared<DataTypeUInt64>();
|
||||
|
||||
return nullptr;
|
||||
return std::make_shared<DataTypeArray>(nested->getSubcolumnType(subcolumn_name));
|
||||
}
|
||||
|
||||
std::vector<String> DataTypeArray::getSubcolumnNames() const
|
||||
MutableColumnPtr DataTypeArray::getSubcolumn(const String & subcolumn_name, IColumn & column) const
|
||||
{
|
||||
size_t num_of_dimentions = getNumberOfDimensions();
|
||||
std::vector<String> res(num_of_dimentions);
|
||||
for (size_t i = 0; i < num_of_dimentions; ++i)
|
||||
res[i] = "size" + std::to_string(i);
|
||||
return getSubcolumnImpl(subcolumn_name, column, 0);
|
||||
}
|
||||
|
||||
return res;
|
||||
MutableColumnPtr DataTypeArray::getSubcolumnImpl(const String & subcolumn_name, IColumn & column, size_t level) const
|
||||
{
|
||||
auto & column_array = assert_cast<ColumnArray &>(column);
|
||||
if (subcolumn_name == "size" + std::to_string(level))
|
||||
return column_array.getOffsetsPtr()->assumeMutable();
|
||||
|
||||
if (const auto * nested_array = typeid_cast<const DataTypeArray *>(nested.get()))
|
||||
return nested_array->getSubcolumnImpl(subcolumn_name, column, level + 1);
|
||||
|
||||
auto subcolumn = nested->getSubcolumn(subcolumn_name, column_array.getData());
|
||||
return ColumnArray::create(std::move(subcolumn), column_array.getOffsetsPtr()->assumeMutable());
|
||||
}
|
||||
|
||||
String DataTypeArray::getEscapedFileName(const NameAndTypePair & column) const
|
||||
|
@ -112,13 +112,17 @@ public:
|
||||
}
|
||||
|
||||
DataTypePtr getSubcolumnType(const String & subcolumn_name) const override;
|
||||
std::vector<String> getSubcolumnNames() const override;
|
||||
MutableColumnPtr getSubcolumn(const String & subcolumn_name, IColumn & column) const override;
|
||||
|
||||
String getEscapedFileName(const NameAndTypePair & column) const override;
|
||||
|
||||
const DataTypePtr & getNestedType() const { return nested; }
|
||||
|
||||
/// 1 for plain array, 2 for array of arrays and so on.
|
||||
size_t getNumberOfDimensions() const;
|
||||
|
||||
private:
|
||||
MutableColumnPtr getSubcolumnImpl(const String & subcolumn_name, IColumn & column, size_t level) const;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ public:
|
||||
TypeIndex getTypeId() const override { return TypeIndex::LowCardinality; }
|
||||
|
||||
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;
|
||||
DataTypePtr getSubcolumnType(const String & /* subcolumn_name */) const override { return shared_from_this(); }
|
||||
|
||||
void serializeBinaryBulkStatePrefix(
|
||||
SerializeBinaryBulkSettings & settings,
|
||||
|
@ -529,15 +529,20 @@ bool DataTypeNullable::equals(const IDataType & rhs) const
|
||||
|
||||
DataTypePtr DataTypeNullable::getSubcolumnType(const String & subcolumn_name) const
|
||||
{
|
||||
std::cerr << "(DataTypeNullable::getSubcolumnType) subcolumn_name: " << subcolumn_name << "\n";
|
||||
if (subcolumn_name == "null")
|
||||
return std::make_shared<DataTypeUInt8>();
|
||||
|
||||
return nullptr;
|
||||
return nested_data_type->getSubcolumnType(subcolumn_name);
|
||||
}
|
||||
|
||||
std::vector<String> DataTypeNullable::getSubcolumnNames() const
|
||||
MutableColumnPtr DataTypeNullable::getSubcolumn(const String & subcolumn_name, IColumn & column) const
|
||||
{
|
||||
return {"null"};
|
||||
auto & column_nullable = assert_cast<ColumnNullable &>(column);
|
||||
if (subcolumn_name == "null")
|
||||
return column_nullable.getNullMapColumnPtr()->assumeMutable();
|
||||
|
||||
return nested_data_type->getSubcolumn(subcolumn_name, column_nullable.getNestedColumn());
|
||||
}
|
||||
|
||||
String DataTypeNullable::getEscapedFileName(const NameAndTypePair & column) const
|
||||
|
@ -98,7 +98,7 @@ public:
|
||||
bool onlyNull() const override;
|
||||
bool canBeInsideLowCardinality() const override { return nested_data_type->canBeInsideLowCardinality(); }
|
||||
DataTypePtr getSubcolumnType(const String & subcolumn_name) const override;
|
||||
std::vector<String> getSubcolumnNames() const override;
|
||||
MutableColumnPtr getSubcolumn(const String & subcolumn_name, IColumn & column) const override;
|
||||
String getEscapedFileName(const NameAndTypePair & column) const override;
|
||||
|
||||
const DataTypePtr & getNestedType() const { return nested_data_type; }
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
@ -31,6 +32,7 @@ namespace ErrorCodes
|
||||
extern const int DUPLICATE_COLUMN;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
}
|
||||
|
||||
|
||||
@ -419,6 +421,14 @@ void DataTypeTuple::deserializeBinaryBulkWithMultipleStreams(
|
||||
settings.path.pop_back();
|
||||
}
|
||||
|
||||
String DataTypeTuple::getEscapedFileName(const NameAndTypePair & column) const
|
||||
{
|
||||
if (column.isSubcolumn())
|
||||
return escapeForFileName(column.getStorageName()) + "%2E" + column.getSubcolumnName();
|
||||
|
||||
return escapeForFileName(column.name);
|
||||
}
|
||||
|
||||
void DataTypeTuple::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
|
||||
{
|
||||
for (; value_index < elems.size(); ++value_index)
|
||||
@ -531,12 +541,40 @@ size_t DataTypeTuple::getSizeOfValueInMemory() const
|
||||
|
||||
DataTypePtr DataTypeTuple::getSubcolumnType(const String & subcolumn_name) const
|
||||
{
|
||||
return elems[getPositionByName(subcolumn_name)];
|
||||
for (size_t i = 0; i < names.size(); ++i)
|
||||
{
|
||||
if (startsWith(subcolumn_name, names[i]))
|
||||
{
|
||||
size_t name_length = names[i].size();
|
||||
if (subcolumn_name.size() == name_length)
|
||||
return elems[i];
|
||||
|
||||
if (subcolumn_name[name_length] == '.')
|
||||
return elems[i]->getSubcolumnType(subcolumn_name.substr(name_length + 1));
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<String> DataTypeTuple::getSubcolumnNames() const
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
|
||||
}
|
||||
|
||||
MutableColumnPtr DataTypeTuple::getSubcolumn(const String & subcolumn_name, IColumn & column) const
|
||||
{
|
||||
return names;
|
||||
for (size_t i = 0; i < names.size(); ++i)
|
||||
{
|
||||
if (startsWith(subcolumn_name, names[i]))
|
||||
{
|
||||
size_t name_length = names[i].size();
|
||||
auto & subcolumn = extractElementColumn(column, i);
|
||||
|
||||
if (subcolumn_name.size() == name_length)
|
||||
return subcolumn.assumeMutable();
|
||||
|
||||
if (subcolumn_name[name_length] == '.')
|
||||
return elems[i]->getSubcolumn(subcolumn_name.substr(name_length + 1), subcolumn);
|
||||
}
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
|
||||
}
|
||||
|
||||
|
||||
|
@ -94,8 +94,10 @@ public:
|
||||
bool haveMaximumSizeOfValue() const override;
|
||||
size_t getMaximumSizeOfValueInMemory() const override;
|
||||
size_t getSizeOfValueInMemory() const override;
|
||||
|
||||
DataTypePtr getSubcolumnType(const String & subcolumn_name) const override;
|
||||
std::vector<String> getSubcolumnNames() const override;
|
||||
MutableColumnPtr getSubcolumn(const String & subcolumn_name, IColumn & column) const override;
|
||||
String getEscapedFileName(const NameAndTypePair & column) const override;
|
||||
|
||||
const DataTypes & getElements() const { return elems; }
|
||||
const Strings & getElementNames() const { return names; }
|
||||
|
@ -18,6 +18,7 @@ namespace ErrorCodes
|
||||
extern const int MULTIPLE_STREAMS_REQUIRED;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int DATA_TYPE_CANNOT_BE_PROMOTED;
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
}
|
||||
|
||||
IDataType::IDataType() : custom_name(nullptr), custom_text_serialization(nullptr)
|
||||
@ -92,19 +93,83 @@ size_t IDataType::getSizeOfValueInMemory() const
|
||||
throw Exception("Value of type " + getName() + " in memory is not of fixed size.", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
DataTypePtr IDataType::getSubcolumnType(const String & subcolumn_name) const
|
||||
{
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
|
||||
}
|
||||
|
||||
MutableColumnPtr IDataType::getSubcolumn(const String & subcolumn_name, IColumn &) const
|
||||
{
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in type {}", subcolumn_name, getName());
|
||||
}
|
||||
|
||||
std::vector<String> IDataType::getSubcolumnNames() const
|
||||
{
|
||||
std::vector<String> res;
|
||||
enumerateStreams([&res](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
|
||||
{
|
||||
auto subcolumn_name = IDataType::getSubcolumnNameForStream("", substream_path);
|
||||
if (!subcolumn_name.empty())
|
||||
res.push_back(subcolumn_name.substr(1)); // It starts with a dot.
|
||||
});
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
String IDataType::getEscapedFileName(const NameAndTypePair & column) const
|
||||
{
|
||||
return escapeForFileName(column.name);
|
||||
}
|
||||
|
||||
static String getNameForSubstreamPath(
|
||||
String stream_name,
|
||||
const IDataType::SubstreamPath & path,
|
||||
const String & tuple_element_delimeter = ".")
|
||||
{
|
||||
size_t array_level = 0;
|
||||
for (const auto & elem : path)
|
||||
{
|
||||
if (elem.type == IDataType::Substream::NullMap)
|
||||
stream_name += ".null";
|
||||
else if (elem.type == IDataType::Substream::ArraySizes)
|
||||
stream_name += ".size" + toString(array_level);
|
||||
else if (elem.type == IDataType::Substream::ArrayElements)
|
||||
++array_level;
|
||||
else if (elem.type == IDataType::Substream::TupleElement)
|
||||
stream_name += tuple_element_delimeter + escapeForFileName(elem.tuple_element_name);
|
||||
else if (elem.type == IDataType::Substream::DictionaryKeys)
|
||||
stream_name += ".dict";
|
||||
}
|
||||
|
||||
return stream_name;
|
||||
}
|
||||
|
||||
|
||||
/// FIXME: rewrite it.
|
||||
String IDataType::getFileNameForStream(const NameAndTypePair & column, const IDataType::SubstreamPath & path)
|
||||
{
|
||||
|
||||
if (!column.isSubcolumn())
|
||||
return getFileNameForStream(column.name, path);
|
||||
|
||||
auto stream_name = column.getStorageType()->getEscapedFileName(column);
|
||||
return getFileNameForStreamImpl(std::move(stream_name), path);
|
||||
String storage_name = column.getStorageName();
|
||||
String nested_table_name = Nested::extractTableName(storage_name);
|
||||
|
||||
bool is_sizes_of_nested_type =
|
||||
(path.size() == 1 && path[0].type == IDataType::Substream::ArraySizes
|
||||
&& nested_table_name != storage_name) || column.getSubcolumnName() == "size0";
|
||||
|
||||
String stream_name;
|
||||
if (is_sizes_of_nested_type)
|
||||
{
|
||||
if (column.getSubcolumnName() == "size0")
|
||||
return escapeForFileName(nested_table_name) + ".size0";
|
||||
|
||||
stream_name = escapeForFileName(Nested::extractTableName(storage_name));
|
||||
}
|
||||
else
|
||||
stream_name = column.getStorageType()->getEscapedFileName(column);
|
||||
|
||||
return getNameForSubstreamPath(std::move(stream_name), path, "%2E");
|
||||
}
|
||||
|
||||
String IDataType::getFileNameForStream(const String & column_name, const IDataType::SubstreamPath & path)
|
||||
@ -119,36 +184,19 @@ String IDataType::getFileNameForStream(const String & column_name, const IDataTy
|
||||
&& nested_table_name != column_name;
|
||||
|
||||
auto stream_name = escapeForFileName(is_sizes_of_nested_type ? nested_table_name : column_name);
|
||||
return getFileNameForStreamImpl(std::move(stream_name), path);
|
||||
}
|
||||
|
||||
String IDataType::getFileNameForStreamImpl(String stream_name, const IDataType::SubstreamPath & path)
|
||||
{
|
||||
size_t array_level = 0;
|
||||
for (const Substream & elem : path)
|
||||
{
|
||||
if (elem.type == Substream::NullMap)
|
||||
stream_name += ".null";
|
||||
else if (elem.type == Substream::ArraySizes)
|
||||
stream_name += ".size" + toString(array_level);
|
||||
else if (elem.type == Substream::ArrayElements)
|
||||
++array_level;
|
||||
else if (elem.type == Substream::TupleElement)
|
||||
{
|
||||
/// For compatibility reasons, we use %2E instead of dot.
|
||||
/// Because nested data may be represented not by Array of Tuple,
|
||||
/// but by separate Array columns with names in a form of a.b,
|
||||
/// and name is encoded as a whole.
|
||||
stream_name += "%2E" + escapeForFileName(elem.tuple_element_name);
|
||||
}
|
||||
else if (elem.type == Substream::DictionaryKeys)
|
||||
stream_name += ".dict";
|
||||
return getNameForSubstreamPath(std::move(stream_name), path, "%2E");
|
||||
}
|
||||
|
||||
return stream_name;
|
||||
String IDataType::getSubcolumnNameForStream(String stream_name, const SubstreamPath & path)
|
||||
{
|
||||
return getNameForSubstreamPath(std::move(stream_name), path);
|
||||
}
|
||||
|
||||
|
||||
bool IDataType::isSpecialCompressionAllowed(const SubstreamPath & path)
|
||||
{
|
||||
for (const Substream & elem : path)
|
||||
|
@ -37,7 +37,7 @@ struct NameAndTypePair;
|
||||
*
|
||||
* DataType is totally immutable object. You can always share them.
|
||||
*/
|
||||
class IDataType : private boost::noncopyable
|
||||
class IDataType : private boost::noncopyable, public std::enable_shared_from_this<IDataType>
|
||||
{
|
||||
public:
|
||||
IDataType();
|
||||
@ -115,6 +115,10 @@ public:
|
||||
void enumerateStreams(const StreamCallback & callback, SubstreamPath && path) const { enumerateStreams(callback, path); }
|
||||
void enumerateStreams(const StreamCallback & callback) const { enumerateStreams(callback, {}); }
|
||||
|
||||
virtual DataTypePtr getSubcolumnType(const String & subcolumn_name) const;
|
||||
virtual MutableColumnPtr getSubcolumn(const String & subcolumn_name, IColumn & column) const;
|
||||
std::vector<String> getSubcolumnNames() const;
|
||||
|
||||
using OutputStreamGetter = std::function<WriteBuffer*(const SubstreamPath &)>;
|
||||
using InputStreamGetter = std::function<ReadBuffer*(const SubstreamPath &)>;
|
||||
|
||||
@ -152,6 +156,8 @@ public:
|
||||
bool position_independent_encoding = true;
|
||||
/// If not zero, may be used to avoid reallocations while reading column of String type.
|
||||
double avg_value_size_hint = 0;
|
||||
|
||||
std::vector<MutableColumnPtr> temporary_column_holders;
|
||||
};
|
||||
|
||||
/// Call before serializeBinaryBulkWithMultipleStreams chain to write something before first mark.
|
||||
@ -230,9 +236,6 @@ public:
|
||||
virtual void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const = 0;
|
||||
virtual void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const = 0;
|
||||
|
||||
virtual DataTypePtr getSubcolumnType(const String & /* subcolumn_path */) const { return nullptr; }
|
||||
virtual std::vector<String> getSubcolumnNames() const { return {}; }
|
||||
|
||||
/** Text serialization with escaping but without quoting.
|
||||
*/
|
||||
void serializeAsTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const;
|
||||
@ -450,6 +453,7 @@ public:
|
||||
|
||||
static String getFileNameForStream(const NameAndTypePair & column, const SubstreamPath & path);
|
||||
static String getFileNameForStream(const String & column_name, const SubstreamPath & path);
|
||||
static String getSubcolumnNameForStream(String stream_name, const SubstreamPath & path);
|
||||
|
||||
/// Substream path supports special compression methods like codec Delta.
|
||||
/// For all other substreams (like ArraySizes, NullMasks, etc.) we use only
|
||||
@ -464,8 +468,6 @@ private:
|
||||
mutable DataTypeCustomNamePtr custom_name;
|
||||
mutable DataTypeCustomTextSerializationPtr custom_text_serialization;
|
||||
|
||||
static String getFileNameForStreamImpl(String stream_name, const SubstreamPath & path);
|
||||
|
||||
public:
|
||||
const IDataTypeCustomName * getCustomName() const { return custom_name.get(); }
|
||||
};
|
||||
|
@ -185,7 +185,7 @@ void ColumnsDescription::add(ColumnDescription column, const String & after_colu
|
||||
insert_it = range.second;
|
||||
}
|
||||
|
||||
addSubcolumns(NameAndTypePair(column.name, column.type));
|
||||
addSubcolumns(column.name, column.type);
|
||||
columns.get<0>().insert(insert_it, std::move(column));
|
||||
}
|
||||
|
||||
@ -519,12 +519,13 @@ ColumnsDescription ColumnsDescription::parse(const String & str)
|
||||
return result;
|
||||
}
|
||||
|
||||
void ColumnsDescription::addSubcolumns(NameAndTypePair storage_column)
|
||||
void ColumnsDescription::addSubcolumns(const String & storage_name, const DataTypePtr & storage_type)
|
||||
{
|
||||
for (const auto & subcolumn_name : storage_column.type->getSubcolumnNames())
|
||||
for (const auto & subcolumn_name : storage_type->getSubcolumnNames())
|
||||
{
|
||||
auto subcolumn = NameAndTypePair(storage_column.name, subcolumn_name,
|
||||
storage_column.type, storage_column.type->getSubcolumnType(subcolumn_name));
|
||||
std::cerr << "storage_name: " << storage_name << ", subcolumn_name: " << subcolumn_name << "\n";
|
||||
auto subcolumn = NameAndTypePair(storage_name, subcolumn_name,
|
||||
storage_type, storage_type->getSubcolumnType(subcolumn_name));
|
||||
|
||||
if (has(subcolumn.name))
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
|
||||
|
@ -147,7 +147,7 @@ private:
|
||||
SubcolumnsContainer subcolumns;
|
||||
|
||||
void modifyColumnOrder(const String & column_name, const String & after_column, bool first);
|
||||
void addSubcolumns(NameAndTypePair storage_column);
|
||||
void addSubcolumns(const String & storage_name, const DataTypePtr & storage_type);
|
||||
};
|
||||
|
||||
/// Validate default expressions and corresponding types compatibility, i.e.
|
||||
|
@ -191,6 +191,11 @@ std::optional<size_t> IMergeTreeDataPart::getColumnPosition(const String & colum
|
||||
return it->second;
|
||||
}
|
||||
|
||||
std::optional<size_t> IMergeTreeDataPart::getColumnPosition(const NameAndTypePair & column) const
|
||||
{
|
||||
return getColumnPosition(column.getStorageName());
|
||||
}
|
||||
|
||||
DayNum IMergeTreeDataPart::getMinDate() const
|
||||
{
|
||||
if (storage.minmax_idx_date_column_pos != -1 && minmax_idx.initialized)
|
||||
|
@ -142,6 +142,7 @@ public:
|
||||
/// take place, you must take original name of column for this part from
|
||||
/// storage and pass it to this method.
|
||||
std::optional<size_t> getColumnPosition(const String & column_name) const;
|
||||
std::optional<size_t> getColumnPosition(const NameAndTypePair & column) const;
|
||||
|
||||
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
|
||||
/// If no checksums are present returns the name of the first physically existing column.
|
||||
|
@ -265,7 +265,7 @@ IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const St
|
||||
{
|
||||
if (typeid_cast<const DataTypeArray *>(part_column.type.get()))
|
||||
{
|
||||
auto position = data_part->getColumnPosition(part_column.name);
|
||||
auto position = data_part->getColumnPosition(part_column);
|
||||
if (position && Nested::extractTableName(part_column.name) == table_name)
|
||||
return position;
|
||||
}
|
||||
|
@ -53,14 +53,14 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
|
||||
auto name_and_type = columns.begin();
|
||||
for (size_t i = 0; i < columns_num; ++i, ++name_and_type)
|
||||
{
|
||||
const auto & [name, type] = getColumnFromPart(*name_and_type);
|
||||
auto position = data_part->getColumnPosition(name);
|
||||
auto column_from_part = getColumnFromPart(*name_and_type);
|
||||
auto position = data_part->getColumnPosition(column_from_part);
|
||||
|
||||
if (!position && typeid_cast<const DataTypeArray *>(type.get()))
|
||||
if (!position && typeid_cast<const DataTypeArray *>(column_from_part.type.get()))
|
||||
{
|
||||
/// If array of Nested column is missing in part,
|
||||
/// we have to read its offsets if they exist.
|
||||
position = findColumnForOffsets(name);
|
||||
position = findColumnForOffsets(column_from_part.name);
|
||||
read_only_offsets[i] = (position != std::nullopt);
|
||||
}
|
||||
|
||||
@ -149,14 +149,14 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
|
||||
if (!res_columns[pos])
|
||||
continue;
|
||||
|
||||
auto [name, type] = getColumnFromPart(*name_and_type);
|
||||
auto column_from_part = getColumnFromPart(*name_and_type);
|
||||
auto & column = mutable_columns[pos];
|
||||
|
||||
try
|
||||
{
|
||||
size_t column_size_before_reading = column->size();
|
||||
|
||||
readData(name, *column, *type, from_mark, *column_positions[pos], rows_to_read, read_only_offsets[pos]);
|
||||
readData(column_from_part, *column, from_mark, *column_positions[pos], rows_to_read, read_only_offsets[pos]);
|
||||
|
||||
size_t read_rows_in_column = column->size() - column_size_before_reading;
|
||||
|
||||
@ -170,7 +170,7 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
|
||||
storage.reportBrokenPart(data_part->name);
|
||||
|
||||
/// Better diagnostics.
|
||||
e.addMessage("(while reading column " + name + ")");
|
||||
e.addMessage("(while reading column " + column_from_part.name + ")");
|
||||
throw;
|
||||
}
|
||||
catch (...)
|
||||
@ -199,9 +199,11 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
|
||||
}
|
||||
|
||||
void MergeTreeReaderCompact::readData(
|
||||
const String & name, IColumn & column, const IDataType & type,
|
||||
const NameAndTypePair & name_and_type, IColumn & column,
|
||||
size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets)
|
||||
{
|
||||
const auto & [name, type] = name_and_type;
|
||||
|
||||
if (!isContinuousReading(from_mark, column_position))
|
||||
seekToMark(from_mark, column_position);
|
||||
|
||||
@ -213,14 +215,29 @@ void MergeTreeReaderCompact::readData(
|
||||
return data_buffer;
|
||||
};
|
||||
|
||||
IDataType::DeserializeBinaryBulkStatePtr state;
|
||||
IDataType::DeserializeBinaryBulkSettings deserialize_settings;
|
||||
deserialize_settings.getter = buffer_getter;
|
||||
deserialize_settings.avg_value_size_hint = avg_value_size_hints[name];
|
||||
deserialize_settings.position_independent_encoding = true;
|
||||
|
||||
IDataType::DeserializeBinaryBulkStatePtr state;
|
||||
type.deserializeBinaryBulkStatePrefix(deserialize_settings, state);
|
||||
type.deserializeBinaryBulkWithMultipleStreams(column, rows_to_read, deserialize_settings, state);
|
||||
if (name_and_type.isSubcolumn())
|
||||
{
|
||||
const auto & storage_type = name_and_type.getStorageType();
|
||||
auto temp_column = storage_type->createColumn();
|
||||
|
||||
storage_type->deserializeBinaryBulkStatePrefix(deserialize_settings, state);
|
||||
storage_type->deserializeBinaryBulkWithMultipleStreams(*temp_column, rows_to_read, deserialize_settings, state);
|
||||
|
||||
auto subcolumn = storage_type->getSubcolumn(name_and_type.getSubcolumnName(), *temp_column);
|
||||
column.insertRangeFrom(*subcolumn, 0, subcolumn->size());
|
||||
}
|
||||
else
|
||||
{
|
||||
deserialize_settings.position_independent_encoding = true;
|
||||
type->deserializeBinaryBulkStatePrefix(deserialize_settings, state);
|
||||
type->deserializeBinaryBulkWithMultipleStreams(column, rows_to_read, deserialize_settings, state);
|
||||
}
|
||||
|
||||
/// The buffer is left in inconsistent state after reading single offsets
|
||||
if (only_offsets)
|
||||
|
@ -56,7 +56,7 @@ private:
|
||||
|
||||
void seekToMark(size_t row_index, size_t column_index);
|
||||
|
||||
void readData(const String & name, IColumn & column, const IDataType & type,
|
||||
void readData(const NameAndTypePair & name_and_type, IColumn & column,
|
||||
size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets = false);
|
||||
|
||||
/// Returns maximal value of granule size in compressed file from @mark_ranges.
|
||||
|
48
tests/queries/0_stateless/001475_read_subcolumns_2.sql
Normal file
48
tests/queries/0_stateless/001475_read_subcolumns_2.sql
Normal file
@ -0,0 +1,48 @@
|
||||
DROP TABLE IF EXISTS subcolumns;
|
||||
|
||||
CREATE TABLE subcolumns
|
||||
(
|
||||
t Tuple
|
||||
(
|
||||
a Array(Nullable(UInt32)),
|
||||
u UInt32,
|
||||
s Nullable(String)
|
||||
),
|
||||
arr Array(Nullable(String)),
|
||||
arr2 Array(Array(LowCardinality(Nullable(String)))),
|
||||
lc LowCardinality(String),
|
||||
nested Nested(col1 String, col2 Nullable(UInt32))
|
||||
)
|
||||
ENGINE = MergeTree order by tuple() SETTINGS min_bytes_for_wide_part = '10M';
|
||||
|
||||
INSERT INTO subcolumns VALUES (([1, NULL], 2, 'a'), ['foo', NULL, 'bar'], [['123'], ['456', '789']], 'qqqq', ['zzz', 'xxx'], [42, 43]);
|
||||
SELECT * FROM subcolumns;
|
||||
SELECT t.a, t.u, t.s, nested.col1, nested.col2, lc FROM subcolumns;
|
||||
SELECT t.a.size0, t.a.null, t.u, t.s, t.s.null FROM subcolumns;
|
||||
-- SELECT arr2, arr2.size0, arr2.size1, arr2.null FROM subcolumns;
|
||||
-- SELECT nested.col1, nested.col2, nested.col1.size0, nested.col2.size0, nested.col2.null FROM subcolumns;
|
||||
SELECT sumArray(arr.null), sum(arr.size0) FROM subcolumns;
|
||||
DROP TABLE IF EXISTS subcolumns;
|
||||
|
||||
CREATE TABLE subcolumns
|
||||
(
|
||||
t Tuple
|
||||
(
|
||||
a Array(Nullable(UInt32)),
|
||||
u UInt32,
|
||||
s Nullable(String)
|
||||
),
|
||||
arr Array(Nullable(String)),
|
||||
arr2 Array(Array(LowCardinality(Nullable(String)))),
|
||||
lc LowCardinality(String),
|
||||
nested Nested(col1 String, col2 Nullable(UInt32))
|
||||
)
|
||||
ENGINE = MergeTree order by tuple() SETTINGS min_bytes_for_wide_part = 0;
|
||||
|
||||
INSERT INTO subcolumns VALUES (([1, NULL], 2, 'a'), ['foo', NULL, 'bar'], [['123'], ['456', '789']], 'qqqq', ['zzz', 'xxx'], [42, 43]);
|
||||
SELECT * FROM subcolumns;
|
||||
SELECT t.a, t.u, t.s, nested.col1, nested.col2, lc FROM subcolumns;
|
||||
SELECT t.a.size0, t.a.null, t.u, t.s, t.s.null FROM subcolumns;
|
||||
-- SELECT arr2, arr2.size0, arr2.size1, arr2.null FROM subcolumns;
|
||||
-- SELECT nested.col1, nested.col2, nested.col1.size0, nested.col2.size0, nested.col2.null FROM subcolumns;
|
||||
SELECT sumArray(arr.null), sum(arr.size0) FROM subcolumns;
|
23
tests/queries/0_stateless/001475_read_subcolumns_storages.sh
Executable file
23
tests/queries/0_stateless/001475_read_subcolumns_storages.sh
Executable file
@ -0,0 +1,23 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
set -e
|
||||
|
||||
create_query="CREATE TABLE subcolumns(n Nullable(UInt32), a1 Array(UInt32),\
|
||||
a2 Array(Array(Array(UInt32))), a3 Array(Nullable(UInt32)), t Tuple(s String, v UInt32))"
|
||||
|
||||
declare -a ENGINES=("Log" "StripeLog" "TinyLog" "Memory" \
|
||||
"MergeTree ORDER BY tuple() SETTINGS min_bytes_for_compact_part='10M'"
|
||||
"MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part='10M'"
|
||||
"MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part=0")
|
||||
|
||||
for engine in "${ENGINES[@]}"; do
|
||||
echo $engine
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS subcolumns"
|
||||
$CLICKHOUSE_CLIENT --query "$create_query ENGINE = $engine"
|
||||
$CLICKHOUSE_CLIENT --query "INSERT INTO subcolumns VALUES (100, [1, 2, 3], [[[1, 2], [], [4]], [[5, 6], [7, 8]], [[]]], [1, NULL, 2], ('foo', 200))"
|
||||
$CLICKHOUSE_CLIENT --query "SELECT * FROM subcolumns"
|
||||
$CLICKHOUSE_CLIENT --query "SELECT n, n.null, a1, a1.size0, a2, a2.size0, a2.size1, a2.size2, a3, a3.size0, a3.null, t, t.s, t.v FROM subcolumns"
|
||||
done
|
Loading…
Reference in New Issue
Block a user