From dc6e34075e690e27c718ce2b504b2abc10308f0e Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Mon, 13 Mar 2023 18:51:56 +0000 Subject: [PATCH] Read less unnecessary data from Parquet files --- src/Core/Settings.h | 2 +- src/IO/ReadBufferFromS3.cpp | 20 +++++++++++++++++++- src/IO/ReadBufferFromS3.h | 1 + src/Processors/Formats/ISchemaReader.h | 2 +- 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index c47432ae14a..5e4124462a3 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -638,7 +638,7 @@ class IColumn; M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \ M(UInt64, merge_tree_min_rows_for_concurrent_read_for_remote_filesystem, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized, when reading from remote filesystem.", 0) \ M(UInt64, merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized, when reading from remote filesystem.", 0) \ - M(UInt64, remote_read_min_bytes_for_seek, 4 * DBMS_DEFAULT_BUFFER_SIZE, "Min bytes required for remote read (url, s3) to do seek, instead for read with ignore.", 0) \ + M(UInt64, remote_read_min_bytes_for_seek, 4 * DBMS_DEFAULT_BUFFER_SIZE, "Min bytes required for remote read (url, s3) to do seek, instead of read with ignore.", 0) \ \ M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \ M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \ diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index 91905330b74..bf95bf59151 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -270,7 +270,25 @@ void ReadBufferFromS3::setReadUntilPosition(size_t position) if (position != static_cast(read_until_position)) { read_until_position = position; - impl.reset(); + if (impl) + { + // Not exactly a seek, but close enough. + ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection); + impl.reset(); + } + } +} + +void ReadBufferFromS3::setReadUntilEnd() +{ + if (read_until_position) + { + read_until_position = 0; + if (impl) + { + ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection); + impl.reset(); + } } } diff --git a/src/IO/ReadBufferFromS3.h b/src/IO/ReadBufferFromS3.h index 84e8d36865c..ad0430d35b1 100644 --- a/src/IO/ReadBufferFromS3.h +++ b/src/IO/ReadBufferFromS3.h @@ -69,6 +69,7 @@ public: size_t getFileSize() override; void setReadUntilPosition(size_t position) override; + void setReadUntilEnd() override; Range getRemainingReadRange() const override; diff --git a/src/Processors/Formats/ISchemaReader.h b/src/Processors/Formats/ISchemaReader.h index e6982ea743b..78b34a07840 100644 --- a/src/Processors/Formats/ISchemaReader.h +++ b/src/Processors/Formats/ISchemaReader.h @@ -16,7 +16,7 @@ namespace ErrorCodes } /// Base class for schema inference for the data in some specific format. -/// It reads some data from read buffer and try to determine the schema +/// It reads some data from read buffer and tries to determine the schema /// from read data. class ISchemaReader {