diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index f7f32cf9b6f..0591fe003fa 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -412,11 +412,26 @@ void FormatFactory::markOutputFormatSupportsParallelFormatting(const String & na { auto & target = dict[name].supports_parallel_formatting; if (target) - throw Exception("FormatFactory: Output format " + name + " is already marked as supporting parallel formatting.", ErrorCodes::LOGICAL_ERROR); + throw Exception("FormatFactory: Output format " + name + " is already marked as supporting parallel formatting", ErrorCodes::LOGICAL_ERROR); target = true; } +void FormatFactory::markFormatAsColumnOriented(const String & name) +{ + auto & target = dict[name].is_column_oriented; + if (target) + throw Exception("FormatFactory: Format " + name + " is already marked as column oriented", ErrorCodes::LOGICAL_ERROR); + target = true; +} + + +bool FormatFactory::checkIfFormatIsColumnOriented(const String & name) +{ + const auto & target = getCreators(name); + return target.is_column_oriented; +} + FormatFactory & FormatFactory::instance() { static FormatFactory ret; diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index 4fa7e9a0c01..3a21ef91d35 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -101,6 +101,7 @@ private: OutputProcessorCreator output_processor_creator; FileSegmentationEngine file_segmentation_engine; bool supports_parallel_formatting{false}; + bool is_column_oriented{false}; }; using FormatsDictionary = std::unordered_map; @@ -155,6 +156,9 @@ public: void registerOutputFormatProcessor(const String & name, OutputProcessorCreator output_creator); void markOutputFormatSupportsParallelFormatting(const String & name); + void markFormatAsColumnOriented(const String & name); + + bool checkIfFormatIsColumnOriented(const String & name); const FormatsDictionary & getAllFormats() const { diff --git a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp index 4edef1f1365..6a5c9718278 100644 --- a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp @@ -101,7 +101,7 @@ void ArrowBlockInputFormat::prepareReader() record_batch_current = 0; } -void registerInputFormatProcessorArrow(FormatFactory &factory) +void registerInputFormatProcessorArrow(FormatFactory & factory) { factory.registerInputFormatProcessor( "Arrow", @@ -112,7 +112,7 @@ void registerInputFormatProcessorArrow(FormatFactory &factory) { return std::make_shared(buf, sample, false); }); - + factory.markFormatAsColumnOriented("Arrow"); factory.registerInputFormatProcessor( "ArrowStream", [](ReadBuffer & buf, diff --git a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp index 7776a904f1c..9af03e93c32 100644 --- a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp @@ -64,6 +64,7 @@ void registerInputFormatProcessorORC(FormatFactory &factory) { return std::make_shared(buf, sample); }); + factory.markFormatAsColumnOriented("ORC"); } } diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index bb55c71b7ca..162185e75b8 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -94,6 +94,7 @@ void registerInputFormatProcessorParquet(FormatFactory &factory) { return std::make_shared(buf, sample); }); + factory.markFormatAsColumnOriented("Parquet"); } } diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 5524569e1f0..56f08802bc4 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include #include @@ -149,6 +151,11 @@ Strings StorageFile::getPathsList(const String & table_path, const String & user return paths; } +bool StorageFile::isColumnOriented() const +{ + return format_name != "Distributed" && FormatFactory::instance().checkIfFormatIsColumnOriented(format_name); +} + StorageFile::StorageFile(int table_fd_, CommonArguments args) : StorageFile(args) { @@ -227,6 +234,8 @@ static std::chrono::seconds getLockTimeout(const Context & context) return std::chrono::seconds{lock_timeout}; } +using StorageFilePtr = std::shared_ptr; + class StorageFileSource : public SourceWithProgress { @@ -257,6 +266,18 @@ public: return header; } + static Block getBlockForSource( + const StorageFilePtr & storage, + const StorageMetadataPtr & metadata_snapshot, + const ColumnsDescription & columns_description, + const FilesInfoPtr & files_info) + { + if (storage->isColumnOriented()) + return metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical(), storage->getVirtuals(), storage->getStorageID()); + else + return getHeader(metadata_snapshot, files_info->need_path_column, files_info->need_file_column); + } + StorageFileSource( std::shared_ptr storage_, const StorageMetadataPtr & metadata_snapshot_, @@ -264,7 +285,7 @@ public: UInt64 max_block_size_, FilesInfoPtr files_info_, ColumnsDescription columns_description_) - : SourceWithProgress(getHeader(metadata_snapshot_, files_info_->need_path_column, files_info_->need_file_column)) + : SourceWithProgress(getBlockForSource(storage_, metadata_snapshot_, columns_description_, files_info_)) , storage(std::move(storage_)) , metadata_snapshot(metadata_snapshot_) , files_info(std::move(files_info_)) @@ -344,8 +365,16 @@ public: } read_buf = wrapReadBufferWithCompressionMethod(std::move(nested_buffer), method); + + auto get_block_for_format = [&]() -> Block + { + if (storage->isColumnOriented()) + return metadata_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical()); + return metadata_snapshot->getSampleBlock(); + }; + auto format = FormatFactory::instance().getInput( - storage->format_name, *read_buf, metadata_snapshot->getSampleBlock(), context, max_block_size, storage->format_settings); + storage->format_name, *read_buf, get_block_for_format(), context, max_block_size, storage->format_settings); reader = std::make_shared(format); @@ -412,7 +441,6 @@ private: std::unique_lock unique_lock; }; - Pipe StorageFile::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, @@ -457,9 +485,16 @@ Pipe StorageFile::read( for (size_t i = 0; i < num_streams; ++i) { + const auto get_columns_for_format = [&]() -> ColumnsDescription + { + if (isColumnOriented()) + return ColumnsDescription{ + metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()).getNamesAndTypesList()}; + else + return metadata_snapshot->getColumns(); + }; pipes.emplace_back(std::make_shared( - this_ptr, metadata_snapshot, context, max_block_size, files_info, - metadata_snapshot->getColumns())); + this_ptr, metadata_snapshot, context, max_block_size, files_info, get_columns_for_format())); } return Pipe::unitePipes(std::move(pipes)); diff --git a/src/Storages/StorageFile.h b/src/Storages/StorageFile.h index c316412f808..27f7419321c 100644 --- a/src/Storages/StorageFile.h +++ b/src/Storages/StorageFile.h @@ -64,6 +64,12 @@ public: static Strings getPathsList(const String & table_path, const String & user_files_path, const Context & context); + /// Check if the format is column-oriented. + /// Is is useful because column oriented formats could effectively skip unknown columns + /// So we can create a header of only required columns in read method and ask + /// format to read only them. Note: this hack cannot be done with ordinary formats like TSV. + bool isColumnOriented() const; + protected: friend class StorageFileSource; friend class StorageFileBlockOutputStream; diff --git a/src/Storages/StorageInMemoryMetadata.cpp b/src/Storages/StorageInMemoryMetadata.cpp index 871ff38c07f..2f4a24a5c60 100644 --- a/src/Storages/StorageInMemoryMetadata.cpp +++ b/src/Storages/StorageInMemoryMetadata.cpp @@ -291,9 +291,10 @@ Block StorageInMemoryMetadata::getSampleBlockForColumns( { Block res; - std::unordered_map columns_map; - auto all_columns = getColumns().getAllWithSubcolumns(); + std::unordered_map columns_map; + columns_map.reserve(all_columns.size()); + for (const auto & elem : all_columns) columns_map.emplace(elem.name, elem.type); @@ -306,15 +307,11 @@ Block StorageInMemoryMetadata::getSampleBlockForColumns( { auto it = columns_map.find(name); if (it != columns_map.end()) - { res.insert({it->second->createColumn(), it->second, it->first}); - } else - { throw Exception( - "Column " + backQuote(name) + " not found in table " + storage_id.getNameForLogs(), + "Column " + backQuote(name) + " not found in table " + (storage_id.empty() ? "" : storage_id.getNameForLogs()), ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK); - } } return res; diff --git a/src/Storages/StorageInMemoryMetadata.h b/src/Storages/StorageInMemoryMetadata.h index 038416aff7d..00fb944c0b5 100644 --- a/src/Storages/StorageInMemoryMetadata.h +++ b/src/Storages/StorageInMemoryMetadata.h @@ -85,9 +85,10 @@ struct StorageInMemoryMetadata /// Returns combined set of columns const ColumnsDescription & getColumns() const; - /// Returns secondary indices + /// Returns secondary indices const IndicesDescription & getSecondaryIndices() const; + /// Has at least one non primary index bool hasSecondaryIndices() const; @@ -146,8 +147,7 @@ struct StorageInMemoryMetadata /// Storage metadata. StorageID required only for more clear exception /// message. Block getSampleBlockForColumns( - const Names & column_names, const NamesAndTypesList & virtuals, const StorageID & storage_id) const; - + const Names & column_names, const NamesAndTypesList & virtuals = {}, const StorageID & storage_id = StorageID::createEmpty()) const; /// Returns structure with partition key. const KeyDescription & getPartitionKey() const; /// Returns ASTExpressionList of partition key expression for storage or nullptr if there is none. diff --git a/tests/queries/1_stateful/00163_column_oriented_formats.reference b/tests/queries/1_stateful/00163_column_oriented_formats.reference new file mode 100644 index 00000000000..cb20aca4392 --- /dev/null +++ b/tests/queries/1_stateful/00163_column_oriented_formats.reference @@ -0,0 +1,12 @@ +Parquet +6b397d4643bc1f920f3eb8aa87ee180c - +7fe6d8c57ddc5fe37bbdcb7f73c5fa78 - +d8746733270cbeff7ab3550c9b944fb6 - +Arrow +6b397d4643bc1f920f3eb8aa87ee180c - +7fe6d8c57ddc5fe37bbdcb7f73c5fa78 - +d8746733270cbeff7ab3550c9b944fb6 - +ORC +6b397d4643bc1f920f3eb8aa87ee180c - +7fe6d8c57ddc5fe37bbdcb7f73c5fa78 - +d8746733270cbeff7ab3550c9b944fb6 - diff --git a/tests/queries/1_stateful/00163_column_oriented_formats.sh b/tests/queries/1_stateful/00163_column_oriented_formats.sh new file mode 100755 index 00000000000..1363ccf3c00 --- /dev/null +++ b/tests/queries/1_stateful/00163_column_oriented_formats.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + + +FORMATS=('Parquet' 'Arrow' 'ORC') + +for format in "${FORMATS[@]}" +do + echo $format + $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS 00163_column_oriented SYNC" + $CLICKHOUSE_CLIENT -q "CREATE TABLE 00163_column_oriented(ClientEventTime DateTime, MobilePhoneModel String, ClientIP6 FixedString(16)) ENGINE=File($format)" + $CLICKHOUSE_CLIENT -q "INSERT INTO 00163_column_oriented SELECT ClientEventTime, MobilePhoneModel, ClientIP6 FROM test.hits ORDER BY ClientEventTime, MobilePhoneModel, ClientIP6 LIMIT 100" + $CLICKHOUSE_CLIENT -q "SELECT ClientEventTime from 00163_column_oriented" | md5sum + $CLICKHOUSE_CLIENT -q "SELECT MobilePhoneModel from 00163_column_oriented" | md5sum + $CLICKHOUSE_CLIENT -q "SELECT ClientIP6 from 00163_column_oriented" | md5sum + $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS 00163_column_oriented SYNC" +done