Merge pull request #49536 from ClickHouse/propa

Make input_format_parquet_preserve_order imply !parallelize_output_from_storages
This commit is contained in:
Igor Nikonov 2023-05-06 14:53:36 +02:00 committed by GitHub
commit c23600fb56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 67 additions and 8 deletions

View File

@ -809,6 +809,14 @@ bool FormatFactory::checkIfOutputFormatPrefersLargeBlocks(const String & name) c
return target.prefers_large_blocks; return target.prefers_large_blocks;
} }
bool FormatFactory::checkParallelizeOutputAfterReading(const String & name, ContextPtr context) const
{
if (name == "Parquet" && context->getSettingsRef().input_format_parquet_preserve_order)
return false;
return true;
}
void FormatFactory::checkFormatName(const String & name) const void FormatFactory::checkFormatName(const String & name) const
{ {
auto it = dict.find(name); auto it = dict.find(name);

View File

@ -250,6 +250,8 @@ public:
bool checkIfFormatHasAnySchemaReader(const String & name) const; bool checkIfFormatHasAnySchemaReader(const String & name) const;
bool checkIfOutputFormatPrefersLargeBlocks(const String & name) const; bool checkIfOutputFormatPrefersLargeBlocks(const String & name) const;
bool checkParallelizeOutputAfterReading(const String & name, ContextPtr context) const;
void registerAdditionalInfoForSchemaCacheGetter(const String & name, AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter); void registerAdditionalInfoForSchemaCacheGetter(const String & name, AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter);
String getAdditionalInfoForSchemaCache(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_ = std::nullopt); String getAdditionalInfoForSchemaCache(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_ = std::nullopt);

View File

@ -137,7 +137,7 @@ void IStorage::read(
/// parallelize processing if not yet /// parallelize processing if not yet
const size_t output_ports = pipe.numOutputPorts(); const size_t output_ports = pipe.numOutputPorts();
const bool parallelize_output = context->getSettingsRef().parallelize_output_from_storages; const bool parallelize_output = context->getSettingsRef().parallelize_output_from_storages;
if (parallelize_output && parallelizeOutputAfterReading() && output_ports > 0 && output_ports < num_streams) if (parallelize_output && parallelizeOutputAfterReading(context) && output_ports > 0 && output_ports < num_streams)
pipe.resize(num_streams); pipe.resize(num_streams);
readFromPipe(query_plan, std::move(pipe), column_names, storage_snapshot, query_info, context, getName()); readFromPipe(query_plan, std::move(pipe), column_names, storage_snapshot, query_info, context, getName());

View File

@ -377,7 +377,7 @@ private:
/// This is enabled by default, but in some cases shouldn't be done. /// This is enabled by default, but in some cases shouldn't be done.
/// For example, when you read from system.numbers instead of system.numbers_mt, /// For example, when you read from system.numbers instead of system.numbers_mt,
/// you still expect the data to be processed sequentially. /// you still expect the data to be processed sequentially.
virtual bool parallelizeOutputAfterReading() const { return true; } virtual bool parallelizeOutputAfterReading(ContextPtr) const { return true; }
public: public:
/// Other version of read which adds reading step to query plan. /// Other version of read which adds reading step to query plan.

View File

@ -76,7 +76,7 @@ public:
/// FIXME: processing after reading from dictionaries are not parallelized due to some bug: /// FIXME: processing after reading from dictionaries are not parallelized due to some bug:
/// count() can return wrong result, see test_dictionaries_redis/test_long.py::test_redis_dict_long /// count() can return wrong result, see test_dictionaries_redis/test_long.py::test_redis_dict_long
bool parallelizeOutputAfterReading() const override { return false; } bool parallelizeOutputAfterReading(ContextPtr) const override { return false; }
std::shared_ptr<const IDictionary> getDictionary() const; std::shared_ptr<const IDictionary> getDictionary() const;

View File

@ -412,6 +412,11 @@ bool StorageFile::prefersLargeBlocks() const
return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(format_name); return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(format_name);
} }
bool StorageFile::parallelizeOutputAfterReading(ContextPtr context) const
{
return FormatFactory::instance().checkParallelizeOutputAfterReading(format_name, context);
}
StorageFile::StorageFile(int table_fd_, CommonArguments args) StorageFile::StorageFile(int table_fd_, CommonArguments args)
: StorageFile(args) : StorageFile(args)
{ {

View File

@ -75,6 +75,8 @@ public:
bool prefersLargeBlocks() const override; bool prefersLargeBlocks() const override;
bool parallelizeOutputAfterReading(ContextPtr context) const override;
bool supportsPartitionBy() const override { return true; } bool supportsPartitionBy() const override { return true; }
ColumnsDescription getTableStructureFromFileDescriptor(ContextPtr context); ColumnsDescription getTableStructureFromFileDescriptor(ContextPtr context);

View File

@ -42,7 +42,7 @@ public:
std::make_shared<NullSource>(storage_snapshot->getSampleBlockForColumns(column_names))); std::make_shared<NullSource>(storage_snapshot->getSampleBlockForColumns(column_names)));
} }
bool parallelizeOutputAfterReading() const override { return false; } bool parallelizeOutputAfterReading(ContextPtr) const override { return false; }
bool supportsParallelInsert() const override { return true; } bool supportsParallelInsert() const override { return true; }

