From 3bd1489f18d9812053a9c1b67601bb23188a3cba Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Fri, 5 May 2023 04:18:46 +0000 Subject: [PATCH 1/2] Propagate input_format_parquet_preserve_order to parallelizeOutputAfterReading() --- src/Formats/FormatFactory.cpp | 8 ++++++++ src/Formats/FormatFactory.h | 2 ++ src/Storages/IStorage.cpp | 2 +- src/Storages/IStorage.h | 2 +- src/Storages/StorageDictionary.h | 2 +- src/Storages/StorageFile.cpp | 5 +++++ src/Storages/StorageFile.h | 2 ++ src/Storages/StorageNull.h | 2 +- src/Storages/StorageS3.cpp | 5 +++++ src/Storages/StorageS3.h | 4 +++- src/Storages/StorageURL.cpp | 5 +++++ src/Storages/StorageURL.h | 2 ++ src/Storages/System/StorageSystemNumbers.h | 2 +- src/Storages/System/StorageSystemOne.h | 2 +- src/Storages/System/StorageSystemZeros.h | 2 +- 15 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index c8fc6c5e702..082ccb85970 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -809,6 +809,14 @@ bool FormatFactory::checkIfOutputFormatPrefersLargeBlocks(const String & name) c return target.prefers_large_blocks; } +bool FormatFactory::checkParallelizeOutputAfterReading(const String & name, ContextPtr context) const +{ + if (name == "Parquet" && context->getSettingsRef().input_format_parquet_preserve_order) + return false; + + return true; +} + void FormatFactory::checkFormatName(const String & name) const { auto it = dict.find(name); diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index edf20a92b00..677e34845d8 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -250,6 +250,8 @@ public: bool checkIfFormatHasAnySchemaReader(const String & name) const; bool checkIfOutputFormatPrefersLargeBlocks(const String & name) const; + bool checkParallelizeOutputAfterReading(const String & name, ContextPtr context) const; + void registerAdditionalInfoForSchemaCacheGetter(const String & name, AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter); String getAdditionalInfoForSchemaCache(const String & name, ContextPtr context, const std::optional & format_settings_ = std::nullopt); diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index 98990b9e37b..8cf708acd8b 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -137,7 +137,7 @@ void IStorage::read( /// parallelize processing if not yet const size_t output_ports = pipe.numOutputPorts(); const bool parallelize_output = context->getSettingsRef().parallelize_output_from_storages; - if (parallelize_output && parallelizeOutputAfterReading() && output_ports > 0 && output_ports < num_streams) + if (parallelize_output && parallelizeOutputAfterReading(context) && output_ports > 0 && output_ports < num_streams) pipe.resize(num_streams); readFromPipe(query_plan, std::move(pipe), column_names, storage_snapshot, query_info, context, getName()); diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 5743d903630..0070791d5de 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -377,7 +377,7 @@ private: /// This is enabled by default, but in some cases shouldn't be done. /// For example, when you read from system.numbers instead of system.numbers_mt, /// you still expect the data to be processed sequentially. - virtual bool parallelizeOutputAfterReading() const { return true; } + virtual bool parallelizeOutputAfterReading(ContextPtr) const { return true; } public: /// Other version of read which adds reading step to query plan. diff --git a/src/Storages/StorageDictionary.h b/src/Storages/StorageDictionary.h index 02efa609337..48230dcfa9f 100644 --- a/src/Storages/StorageDictionary.h +++ b/src/Storages/StorageDictionary.h @@ -76,7 +76,7 @@ public: /// FIXME: processing after reading from dictionaries are not parallelized due to some bug: /// count() can return wrong result, see test_dictionaries_redis/test_long.py::test_redis_dict_long - bool parallelizeOutputAfterReading() const override { return false; } + bool parallelizeOutputAfterReading(ContextPtr) const override { return false; } std::shared_ptr getDictionary() const; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 16769d72e4c..26d4aee8cfe 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -412,6 +412,11 @@ bool StorageFile::prefersLargeBlocks() const return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(format_name); } +bool StorageFile::parallelizeOutputAfterReading(ContextPtr context) const +{ + return FormatFactory::instance().checkParallelizeOutputAfterReading(format_name, context); +} + StorageFile::StorageFile(int table_fd_, CommonArguments args) : StorageFile(args) { diff --git a/src/Storages/StorageFile.h b/src/Storages/StorageFile.h index 2c901ce1573..53ce7eeaaf6 100644 --- a/src/Storages/StorageFile.h +++ b/src/Storages/StorageFile.h @@ -75,6 +75,8 @@ public: bool prefersLargeBlocks() const override; + bool parallelizeOutputAfterReading(ContextPtr context) const override; + bool supportsPartitionBy() const override { return true; } ColumnsDescription getTableStructureFromFileDescriptor(ContextPtr context); diff --git a/src/Storages/StorageNull.h b/src/Storages/StorageNull.h index 24ddb311b65..d35c6a0b8b5 100644 --- a/src/Storages/StorageNull.h +++ b/src/Storages/StorageNull.h @@ -42,7 +42,7 @@ public: std::make_shared(storage_snapshot->getSampleBlockForColumns(column_names))); } - bool parallelizeOutputAfterReading() const override { return false; } + bool parallelizeOutputAfterReading(ContextPtr) const override { return false; } bool supportsParallelInsert() const override { return true; } diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 8add005f7f5..d1b168d3f7d 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -1008,6 +1008,11 @@ bool StorageS3::prefersLargeBlocks() const return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration.format); } +bool StorageS3::parallelizeOutputAfterReading(ContextPtr context) const +{ + return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration.format, context); +} + Pipe StorageS3::read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 3148206a743..c3b862f6bbd 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -238,7 +238,7 @@ private: * It sends HTTP GET to server when select is called and * HTTP PUT when insert is called. */ -class StorageS3 : public IStorage, WithContext +class StorageS3 : public IStorage { public: struct Configuration : public StatelessTableEngineConfiguration @@ -366,6 +366,8 @@ private: bool prefersLargeBlocks() const override; + bool parallelizeOutputAfterReading(ContextPtr context) const override; + static std::optional tryGetColumnsFromCache( const KeysWithInfo::const_iterator & begin, const KeysWithInfo::const_iterator & end, diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 59543b3b4ef..b76d6e7e382 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -600,6 +600,11 @@ bool IStorageURLBase::prefersLargeBlocks() const return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(format_name); } +bool IStorageURLBase::parallelizeOutputAfterReading(ContextPtr context) const +{ + return FormatFactory::instance().checkParallelizeOutputAfterReading(format_name, context); +} + Pipe IStorageURLBase::read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, diff --git a/src/Storages/StorageURL.h b/src/Storages/StorageURL.h index 65bf86da594..a21a5c59298 100644 --- a/src/Storages/StorageURL.h +++ b/src/Storages/StorageURL.h @@ -102,6 +102,8 @@ protected: bool prefersLargeBlocks() const override; + bool parallelizeOutputAfterReading(ContextPtr context) const override; + private: virtual Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const = 0; diff --git a/src/Storages/System/StorageSystemNumbers.h b/src/Storages/System/StorageSystemNumbers.h index f7277f147f9..80590718d03 100644 --- a/src/Storages/System/StorageSystemNumbers.h +++ b/src/Storages/System/StorageSystemNumbers.h @@ -40,7 +40,7 @@ public: size_t max_block_size, size_t num_streams) override; - bool parallelizeOutputAfterReading() const override { return false; } + bool parallelizeOutputAfterReading(ContextPtr) const override { return false; } bool hasEvenlyDistributedRead() const override { return true; } bool isSystemStorage() const override { return true; } diff --git a/src/Storages/System/StorageSystemOne.h b/src/Storages/System/StorageSystemOne.h index eaf44b635cc..e3b6e1d5f78 100644 --- a/src/Storages/System/StorageSystemOne.h +++ b/src/Storages/System/StorageSystemOne.h @@ -30,7 +30,7 @@ public: size_t max_block_size, size_t num_streams) override; - bool parallelizeOutputAfterReading() const override { return false; } + bool parallelizeOutputAfterReading(ContextPtr) const override { return false; } bool isSystemStorage() const override { return true; } diff --git a/src/Storages/System/StorageSystemZeros.h b/src/Storages/System/StorageSystemZeros.h index 936fb772453..84b4b752aae 100644 --- a/src/Storages/System/StorageSystemZeros.h +++ b/src/Storages/System/StorageSystemZeros.h @@ -31,7 +31,7 @@ public: size_t max_block_size, size_t num_streams) override; - bool parallelizeOutputAfterReading() const override { return false; } + bool parallelizeOutputAfterReading(ContextPtr) const override { return false; } bool hasEvenlyDistributedRead() const override { return true; } bool isSystemStorage() const override { return true; } From ddb99279f786d52d47095818ce95181ba4622b63 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Fri, 5 May 2023 04:29:05 +0000 Subject: [PATCH 2/2] Test for input_format_parquet_preserve_order --- .../02725_parquet_preserve_order.reference | 12 ++++++++++++ .../0_stateless/02725_parquet_preserve_order.sh | 16 ++++++++++++++++ .../0_stateless/data_parquet/02725_data.parquet | Bin 0 -> 576 bytes 3 files changed, 28 insertions(+) create mode 100644 tests/queries/0_stateless/02725_parquet_preserve_order.reference create mode 100755 tests/queries/0_stateless/02725_parquet_preserve_order.sh create mode 100644 tests/queries/0_stateless/data_parquet/02725_data.parquet diff --git a/tests/queries/0_stateless/02725_parquet_preserve_order.reference b/tests/queries/0_stateless/02725_parquet_preserve_order.reference new file mode 100644 index 00000000000..e9c8f99bb33 --- /dev/null +++ b/tests/queries/0_stateless/02725_parquet_preserve_order.reference @@ -0,0 +1,12 @@ +0 +1 +2 +(Expression) +ExpressionTransform + (ReadFromStorage) + File 0 → 1 +(Expression) +ExpressionTransform × 2 + (ReadFromStorage) + Resize 1 → 2 + File 0 → 1 diff --git a/tests/queries/0_stateless/02725_parquet_preserve_order.sh b/tests/queries/0_stateless/02725_parquet_preserve_order.sh new file mode 100755 index 00000000000..ea3e4219e35 --- /dev/null +++ b/tests/queries/0_stateless/02725_parquet_preserve_order.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + + +# This file has a row group with 2 rows, then a row group with 1 row. +# It'll be read into two blocks. The first block will sleep 2x longer than the second. +# So reordering is very likely if the order-preservation doesn't work. + +$CLICKHOUSE_LOCAL -q "select number+sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=1" + +$CLICKHOUSE_LOCAL -q "explain pipeline select number+sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=1, max_threads=2" +$CLICKHOUSE_LOCAL -q "explain pipeline select number+sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=0, parallelize_output_from_storages=1, max_threads=2" diff --git a/tests/queries/0_stateless/data_parquet/02725_data.parquet b/tests/queries/0_stateless/data_parquet/02725_data.parquet new file mode 100644 index 0000000000000000000000000000000000000000..5b4842c9dbdf52bac11c9f9867508ea582191608 GIT binary patch literal 576 zcmWG=3^EjD5mgYC@&Qr|LJSPT42%p7j0_MU${@-n%Ao_45&$yTWXu>eIAC%T98g6d z%7BrXNo@wBj3|qwsQ?3;qy$@DX>L+#ktm-Si`W!Ku?dW7c4{2B^h<(lU=U?u01^T~ zih%)aqmYj%6WC1+AR!QdxrYhKJuo>54j2ol(14MNL2U;!Huo@zwJ?fxFsj{SQro}; zb`eY;k_p&cB&lP{Ai-9goRONF2y!cnDv%5DrmBq38es=u=rM?~N$N<-P%|h%!N;P; z0SrPB1_q$fRP_}oEI{D^3=k0}1`VZx#G=B|)Dqp~f&zuI)S}|d{5%CiJyShH-C#e* VfM5^*5J?#spf~w|sUQHDb^xl%LbCt> literal 0 HcmV?d00001