Fix parsing error in WithNames formats while reading subset of columns with disabled input_format_with_names_use_header

This commit is contained in:
avogar 2023-09-11 14:55:37 +00:00
parent 21a2cf7707
commit 2d8f33bfa2
31 changed files with 80 additions and 54 deletions

View File

@ -6341,9 +6341,9 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node,
{
/// For input function we should check if input format supports reading subset of columns.
if (table_function_ptr->getName() == "input")
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(scope.context->getInsertFormat());
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(scope.context->getInsertFormat(), scope.context);
else
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns();
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns(scope.context);
}
if (use_columns_from_insert_query)

View File

@ -684,10 +684,18 @@ void FormatFactory::markOutputFormatSupportsParallelFormatting(const String & na
void FormatFactory::markFormatSupportsSubsetOfColumns(const String & name)
{
auto & target = dict[name].supports_subset_of_columns;
auto & target = dict[name].subset_of_columns_support_checker;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} is already marked as supporting subset of columns", name);
target = true;
target = [](const FormatSettings &){ return true; };
}
void FormatFactory::registerSubsetOfColumnsSupportChecker(const String & name, SubsetOfColumnsSupportChecker subset_of_columns_support_checker)
{
auto & target = dict[name].subset_of_columns_support_checker;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} is already marked as supporting subset of columns", name);
target = std::move(subset_of_columns_support_checker);
}
void FormatFactory::markOutputFormatPrefersLargeBlocks(const String & name)
@ -698,10 +706,11 @@ void FormatFactory::markOutputFormatPrefersLargeBlocks(const String & name)
target = true;
}
bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const String & name) const
bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const DB::String & name, const ContextPtr & context, const std::optional<FormatSettings> & format_settings_) const
{
const auto & target = getCreators(name);
return target.supports_subset_of_columns;
auto format_settings = format_settings_ ? *format_settings_ : getFormatSettings(context);
return target.subset_of_columns_support_checker && target.subset_of_columns_support_checker(format_settings);
}
void FormatFactory::registerAdditionalInfoForSchemaCacheGetter(

View File

@ -123,6 +123,10 @@ private:
/// and the name of the message.
using AdditionalInfoForSchemaCacheGetter = std::function<String(const FormatSettings & settings)>;
/// Some formats can support reading subset of columns depending on settings.
/// The checker should return true if format support append.
using SubsetOfColumnsSupportChecker = std::function<bool(const FormatSettings & settings)>;
struct Creators
{
InputCreator input_creator;
@ -133,11 +137,11 @@ private:
ExternalSchemaReaderCreator external_schema_reader_creator;
bool supports_parallel_formatting{false};
bool supports_subcolumns{false};
bool supports_subset_of_columns{false};
bool prefers_large_blocks{false};
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
AppendSupportChecker append_support_checker;
AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter;
SubsetOfColumnsSupportChecker subset_of_columns_support_checker;
};
using FormatsDictionary = std::unordered_map<String, Creators>;
@ -225,9 +229,10 @@ public:
void markOutputFormatSupportsParallelFormatting(const String & name);
void markOutputFormatPrefersLargeBlocks(const String & name);
void markFormatSupportsSubsetOfColumns(const String & name);
bool checkIfFormatSupportsSubsetOfColumns(const String & name) const;
void markFormatSupportsSubsetOfColumns(const String & name);
void registerSubsetOfColumnsSupportChecker(const String & name, SubsetOfColumnsSupportChecker subset_of_columns_support_checker);
bool checkIfFormatSupportsSubsetOfColumns(const String & name, const ContextPtr & context, const std::optional<FormatSettings> & format_settings_ = std::nullopt) const;
bool checkIfFormatHasSchemaReader(const String & name) const;
bool checkIfFormatHasExternalSchemaReader(const String & name) const;

View File

@ -12,8 +12,9 @@ void registerWithNamesAndTypes(const std::string & base_format_name, RegisterWit
void markFormatWithNamesAndTypesSupportsSamplingColumns(const std::string & base_format_name, FormatFactory & factory)
{
factory.markFormatSupportsSubsetOfColumns(base_format_name + "WithNames");
factory.markFormatSupportsSubsetOfColumns(base_format_name + "WithNamesAndTypes");
auto setting_checker = [](const FormatSettings & settings){ return settings.with_names_use_header; };
factory.registerSubsetOfColumnsSupportChecker(base_format_name + "WithNames", setting_checker);
factory.registerSubsetOfColumnsSupportChecker(base_format_name + "WithNamesAndTypes", setting_checker);
}
}

View File

@ -1684,9 +1684,9 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const
{
/// For input function we should check if input format supports reading subset of columns.
if (table_function_ptr->getName() == "input")
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(getInsertFormat());
use_columns_from_insert_query = FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(getInsertFormat(), shared_from_this());
else
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns();
use_columns_from_insert_query = table_function_ptr->supportsReadingSubsetOfColumns(shared_from_this());
}
if (use_columns_from_insert_query)

View File

@ -838,9 +838,9 @@ private:
};
bool StorageHDFS::supportsSubsetOfColumns() const
bool StorageHDFS::supportsSubsetOfColumns(const ContextPtr & context_) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context_);
}
Pipe StorageHDFS::read(
@ -878,7 +878,7 @@ Pipe StorageHDFS::read(
});
}
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context_), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& context_->getSettingsRef().optimize_count_from_files;