View File

@ -1008,6 +1008,11 @@ bool StorageS3::prefersLargeBlocks() const
return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration.format); return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration.format);
} }
bool StorageS3::parallelizeOutputAfterReading(ContextPtr context) const
{
return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration.format, context);
}
Pipe StorageS3::read( Pipe StorageS3::read(
const Names & column_names, const Names & column_names,
const StorageSnapshotPtr & storage_snapshot, const StorageSnapshotPtr & storage_snapshot,

View File

@ -238,7 +238,7 @@ private:
* It sends HTTP GET to server when select is called and * It sends HTTP GET to server when select is called and
* HTTP PUT when insert is called. * HTTP PUT when insert is called.
*/ */
class StorageS3 : public IStorage, WithContext class StorageS3 : public IStorage
{ {
public: public:
struct Configuration : public StatelessTableEngineConfiguration struct Configuration : public StatelessTableEngineConfiguration
@ -366,6 +366,8 @@ private:
bool prefersLargeBlocks() const override; bool prefersLargeBlocks() const override;
bool parallelizeOutputAfterReading(ContextPtr context) const override;
static std::optional<ColumnsDescription> tryGetColumnsFromCache( static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const KeysWithInfo::const_iterator & begin, const KeysWithInfo::const_iterator & begin,
const KeysWithInfo::const_iterator & end, const KeysWithInfo::const_iterator & end,

View File

@ -600,6 +600,11 @@ bool IStorageURLBase::prefersLargeBlocks() const
return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(format_name); return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(format_name);
} }
bool IStorageURLBase::parallelizeOutputAfterReading(ContextPtr context) const
{
return FormatFactory::instance().checkParallelizeOutputAfterReading(format_name, context);
}
Pipe IStorageURLBase::read( Pipe IStorageURLBase::read(
const Names & column_names, const Names & column_names,
const StorageSnapshotPtr & storage_snapshot, const StorageSnapshotPtr & storage_snapshot,

View File

@ -102,6 +102,8 @@ protected:
bool prefersLargeBlocks() const override; bool prefersLargeBlocks() const override;
bool parallelizeOutputAfterReading(ContextPtr context) const override;
private: private:
virtual Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const = 0; virtual Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const = 0;

View File

@ -40,7 +40,7 @@ public:
size_t max_block_size, size_t max_block_size,
size_t num_streams) override; size_t num_streams) override;
bool parallelizeOutputAfterReading() const override { return false; } bool parallelizeOutputAfterReading(ContextPtr) const override { return false; }
bool hasEvenlyDistributedRead() const override { return true; } bool hasEvenlyDistributedRead() const override { return true; }
bool isSystemStorage() const override { return true; } bool isSystemStorage() const override { return true; }

View File

@ -30,7 +30,7 @@ public:
size_t max_block_size, size_t max_block_size,
size_t num_streams) override; size_t num_streams) override;
bool parallelizeOutputAfterReading() const override { return false; } bool parallelizeOutputAfterReading(ContextPtr) const override { return false; }
bool isSystemStorage() const override { return true; } bool isSystemStorage() const override { return true; }

View File

@ -31,7 +31,7 @@ public:
size_t max_block_size, size_t max_block_size,
size_t num_streams) override; size_t num_streams) override;
bool parallelizeOutputAfterReading() const override { return false; } bool parallelizeOutputAfterReading(ContextPtr) const override { return false; }
bool hasEvenlyDistributedRead() const override { return true; } bool hasEvenlyDistributedRead() const override { return true; }
bool isSystemStorage() const override { return true; } bool isSystemStorage() const override { return true; }

View File

@ -0,0 +1,12 @@
0
1
2
(Expression)
ExpressionTransform
(ReadFromStorage)
File 0 → 1
(Expression)
ExpressionTransform × 2
(ReadFromStorage)
Resize 1 → 2
File 0 → 1

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# This file has a row group with 2 rows, then a row group with 1 row.
# It'll be read into two blocks. The first block will sleep 2x longer than the second.
# So reordering is very likely if the order-preservation doesn't work.
$CLICKHOUSE_LOCAL -q "select number+sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=1"
$CLICKHOUSE_LOCAL -q "explain pipeline select number+sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=1, max_threads=2"
$CLICKHOUSE_LOCAL -q "explain pipeline select number+sleepEachRow(3) from file('$CURDIR/data_parquet/02725_data.parquet') settings input_format_parquet_preserve_order=0, parallelize_output_from_storages=1, max_threads=2"