Merge pull request #69176 from ClickHouse/backport/fix-s3-filter-handling

Fix s3 filter handling
This commit is contained in:
robot-clickhouse-ci-1 2024-09-03 11:56:58 +02:00 committed by GitHub
commit ba0225e96c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 56 additions and 9 deletions

View File

@ -4,11 +4,11 @@
#include <optional>
#include <string_view>
#include <Poco/Logger.h>
#include "Common/logger_useful.h"
#include "IO/CompressionMethod.h"
#include "IO/ReadBuffer.h"
#include "Interpreters/Context_fwd.h"
#include "Storages/MergeTree/ReplicatedMergeTreePartHeader.h"
#include <Common/logger_useful.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h>
#if USE_AWS_S3
@ -202,7 +202,7 @@ public:
Impl(
const S3::Client & client_,
const S3::URI & globbed_uri_,
const ActionsDAG::Node * predicate_,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns_,
ContextPtr context_,
KeysWithInfo * read_keys_,
@ -211,7 +211,6 @@ public:
: WithContext(context_)
, client(client_.clone())
, globbed_uri(globbed_uri_)
, predicate(predicate_)
, virtual_columns(virtual_columns_)
, read_keys(read_keys_)
, request_settings(request_settings_)
@ -226,6 +225,7 @@ public:
expanded_keys = expandSelectionGlob(globbed_uri.key);
expanded_keys_iter = expanded_keys.begin();
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
fillBufferForKey(*expanded_keys_iter);
expanded_keys_iter++;
}
@ -288,7 +288,6 @@ private:
recursive = globbed_uri.key == "/**";
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
fillInternalBufferAssumeLocked();
}
@ -459,7 +458,6 @@ private:
std::unique_ptr<S3::Client> client;
S3::URI globbed_uri;
const ActionsDAG::Node * predicate;
ASTPtr query;
NamesAndTypesList virtual_columns;
ActionsDAGPtr filter_dag;

View File

@ -0,0 +1 @@
43

View File

@ -0,0 +1,48 @@
-- Tags: no-fasttest
-- Tag no-fasttest: Depends on AWS
SET s3_truncate_on_insert=1;
INSERT INTO FUNCTION s3(
s3_conn,
filename='input/test_03229_s3_filter_handling_input_{_partition_id}',
format=Parquet,
structure='a UInt8, b String'
)
PARTITION BY a
SELECT
a % 8 AS a,
CASE
WHEN a % 5 = 1 THEN concat('a', b)
WHEN a % 5 = 2 THEN concat('b', b)
ELSE b
END AS b
FROM generateRandom('a UInt8, b String', 3453451233, 10, 2) LIMIT 100;
INSERT INTO FUNCTION s3(
s3_conn,
filename='output/test_03229_s3_filter_handling_output',
format=Parquet,
structure='a UInt8, b2 String'
)
SELECT
a,
CASE
WHEN startsWith(b, 'a') THEN concat('1', b)
WHEN startsWith(b, 'b') THEN concat('2', b)
ELSE b
END AS b2
FROM s3(
s3_conn,
filename='input/test_03229_s3_filter_handling_input_{1,2,3,4,5,6}*',
format=Parquet,
structure='a UInt8, b String'
)
WHERE (startsWith(b, 'a') OR startsWith(b, 'b'));
SELECT count() FROM s3(
s3_conn,
filename='output/test_03229_s3_filter_handling_output',
format=Parquet,
structure='a UInt8, b2 String'
);