View File

@ -76,7 +76,7 @@ public:
/// Is is useful because column oriented formats could effectively skip unknown columns
/// So we can create a header of only required columns in read method and ask
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context_) const;
bool supportsSubcolumns() const override { return true; }

View File

@ -65,7 +65,7 @@ public:
NamesAndTypesList getVirtuals() const override;
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns() const;
std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context_) const override;

View File

@ -620,8 +620,6 @@ public:
/// NOTE: write-once also does not support INSERTs/merges/... for MergeTree
virtual bool isStaticStorage() const;
virtual bool supportsSubsetOfColumns() const { return false; }
/// If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
/// Used for:
/// - Simple count() optimization

View File

@ -162,9 +162,9 @@ bool StorageS3Queue::supportsSubcolumns() const
return true;
}
bool StorageS3Queue::supportsSubsetOfColumns() const
bool StorageS3Queue::supportsSubsetOfColumns(const ContextPtr & context_) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context_, format_settings);
}
Pipe StorageS3Queue::read(
@ -187,7 +187,7 @@ Pipe StorageS3Queue::read(
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(local_context, query_info.query);
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
@ -363,7 +363,7 @@ void StorageS3Queue::streamToViews()
// Create a stream for each consumer and join them in a union stream
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(s3queue_context, nullptr);
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(getContext()), getVirtuals());
const size_t max_download_threads = s3queue_context->getSettingsRef().max_download_threads;
auto pipe = Pipe(std::make_shared<StorageS3QueueSource>(

View File

@ -125,7 +125,7 @@ private:
};
std::shared_ptr<TaskContext> task;
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context_) const;
const UInt32 zk_create_table_retries = 1000;
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);

View File

@ -687,7 +687,7 @@ Pipe StorageAzureBlob::read(
query_info.query, virtual_columns, local_context, nullptr, local_context->getFileProgressCallback());
}
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;
@ -792,9 +792,9 @@ bool StorageAzureBlob::supportsPartitionBy() const
return true;
}
bool StorageAzureBlob::supportsSubsetOfColumns() const
bool StorageAzureBlob::supportsSubsetOfColumns(const ContextPtr & context) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context, format_settings);
}
bool StorageAzureBlob::prefersLargeBlocks() const

View File

@ -99,7 +99,7 @@ public:
bool supportsSubcolumns() const override { return true; }
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context) const;
bool supportsTrivialCountOptimization() const override { return true; }

View File

@ -803,9 +803,9 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
return columns;
}
bool StorageFile::supportsSubsetOfColumns() const
bool StorageFile::supportsSubsetOfColumns(const ContextPtr & context) const
{
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
return format_name != "Distributed" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context, format_settings);
}
bool StorageFile::prefersLargeBlocks() const
@ -1433,7 +1433,7 @@ Pipe StorageFile::read(
if (progress_callback && !archive_info)
progress_callback(FileProgress(0, total_bytes_to_read));
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& context->getSettingsRef().optimize_count_from_files;

View File

@ -74,7 +74,7 @@ public:
/// Is is useful because such formats could effectively skip unknown columns
/// So we can create a header of only required columns in read method and ask
/// format to read only them. Note: this hack cannot be done with ordinary formats like TSV.
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context) const;
bool supportsSubcolumns() const override { return true; }

View File

