mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
dynamic subcolumns: wip
This commit is contained in:
parent
6240169bbb
commit
644df6be7d
@ -58,6 +58,7 @@ enum class TypeIndex
|
||||
AggregateFunction,
|
||||
LowCardinality,
|
||||
Map,
|
||||
Object,
|
||||
};
|
||||
#if !defined(__clang__)
|
||||
#pragma GCC diagnostic pop
|
||||
@ -273,6 +274,7 @@ inline constexpr const char * getTypeName(TypeIndex idx)
|
||||
case TypeIndex::AggregateFunction: return "AggregateFunction";
|
||||
case TypeIndex::LowCardinality: return "LowCardinality";
|
||||
case TypeIndex::Map: return "Map";
|
||||
case TypeIndex::Object: return "Object";
|
||||
}
|
||||
|
||||
__builtin_unreachable();
|
||||
|
@ -24,7 +24,7 @@ public:
|
||||
|
||||
const char * getFamilyName() const override { return "Object"; }
|
||||
String doGetName() const override;
|
||||
TypeIndex getTypeId() const override { return TypeIndex::Nothing; }
|
||||
TypeIndex getTypeId() const override { return TypeIndex::Object; }
|
||||
|
||||
MutableColumnPtr createColumn() const override { return ColumnObject::create(); }
|
||||
|
||||
|
@ -281,7 +281,9 @@ DataTypePtr DataTypeTuple::tryGetSubcolumnType(const String & subcolumn_name) co
|
||||
auto on_success = [&](size_t pos) { return elems[pos]; };
|
||||
auto on_continue = [&](size_t pos, const String & next_subcolumn) { return elems[pos]->tryGetSubcolumnType(next_subcolumn); };
|
||||
|
||||
return getSubcolumnEntity(subcolumn_name, on_success, on_continue);
|
||||
auto kek = getSubcolumnEntity(subcolumn_name, on_success, on_continue);
|
||||
std::cerr << "requested subcolumn: " << subcolumn_name << ", have: " << !!kek << "\n";
|
||||
return kek;
|
||||
}
|
||||
|
||||
ColumnPtr DataTypeTuple::getSubcolumn(const String & subcolumn_name, const IColumn & column) const
|
||||
|
@ -337,6 +337,7 @@ struct WhichDataType
|
||||
constexpr bool isMap() const {return idx == TypeIndex::Map; }
|
||||
constexpr bool isSet() const { return idx == TypeIndex::Set; }
|
||||
constexpr bool isInterval() const { return idx == TypeIndex::Interval; }
|
||||
constexpr bool isObject() const { return idx == TypeIndex::Object; }
|
||||
|
||||
constexpr bool isNothing() const { return idx == TypeIndex::Nothing; }
|
||||
constexpr bool isNullable() const { return idx == TypeIndex::Nullable; }
|
||||
@ -362,6 +363,7 @@ inline bool isDecimal(const DataTypePtr & data_type) { return WhichDataType(data
|
||||
inline bool isTuple(const DataTypePtr & data_type) { return WhichDataType(data_type).isTuple(); }
|
||||
inline bool isArray(const DataTypePtr & data_type) { return WhichDataType(data_type).isArray(); }
|
||||
inline bool isMap(const DataTypePtr & data_type) {return WhichDataType(data_type).isMap(); }
|
||||
inline bool isObject(const DataTypePtr & data_type) {return WhichDataType(data_type).isObject(); }
|
||||
|
||||
template <typename T>
|
||||
inline bool isUInt8(const T & data_type)
|
||||
|
@ -62,7 +62,7 @@ std::pair<std::string, std::string> splitName(const std::string & name)
|
||||
|
||||
++pos;
|
||||
|
||||
while (pos < end && isWordCharASCII(*pos))
|
||||
while (pos < end && (isWordCharASCII(*pos) || *pos == '.'))
|
||||
++pos;
|
||||
|
||||
if (pos != end)
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <DataTypes/DataTypeNothing.h>
|
||||
#include <DataTypes/FieldToDataType.h>
|
||||
#include <DataTypes/getLeastSupertype.h>
|
||||
#include <Columns/ColumnObject.h>
|
||||
#include <Columns/ColumnTuple.h>
|
||||
#include <Common/FieldVisitors.h>
|
||||
@ -67,4 +68,34 @@ void convertObjectsToTuples(NamesAndTypesList & columns_list, Block & block)
|
||||
}
|
||||
}
|
||||
|
||||
DataTypePtr getLeastCommonTypeForObject(const DataTypes & types)
|
||||
{
|
||||
std::unordered_map<String, DataTypes> subcolumns_types;
|
||||
for (const auto & type : types)
|
||||
{
|
||||
const auto * type_tuple = typeid_cast<const DataTypeTuple *>(type.get());
|
||||
if (!type_tuple)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Least common type for object can be deduced only from tuples, but {} given", type->getName());
|
||||
|
||||
const auto & tuple_names = type_tuple->getElementNames();
|
||||
const auto & tuple_types = type_tuple->getElements();
|
||||
assert(tuple_names.size() == tuple_type.size());
|
||||
|
||||
for (size_t i = 0; i < tuple_names.size(); ++i)
|
||||
subcolumns_types[tuple_names[i]].push_back(tuple_types[i]);
|
||||
}
|
||||
|
||||
Names tuple_names;
|
||||
DataTypes tuple_types;
|
||||
|
||||
for (const auto & [name, subtypes] : subcolumns_types)
|
||||
{
|
||||
tuple_names.push_back(name);
|
||||
tuple_types.push_back(getLeastSupertype(subtypes));
|
||||
}
|
||||
|
||||
return std::make_shared<DataTypeTuple>(tuple_types, tuple_names);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -8,5 +8,6 @@ namespace DB
|
||||
|
||||
DataTypePtr getDataTypeByColumn(const IColumn & column);
|
||||
void convertObjectsToTuples(NamesAndTypesList & columns_list, Block & block);
|
||||
DataTypePtr getLeastCommonTypeForObject(const DataTypes & types);
|
||||
|
||||
}
|
||||
|
@ -144,8 +144,10 @@ static String getNameForSubstreamPath(
|
||||
/// Because nested data may be represented not by Array of Tuple,
|
||||
/// but by separate Array columns with names in a form of a.b,
|
||||
/// and name is encoded as a whole.
|
||||
stream_name += (escape_tuple_delimiter && elem.escape_tuple_delimiter ?
|
||||
escapeForFileName(".") : ".") + escapeForFileName(elem.tuple_element_name);
|
||||
if (escape_tuple_delimiter && elem.escape_tuple_delimiter)
|
||||
stream_name += escapeForFileName(".") + escapeForFileName(elem.tuple_element_name);
|
||||
else
|
||||
stream_name += "." + elem.tuple_element_name;
|
||||
}
|
||||
else if (elem.type == Substream::ObjectElement)
|
||||
{
|
||||
|
@ -46,7 +46,7 @@ BlockIO InterpreterOptimizeQuery::execute()
|
||||
column_names.emplace_back(col->getColumnName());
|
||||
}
|
||||
|
||||
metadata_snapshot->check(column_names, NamesAndTypesList{}, table_id);
|
||||
metadata_snapshot->check(column_names, NamesAndTypesList{}, table_id, {});
|
||||
Names required_columns;
|
||||
{
|
||||
required_columns = metadata_snapshot->getColumnsRequiredForSortingKey();
|
||||
|
@ -475,7 +475,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
|
||||
}
|
||||
}
|
||||
|
||||
source_header = metadata_snapshot->getSampleBlockForColumns(required_columns, storage->getVirtuals(), storage->getStorageID());
|
||||
source_header = metadata_snapshot->getSampleBlockForColumns(
|
||||
required_columns, storage->getVirtuals(), storage->getStorageID(), storage->getExpandedObjects());
|
||||
}
|
||||
|
||||
/// Calculate structure of the result.
|
||||
@ -1736,7 +1737,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
|
||||
if (!query_plan.isInitialized())
|
||||
{
|
||||
auto header = metadata_snapshot->getSampleBlockForColumns(
|
||||
required_columns, storage->getVirtuals(), storage->getStorageID());
|
||||
required_columns, storage->getVirtuals(), storage->getStorageID(), storage->getExpandedObjects());
|
||||
addEmptySourceToQueryPlan(query_plan, header, query_info);
|
||||
}
|
||||
|
||||
|
@ -641,6 +641,12 @@ void TreeRewriterResult::collectSourceColumns(bool add_special)
|
||||
else
|
||||
columns_from_storage = add_special ? columns.getAll() : columns.getAllPhysical();
|
||||
|
||||
columns_from_storage = storage->expandObjectColumns(columns_from_storage, storage->supportsSubcolumns());
|
||||
|
||||
std::cerr << "columns_from_storage: ";
|
||||
for (const auto & col : columns_from_storage)
|
||||
std::cerr << col.dump() << "\n";
|
||||
|
||||
if (source_columns.empty())
|
||||
source_columns.swap(columns_from_storage);
|
||||
else
|
||||
|
@ -47,7 +47,9 @@ Block getHeaderForProcessingStage(
|
||||
{
|
||||
case QueryProcessingStage::FetchColumns:
|
||||
{
|
||||
Block header = metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID());
|
||||
Block header = metadata_snapshot->getSampleBlockForColumns(
|
||||
column_names, storage.getVirtuals(), storage.getStorageID(), storage.getExpandedObjects());
|
||||
|
||||
if (query_info.prewhere_info)
|
||||
{
|
||||
auto & prewhere_info = *query_info.prewhere_info;
|
||||
@ -75,7 +77,8 @@ Block getHeaderForProcessingStage(
|
||||
removeJoin(*query->as<ASTSelectQuery>());
|
||||
|
||||
auto stream = std::make_shared<OneBlockInputStream>(
|
||||
metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID()));
|
||||
metadata_snapshot->getSampleBlockForColumns(
|
||||
column_names, storage.getVirtuals(), storage.getStorageID(), storage.getExpandedObjects()));
|
||||
return InterpreterSelectQuery(query, context, stream, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ ReadFromMergeTree::ReadFromMergeTree(
|
||||
size_t num_streams_,
|
||||
ReadType read_type_)
|
||||
: ISourceStep(DataStream{.header = MergeTreeBaseSelectProcessor::transformHeader(
|
||||
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
|
||||
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID(), storage_.getExpandedObjects()),
|
||||
prewhere_info_,
|
||||
virt_column_names_)})
|
||||
, storage(storage_)
|
||||
|
@ -31,6 +31,7 @@ struct ColumnDescription
|
||||
{
|
||||
String name;
|
||||
DataTypePtr type;
|
||||
DataTypePtr expaneded_type;
|
||||
ColumnDefault default_desc;
|
||||
String comment;
|
||||
ASTPtr codec;
|
||||
|
@ -105,7 +105,7 @@ void IStorage::read(
|
||||
auto pipe = read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
if (pipe.empty())
|
||||
{
|
||||
auto header = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
|
||||
auto header = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID(), getExpandedObjects());
|
||||
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info);
|
||||
}
|
||||
else
|
||||
|
@ -524,6 +524,9 @@ public:
|
||||
/// Does not takes underlying Storage (if any) into account.
|
||||
virtual std::optional<UInt64> lifetimeBytes() const { return {}; }
|
||||
|
||||
virtual NamesAndTypesList expandObjectColumns(const NamesAndTypesList & columns_list, bool /*with_subcolumns*/) const { return columns_list; }
|
||||
virtual NamesAndTypesList getExpandedObjects() const { return {}; }
|
||||
|
||||
private:
|
||||
/// Lock required for alter queries (lockForAlter). Always taken for write
|
||||
/// (actually can be replaced with std::mutex, but for consistency we use
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
@ -90,6 +91,13 @@ NameSet injectRequiredColumns(const MergeTreeData & storage, const StorageMetada
|
||||
auto alter_conversions = storage.getAlterConversionsForPart(part);
|
||||
for (size_t i = 0; i < columns.size(); ++i)
|
||||
{
|
||||
auto name_in_storage = Nested::extractTableName(columns[i]);
|
||||
if (isObject(storage_columns.get(name_in_storage).type))
|
||||
{
|
||||
have_at_least_one_physical_column = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
/// We are going to fetch only physical columns
|
||||
if (!storage_columns.hasPhysicalOrSubcolumn(columns[i]))
|
||||
throw Exception("There is no physical column or subcolumn " + columns[i] + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
|
||||
@ -308,7 +316,9 @@ MergeTreeReadTaskColumns getReadTaskColumns(
|
||||
|
||||
if (check_columns)
|
||||
{
|
||||
const NamesAndTypesList & physical_columns = metadata_snapshot->getColumns().getAllWithSubcolumns();
|
||||
auto physical_columns = metadata_snapshot->getColumns().getAllWithSubcolumns();
|
||||
physical_columns = storage.expandObjectColumns(physical_columns, true);
|
||||
|
||||
result.pre_columns = physical_columns.addTypes(pre_column_names);
|
||||
result.columns = physical_columns.addTypes(column_names);
|
||||
}
|
||||
|
@ -8,6 +8,8 @@
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypeUUID.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <DataTypes/DataTypeObject.h>
|
||||
#include <DataTypes/ObjectUtils.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include <Functions/IFunction.h>
|
||||
@ -4479,6 +4481,97 @@ ReservationPtr MergeTreeData::balancedReservation(
|
||||
return reserved_space;
|
||||
}
|
||||
|
||||
static NameSet getNamesOfObjectColumns(const NamesAndTypesList & columns_list)
|
||||
{
|
||||
NameSet res;
|
||||
for (const auto & [name, type] : columns_list)
|
||||
if (isObject(type))
|
||||
res.insert(name);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
static NamesAndTypesList expandObjectColumnsImpl(
|
||||
const MergeTreeData::DataPartsVector & parts,
|
||||
const NamesAndTypesList & columns_list,
|
||||
const NameSet & requested_to_expand,
|
||||
bool with_subcolumns)
|
||||
{
|
||||
std::unordered_map<String, DataTypes> types_in_parts;
|
||||
|
||||
for (const auto & part : parts)
|
||||
{
|
||||
const auto & part_columns = part->getColumns();
|
||||
for (const auto & [name, type] : part_columns)
|
||||
{
|
||||
if (requested_to_expand.count(name))
|
||||
types_in_parts[name].push_back(type);
|
||||
}
|
||||
}
|
||||
|
||||
NamesAndTypesList result_columns;
|
||||
for (const auto & column : columns_list)
|
||||
{
|
||||
auto it = types_in_parts.find(column.name);
|
||||
if (it != types_in_parts.end())
|
||||
{
|
||||
auto expanded_type = getLeastCommonTypeForObject(it->second);
|
||||
result_columns.emplace_back(column.name, expanded_type);
|
||||
|
||||
std::cerr << "expanded_type: " << expanded_type->getName() << "\n";
|
||||
|
||||
if (with_subcolumns)
|
||||
{
|
||||
for (const auto & subcolumn : expanded_type->getSubcolumnNames())
|
||||
{
|
||||
result_columns.emplace_back(column.name, subcolumn,
|
||||
expanded_type, expanded_type->getSubcolumnType(subcolumn));
|
||||
std::cerr << "adding subcolumn: " << subcolumn << "\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
result_columns.push_back(column);
|
||||
}
|
||||
}
|
||||
|
||||
return result_columns;
|
||||
}
|
||||
|
||||
NamesAndTypesList MergeTreeData::expandObjectColumns(const DataPartsVector & parts, const NamesAndTypesList & columns_list, bool with_subcolumns) const
|
||||
{
|
||||
/// Firstly fast check if there are any Object columns.
|
||||
NameSet requested_to_expand = getNamesOfObjectColumns(columns_list);
|
||||
if (requested_to_expand.empty())
|
||||
return columns_list;
|
||||
|
||||
return expandObjectColumnsImpl(parts, columns_list, requested_to_expand, with_subcolumns);
|
||||
}
|
||||
|
||||
NamesAndTypesList MergeTreeData::expandObjectColumns(const NamesAndTypesList & columns_list, bool with_subcolumns) const
|
||||
{
|
||||
/// Firstly fast check if there are any Object columns.
|
||||
NameSet requested_to_expand = getNamesOfObjectColumns(columns_list);
|
||||
if (requested_to_expand.empty())
|
||||
return columns_list;
|
||||
|
||||
return expandObjectColumnsImpl(getDataPartsVector(), columns_list, requested_to_expand, with_subcolumns);
|
||||
}
|
||||
|
||||
NamesAndTypesList MergeTreeData::getExpandedObjects() const
|
||||
{
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
auto columns = metadata_snapshot->getColumns().getAllPhysical();
|
||||
|
||||
NamesAndTypesList result_columns;
|
||||
for (const auto & column : columns)
|
||||
if (isObject(column.type))
|
||||
result_columns.push_back(column);
|
||||
|
||||
return expandObjectColumns(result_columns, false);
|
||||
}
|
||||
|
||||
CurrentlySubmergingEmergingTagger::~CurrentlySubmergingEmergingTagger()
|
||||
{
|
||||
std::lock_guard lock(storage.currently_submerging_emerging_mutex);
|
||||
|
@ -617,6 +617,10 @@ public:
|
||||
return column_sizes;
|
||||
}
|
||||
|
||||
NamesAndTypesList expandObjectColumns(const NamesAndTypesList & columns_list, bool with_subcolumns) const override;
|
||||
NamesAndTypesList expandObjectColumns(const DataPartsVector & parts, const NamesAndTypesList & columns_list, bool with_subcolumns) const;
|
||||
NamesAndTypesList getExpandedObjects() const override;
|
||||
|
||||
/// For ATTACH/DETACH/DROP PARTITION.
|
||||
String getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr context) const;
|
||||
|
||||
|
@ -236,7 +236,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
|
||||
}
|
||||
// At this point, empty `part_values` means all parts.
|
||||
|
||||
metadata_snapshot->check(real_column_names, data.getVirtuals(), data.getStorageID());
|
||||
metadata_snapshot->check(real_column_names, data.getVirtuals(), data.getStorageID(), data.getExpandedObjects());
|
||||
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
const auto & primary_key = metadata_snapshot->getPrimaryKey();
|
||||
|
@ -168,7 +168,7 @@ MarkRanges MergeTreeReadPool::getRestMarks(const IMergeTreeDataPart & part, cons
|
||||
|
||||
Block MergeTreeReadPool::getHeader() const
|
||||
{
|
||||
return metadata_snapshot->getSampleBlockForColumns(column_names, data.getVirtuals(), data.getStorageID());
|
||||
return metadata_snapshot->getSampleBlockForColumns(column_names, data.getVirtuals(), data.getStorageID(), data.getExpandedObjects());
|
||||
}
|
||||
|
||||
void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInfo info)
|
||||
|
@ -30,7 +30,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
|
||||
bool quiet)
|
||||
:
|
||||
MergeTreeBaseSelectProcessor{
|
||||
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
|
||||
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID(), storage_.getExpandedObjects()),
|
||||
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
|
||||
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
|
||||
reader_settings_, use_uncompressed_cache_, virt_column_names_},
|
||||
|
@ -30,7 +30,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
|
||||
bool quiet)
|
||||
:
|
||||
MergeTreeBaseSelectProcessor{
|
||||
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
|
||||
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID(), storage_.getExpandedObjects()),
|
||||
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
|
||||
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
|
||||
reader_settings_, use_uncompressed_cache_, virt_column_names_},
|
||||
|
@ -17,7 +17,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
|
||||
bool read_with_direct_io_,
|
||||
bool take_column_types_from_storage,
|
||||
bool quiet)
|
||||
: SourceWithProgress(metadata_snapshot_->getSampleBlockForColumns(columns_to_read_, storage_.getVirtuals(), storage_.getStorageID()))
|
||||
: SourceWithProgress(metadata_snapshot_->getSampleBlockForColumns(columns_to_read_, storage_.getVirtuals(), storage_.getStorageID(), storage_.getExpandedObjects()))
|
||||
, storage(storage_)
|
||||
, metadata_snapshot(metadata_snapshot_)
|
||||
, data_part(std::move(data_part_))
|
||||
|
@ -289,7 +289,7 @@ Pipe StorageEmbeddedRocksDB::read(
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
FieldVectorPtr keys;
|
||||
bool all_scan = false;
|
||||
|
@ -447,7 +447,7 @@ Pipe StorageGenerateRandom::read(
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
Pipes pipes;
|
||||
pipes.reserve(num_streams);
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Core/ColumnWithTypeAndName.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/Operators.h>
|
||||
@ -287,7 +288,7 @@ Block StorageInMemoryMetadata::getSampleBlock() const
|
||||
}
|
||||
|
||||
Block StorageInMemoryMetadata::getSampleBlockForColumns(
|
||||
const Names & column_names, const NamesAndTypesList & virtuals, const StorageID & storage_id) const
|
||||
const Names & column_names, const NamesAndTypesList & virtuals, const StorageID & storage_id, const NamesAndTypesList & expanded_objects) const
|
||||
{
|
||||
Block res;
|
||||
|
||||
@ -303,6 +304,17 @@ Block StorageInMemoryMetadata::getSampleBlockForColumns(
|
||||
for (const auto & column : virtuals)
|
||||
columns_map.emplace(column.name, column.type);
|
||||
|
||||
for (const auto & column : expanded_objects)
|
||||
{
|
||||
columns_map.emplace(column.name, column.type);
|
||||
for (const auto & subcolumn : column.type->getSubcolumnNames())
|
||||
columns_map.emplace(Nested::concatenateName(column.name, subcolumn), column.type->getSubcolumnType(subcolumn));
|
||||
}
|
||||
|
||||
std::cerr << "expanded objects: ";
|
||||
for (const auto & col : expanded_objects)
|
||||
std::cerr << col.dump() << "\n";
|
||||
|
||||
for (const auto & name : column_names)
|
||||
{
|
||||
auto it = columns_map.find(name);
|
||||
@ -477,11 +489,19 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
void StorageInMemoryMetadata::check(const Names & column_names, const NamesAndTypesList & virtuals, const StorageID & storage_id) const
|
||||
void StorageInMemoryMetadata::check(
|
||||
const Names & column_names, const NamesAndTypesList & virtuals, const StorageID & storage_id, const NamesAndTypesList & expanded_objects) const
|
||||
{
|
||||
NamesAndTypesList available_columns = getColumns().getAllPhysicalWithSubcolumns();
|
||||
available_columns.insert(available_columns.end(), virtuals.begin(), virtuals.end());
|
||||
|
||||
for (const auto & column : expanded_objects)
|
||||
{
|
||||
available_columns.push_back(column);
|
||||
for (const auto & subcolumn : column.type->getSubcolumnNames())
|
||||
available_columns.emplace_back(column.name, subcolumn, column.type, column.type->getSubcolumnType(subcolumn));
|
||||
}
|
||||
|
||||
const String list_of_columns = listOfColumns(available_columns);
|
||||
|
||||
if (column_names.empty())
|
||||
|
@ -147,7 +147,9 @@ struct StorageInMemoryMetadata
|
||||
/// Storage metadata. StorageID required only for more clear exception
|
||||
/// message.
|
||||
Block getSampleBlockForColumns(
|
||||
const Names & column_names, const NamesAndTypesList & virtuals = {}, const StorageID & storage_id = StorageID::createEmpty()) const;
|
||||
const Names & column_names, const NamesAndTypesList & virtuals = {},
|
||||
const StorageID & storage_id = StorageID::createEmpty(), const NamesAndTypesList & expanded_objects = {}) const;
|
||||
|
||||
/// Returns structure with partition key.
|
||||
const KeyDescription & getPartitionKey() const;
|
||||
/// Returns ASTExpressionList of partition key expression for storage or nullptr if there is none.
|
||||
@ -212,7 +214,8 @@ struct StorageInMemoryMetadata
|
||||
|
||||
/// Verify that all the requested names are in the table and are set correctly:
|
||||
/// list of names is not empty and the names do not repeat.
|
||||
void check(const Names & column_names, const NamesAndTypesList & virtuals, const StorageID & storage_id) const;
|
||||
void check(const Names & column_names, const NamesAndTypesList & virtuals,
|
||||
const StorageID & storage_id, const NamesAndTypesList & expanded_objects) const;
|
||||
|
||||
/// Check that all the requested names are in the table and have the correct types.
|
||||
void check(const NamesAndTypesList & columns) const;
|
||||
|
@ -497,7 +497,7 @@ Pipe StorageJoin::read(
|
||||
size_t max_block_size,
|
||||
unsigned /*num_streams*/)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
Block source_sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
|
||||
return Pipe(std::make_shared<JoinSource>(join, rwlock, max_block_size, source_sample_block));
|
||||
|
@ -652,7 +652,7 @@ Pipe StorageLog::read(
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
auto lock_timeout = getLockTimeout(context);
|
||||
loadMarks(lock_timeout);
|
||||
|
@ -183,7 +183,7 @@ Pipe StorageMemory::read(
|
||||
size_t /*max_block_size*/,
|
||||
unsigned num_streams)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
if (delay_read_for_global_subqueries)
|
||||
{
|
||||
|
@ -81,7 +81,7 @@ Pipe StorageMongoDB::read(
|
||||
{
|
||||
connectIfNotConnected();
|
||||
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
Block sample_block;
|
||||
for (const String & column_name : column_names)
|
||||
|
@ -74,7 +74,7 @@ Pipe StorageMySQL::read(
|
||||
size_t /*max_block_size*/,
|
||||
unsigned)
|
||||
{
|
||||
metadata_snapshot->check(column_names_, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names_, getVirtuals(), getStorageID(), getExpandedObjects());
|
||||
String query = transformQueryForExternalDatabase(
|
||||
query_info_,
|
||||
metadata_snapshot->getColumns().getOrdinary(),
|
||||
|
@ -70,7 +70,7 @@ Pipe StoragePostgreSQL::read(
|
||||
size_t max_block_size_,
|
||||
unsigned)
|
||||
{
|
||||
metadata_snapshot->check(column_names_, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names_, getVirtuals(), getStorageID(), getExpandedObjects());
|
||||
|
||||
/// Connection is already made to the needed database, so it should not be present in the query;
|
||||
/// remote_table_schema is empty if it is not specified, will access only table_name.
|
||||
|
@ -135,7 +135,7 @@ Pipe StorageS3Cluster::read(
|
||||
}
|
||||
}
|
||||
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
}
|
||||
|
||||
|
@ -330,7 +330,7 @@ Pipe StorageStripeLog::read(
|
||||
if (!lock)
|
||||
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
NameSet column_names_set(column_names.begin(), column_names.end());
|
||||
|
||||
|
@ -484,7 +484,7 @@ Pipe StorageTinyLog::read(
|
||||
const size_t max_block_size,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
auto all_columns = metadata_snapshot->getColumns().getAllWithSubcolumns().addTypes(column_names);
|
||||
|
||||
|
@ -29,7 +29,7 @@ Pipe StorageValues::read(
|
||||
size_t /*max_block_size*/,
|
||||
unsigned /*num_streams*/)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
/// Get only required columns.
|
||||
Block block;
|
||||
|
@ -106,7 +106,7 @@ Pipe StorageXDBC::read(
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
bridge_helper->startBridgeSync();
|
||||
return IStorageURLBase::read(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
|
||||
|
@ -41,7 +41,7 @@ public:
|
||||
size_t /*max_block_size*/,
|
||||
unsigned /*num_streams*/) override
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
Block sample_block = metadata_snapshot->getSampleBlock();
|
||||
MutableColumns res_columns = sample_block.cloneEmptyColumns();
|
||||
|
@ -248,7 +248,7 @@ Pipe StorageSystemColumns::read(
|
||||
const size_t max_block_size,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
/// Create a mask of what columns are needed in the result.
|
||||
|
||||
|
@ -35,7 +35,7 @@ Pipe StorageSystemDisks::read(
|
||||
const size_t /*max_block_size*/,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
MutableColumnPtr col_name = ColumnString::create();
|
||||
MutableColumnPtr col_path = ColumnString::create();
|
||||
|
@ -131,7 +131,7 @@ Pipe StorageSystemNumbers::read(
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
if (limit && *limit < max_block_size)
|
||||
{
|
||||
|
@ -29,7 +29,7 @@ Pipe StorageSystemOne::read(
|
||||
const size_t /*max_block_size*/,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
Block header{ColumnWithTypeAndName(
|
||||
DataTypeUInt8().createColumn(),
|
||||
|
@ -41,7 +41,7 @@ bool StorageSystemPartsBase::hasStateColumn(const Names & column_names, const St
|
||||
|
||||
/// Do not check if only _state column is requested
|
||||
if (!(has_state_column && real_column_names.empty()))
|
||||
metadata_snapshot->check(real_column_names, {}, getStorageID());
|
||||
metadata_snapshot->check(real_column_names, {}, getStorageID(), {});
|
||||
|
||||
return has_state_column;
|
||||
}
|
||||
|
@ -65,7 +65,7 @@ Pipe StorageSystemReplicas::read(
|
||||
const size_t /*max_block_size*/,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
const auto access = context->getAccess();
|
||||
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
|
||||
|
@ -44,7 +44,7 @@ Pipe StorageSystemStoragePolicies::read(
|
||||
const size_t /*max_block_size*/,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
MutableColumnPtr col_policy_name = ColumnString::create();
|
||||
MutableColumnPtr col_volume_name = ColumnString::create();
|
||||
|
@ -506,7 +506,7 @@ Pipe StorageSystemTables::read(
|
||||
const size_t max_block_size,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
/// Create a mask of what columns are needed in the result.
|
||||
|
||||
|
@ -99,7 +99,7 @@ Pipe StorageSystemZeros::read(
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID(), getExpandedObjects() );
|
||||
|
||||
bool use_multiple_streams = multithreaded;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user