mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-24 02:30:51 +00:00
Check methods in metadata
This commit is contained in:
parent
31abbe5dbd
commit
33c27de54d
@ -37,176 +37,6 @@ const ColumnsDescription & IStorage::getColumns() const
|
|||||||
return metadata->columns;
|
return metadata->columns;
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace
|
|
||||||
{
|
|
||||||
#if !defined(ARCADIA_BUILD)
|
|
||||||
using NamesAndTypesMap = google::dense_hash_map<StringRef, const IDataType *, StringRefHash>;
|
|
||||||
using UniqueStrings = google::dense_hash_set<StringRef, StringRefHash>;
|
|
||||||
#else
|
|
||||||
using NamesAndTypesMap = google::sparsehash::dense_hash_map<StringRef, const IDataType *, StringRefHash>;
|
|
||||||
using UniqueStrings = google::sparsehash::dense_hash_set<StringRef, StringRefHash>;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
String listOfColumns(const NamesAndTypesList & available_columns)
|
|
||||||
{
|
|
||||||
std::stringstream ss;
|
|
||||||
for (auto it = available_columns.begin(); it != available_columns.end(); ++it)
|
|
||||||
{
|
|
||||||
if (it != available_columns.begin())
|
|
||||||
ss << ", ";
|
|
||||||
ss << it->name;
|
|
||||||
}
|
|
||||||
return ss.str();
|
|
||||||
}
|
|
||||||
|
|
||||||
NamesAndTypesMap getColumnsMap(const NamesAndTypesList & columns)
|
|
||||||
{
|
|
||||||
NamesAndTypesMap res;
|
|
||||||
res.set_empty_key(StringRef());
|
|
||||||
|
|
||||||
for (const auto & column : columns)
|
|
||||||
res.insert({column.name, column.type.get()});
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
UniqueStrings initUniqueStrings()
|
|
||||||
{
|
|
||||||
UniqueStrings strings;
|
|
||||||
strings.set_empty_key(StringRef());
|
|
||||||
return strings;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void IStorage::check(const Names & column_names, bool include_virtuals) const
|
|
||||||
{
|
|
||||||
NamesAndTypesList available_columns = getColumns().getAllPhysical();
|
|
||||||
if (include_virtuals)
|
|
||||||
{
|
|
||||||
auto virtuals = getVirtuals();
|
|
||||||
available_columns.insert(available_columns.end(), virtuals.begin(), virtuals.end());
|
|
||||||
}
|
|
||||||
|
|
||||||
const String list_of_columns = listOfColumns(available_columns);
|
|
||||||
|
|
||||||
if (column_names.empty())
|
|
||||||
throw Exception("Empty list of columns queried. There are columns: " + list_of_columns, ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED);
|
|
||||||
|
|
||||||
const auto columns_map = getColumnsMap(available_columns);
|
|
||||||
|
|
||||||
auto unique_names = initUniqueStrings();
|
|
||||||
for (const auto & name : column_names)
|
|
||||||
{
|
|
||||||
if (columns_map.end() == columns_map.find(name))
|
|
||||||
throw Exception(
|
|
||||||
"There is no column with name " + backQuote(name) + " in table " + getStorageID().getNameForLogs() + ". There are columns: " + list_of_columns,
|
|
||||||
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
|
|
||||||
|
|
||||||
if (unique_names.end() != unique_names.find(name))
|
|
||||||
throw Exception("Column " + name + " queried more than once", ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
|
|
||||||
unique_names.insert(name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void IStorage::check(const NamesAndTypesList & provided_columns) const
|
|
||||||
{
|
|
||||||
const NamesAndTypesList & available_columns = getColumns().getAllPhysical();
|
|
||||||
const auto columns_map = getColumnsMap(available_columns);
|
|
||||||
|
|
||||||
auto unique_names = initUniqueStrings();
|
|
||||||
for (const NameAndTypePair & column : provided_columns)
|
|
||||||
{
|
|
||||||
auto it = columns_map.find(column.name);
|
|
||||||
if (columns_map.end() == it)
|
|
||||||
throw Exception(
|
|
||||||
"There is no column with name " + column.name + ". There are columns: " + listOfColumns(available_columns),
|
|
||||||
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
|
|
||||||
|
|
||||||
if (!column.type->equals(*it->second))
|
|
||||||
throw Exception(
|
|
||||||
"Type mismatch for column " + column.name + ". Column has type " + it->second->getName() + ", got type "
|
|
||||||
+ column.type->getName(),
|
|
||||||
ErrorCodes::TYPE_MISMATCH);
|
|
||||||
|
|
||||||
if (unique_names.end() != unique_names.find(column.name))
|
|
||||||
throw Exception("Column " + column.name + " queried more than once", ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
|
|
||||||
unique_names.insert(column.name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void IStorage::check(const NamesAndTypesList & provided_columns, const Names & column_names) const
|
|
||||||
{
|
|
||||||
const NamesAndTypesList & available_columns = getColumns().getAllPhysical();
|
|
||||||
const auto available_columns_map = getColumnsMap(available_columns);
|
|
||||||
const auto & provided_columns_map = getColumnsMap(provided_columns);
|
|
||||||
|
|
||||||
if (column_names.empty())
|
|
||||||
throw Exception(
|
|
||||||
"Empty list of columns queried. There are columns: " + listOfColumns(available_columns),
|
|
||||||
ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED);
|
|
||||||
|
|
||||||
auto unique_names = initUniqueStrings();
|
|
||||||
for (const String & name : column_names)
|
|
||||||
{
|
|
||||||
auto it = provided_columns_map.find(name);
|
|
||||||
if (provided_columns_map.end() == it)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
auto jt = available_columns_map.find(name);
|
|
||||||
if (available_columns_map.end() == jt)
|
|
||||||
throw Exception(
|
|
||||||
"There is no column with name " + name + ". There are columns: " + listOfColumns(available_columns),
|
|
||||||
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
|
|
||||||
|
|
||||||
if (!it->second->equals(*jt->second))
|
|
||||||
throw Exception(
|
|
||||||
"Type mismatch for column " + name + ". Column has type " + jt->second->getName() + ", got type " + it->second->getName(),
|
|
||||||
ErrorCodes::TYPE_MISMATCH);
|
|
||||||
|
|
||||||
if (unique_names.end() != unique_names.find(name))
|
|
||||||
throw Exception("Column " + name + " queried more than once", ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
|
|
||||||
unique_names.insert(name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void IStorage::check(const Block & block, bool need_all) const
|
|
||||||
{
|
|
||||||
const NamesAndTypesList & available_columns = getColumns().getAllPhysical();
|
|
||||||
const auto columns_map = getColumnsMap(available_columns);
|
|
||||||
|
|
||||||
NameSet names_in_block;
|
|
||||||
|
|
||||||
block.checkNumberOfRows();
|
|
||||||
|
|
||||||
for (const auto & column : block)
|
|
||||||
{
|
|
||||||
if (names_in_block.count(column.name))
|
|
||||||
throw Exception("Duplicate column " + column.name + " in block", ErrorCodes::DUPLICATE_COLUMN);
|
|
||||||
|
|
||||||
names_in_block.insert(column.name);
|
|
||||||
|
|
||||||
auto it = columns_map.find(column.name);
|
|
||||||
if (columns_map.end() == it)
|
|
||||||
throw Exception(
|
|
||||||
"There is no column with name " + column.name + ". There are columns: " + listOfColumns(available_columns),
|
|
||||||
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
|
|
||||||
|
|
||||||
if (!column.type->equals(*it->second))
|
|
||||||
throw Exception(
|
|
||||||
"Type mismatch for column " + column.name + ". Column has type " + it->second->getName() + ", got type "
|
|
||||||
+ column.type->getName(),
|
|
||||||
ErrorCodes::TYPE_MISMATCH);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (need_all && names_in_block.size() < columns_map.size())
|
|
||||||
{
|
|
||||||
for (const auto & available_column : available_columns)
|
|
||||||
{
|
|
||||||
if (!names_in_block.count(available_column.name))
|
|
||||||
throw Exception("Expected column " + available_column.name, ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool IStorage::isVirtualColumn(const String & column_name) const
|
bool IStorage::isVirtualColumn(const String & column_name) const
|
||||||
{
|
{
|
||||||
|
@ -138,25 +138,10 @@ public:
|
|||||||
public: /// thread-unsafe part. lockStructure must be acquired
|
public: /// thread-unsafe part. lockStructure must be acquired
|
||||||
|
|
||||||
const ColumnsDescription & getColumns() const; /// returns combined set of columns
|
const ColumnsDescription & getColumns() const; /// returns combined set of columns
|
||||||
|
|
||||||
StorageInMemoryMetadata getInMemoryMetadata() const { return *metadata; }
|
StorageInMemoryMetadata getInMemoryMetadata() const { return *metadata; }
|
||||||
StorageMetadataPtr getInMemoryMetadataPtr() const { return metadata; }
|
StorageMetadataPtr getInMemoryMetadataPtr() const { return metadata; }
|
||||||
void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) { metadata = std::make_shared<StorageInMemoryMetadata>(metadata_); }
|
void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) { metadata = std::make_shared<StorageInMemoryMetadata>(metadata_); }
|
||||||
|
|
||||||
/// Verify that all the requested names are in the table and are set correctly:
|
|
||||||
/// list of names is not empty and the names do not repeat.
|
|
||||||
void check(const Names & column_names, bool include_virtuals = false) const;
|
|
||||||
|
|
||||||
/// Check that all the requested names are in the table and have the correct types.
|
|
||||||
void check(const NamesAndTypesList & columns) const;
|
|
||||||
|
|
||||||
/// Check that all names from the intersection of `names` and `columns` are in the table and have the same types.
|
|
||||||
void check(const NamesAndTypesList & columns, const Names & column_names) const;
|
|
||||||
|
|
||||||
/// Check that the data block contains all the columns of the table with the correct types,
|
|
||||||
/// contains only the columns of the table, and all the columns are different.
|
|
||||||
/// If |need_all| is set, then checks that all the columns of the table are in the block.
|
|
||||||
void check(const Block & block, bool need_all = false) const;
|
|
||||||
|
|
||||||
/// Return list of virtual columns (like _part, _table, etc). In the vast
|
/// Return list of virtual columns (like _part, _table, etc). In the vast
|
||||||
/// majority of cases virtual columns are static constant part of Storage
|
/// majority of cases virtual columns are static constant part of Storage
|
||||||
|
@ -224,7 +224,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
|
|||||||
|
|
||||||
std::multiset<String> part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
|
std::multiset<String> part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
|
||||||
|
|
||||||
data.check(real_column_names);
|
metadata_snapshot->check(real_column_names, data.getVirtuals());
|
||||||
|
|
||||||
const Settings & settings = context.getSettingsRef();
|
const Settings & settings = context.getSettingsRef();
|
||||||
const auto & primary_key = metadata_snapshot->getPrimaryKey();
|
const auto & primary_key = metadata_snapshot->getPrimaryKey();
|
||||||
|
@ -138,7 +138,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
|
|||||||
if (!block || !block.rows())
|
if (!block || !block.rows())
|
||||||
return result;
|
return result;
|
||||||
|
|
||||||
data.check(block, true);
|
metadata_snapshot->check(block, true);
|
||||||
|
|
||||||
if (!metadata_snapshot->hasPartitionKey()) /// Table is not partitioned.
|
if (!metadata_snapshot->hasPartitionKey()) /// Table is not partitioned.
|
||||||
{
|
{
|
||||||
|
@ -211,7 +211,7 @@ void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::Muta
|
|||||||
void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
||||||
zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id)
|
zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id)
|
||||||
{
|
{
|
||||||
storage.check(part->getColumns());
|
metadata_snapshot->check(part->getColumns());
|
||||||
assertSessionIsNotExpired(zookeeper);
|
assertSessionIsNotExpired(zookeeper);
|
||||||
|
|
||||||
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
|
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
|
||||||
|
@ -342,7 +342,7 @@ public:
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
// Check table structure.
|
// Check table structure.
|
||||||
storage.check(block, true);
|
metadata_snapshot->check(block, true);
|
||||||
|
|
||||||
size_t rows = block.rows();
|
size_t rows = block.rows();
|
||||||
if (!rows)
|
if (!rows)
|
||||||
|
@ -429,14 +429,14 @@ void registerStorageGenerateRandom(StorageFactory & factory)
|
|||||||
|
|
||||||
Pipes StorageGenerateRandom::read(
|
Pipes StorageGenerateRandom::read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
const SelectQueryInfo & /*query_info*/,
|
const SelectQueryInfo & /*query_info*/,
|
||||||
const Context & context,
|
const Context & context,
|
||||||
QueryProcessingStage::Enum /*processed_stage*/,
|
QueryProcessingStage::Enum /*processed_stage*/,
|
||||||
size_t max_block_size,
|
size_t max_block_size,
|
||||||
unsigned num_streams)
|
unsigned num_streams)
|
||||||
{
|
{
|
||||||
check(column_names, true);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
Pipes pipes;
|
Pipes pipes;
|
||||||
pipes.reserve(num_streams);
|
pipes.reserve(num_streams);
|
||||||
|
@ -1,6 +1,10 @@
|
|||||||
#include <Storages/StorageInMemoryMetadata.h>
|
#include <Storages/StorageInMemoryMetadata.h>
|
||||||
|
|
||||||
|
#include <sparsehash/dense_hash_map>
|
||||||
|
#include <sparsehash/dense_hash_set>
|
||||||
#include <Common/quoteString.h>
|
#include <Common/quoteString.h>
|
||||||
|
#include <Core/ColumnWithTypeAndName.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -410,4 +414,172 @@ bool StorageInMemoryMetadata::hasSelectQuery() const
|
|||||||
return select.select_query != nullptr;
|
return select.select_query != nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
#if !defined(ARCADIA_BUILD)
|
||||||
|
using NamesAndTypesMap = google::dense_hash_map<StringRef, const IDataType *, StringRefHash>;
|
||||||
|
using UniqueStrings = google::dense_hash_set<StringRef, StringRefHash>;
|
||||||
|
#else
|
||||||
|
using NamesAndTypesMap = google::sparsehash::dense_hash_map<StringRef, const IDataType *, StringRefHash>;
|
||||||
|
using UniqueStrings = google::sparsehash::dense_hash_set<StringRef, StringRefHash>;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
String listOfColumns(const NamesAndTypesList & available_columns)
|
||||||
|
{
|
||||||
|
std::stringstream ss;
|
||||||
|
for (auto it = available_columns.begin(); it != available_columns.end(); ++it)
|
||||||
|
{
|
||||||
|
if (it != available_columns.begin())
|
||||||
|
ss << ", ";
|
||||||
|
ss << it->name;
|
||||||
|
}
|
||||||
|
return ss.str();
|
||||||
|
}
|
||||||
|
|
||||||
|
NamesAndTypesMap getColumnsMap(const NamesAndTypesList & columns)
|
||||||
|
{
|
||||||
|
NamesAndTypesMap res;
|
||||||
|
res.set_empty_key(StringRef());
|
||||||
|
|
||||||
|
for (const auto & column : columns)
|
||||||
|
res.insert({column.name, column.type.get()});
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
UniqueStrings initUniqueStrings()
|
||||||
|
{
|
||||||
|
UniqueStrings strings;
|
||||||
|
strings.set_empty_key(StringRef());
|
||||||
|
return strings;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void StorageInMemoryMetadata::check(const Names & column_names, const NamesAndTypesList & virtuals) const
|
||||||
|
{
|
||||||
|
NamesAndTypesList available_columns = getColumns().getAllPhysical();
|
||||||
|
available_columns.insert(available_columns.end(), virtuals.begin(), virtuals.end());
|
||||||
|
|
||||||
|
const String list_of_columns = listOfColumns(available_columns);
|
||||||
|
|
||||||
|
if (column_names.empty())
|
||||||
|
throw Exception("Empty list of columns queried. There are columns: " + list_of_columns, ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED);
|
||||||
|
|
||||||
|
const auto columns_map = getColumnsMap(available_columns);
|
||||||
|
|
||||||
|
auto unique_names = initUniqueStrings();
|
||||||
|
for (const auto & name : column_names)
|
||||||
|
{
|
||||||
|
if (columns_map.end() == columns_map.find(name))
|
||||||
|
throw Exception(
|
||||||
|
"There is no column with name " + backQuote(name) + " in table " + /* TODO alesap getStorageID().getNameForLogs() +*/ ". There are columns: " + list_of_columns,
|
||||||
|
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
|
||||||
|
|
||||||
|
if (unique_names.end() != unique_names.find(name))
|
||||||
|
throw Exception("Column " + name + " queried more than once", ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
|
||||||
|
unique_names.insert(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void StorageInMemoryMetadata::check(const NamesAndTypesList & provided_columns) const
|
||||||
|
{
|
||||||
|
const NamesAndTypesList & available_columns = getColumns().getAllPhysical();
|
||||||
|
const auto columns_map = getColumnsMap(available_columns);
|
||||||
|
|
||||||
|
auto unique_names = initUniqueStrings();
|
||||||
|
for (const NameAndTypePair & column : provided_columns)
|
||||||
|
{
|
||||||
|
auto it = columns_map.find(column.name);
|
||||||
|
if (columns_map.end() == it)
|
||||||
|
throw Exception(
|
||||||
|
"There is no column with name " + column.name + ". There are columns: " + listOfColumns(available_columns),
|
||||||
|
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
|
||||||
|
|
||||||
|
if (!column.type->equals(*it->second))
|
||||||
|
throw Exception(
|
||||||
|
"Type mismatch for column " + column.name + ". Column has type " + it->second->getName() + ", got type "
|
||||||
|
+ column.type->getName(),
|
||||||
|
ErrorCodes::TYPE_MISMATCH);
|
||||||
|
|
||||||
|
if (unique_names.end() != unique_names.find(column.name))
|
||||||
|
throw Exception("Column " + column.name + " queried more than once", ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
|
||||||
|
unique_names.insert(column.name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void StorageInMemoryMetadata::check(const NamesAndTypesList & provided_columns, const Names & column_names) const
|
||||||
|
{
|
||||||
|
const NamesAndTypesList & available_columns = getColumns().getAllPhysical();
|
||||||
|
const auto available_columns_map = getColumnsMap(available_columns);
|
||||||
|
const auto & provided_columns_map = getColumnsMap(provided_columns);
|
||||||
|
|
||||||
|
if (column_names.empty())
|
||||||
|
throw Exception(
|
||||||
|
"Empty list of columns queried. There are columns: " + listOfColumns(available_columns),
|
||||||
|
ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED);
|
||||||
|
|
||||||
|
auto unique_names = initUniqueStrings();
|
||||||
|
for (const String & name : column_names)
|
||||||
|
{
|
||||||
|
auto it = provided_columns_map.find(name);
|
||||||
|
if (provided_columns_map.end() == it)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
auto jt = available_columns_map.find(name);
|
||||||
|
if (available_columns_map.end() == jt)
|
||||||
|
throw Exception(
|
||||||
|
"There is no column with name " + name + ". There are columns: " + listOfColumns(available_columns),
|
||||||
|
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
|
||||||
|
|
||||||
|
if (!it->second->equals(*jt->second))
|
||||||
|
throw Exception(
|
||||||
|
"Type mismatch for column " + name + ". Column has type " + jt->second->getName() + ", got type " + it->second->getName(),
|
||||||
|
ErrorCodes::TYPE_MISMATCH);
|
||||||
|
|
||||||
|
if (unique_names.end() != unique_names.find(name))
|
||||||
|
throw Exception("Column " + name + " queried more than once", ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
|
||||||
|
unique_names.insert(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void StorageInMemoryMetadata::check(const Block & block, bool need_all) const
|
||||||
|
{
|
||||||
|
const NamesAndTypesList & available_columns = getColumns().getAllPhysical();
|
||||||
|
const auto columns_map = getColumnsMap(available_columns);
|
||||||
|
|
||||||
|
NameSet names_in_block;
|
||||||
|
|
||||||
|
block.checkNumberOfRows();
|
||||||
|
|
||||||
|
for (const auto & column : block)
|
||||||
|
{
|
||||||
|
if (names_in_block.count(column.name))
|
||||||
|
throw Exception("Duplicate column " + column.name + " in block", ErrorCodes::DUPLICATE_COLUMN);
|
||||||
|
|
||||||
|
names_in_block.insert(column.name);
|
||||||
|
|
||||||
|
auto it = columns_map.find(column.name);
|
||||||
|
if (columns_map.end() == it)
|
||||||
|
throw Exception(
|
||||||
|
"There is no column with name " + column.name + ". There are columns: " + listOfColumns(available_columns),
|
||||||
|
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
|
||||||
|
|
||||||
|
if (!column.type->equals(*it->second))
|
||||||
|
throw Exception(
|
||||||
|
"Type mismatch for column " + column.name + ". Column has type " + it->second->getName() + ", got type "
|
||||||
|
+ column.type->getName(),
|
||||||
|
ErrorCodes::TYPE_MISMATCH);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (need_all && names_in_block.size() < columns_map.size())
|
||||||
|
{
|
||||||
|
for (const auto & available_column : available_columns)
|
||||||
|
{
|
||||||
|
if (!names_in_block.count(available_column.name))
|
||||||
|
throw Exception("Expected column " + available_column.name, ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -177,6 +177,21 @@ struct StorageInMemoryMetadata
|
|||||||
/// Select query for *View storages.
|
/// Select query for *View storages.
|
||||||
const SelectQueryDescription & getSelectQuery() const;
|
const SelectQueryDescription & getSelectQuery() const;
|
||||||
bool hasSelectQuery() const;
|
bool hasSelectQuery() const;
|
||||||
|
|
||||||
|
/// Verify that all the requested names are in the table and are set correctly:
|
||||||
|
/// list of names is not empty and the names do not repeat.
|
||||||
|
void check(const Names & column_names, const NamesAndTypesList & virtuals) const;
|
||||||
|
|
||||||
|
/// Check that all the requested names are in the table and have the correct types.
|
||||||
|
void check(const NamesAndTypesList & columns) const;
|
||||||
|
|
||||||
|
/// Check that all names from the intersection of `names` and `columns` are in the table and have the same types.
|
||||||
|
void check(const NamesAndTypesList & columns, const Names & column_names) const;
|
||||||
|
|
||||||
|
/// Check that the data block contains all the columns of the table with the correct types,
|
||||||
|
/// contains only the columns of the table, and all the columns are different.
|
||||||
|
/// If |need_all| is set, then checks that all the columns of the table are in the block.
|
||||||
|
void check(const Block & block, bool need_all = false) const;
|
||||||
};
|
};
|
||||||
|
|
||||||
using StorageMetadataPtr = std::shared_ptr<StorageInMemoryMetadata>;
|
using StorageMetadataPtr = std::shared_ptr<StorageInMemoryMetadata>;
|
||||||
|
@ -446,7 +446,7 @@ Pipes StorageJoin::read(
|
|||||||
size_t max_block_size,
|
size_t max_block_size,
|
||||||
unsigned /*num_streams*/)
|
unsigned /*num_streams*/)
|
||||||
{
|
{
|
||||||
check(column_names);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
Pipes pipes;
|
Pipes pipes;
|
||||||
pipes.emplace_back(std::make_shared<JoinSource>(*join, max_block_size, metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals())));
|
pipes.emplace_back(std::make_shared<JoinSource>(*join, max_block_size, metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals())));
|
||||||
|
@ -276,7 +276,7 @@ void LogSource::readData(const String & name, const IDataType & type, IColumn &
|
|||||||
|
|
||||||
void LogBlockOutputStream::write(const Block & block)
|
void LogBlockOutputStream::write(const Block & block)
|
||||||
{
|
{
|
||||||
storage.check(block, true);
|
metadata_snapshot->check(block, true);
|
||||||
|
|
||||||
/// The set of written offset columns so that you do not write shared offsets of columns for nested structures multiple times
|
/// The set of written offset columns so that you do not write shared offsets of columns for nested structures multiple times
|
||||||
WrittenStreams written_streams;
|
WrittenStreams written_streams;
|
||||||
@ -580,14 +580,14 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount() const
|
|||||||
|
|
||||||
Pipes StorageLog::read(
|
Pipes StorageLog::read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
const SelectQueryInfo & /*query_info*/,
|
const SelectQueryInfo & /*query_info*/,
|
||||||
const Context & context,
|
const Context & context,
|
||||||
QueryProcessingStage::Enum /*processed_stage*/,
|
QueryProcessingStage::Enum /*processed_stage*/,
|
||||||
size_t max_block_size,
|
size_t max_block_size,
|
||||||
unsigned num_streams)
|
unsigned num_streams)
|
||||||
{
|
{
|
||||||
check(column_names);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
loadMarks();
|
loadMarks();
|
||||||
|
|
||||||
NamesAndTypesList all_columns = Nested::collect(getColumns().getAllPhysical().addTypes(column_names));
|
NamesAndTypesList all_columns = Nested::collect(getColumns().getAllPhysical().addTypes(column_names));
|
||||||
|
@ -81,7 +81,7 @@ public:
|
|||||||
|
|
||||||
void write(const Block & block) override
|
void write(const Block & block) override
|
||||||
{
|
{
|
||||||
storage.check(block, true);
|
metadata_snapshot->check(block, true);
|
||||||
std::lock_guard lock(storage.mutex);
|
std::lock_guard lock(storage.mutex);
|
||||||
storage.data.push_back(block);
|
storage.data.push_back(block);
|
||||||
}
|
}
|
||||||
@ -110,7 +110,7 @@ Pipes StorageMemory::read(
|
|||||||
size_t /*max_block_size*/,
|
size_t /*max_block_size*/,
|
||||||
unsigned num_streams)
|
unsigned num_streams)
|
||||||
{
|
{
|
||||||
check(column_names);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
|
|
||||||
|
@ -72,7 +72,7 @@ Pipes StorageMySQL::read(
|
|||||||
size_t max_block_size_,
|
size_t max_block_size_,
|
||||||
unsigned)
|
unsigned)
|
||||||
{
|
{
|
||||||
check(column_names_);
|
metadata_snapshot->check(column_names_, getVirtuals());
|
||||||
String query = transformQueryForExternalDatabase(
|
String query = transformQueryForExternalDatabase(
|
||||||
query_info_,
|
query_info_,
|
||||||
metadata_snapshot->getColumns().getOrdinary(),
|
metadata_snapshot->getColumns().getOrdinary(),
|
||||||
|
@ -276,7 +276,7 @@ Pipes StorageStripeLog::read(
|
|||||||
{
|
{
|
||||||
std::shared_lock<std::shared_mutex> lock(rwlock);
|
std::shared_lock<std::shared_mutex> lock(rwlock);
|
||||||
|
|
||||||
check(column_names);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
NameSet column_names_set(column_names.begin(), column_names.end());
|
NameSet column_names_set(column_names.begin(), column_names.end());
|
||||||
|
|
||||||
|
@ -309,7 +309,7 @@ void TinyLogBlockOutputStream::writeSuffix()
|
|||||||
|
|
||||||
void TinyLogBlockOutputStream::write(const Block & block)
|
void TinyLogBlockOutputStream::write(const Block & block)
|
||||||
{
|
{
|
||||||
storage.check(block, true);
|
metadata_snapshot->check(block, true);
|
||||||
|
|
||||||
/// The set of written offset columns so that you do not write shared columns for nested structures multiple times
|
/// The set of written offset columns so that you do not write shared columns for nested structures multiple times
|
||||||
WrittenStreams written_streams;
|
WrittenStreams written_streams;
|
||||||
@ -402,7 +402,7 @@ Pipes StorageTinyLog::read(
|
|||||||
const size_t max_block_size,
|
const size_t max_block_size,
|
||||||
const unsigned /*num_streams*/)
|
const unsigned /*num_streams*/)
|
||||||
{
|
{
|
||||||
check(column_names);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
Pipes pipes;
|
Pipes pipes;
|
||||||
|
|
||||||
|
@ -23,14 +23,14 @@ StorageValues::StorageValues(
|
|||||||
|
|
||||||
Pipes StorageValues::read(
|
Pipes StorageValues::read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
const SelectQueryInfo & /*query_info*/,
|
const SelectQueryInfo & /*query_info*/,
|
||||||
const Context & /*context*/,
|
const Context & /*context*/,
|
||||||
QueryProcessingStage::Enum /*processed_stage*/,
|
QueryProcessingStage::Enum /*processed_stage*/,
|
||||||
size_t /*max_block_size*/,
|
size_t /*max_block_size*/,
|
||||||
unsigned /*num_streams*/)
|
unsigned /*num_streams*/)
|
||||||
{
|
{
|
||||||
check(column_names, true);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
Pipes pipes;
|
Pipes pipes;
|
||||||
|
|
||||||
|
@ -90,7 +90,7 @@ Pipes StorageXDBC::read(
|
|||||||
size_t max_block_size,
|
size_t max_block_size,
|
||||||
unsigned num_streams)
|
unsigned num_streams)
|
||||||
{
|
{
|
||||||
check(column_names);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
bridge_helper->startBridgeSync();
|
bridge_helper->startBridgeSync();
|
||||||
return IStorageURLBase::read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
return IStorageURLBase::read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||||
|
@ -37,7 +37,7 @@ public:
|
|||||||
size_t /*max_block_size*/,
|
size_t /*max_block_size*/,
|
||||||
unsigned /*num_streams*/) override
|
unsigned /*num_streams*/) override
|
||||||
{
|
{
|
||||||
check(column_names);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
Block sample_block = metadata_snapshot->getSampleBlock();
|
Block sample_block = metadata_snapshot->getSampleBlock();
|
||||||
MutableColumns res_columns = sample_block.cloneEmptyColumns();
|
MutableColumns res_columns = sample_block.cloneEmptyColumns();
|
||||||
|
@ -249,7 +249,7 @@ Pipes StorageSystemColumns::read(
|
|||||||
const size_t max_block_size,
|
const size_t max_block_size,
|
||||||
const unsigned /*num_streams*/)
|
const unsigned /*num_streams*/)
|
||||||
{
|
{
|
||||||
check(column_names);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
/// Create a mask of what columns are needed in the result.
|
/// Create a mask of what columns are needed in the result.
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@ Pipes StorageSystemDisks::read(
|
|||||||
const size_t /*max_block_size*/,
|
const size_t /*max_block_size*/,
|
||||||
const unsigned /*num_streams*/)
|
const unsigned /*num_streams*/)
|
||||||
{
|
{
|
||||||
check(column_names);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
MutableColumnPtr col_name = ColumnString::create();
|
MutableColumnPtr col_name = ColumnString::create();
|
||||||
MutableColumnPtr col_path = ColumnString::create();
|
MutableColumnPtr col_path = ColumnString::create();
|
||||||
|
@ -125,14 +125,14 @@ StorageSystemNumbers::StorageSystemNumbers(const StorageID & table_id, bool mult
|
|||||||
|
|
||||||
Pipes StorageSystemNumbers::read(
|
Pipes StorageSystemNumbers::read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
const SelectQueryInfo &,
|
const SelectQueryInfo &,
|
||||||
const Context & /*context*/,
|
const Context & /*context*/,
|
||||||
QueryProcessingStage::Enum /*processed_stage*/,
|
QueryProcessingStage::Enum /*processed_stage*/,
|
||||||
size_t max_block_size,
|
size_t max_block_size,
|
||||||
unsigned num_streams)
|
unsigned num_streams)
|
||||||
{
|
{
|
||||||
check(column_names);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
if (limit && *limit < max_block_size)
|
if (limit && *limit < max_block_size)
|
||||||
{
|
{
|
||||||
|
@ -22,14 +22,14 @@ StorageSystemOne::StorageSystemOne(const std::string & name_)
|
|||||||
|
|
||||||
Pipes StorageSystemOne::read(
|
Pipes StorageSystemOne::read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
const SelectQueryInfo &,
|
const SelectQueryInfo &,
|
||||||
const Context & /*context*/,
|
const Context & /*context*/,
|
||||||
QueryProcessingStage::Enum /*processed_stage*/,
|
QueryProcessingStage::Enum /*processed_stage*/,
|
||||||
const size_t /*max_block_size*/,
|
const size_t /*max_block_size*/,
|
||||||
const unsigned /*num_streams*/)
|
const unsigned /*num_streams*/)
|
||||||
{
|
{
|
||||||
check(column_names);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
Block header{ColumnWithTypeAndName(
|
Block header{ColumnWithTypeAndName(
|
||||||
DataTypeUInt8().createColumn(),
|
DataTypeUInt8().createColumn(),
|
||||||
|
@ -26,7 +26,7 @@ namespace ErrorCodes
|
|||||||
extern const int TABLE_IS_DROPPED;
|
extern const int TABLE_IS_DROPPED;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool StorageSystemPartsBase::hasStateColumn(const Names & column_names) const
|
bool StorageSystemPartsBase::hasStateColumn(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const
|
||||||
{
|
{
|
||||||
bool has_state_column = false;
|
bool has_state_column = false;
|
||||||
Names real_column_names;
|
Names real_column_names;
|
||||||
@ -41,7 +41,7 @@ bool StorageSystemPartsBase::hasStateColumn(const Names & column_names) const
|
|||||||
|
|
||||||
/// Do not check if only _state column is requested
|
/// Do not check if only _state column is requested
|
||||||
if (!(has_state_column && real_column_names.empty()))
|
if (!(has_state_column && real_column_names.empty()))
|
||||||
check(real_column_names);
|
metadata_snapshot->check(real_column_names, {});
|
||||||
|
|
||||||
return has_state_column;
|
return has_state_column;
|
||||||
}
|
}
|
||||||
@ -232,7 +232,7 @@ Pipes StorageSystemPartsBase::read(
|
|||||||
const size_t /*max_block_size*/,
|
const size_t /*max_block_size*/,
|
||||||
const unsigned /*num_streams*/)
|
const unsigned /*num_streams*/)
|
||||||
{
|
{
|
||||||
bool has_state_column = hasStateColumn(column_names);
|
bool has_state_column = hasStateColumn(column_names, metadata_snapshot);
|
||||||
|
|
||||||
StoragesInfoStream stream(query_info, context);
|
StoragesInfoStream stream(query_info, context);
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ class StorageSystemPartsBase : public IStorage
|
|||||||
public:
|
public:
|
||||||
Pipes read(
|
Pipes read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageMetadataPtr & metadata_,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
const SelectQueryInfo & query_info,
|
const SelectQueryInfo & query_info,
|
||||||
const Context & context,
|
const Context & context,
|
||||||
QueryProcessingStage::Enum processed_stage,
|
QueryProcessingStage::Enum processed_stage,
|
||||||
@ -67,7 +67,7 @@ public:
|
|||||||
NamesAndTypesList getVirtuals() const override;
|
NamesAndTypesList getVirtuals() const override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool hasStateColumn(const Names & column_names) const;
|
bool hasStateColumn(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
const FormatSettings format_settings;
|
const FormatSettings format_settings;
|
||||||
|
@ -66,7 +66,7 @@ Pipes StorageSystemReplicas::read(
|
|||||||
const size_t /*max_block_size*/,
|
const size_t /*max_block_size*/,
|
||||||
const unsigned /*num_streams*/)
|
const unsigned /*num_streams*/)
|
||||||
{
|
{
|
||||||
check(column_names);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
const auto access = context.getAccess();
|
const auto access = context.getAccess();
|
||||||
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
|
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
|
||||||
|
@ -39,7 +39,7 @@ Pipes StorageSystemStoragePolicies::read(
|
|||||||
const size_t /*max_block_size*/,
|
const size_t /*max_block_size*/,
|
||||||
const unsigned /*num_streams*/)
|
const unsigned /*num_streams*/)
|
||||||
{
|
{
|
||||||
check(column_names);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
MutableColumnPtr col_policy_name = ColumnString::create();
|
MutableColumnPtr col_policy_name = ColumnString::create();
|
||||||
MutableColumnPtr col_volume_name = ColumnString::create();
|
MutableColumnPtr col_volume_name = ColumnString::create();
|
||||||
|
@ -456,7 +456,7 @@ Pipes StorageSystemTables::read(
|
|||||||
const size_t max_block_size,
|
const size_t max_block_size,
|
||||||
const unsigned /*num_streams*/)
|
const unsigned /*num_streams*/)
|
||||||
{
|
{
|
||||||
check(column_names);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
/// Create a mask of what columns are needed in the result.
|
/// Create a mask of what columns are needed in the result.
|
||||||
|
|
||||||
|
@ -92,14 +92,14 @@ StorageSystemZeros::StorageSystemZeros(const StorageID & table_id_, bool multith
|
|||||||
|
|
||||||
Pipes StorageSystemZeros::read(
|
Pipes StorageSystemZeros::read(
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
const SelectQueryInfo &,
|
const SelectQueryInfo &,
|
||||||
const Context & /*context*/,
|
const Context & /*context*/,
|
||||||
QueryProcessingStage::Enum /*processed_stage*/,
|
QueryProcessingStage::Enum /*processed_stage*/,
|
||||||
size_t max_block_size,
|
size_t max_block_size,
|
||||||
unsigned num_streams)
|
unsigned num_streams)
|
||||||
{
|
{
|
||||||
check(column_names);
|
metadata_snapshot->check(column_names, getVirtuals());
|
||||||
|
|
||||||
bool use_multiple_streams = multithreaded;
|
bool use_multiple_streams = multithreaded;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user