@ -983,9 +983,9 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
}
}
bool StorageS3::supportsSubsetOfColumns() const
bool StorageS3::supportsSubsetOfColumns(const ContextPtr & context) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context, format_settings);
}
bool StorageS3::prefersLargeBlocks() const
@ -1017,7 +1017,7 @@ Pipe StorageS3::read(
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
query_configuration, distributed_processing, local_context, query_info.query, virtual_columns, nullptr, local_context->getFileProgressCallback());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;

View File

@ -388,7 +388,7 @@ private:
bool supportsSubcolumns() const override { return true; }
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr & context) const;
bool prefersLargeBlocks() const override;

View File

@ -817,9 +817,9 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
return columns;
}
bool IStorageURLBase::supportsSubsetOfColumns() const
bool IStorageURLBase::supportsSubsetOfColumns(const ContextPtr & context) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context, format_settings);
}
bool IStorageURLBase::prefersLargeBlocks() const
@ -846,7 +846,7 @@ Pipe IStorageURLBase::read(
std::shared_ptr<StorageURLSource::IteratorWrapper> iterator_wrapper{nullptr};
bool is_url_with_globs = urlWithGlobs(uri);
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
if (distributed_processing)
{
@ -951,7 +951,7 @@ Pipe StorageURLWithFailover::read(
return uri_options;
});
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
const size_t max_threads = local_context->getSettingsRef().max_threads;
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams);

View File

@ -114,7 +114,7 @@ protected:
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const;
bool supportsSubsetOfColumns() const override;
virtual bool supportsSubsetOfColumns(const ContextPtr & context) const;
bool prefersLargeBlocks() const override;

View File

@ -146,7 +146,7 @@ SinkToStoragePtr StorageXDBC::write(const ASTPtr & /* query */, const StorageMet
compression_method);
}
bool StorageXDBC::supportsSubsetOfColumns() const
bool StorageXDBC::supportsSubsetOfColumns(const ContextPtr &) const
{
return true;
}

View File

@ -68,7 +68,7 @@ private:
Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const override;
bool supportsSubsetOfColumns() const override;
bool supportsSubsetOfColumns(const ContextPtr &) const override;
};
}

View File

@ -76,7 +76,7 @@ public:
/// because we cannot determine which column from table correspond to this virtual column.
virtual std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const { return {}; }
virtual bool supportsReadingSubsetOfColumns() { return true; }
virtual bool supportsReadingSubsetOfColumns(const ContextPtr &) { return true; }
/// Create storage according to the query.
StoragePtr

View File

@ -32,9 +32,9 @@ String ITableFunctionFileLike::getFormatFromFirstArgument()
return FormatFactory::instance().getFormatFromFileName(filename, true);
}
bool ITableFunctionFileLike::supportsReadingSubsetOfColumns()
bool ITableFunctionFileLike::supportsReadingSubsetOfColumns(const ContextPtr & context)
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format, context);
}
void ITableFunctionFileLike::parseArguments(const ASTPtr & ast_function, ContextPtr context)

View File

@ -27,7 +27,7 @@ public:
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
bool supportsReadingSubsetOfColumns() override;
bool supportsReadingSubsetOfColumns(const ContextPtr & context) override;
static size_t getMaxNumberOfArguments() { return 4; }

View File

@ -269,9 +269,9 @@ ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(Contex
return parseColumnsListFromString(configuration.structure, context);
}
bool TableFunctionAzureBlobStorage::supportsReadingSubsetOfColumns()
bool TableFunctionAzureBlobStorage::supportsReadingSubsetOfColumns(const ContextPtr & context)
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context);
}
StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const

View File

@ -49,7 +49,7 @@ public:
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
bool supportsReadingSubsetOfColumns() override;
bool supportsReadingSubsetOfColumns(const ContextPtr & context) override;
std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override
{

View File

@ -325,9 +325,9 @@ ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context,
return parseColumnsListFromString(configuration.structure, context);
}
bool TableFunctionS3::supportsReadingSubsetOfColumns()
bool TableFunctionS3::supportsReadingSubsetOfColumns(const ContextPtr & context)
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context);
}
StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const

View File

@ -47,7 +47,7 @@ public:
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
bool supportsReadingSubsetOfColumns() override;
bool supportsReadingSubsetOfColumns(const ContextPtr & context) override;
std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override
{

View File

@ -0,0 +1,10 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
echo -e "a,b,c\n1,2,3" > $CLICKHOUSE_TEST_UNIQUE_NAME.csvwithnames
$CLICKHOUSE_LOCAL -q "select b from file('$CLICKHOUSE_TEST_UNIQUE_NAME.csvwithnames') settings input_format_with_names_use_header=0"