Merge remote-tracking branch 'upstream/master' into cache-better-locks

This commit is contained in:
kssenii 2023-04-17 19:47:43 +02:00
commit cd25d61795
58 changed files with 1594 additions and 754 deletions

View File

@ -140,6 +140,8 @@
M(RestartReplicaThreadsActive, "Number of threads in the RESTART REPLICA thread pool running a task.") \
M(QueryPipelineExecutorThreads, "Number of threads in the PipelineExecutor thread pool.") \
M(QueryPipelineExecutorThreadsActive, "Number of threads in the PipelineExecutor thread pool running a task.") \
M(ParquetDecoderThreads, "Number of threads in the ParquetBlockInputFormat thread pool running a task.") \
M(ParquetDecoderThreadsActive, "Number of threads in the ParquetBlockInputFormat thread pool.") \
M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \
M(BrokenDistributedFilesToInsert, "Number of files for asynchronous insertion into Distributed tables that has been marked as broken. This metric will starts from 0 on start. Number of files for every shard is summed.") \
M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \

View File

@ -63,7 +63,7 @@
M(DiskReadElapsedMicroseconds, "Total time spent waiting for read syscall. This include reads from page cache.") \
M(DiskWriteElapsedMicroseconds, "Total time spent waiting for write syscall. This include writes to page cache.") \
M(NetworkReceiveElapsedMicroseconds, "Total time spent waiting for data to receive or receiving data from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(NetworkSendElapsedMicroseconds, "Total time spent waiting for data to send to network or sending data to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries..") \
M(NetworkSendElapsedMicroseconds, "Total time spent waiting for data to send to network or sending data to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(NetworkReceiveBytes, "Total number of bytes received from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(NetworkSendBytes, "Total number of bytes send to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
\
@ -249,7 +249,7 @@ The server successfully detected this situation and will download merged part fr
M(RWLockWritersWaitMilliseconds, "Total time spent waiting for a write lock to be acquired (in a heavy RWLock).") \
M(DNSError, "Total count of errors in DNS resolution") \
\
M(RealTimeMicroseconds, "Total (wall clock) time spent in processing (queries and other tasks) threads (not that this is a sum).") \
M(RealTimeMicroseconds, "Total (wall clock) time spent in processing (queries and other tasks) threads (note that this is a sum).") \
M(UserTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in user space. This include time CPU pipeline was stalled due to cache misses, branch mispredictions, hyper-threading, etc.") \
M(SystemTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in OS kernel space. This include time CPU pipeline was stalled due to cache misses, branch mispredictions, hyper-threading, etc.") \
M(MemoryOvercommitWaitTimeMicroseconds, "Total time spent in waiting for memory to be freed in OvercommitTracker.") \

View File

@ -1,46 +0,0 @@
#pragma once
#include <optional>
#include <cmath>
namespace DB
{
class RangeGenerator
{
public:
explicit RangeGenerator(size_t total_size_, size_t range_step_, size_t range_start = 0)
: from(range_start), range_step(range_step_), total_size(total_size_)
{
}
size_t totalRanges() const { return static_cast<size_t>(ceil(static_cast<float>(total_size - from) / range_step)); }
using Range = std::pair<size_t, size_t>;
// return upper exclusive range of values, i.e. [from_range, to_range>
std::optional<Range> nextRange()
{
if (from >= total_size)
{
return std::nullopt;
}
auto to = from + range_step;
if (to >= total_size)
{
to = total_size;
}
Range range{from, to};
from = to;
return range;
}
private:
size_t from;
size_t range_step;
size_t total_size;
};
}

View File

@ -323,7 +323,7 @@ using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true>;
/// one is at GlobalThreadPool level, the other is at ThreadPool level, so tracing context will be initialized on the same thread twice.
///
/// Once the worker on ThreadPool gains the control of execution, it won't return until it's shutdown,
/// which means the tracing context initialized at underlying worker level won't be delete for a very long time.
/// which means the tracing context initialized at underlying worker level won't be deleted for a very long time.
/// This would cause wrong context for further jobs scheduled in ThreadPool.
///
/// To make sure the tracing context is correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level.

View File

@ -1,22 +0,0 @@
#include <Common/RangeGenerator.h>
#include <gtest/gtest.h>
using namespace DB;
TEST(RangeGenerator, Common)
{
RangeGenerator g{25, 10};
EXPECT_EQ(g.totalRanges(), 3);
std::vector<RangeGenerator::Range> ranges{{0, 10}, {10, 20}, {20, 25}};
for (size_t i = 0; i < 3; ++i)
{
auto r = g.nextRange();
EXPECT_TRUE(r);
EXPECT_EQ(r, ranges[i]);
}
auto r = g.nextRange();
EXPECT_TRUE(!r);
}

View File

@ -11,6 +11,9 @@
namespace DB
{
namespace ErrorCodes { extern const int CANNOT_SCHEDULE_TASK; }
BackgroundSchedulePoolTaskInfo::BackgroundSchedulePoolTaskInfo(
BackgroundSchedulePool & pool_, const std::string & log_name_, const BackgroundSchedulePool::TaskFunc & function_)
: pool(pool_), log_name(log_name_), function(function_)
@ -158,10 +161,26 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Met
LOG_INFO(&Poco::Logger::get("BackgroundSchedulePool/" + thread_name), "Create BackgroundSchedulePool with {} threads", size_);
threads.resize(size_);
for (auto & thread : threads)
thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });
delayed_thread = std::make_unique<ThreadFromGlobalPoolNoTracingContextPropagation>([this] { delayExecutionThreadFunction(); });
try
{
for (auto & thread : threads)
thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });
delayed_thread = std::make_unique<ThreadFromGlobalPoolNoTracingContextPropagation>([this] { delayExecutionThreadFunction(); });
}
catch (...)
{
LOG_FATAL(
&Poco::Logger::get("BackgroundSchedulePool/" + thread_name),
"Couldn't get {} threads from global thread pool: {}",
size_,
getCurrentExceptionCode() == DB::ErrorCodes::CANNOT_SCHEDULE_TASK
? "Not enough threads. Please make sure max_thread_pool_size is considerably "
"bigger than background_schedule_pool_size."
: getCurrentExceptionMessage(/* with_stacktrace */ true));
abort();
}
}

View File

@ -638,7 +638,7 @@ class IColumn;
M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \
M(UInt64, merge_tree_min_rows_for_concurrent_read_for_remote_filesystem, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized, when reading from remote filesystem.", 0) \
M(UInt64, merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized, when reading from remote filesystem.", 0) \
M(UInt64, remote_read_min_bytes_for_seek, 4 * DBMS_DEFAULT_BUFFER_SIZE, "Min bytes required for remote read (url, s3) to do seek, instead for read with ignore.", 0) \
M(UInt64, remote_read_min_bytes_for_seek, 4 * DBMS_DEFAULT_BUFFER_SIZE, "Min bytes required for remote read (url, s3) to do seek, instead of read with ignore.", 0) \
\
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
@ -812,6 +812,8 @@ class IColumn;
M(Bool, input_format_orc_case_insensitive_column_matching, false, "Ignore case when matching ORC columns with CH columns.", 0) \
M(Bool, input_format_parquet_import_nested, false, "Allow to insert array of structs into Nested table in Parquet input format.", 0) \
M(Bool, input_format_parquet_case_insensitive_column_matching, false, "Ignore case when matching Parquet columns with CH columns.", 0) \
/* TODO: Consider unifying this with https://github.com/ClickHouse/ClickHouse/issues/38755 */ \
M(Bool, input_format_parquet_preserve_order, true, "Avoid reordering rows when reading from Parquet files. Usually makes it much slower.", 0) \
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \
M(Bool, input_format_orc_allow_missing_columns, false, "Allow missing columns while reading ORC input formats", 0) \
M(Bool, input_format_parquet_allow_missing_columns, false, "Allow missing columns while reading Parquet input formats", 0) \

View File

@ -99,7 +99,6 @@ QueryPipeline HTTPDictionarySource::loadAll()
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries,
ReadWriteBufferFromHTTP::Range{},
nullptr, false);
return createWrappedBuffer(std::move(in_ptr));
@ -120,7 +119,6 @@ QueryPipeline HTTPDictionarySource::loadUpdatedAll()
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries,
ReadWriteBufferFromHTTP::Range{},
nullptr, false);
return createWrappedBuffer(std::move(in_ptr));
@ -150,7 +148,6 @@ QueryPipeline HTTPDictionarySource::loadIds(const std::vector<UInt64> & ids)
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries,
ReadWriteBufferFromHTTP::Range{},
nullptr, false);
return createWrappedBuffer(std::move(in_ptr));
@ -180,7 +177,6 @@ QueryPipeline HTTPDictionarySource::loadKeys(const Columns & key_columns, const
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries,
ReadWriteBufferFromHTTP::Range{},
nullptr, false);
return createWrappedBuffer(std::move(in_ptr));

View File

@ -146,12 +146,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch(int64_t priority)
void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t position)
{
/// Do not reinitialize internal state in case the new end of range is already included.
/// Actually it is likely that we will anyway reinitialize it as seek method is called after
/// changing end position, but seek avoiding feature might help to avoid reinitialization,
/// so this check is useful to save the prefetch for the time when we try to avoid seek by
/// reading and ignoring some data.
if (!read_until_position || position > *read_until_position)
if (!read_until_position || position != *read_until_position)
{
read_until_position = position;

View File

@ -150,6 +150,11 @@ CachedOnDiskReadBufferFromFile::getCacheReadBuffer(const FileSegment & file_segm
/// Do not allow to use asynchronous version of LocalFSReadMethod.
local_read_settings.local_fs_method = LocalFSReadMethod::pread;
// The buffer will unnecessarily allocate a Memory of size local_fs_buffer_size, which will then
// most likely be unused because we're swap()ping our own internal_buffer into
// implementation_buffer before each read. But we can't just set local_fs_buffer_size = 0 here
// because some buffer implementations actually use that memory (e.g. for prefetching).
auto buf = createReadBufferFromFileBase(path, local_read_settings);
if (getFileSizeFromReadBuffer(*buf) == 0)
@ -175,7 +180,7 @@ CachedOnDiskReadBufferFromFile::getRemoteReadBuffer(FileSegment & file_segment,
* [___________] -- read_range_1 for query1
* [_______________] -- read_range_2 for query2
* ^___________^______^
* | segment1 | segment2
* | segment1 | segment2
*
* So query2 can reuse implementation buffer, which downloaded segment1.
* Implementation buffer from segment1 is passed to segment2 once segment1 is loaded.
@ -194,6 +199,10 @@ CachedOnDiskReadBufferFromFile::getRemoteReadBuffer(FileSegment & file_segment,
file_segment.setRemoteFileReader(remote_fs_segment_reader);
}
else
{
chassert(remote_fs_segment_reader->getFileOffsetOfBufferEnd() == file_segment.getCurrentWriteOffset());
}
return remote_fs_segment_reader;
}
@ -448,8 +457,7 @@ CachedOnDiskReadBufferFromFile::getImplementationBuffer(FileSegment & file_segme
{
read_buffer_for_file_segment->seek(file_offset_of_buffer_end, SEEK_SET);
assert(static_cast<size_t>(read_buffer_for_file_segment->getPosition()) == file_offset_of_buffer_end);
assert(static_cast<size_t>(read_buffer_for_file_segment->getFileOffsetOfBufferEnd()) == file_offset_of_buffer_end);
assert(read_buffer_for_file_segment->getFileOffsetOfBufferEnd() == file_offset_of_buffer_end);
}
const auto current_write_offset = file_segment.getCurrentWriteOffset(false);
@ -457,16 +465,21 @@ CachedOnDiskReadBufferFromFile::getImplementationBuffer(FileSegment & file_segme
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Buffer's offsets mismatch. Cached buffer offset: {}, current_write_offset: {} implementation buffer offset: {}, "
"implementation buffer remaining range: {}, file segment info: {}",
file_offset_of_buffer_end, current_write_offset, read_buffer_for_file_segment->getPosition(),
read_buffer_for_file_segment->getRemainingReadRange().toString(), file_segment.getInfoForLog());
"Buffer's offsets mismatch. Cached buffer offset: {}, current_write_offset: {}, implementation buffer position: {}, "
"implementation buffer end position: {}, file segment info: {}",
file_offset_of_buffer_end,
current_write_offset,
read_buffer_for_file_segment->getPosition(),
read_buffer_for_file_segment->getFileOffsetOfBufferEnd(),
file_segment->getInfoForLog());
}
break;
}
}
chassert(!read_buffer_for_file_segment->hasPendingData());
return read_buffer_for_file_segment;
}
@ -530,7 +543,8 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment)
/// download from offset a'' < a', but return buffer from offset a'.
LOG_TEST(log, "Bytes to predownload: {}, caller_id: {}", bytes_to_predownload, FileSegment::getCallerId());
chassert(implementation_buffer->getFileOffsetOfBufferEnd() == file_segment.getCurrentWriteOffset(false));
/// chassert(implementation_buffer->getFileOffsetOfBufferEnd() == file_segment.getCurrentWriteOffset(false));
chassert(static_cast<size_t>(implementation_buffer->getPosition()) == file_segment->getCurrentWriteOffset(false));
size_t current_offset = file_segment.getCurrentWriteOffset(false);
const auto & current_range = file_segment.range();
@ -781,6 +795,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
if (file_segments->empty())
return false;
bool implementation_buffer_can_be_reused = false;
SCOPE_EXIT({
try
{
@ -797,7 +812,12 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
{
bool need_complete_file_segment = file_segment.isDownloader();
if (need_complete_file_segment)
file_segment.completePartAndResetDownloader();
{
if (!implementation_buffer_can_be_reused)
file_segment->resetRemoteFileReader();
file_segment->completePartAndResetDownloader();
}
}
chassert(!file_segment.isDownloader());
@ -826,6 +846,10 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
chassert(!internal_buffer.empty());
// Pass a valid external buffer for implementation_buffer to read into.
// We then take it back with another swap() after reading is done.
// (If we get an exception in between, we'll be left with an invalid internal_buffer. That's ok, as long as
// the caller doesn't try to use this CachedOnDiskReadBufferFromFile after it threw an exception.)
swap(*implementation_buffer);
auto & file_segment = file_segments->front();
@ -833,10 +857,10 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
LOG_TEST(
log,
"Current read type: {}, read offset: {}, impl read range: {}, file segment: {}",
"Current read type: {}, read offset: {}, impl offset: {}, file segment: {}",
toString(read_type),
file_offset_of_buffer_end,
implementation_buffer->getRemainingReadRange().toString(),
implementation_buffer->getFileOffsetOfBufferEnd(),
file_segment.getInfoForLog());
chassert(current_read_range.left <= file_offset_of_buffer_end);
@ -893,13 +917,16 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
auto elapsed = watch.elapsedMicroseconds();
current_file_segment_counters.increment(ProfileEvents::FileSegmentReadMicroseconds, elapsed);
// We don't support implementation_buffer implementations that use nextimpl_working_buffer_offset.
chassert(implementation_buffer->position() == implementation_buffer->buffer().begin());
size = implementation_buffer->buffer().size();
LOG_TEST(
log,
"Read {} bytes, read type {}, position: {}, offset: {}, remaining read range: {}",
"Read {} bytes, read type {}, position: {}, offset: {}, segment end: {}",
size, toString(read_type), implementation_buffer->getPosition(),
implementation_buffer->getFileOffsetOfBufferEnd(), implementation_buffer->getRemainingReadRange().toString());
implementation_buffer->getFileOffsetOfBufferEnd(), file_segment->range().right);
if (read_type == ReadType::CACHED)
{
@ -933,6 +960,12 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
|| file_segment.getCurrentWriteOffset(false) == implementation_buffer->getFileOffsetOfBufferEnd());
LOG_TEST(log, "Successfully written {} bytes", size);
// The implementation_buffer is valid and positioned correctly (at file_segment->getCurrentWriteOffset()).
// Later reads for this file segment can reuse it.
// (It's reusable even if we don't reach the swap(*implementation_buffer) below,
// because the reuser must assign implementation_buffer's buffer anyway.)
implementation_buffer_can_be_reused = true;
}
else
{
@ -962,12 +995,15 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
}
file_offset_of_buffer_end += size;
chassert(file_offset_of_buffer_end <= read_until_position);
}
swap(*implementation_buffer);
current_file_segment_counters.increment(ProfileEvents::FileSegmentUsedBytes, available());
// No necessary because of the SCOPE_EXIT above, but useful for logging below.
if (download_current_segment)
file_segment.completePartAndResetDownloader();
@ -1101,6 +1137,8 @@ off_t CachedOnDiskReadBufferFromFile::seek(off_t offset, int whence)
implementation_buffer.reset();
initialized = false;
LOG_TEST(log, "Reset state for seek to position {}", new_pos);
return new_pos;
}
@ -1125,9 +1163,18 @@ void CachedOnDiskReadBufferFromFile::setReadUntilPosition(size_t position)
if (!allow_seeks_after_first_read)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Method `setReadUntilPosition()` not allowed");
read_until_position = position;
initialized = false;
if (read_until_position == position)
return;
file_offset_of_buffer_end = getPosition();
resetWorkingBuffer();
file_segments_holder.reset();
implementation_buffer.reset();
initialized = false;
read_until_position = position;
LOG_TEST(log, "Set read_until_position to {}", read_until_position);
}
void CachedOnDiskReadBufferFromFile::setReadUntilEnd()
@ -1149,12 +1196,6 @@ void CachedOnDiskReadBufferFromFile::assertCorrectness() const
String CachedOnDiskReadBufferFromFile::getInfoForLog()
{
String implementation_buffer_read_range_str;
if (implementation_buffer)
implementation_buffer_read_range_str = implementation_buffer->getRemainingReadRange().toString();
else
implementation_buffer_read_range_str = "None";
String current_file_segment_info;
if (file_segments->empty())
current_file_segment_info = "None";
@ -1162,13 +1203,13 @@ String CachedOnDiskReadBufferFromFile::getInfoForLog()
current_file_segment_info = file_segments->front().getInfoForLog();
return fmt::format(
"Buffer path: {}, hash key: {}, file_offset_of_buffer_end: {}, "
"internal buffer remaining read range: {}, "
"read_type: {}, last caller: {}, file segment info: {}",
"Buffer path: {}, hash key: {}, file_offset_of_buffer_end: {}, read_until_position: {}, "
"internal buffer end: {}, read_type: {}, last caller: {}, file segment info: {}",
source_file_path,
cache_key.toString(),
file_offset_of_buffer_end,
implementation_buffer_read_range_str,
read_until_position,
implementation_buffer ? std::to_string(implementation_buffer->getFileOffsetOfBufferEnd()) : "None",
toString(read_type),
last_caller_id,
current_file_segment_info);

View File

@ -54,14 +54,6 @@ ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage(
}
}
SeekableReadBuffer::Range ReadBufferFromAzureBlobStorage::getRemainingReadRange() const
{
return Range{
.left = static_cast<size_t>(offset),
.right = read_until_position ? std::optional{read_until_position - 1} : std::nullopt
};
}
void ReadBufferFromAzureBlobStorage::setReadUntilPosition(size_t position)
{
read_until_position = position;

View File

@ -38,8 +38,6 @@ public:
void setReadUntilPosition(size_t position) override;
Range getRemainingReadRange() const override;
bool supportsRightBoundedReads() const override { return true; }
private:

View File

@ -41,18 +41,15 @@ ReadBufferFromWebServer::ReadBufferFromWebServer(
std::unique_ptr<ReadBuffer> ReadBufferFromWebServer::initialize()
{
Poco::URI uri(url);
ReadWriteBufferFromHTTP::Range range;
if (read_until_position)
{
if (read_until_position < offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
range = { .begin = static_cast<size_t>(offset), .end = read_until_position - 1 };
LOG_DEBUG(log, "Reading with range: {}-{}", offset, read_until_position);
}
else
{
range = { .begin = static_cast<size_t>(offset), .end = std::nullopt };
LOG_DEBUG(log, "Reading from offset: {}", offset);
}
@ -60,7 +57,7 @@ std::unique_ptr<ReadBuffer> ReadBufferFromWebServer::initialize()
const auto & config = context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 20), 0};
return std::make_unique<ReadWriteBufferFromHTTP>(
auto res = std::make_unique<ReadWriteBufferFromHTTP>(
uri,
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
@ -74,10 +71,16 @@ std::unique_ptr<ReadBuffer> ReadBufferFromWebServer::initialize()
buf_size,
read_settings,
HTTPHeaderEntries{},
range,
&context->getRemoteHostFilter(),
/* delay_initialization */true,
use_external_buffer);
if (read_until_position)
res->setReadUntilPosition(read_until_position);
if (offset)
res->seek(offset, SEEK_SET);
return res;
}
@ -88,15 +91,6 @@ void ReadBufferFromWebServer::setReadUntilPosition(size_t position)
}
SeekableReadBuffer::Range ReadBufferFromWebServer::getRemainingReadRange() const
{
return Range{
.left = static_cast<size_t>(offset),
.right = read_until_position ? std::optional{read_until_position - 1} : std::nullopt
};
}
bool ReadBufferFromWebServer::nextImpl()
{
if (read_until_position)

View File

@ -37,8 +37,6 @@ public:
size_t getFileOffsetOfBufferEnd() const override { return offset; }
Range getRemainingReadRange() const override;
bool supportsRightBoundedReads() const override { return true; }
private:

View File

@ -5,6 +5,7 @@
#include <Formats/FormatSettings.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <IO/IOThreadPool.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
@ -113,6 +114,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.parquet.output_version = settings.output_format_parquet_version;
format_settings.parquet.import_nested = settings.input_format_parquet_import_nested;
format_settings.parquet.case_insensitive_column_matching = settings.input_format_parquet_case_insensitive_column_matching;
format_settings.parquet.preserve_order = settings.input_format_parquet_preserve_order;
format_settings.parquet.allow_missing_columns = settings.input_format_parquet_allow_missing_columns;
format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference;
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
@ -221,45 +223,114 @@ InputFormatPtr FormatFactory::getInput(
const Block & sample,
ContextPtr context,
UInt64 max_block_size,
const std::optional<FormatSettings> & _format_settings) const
const std::optional<FormatSettings> & format_settings,
std::optional<size_t> max_parsing_threads) const
{
auto format_settings = _format_settings
? *_format_settings : getFormatSettings(context);
return getInputImpl(
name,
nullptr,
&buf,
sample,
context,
max_block_size,
/* is_remote_fs */ false,
CompressionMethod::None,
format_settings,
/* max_download_threads */ 1,
max_parsing_threads);
}
if (!getCreators(name).input_creator)
InputFormatPtr FormatFactory::getInputRandomAccess(
const String & name,
SeekableReadBufferFactoryPtr buf_factory,
const Block & sample,
ContextPtr context,
UInt64 max_block_size,
bool is_remote_fs,
CompressionMethod compression,
const std::optional<FormatSettings> & format_settings,
std::optional<size_t> max_download_threads,
std::optional<size_t> max_parsing_threads) const
{
return getInputImpl(
name,
std::move(buf_factory),
nullptr,
sample,
context,
max_block_size,
is_remote_fs,
compression,
format_settings,
max_download_threads,
max_parsing_threads);
}
InputFormatPtr FormatFactory::getInputImpl(
const String & name,
// exactly one of the following two is nullptr
SeekableReadBufferFactoryPtr buf_factory,
ReadBuffer * _buf,
const Block & sample,
ContextPtr context,
UInt64 max_block_size,
bool is_remote_fs,
CompressionMethod compression,
const std::optional<FormatSettings> & _format_settings,
std::optional<size_t> _max_download_threads,
std::optional<size_t> _max_parsing_threads) const
{
chassert((!_buf) != (!buf_factory));
const auto& creators = getCreators(name);
if (!creators.input_creator && !creators.random_access_input_creator)
throw Exception(ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT, "Format {} is not suitable for input", name);
auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
const Settings & settings = context->getSettingsRef();
const auto & file_segmentation_engine = getCreators(name).file_segmentation_engine;
size_t max_parsing_threads = _max_parsing_threads.value_or(settings.max_threads);
size_t max_download_threads = _max_download_threads.value_or(settings.max_download_threads);
// Doesn't make sense to use parallel parsing with less than four threads
// (segmentator + two parsers + reader).
bool parallel_parsing = settings.input_format_parallel_parsing && file_segmentation_engine && settings.max_threads >= 4;
RowInputFormatParams row_input_format_params;
row_input_format_params.max_block_size = max_block_size;
row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num;
row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
row_input_format_params.max_execution_time = settings.max_execution_time;
row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;
if (settings.max_memory_usage && settings.min_chunk_bytes_for_parallel_parsing * settings.max_threads * 2 > settings.max_memory_usage)
if (context->hasQueryContext() && settings.log_queries)
context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Format, name);
// Prepare a read buffer.
std::unique_ptr<ReadBuffer> owned_buf;
if (buf_factory)
owned_buf = prepareReadBuffer(buf_factory, compression, creators, format_settings, settings, max_download_threads);
auto * buf = owned_buf ? owned_buf.get() : _buf;
// Decide whether to use parallel ParallelParsingInputFormat.
bool parallel_parsing = max_parsing_threads > 1 && settings.input_format_parallel_parsing && creators.file_segmentation_engine && !creators.random_access_input_creator;
if (settings.max_memory_usage && settings.min_chunk_bytes_for_parallel_parsing * max_parsing_threads * 2 > settings.max_memory_usage)
parallel_parsing = false;
if (settings.max_memory_usage_for_user && settings.min_chunk_bytes_for_parallel_parsing * settings.max_threads * 2 > settings.max_memory_usage_for_user)
if (settings.max_memory_usage_for_user && settings.min_chunk_bytes_for_parallel_parsing * max_parsing_threads * 2 > settings.max_memory_usage_for_user)
parallel_parsing = false;
if (parallel_parsing)
{
const auto & non_trivial_prefix_and_suffix_checker = getCreators(name).non_trivial_prefix_and_suffix_checker;
const auto & non_trivial_prefix_and_suffix_checker = creators.non_trivial_prefix_and_suffix_checker;
/// Disable parallel parsing for input formats with non-trivial readPrefix() and readSuffix().
if (non_trivial_prefix_and_suffix_checker && non_trivial_prefix_and_suffix_checker(buf))
if (non_trivial_prefix_and_suffix_checker && non_trivial_prefix_and_suffix_checker(*buf))
parallel_parsing = false;
}
// Create the InputFormat in one of 3 ways.
InputFormatPtr format;
if (parallel_parsing)
{
const auto & input_getter = getCreators(name).input_creator;
RowInputFormatParams row_input_format_params;
row_input_format_params.max_block_size = max_block_size;
row_input_format_params.allow_errors_num = format_settings.input_allow_errors_num;
row_input_format_params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
row_input_format_params.max_execution_time = settings.max_execution_time;
row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;
const auto & input_getter = creators.input_creator;
/// Const reference is copied to lambda.
auto parser_creator = [input_getter, sample, row_input_format_params, format_settings]
@ -267,57 +338,101 @@ InputFormatPtr FormatFactory::getInput(
{ return input_getter(input, sample, row_input_format_params, format_settings); };
ParallelParsingInputFormat::Params params{
buf, sample, parser_creator, file_segmentation_engine, name, settings.max_threads,
*buf, sample, parser_creator, creators.file_segmentation_engine, name, max_parsing_threads,
settings.min_chunk_bytes_for_parallel_parsing, max_block_size, context->getApplicationType() == Context::ApplicationType::SERVER};
auto format = std::make_shared<ParallelParsingInputFormat>(params);
if (!settings.input_format_record_errors_file_path.toString().empty())
format->setErrorsLogger(std::make_shared<ParallelInputFormatErrorsLogger>(context));
return format;
format = std::make_shared<ParallelParsingInputFormat>(params);
}
else if (creators.random_access_input_creator)
{
format = creators.random_access_input_creator(
buf,
std::move(buf_factory),
sample,
format_settings,
context->getReadSettings(),
is_remote_fs,
max_download_threads,
max_parsing_threads);
}
else
{
auto format = getInputFormat(name, buf, sample, context, max_block_size, format_settings);
if (!settings.input_format_record_errors_file_path.toString().empty())
format->setErrorsLogger(std::make_shared<InputFormatErrorsLogger>(context));
return format;
format = creators.input_creator(*buf, sample, row_input_format_params, format_settings);
}
}
InputFormatPtr FormatFactory::getInputFormat(
const String & name,
ReadBuffer & buf,
const Block & sample,
ContextPtr context,
UInt64 max_block_size,
const std::optional<FormatSettings> & _format_settings) const
{
const auto & input_getter = getCreators(name).input_creator;
if (!input_getter)
throw Exception(ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT, "Format {} is not suitable for input", name);
const Settings & settings = context->getSettingsRef();
if (context->hasQueryContext() && settings.log_queries)
context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Format, name);
auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
RowInputFormatParams params;
params.max_block_size = max_block_size;
params.allow_errors_num = format_settings.input_allow_errors_num;
params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
params.max_execution_time = settings.max_execution_time;
params.timeout_overflow_mode = settings.timeout_overflow_mode;
auto format = input_getter(buf, sample, params, format_settings);
if (owned_buf)
format->addBuffer(std::move(owned_buf));
if (!settings.input_format_record_errors_file_path.toString().empty())
format->setErrorsLogger(std::make_shared<ParallelInputFormatErrorsLogger>(context));
/// It's a kludge. Because I cannot remove context from values format.
/// (Not needed in the parallel_parsing case above because VALUES format doesn't support it.)
if (auto * values = typeid_cast<ValuesBlockInputFormat *>(format.get()))
values->setContext(context);
return format;
}
std::unique_ptr<ReadBuffer> FormatFactory::prepareReadBuffer(
SeekableReadBufferFactoryPtr & buf_factory,
CompressionMethod compression,
const Creators & creators,
const FormatSettings & format_settings,
const Settings & settings,
size_t max_download_threads) const
{
std::unique_ptr<ReadBuffer> res;
bool parallel_read = max_download_threads > 1 && buf_factory && format_settings.seekable_read;
if (creators.random_access_input_creator)
parallel_read &= compression != CompressionMethod::None;
if (parallel_read)
{
try
{
parallel_read = buf_factory->checkIfActuallySeekable()
&& buf_factory->getFileSize() >= 2 * settings.max_download_buffer_size;
}
catch (const Poco::Exception & e)
{
parallel_read = false;
LOG_TRACE(
&Poco::Logger::get("FormatFactory"),
"Failed to setup ParallelReadBuffer because of an exception:\n{}.\n"
"Falling back to the single-threaded buffer",
e.displayText());
}
}
if (parallel_read)
{
LOG_TRACE(
&Poco::Logger::get("FormatFactory"),
"Using ParallelReadBuffer with {} workers with chunks of {} bytes",
max_download_threads,
settings.max_download_buffer_size);
res = std::make_unique<ParallelReadBuffer>(
std::move(buf_factory),
threadPoolCallbackRunner<void>(IOThreadPool::get(), "ParallelRead"),
max_download_threads,
settings.max_download_buffer_size);
}
if (compression != CompressionMethod::None)
{
if (!res)
res = buf_factory->getReader(); // NOLINT
res = wrapReadBufferWithCompressionMethod(std::move(res), compression, static_cast<int>(settings.zstd_window_log_max));
}
if (!creators.random_access_input_creator && !res)
res = buf_factory->getReader();
return res;
}
static void addExistingProgressToOutputFormat(OutputFormatPtr format, ContextPtr context)
{
auto element_id = context->getProcessListElement();
@ -454,10 +569,22 @@ ExternalSchemaReaderPtr FormatFactory::getExternalSchemaReader(
void FormatFactory::registerInputFormat(const String & name, InputCreator input_creator)
{
auto & target = dict[name].input_creator;
if (target)
chassert(input_creator);
auto & creators = dict[name];
if (creators.input_creator || creators.random_access_input_creator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Input format {} is already registered", name);
target = std::move(input_creator);
creators.input_creator = std::move(input_creator);
registerFileExtension(name, name);
KnownFormatNames::instance().add(name);
}
void FormatFactory::registerRandomAccessInputFormat(const String & name, RandomAccessInputCreator input_creator)
{
chassert(input_creator);
auto & creators = dict[name];
if (creators.input_creator || creators.random_access_input_creator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Input format {} is already registered", name);
creators.random_access_input_creator = std::move(input_creator);
registerFileExtension(name, name);
KnownFormatNames::instance().add(name);
}
@ -641,7 +768,7 @@ String FormatFactory::getAdditionalInfoForSchemaCache(const String & name, Conte
bool FormatFactory::isInputFormat(const String & name) const
{
auto it = dict.find(name);
return it != dict.end() && it->second.input_creator;
return it != dict.end() && (it->second.input_creator || it->second.random_access_input_creator);
}
bool FormatFactory::isOutputFormat(const String & name) const

View File

@ -6,6 +6,7 @@
#include <Interpreters/Context_fwd.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressionMethod.h>
#include <IO/ParallelReadBuffer.h>
#include <base/types.h>
#include <Core/NamesAndTypes.h>
@ -71,12 +72,40 @@ public:
size_t max_rows)>;
private:
// On the input side, there are two kinds of formats:
// * InputCreator - formats parsed sequentially, e.g. CSV. Almost all formats are like this.
// FormatFactory uses ParallelReadBuffer to read in parallel, and ParallelParsingInputFormat
// to parse in parallel; the formats mostly don't need to worry about it.
// * RandomAccessInputCreator - column-oriented formats that require seeking back and forth in
// the file when reading. E.g. Parquet has metadata at the end of the file (needs to be read
// before we can parse any data), can skip columns by seeking in the file, and often reads
// many short byte ranges from the file. ParallelReadBuffer and ParallelParsingInputFormat
// are a poor fit. Instead, the format implementation is in charge of parallelizing both
// reading and parsing.
using InputCreator = std::function<InputFormatPtr(
ReadBuffer & buf,
const Block & header,
const RowInputFormatParams & params,
const FormatSettings & settings)>;
// Incompatible with FileSegmentationEngine.
// When created using SeekableReadBufferFactoryPtr, the IInputFormat doesn't support
// resetParser() and setReadBuffer().
//
// In future we may also want to pass some information about WHERE conditions (SelectQueryInfo?)
// and get some information about projections (min/max/count per column per row group).
using RandomAccessInputCreator = std::function<InputFormatPtr(
// exactly one of these two is nullptr
ReadBuffer * buf,
SeekableReadBufferFactoryPtr buf_factory,
const Block & header,
const FormatSettings & settings,
const ReadSettings& read_settings,
bool is_remote_fs,
size_t max_download_threads,
size_t max_parsing_threads)>;
using OutputCreator = std::function<OutputFormatPtr(
WriteBuffer & buf,
const Block & sample,
@ -104,6 +133,7 @@ private:
struct Creators
{
InputCreator input_creator;
RandomAccessInputCreator random_access_input_creator;
OutputCreator output_creator;
FileSegmentationEngine file_segmentation_engine;
SchemaReaderCreator schema_reader_creator;
@ -122,21 +152,32 @@ private:
public:
static FormatFactory & instance();
// Format parser from a single ReadBuffer.
// Parallelizes parsing (when possible) but not reading.
InputFormatPtr getInput(
const String & name,
ReadBuffer & buf,
const Block & sample,
ContextPtr context,
UInt64 max_block_size,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
const std::optional<FormatSettings> & format_settings = std::nullopt,
std::optional<size_t> max_parsing_threads = std::nullopt) const;
InputFormatPtr getInputFormat(
// Format parser from a random-access source (factory of seekable read buffers).
// Parallelizes both parsing and reading when possible.
// Prefer this over getInput() when reading from random-access source like file or HTTP.
InputFormatPtr getInputRandomAccess(
const String & name,
ReadBuffer & buf,
SeekableReadBufferFactoryPtr buf_factory,
const Block & sample,
ContextPtr context,
UInt64 max_block_size,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
bool is_remote_fs,
CompressionMethod compression,
// if nullopt, getFormatSettings(context) is used
const std::optional<FormatSettings> & format_settings = std::nullopt,
std::optional<size_t> max_download_threads = std::nullopt,
std::optional<size_t> max_parsing_threads = std::nullopt) const;
/// Checks all preconditions. Returns ordinary format if parallel formatting cannot be done.
OutputFormatPtr getOutputFormatParallelIfPossible(
@ -183,6 +224,7 @@ public:
/// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator);
void registerRandomAccessInputFormat(const String & name, RandomAccessInputCreator input_creator);
void registerOutputFormat(const String & name, OutputCreator output_creator);
/// Register file extension for format
@ -225,6 +267,29 @@ private:
const Creators & getCreators(const String & name) const;
InputFormatPtr getInputImpl(
const String & name,
// exactly one of the following two is nullptr
SeekableReadBufferFactoryPtr buf_factory,
ReadBuffer * buf,
const Block & sample,
ContextPtr context,
UInt64 max_block_size,
bool is_remote_fs,
CompressionMethod compression,
const std::optional<FormatSettings> & format_settings,
std::optional<size_t> max_download_threads,
std::optional<size_t> max_parsing_threads) const;
// Creates a ReadBuffer to give to an input format.
// Returns nullptr if we should give it the whole factory.
std::unique_ptr<ReadBuffer> prepareReadBuffer(
SeekableReadBufferFactoryPtr & buf_factory,
CompressionMethod compression,
const Creators & creators,
const FormatSettings & format_settings,
const Settings & settings,
size_t max_download_threads) const;
};
}

View File

@ -213,6 +213,9 @@ struct FormatSettings
std::unordered_set<int> skip_row_groups = {};
bool output_string_as_string = false;
bool output_fixed_string_as_fixed_byte_array = true;
// TODO: This should probably be shared among all formats and with
// https://github.com/ClickHouse/ClickHouse/issues/38755
bool preserve_order = false;
UInt64 max_block_size = 8192;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;

View File

@ -19,14 +19,6 @@ void BoundedReadBuffer::setReadUntilEnd()
read_until_position.reset();
}
SeekableReadBuffer::Range BoundedReadBuffer::getRemainingReadRange() const
{
std::optional<size_t> right_bound_included;
if (read_until_position)
right_bound_included = *read_until_position - 1;
return Range{file_offset_of_buffer_end, right_bound_included};
}
off_t BoundedReadBuffer::getPosition()
{
return file_offset_of_buffer_end - (working_buffer.end() - pos);

View File

@ -22,8 +22,6 @@ public:
off_t seek(off_t off, int whence) override;
Range getRemainingReadRange() const override;
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }
/// file_offset_of_buffer_end can differ from impl's file_offset_of_buffer_end

View File

@ -13,12 +13,13 @@ namespace ErrorCodes
}
// A subrange of the input, read by one SeekableReadBuffer.
struct ParallelReadBuffer::ReadWorker
{
explicit ReadWorker(SeekableReadBufferPtr reader_) : reader(std::move(reader_)), range(reader->getRemainingReadRange())
ReadWorker(std::unique_ptr<SeekableReadBuffer> reader_, size_t offset_, size_t size)
: reader(std::move(reader_)), offset(offset_), bytes_left(size), range_end(offset + bytes_left)
{
assert(range.right);
bytes_left = *range.right - range.left + 1;
assert(bytes_left);
}
auto hasSegment() const { return current_segment_index < segments.size(); }
@ -28,26 +29,34 @@ struct ParallelReadBuffer::ReadWorker
assert(hasSegment());
auto next_segment = std::move(segments[current_segment_index]);
++current_segment_index;
range.left += next_segment.size();
offset += next_segment.size();
return next_segment;
}
SeekableReadBufferPtr reader;
std::vector<Memory<>> segments;
size_t current_segment_index = 0;
bool finished{false};
SeekableReadBuffer::Range range;
size_t bytes_left{0};
std::unique_ptr<SeekableReadBuffer> reader;
// Reader thread produces segments, nextImpl() consumes them.
std::vector<Memory<>> segments; // segments that were produced
size_t current_segment_index = 0; // first segment that's not consumed
bool finished{false}; // no more segments will be produced
size_t offset; // start of segments[current_segment_idx]
size_t bytes_left; // bytes left to produce above segments end
size_t range_end; // segments end + bytes_left, i.e. how far this worker will read
// segments[current_segment_idx..end] range_end
// |-------------|--------------------------------------|------------|
// offset bytes_left
std::atomic_bool cancel{false};
std::mutex worker_mutex;
};
ParallelReadBuffer::ParallelReadBuffer(
std::unique_ptr<ReadBufferFactory> reader_factory_, ThreadPoolCallbackRunner<void> schedule_, size_t max_working_readers_)
std::unique_ptr<SeekableReadBufferFactory> reader_factory_, ThreadPoolCallbackRunner<void> schedule_, size_t max_working_readers_, size_t range_step_)
: SeekableReadBuffer(nullptr, 0)
, max_working_readers(max_working_readers_)
, schedule(std::move(schedule_))
, reader_factory(std::move(reader_factory_))
, range_step(std::max(1ul, range_step_))
{
try
{
@ -62,13 +71,20 @@ ParallelReadBuffer::ParallelReadBuffer(
bool ParallelReadBuffer::addReaderToPool()
{
size_t file_size = reader_factory->getFileSize();
if (next_range_start >= file_size)
return false;
size_t range_start = next_range_start;
size_t size = std::min(range_step, file_size - range_start);
next_range_start += size;
auto reader = reader_factory->getReader();
if (!reader)
{
return false;
}
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader)));
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader), range_start, size));
++active_working_reader;
schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); }, 0);
@ -100,9 +116,9 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
}
const auto offset_is_in_range
= [&](const auto & range) { return static_cast<size_t>(offset) >= range.left && static_cast<size_t>(offset) <= *range.right; };
= [&](const auto & worker) { return static_cast<size_t>(offset) >= worker->offset && static_cast<size_t>(offset) < worker->range_end; };
while (!read_workers.empty() && (offset < current_position || !offset_is_in_range(read_workers.front()->range)))
while (!read_workers.empty() && (offset < current_position || !offset_is_in_range(read_workers.front())))
{
read_workers.front()->cancel = true;
read_workers.pop_front();
@ -111,7 +127,7 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
if (!read_workers.empty())
{
auto & front_worker = read_workers.front();
current_position = front_worker->range.left;
current_position = front_worker->offset;
while (true)
{
std::unique_lock lock{front_worker->worker_mutex};
@ -121,26 +137,24 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
handleEmergencyStop();
auto next_segment = front_worker->nextSegment();
if (static_cast<size_t>(offset) < current_position + next_segment.size())
current_position += next_segment.size();
if (offset < current_position)
{
current_segment = std::move(next_segment);
working_buffer = internal_buffer = Buffer(current_segment.data(), current_segment.data() + current_segment.size());
current_position += current_segment.size();
pos = working_buffer.end() - (current_position - offset);
addReaders();
return offset;
}
current_position += next_segment.size();
}
}
finishAndWait();
reader_factory->seek(offset, whence);
all_completed = false;
read_workers.clear();
next_range_start = offset;
current_position = offset;
resetWorkingBuffer();
@ -249,6 +263,9 @@ void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
try
{
read_worker->reader->setReadUntilPosition(read_worker->range_end);
read_worker->reader->seek(read_worker->offset, SEEK_SET);
while (!emergency_stop && !read_worker->cancel)
{
if (!read_worker->reader->next())

View File

@ -11,7 +11,7 @@ namespace DB
/**
* Reads from multiple ReadBuffers in parallel.
* Preserves order of readers obtained from ReadBufferFactory.
* Preserves order of readers obtained from SeekableReadBufferFactory.
*
* It consumes multiple readers and yields data from them in order as it passed.
* Each working reader save segments of data to internal queue.
@ -29,16 +29,7 @@ private:
bool nextImpl() override;
public:
class ReadBufferFactory : public WithFileSize
{
public:
~ReadBufferFactory() override = default;
virtual SeekableReadBufferPtr getReader() = 0;
virtual off_t seek(off_t off, int whence) = 0;
};
ParallelReadBuffer(std::unique_ptr<ReadBufferFactory> reader_factory_, ThreadPoolCallbackRunner<void> schedule_, size_t max_working_readers);
ParallelReadBuffer(SeekableReadBufferFactoryPtr reader_factory_, ThreadPoolCallbackRunner<void> schedule_, size_t max_working_readers, size_t range_step_);
~ParallelReadBuffer() override { finishAndWait(); }
@ -46,8 +37,8 @@ public:
size_t getFileSize();
off_t getPosition() override;
const ReadBufferFactory & getReadBufferFactory() const { return *reader_factory; }
ReadBufferFactory & getReadBufferFactory() { return *reader_factory; }
const SeekableReadBufferFactory & getReadBufferFactory() const { return *reader_factory; }
SeekableReadBufferFactory & getReadBufferFactory() { return *reader_factory; }
private:
/// Reader in progress with a list of read segments
@ -77,7 +68,9 @@ private:
ThreadPoolCallbackRunner<void> schedule;
std::unique_ptr<ReadBufferFactory> reader_factory;
std::unique_ptr<SeekableReadBufferFactory> reader_factory;
size_t range_step;
size_t next_range_start{0};
/**
* FIFO queue of readers.
@ -94,7 +87,7 @@ private:
std::exception_ptr background_exception = nullptr;
std::atomic_bool emergency_stop{false};
off_t current_position{0};
off_t current_position{0}; // end of working_buffer
bool all_completed{false};
};

View File

@ -54,8 +54,12 @@ public:
// FIXME: behavior differs greately from `BufferBase::set()` and it's very confusing.
void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); working_buffer.resize(0); }
/** read next data and fill a buffer with it; set position to the beginning;
* return `false` in case of end, `true` otherwise; throw an exception, if something is wrong
/** read next data and fill a buffer with it; set position to the beginning of the new data
* (but not necessarily to the beginning of working_buffer!);
* return `false` in case of end, `true` otherwise; throw an exception, if something is wrong;
*
* if an exception was thrown, is the ReadBuffer left in a usable state? this varies across implementations;
* can the caller retry next() after an exception, or call other methods? not recommended
*/
bool next()
{
@ -211,7 +215,21 @@ public:
/**
* Set upper bound for read range [..., position).
* Required for reading from remote filesystem, when it matters how much we read.
* Useful for reading from remote filesystem, when it matters how much we read.
* Doesn't affect getFileSize().
* See also: SeekableReadBuffer::supportsRightBoundedReads().
*
* Behavior in weird cases is currently implementation-defined:
* - setReadUntilPosition() below current position,
* - setReadUntilPosition() above the end of the file,
* - seek() to a position above the until position (even if you setReadUntilPosition() to a
* higher value right after the seek!),
*
* Typical implementations discard any current buffers and connections, even if the position is
* adjusted only a little.
*
* Typical usage is to call it right after creating the ReadBuffer, before it started doing any
* work.
*/
virtual void setReadUntilPosition(size_t /* position */) {}

View File

@ -55,8 +55,6 @@ public:
return file_offset_of_buffer_end - (working_buffer.end() - pos);
}
Range getRemainingReadRange() const override { return Range{ .left = file_offset_of_buffer_end, .right = std::nullopt }; }
size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; }
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.

View File

@ -53,8 +53,9 @@ ReadBufferFromS3::ReadBufferFromS3(
bool use_external_buffer_,
size_t offset_,
size_t read_until_position_,
bool restricted_seek_)
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : settings_.remote_fs_buffer_size, nullptr, 0)
bool restricted_seek_,
std::optional<size_t> file_size_)
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : settings_.remote_fs_buffer_size, nullptr, 0, file_size_)
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, key(key_)
@ -177,6 +178,7 @@ bool ReadBufferFromS3::nextImpl()
sleep_time_with_backoff_milliseconds *= 2;
/// Try to reinitialize `impl`.
resetWorkingBuffer();
impl.reset();
}
}
@ -197,16 +199,16 @@ bool ReadBufferFromS3::nextImpl()
off_t ReadBufferFromS3::seek(off_t offset_, int whence)
{
if (offset_ == offset && whence == SEEK_SET)
return offset;
if (offset_ == getPosition() && whence == SEEK_SET)
return offset_;
if (impl && restricted_seek)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
ErrorCodes::CANNOT_SEEK_THROUGH_FILE,
"Seek is allowed only before first read attempt from the buffer (current offset: "
"{}, new offset: {}, reading until position: {}, available: {})",
offset, offset_, read_until_position, available());
getPosition(), offset_, read_until_position, available());
}
if (whence != SEEK_SET)
@ -223,13 +225,13 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
{
pos = working_buffer.end() - (offset - offset_);
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
assert(pos < working_buffer.end());
return getPosition();
}
auto position = getPosition();
if (offset_ > position)
off_t position = getPosition();
if (impl && offset_ > position)
{
size_t diff = offset_ - position;
if (diff < read_settings.remote_read_min_bytes_for_seek)
@ -242,7 +244,8 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
resetWorkingBuffer();
if (impl)
{
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
if (!atEndOfRequestedRangeGuess())
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
impl.reset();
}
}
@ -271,14 +274,43 @@ void ReadBufferFromS3::setReadUntilPosition(size_t position)
{
if (position != static_cast<size_t>(read_until_position))
{
if (impl)
{
if (!atEndOfRequestedRangeGuess())
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
offset = getPosition();
resetWorkingBuffer();
impl.reset();
}
read_until_position = position;
impl.reset();
}
}
SeekableReadBuffer::Range ReadBufferFromS3::getRemainingReadRange() const
void ReadBufferFromS3::setReadUntilEnd()
{
return Range{ .left = static_cast<size_t>(offset), .right = read_until_position ? std::optional{read_until_position - 1} : std::nullopt };
if (read_until_position)
{
read_until_position = 0;
if (impl)
{
if (!atEndOfRequestedRangeGuess())
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
offset = getPosition();
resetWorkingBuffer();
impl.reset();
}
}
}
bool ReadBufferFromS3::atEndOfRequestedRangeGuess()
{
if (!impl)
return true;
if (read_until_position)
return getPosition() >= read_until_position;
if (file_size)
return getPosition() >= static_cast<off_t>(*file_size);
return false;
}
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
@ -351,30 +383,15 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
}
}
SeekableReadBufferPtr ReadBufferS3Factory::getReader()
std::unique_ptr<SeekableReadBuffer> ReadBufferS3Factory::getReader()
{
const auto next_range = range_generator.nextRange();
if (!next_range)
return nullptr;
auto reader = std::make_shared<ReadBufferFromS3>(
return std::make_unique<ReadBufferFromS3>(
client_ptr,
bucket,
key,
version_id,
request_settings,
read_settings.adjustBufferSize(object_size),
false /*use_external_buffer*/,
next_range->first,
next_range->second);
return reader;
}
off_t ReadBufferS3Factory::seek(off_t off, [[maybe_unused]] int whence)
{
range_generator = RangeGenerator{object_size, range_step, static_cast<size_t>(off)};
return off;
read_settings.adjustBufferSize(object_size));
}
size_t ReadBufferS3Factory::getFileSize()

View File

@ -1,6 +1,5 @@
#pragma once
#include <Common/RangeGenerator.h>
#include <Storages/StorageS3Settings.h>
#include "config.h"
@ -58,7 +57,8 @@ public:
bool use_external_buffer = false,
size_t offset_ = 0,
size_t read_until_position_ = 0,
bool restricted_seek_ = false);
bool restricted_seek_ = false,
std::optional<size_t> file_size = std::nullopt);
bool nextImpl() override;
@ -69,8 +69,7 @@ public:
size_t getFileSize() override;
void setReadUntilPosition(size_t position) override;
Range getRemainingReadRange() const override;
void setReadUntilEnd() override;
size_t getFileOffsetOfBufferEnd() const override { return offset; }
@ -81,6 +80,9 @@ public:
private:
std::unique_ptr<ReadBuffer> initialize();
// If true, if we destroy impl now, no work was wasted. Just for metrics.
bool atEndOfRequestedRangeGuess();
ReadSettings read_settings;
bool use_external_buffer;
@ -91,7 +93,7 @@ private:
};
/// Creates separate ReadBufferFromS3 for sequence of ranges of particular object
class ReadBufferS3Factory : public ParallelReadBuffer::ReadBufferFactory, public WithFileName
class ReadBufferS3Factory : public SeekableReadBufferFactory, public WithFileName
{
public:
explicit ReadBufferS3Factory(
@ -99,7 +101,6 @@ public:
const String & bucket_,
const String & key_,
const String & version_id_,
size_t range_step_,
size_t object_size_,
const S3Settings::RequestSettings & request_settings_,
const ReadSettings & read_settings_)
@ -108,18 +109,11 @@ public:
, key(key_)
, version_id(version_id_)
, read_settings(read_settings_)
, range_generator(object_size_, range_step_)
, range_step(range_step_)
, object_size(object_size_)
, request_settings(request_settings_)
{
assert(range_step > 0);
assert(range_step < object_size);
}
{}
SeekableReadBufferPtr getReader() override;
off_t seek(off_t off, [[maybe_unused]] int whence) override;
std::unique_ptr<SeekableReadBuffer> getReader() override;
size_t getFileSize() override;
@ -131,11 +125,7 @@ private:
const String key;
const String version_id;
ReadSettings read_settings;
RangeGenerator range_generator;
size_t range_step;
size_t object_size;
const S3Settings::RequestSettings request_settings;
};

View File

@ -125,8 +125,8 @@ struct ReadSettings
ReadSettings adjustBufferSize(size_t file_size) const
{
ReadSettings res = *this;
res.local_fs_buffer_size = std::min(file_size, local_fs_buffer_size);
res.remote_fs_buffer_size = std::min(file_size, remote_fs_buffer_size);
res.local_fs_buffer_size = std::min(std::max(1ul, file_size), local_fs_buffer_size);
res.remote_fs_buffer_size = std::min(std::max(1ul, file_size), remote_fs_buffer_size);
return res;
}
};

View File

@ -1,7 +1,6 @@
#pragma once
#include <functional>
#include <Common/RangeGenerator.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/HTTPCommon.h>
#include <IO/ParallelReadBuffer.h>
@ -92,6 +91,16 @@ namespace detail
class ReadWriteBufferFromHTTPBase : public SeekableReadBuffer, public WithFileName, public WithFileSize
{
public:
/// Information from HTTP response header.
struct FileInfo
{
// nullopt if the server doesn't report it.
std::optional<size_t> file_size;
std::optional<time_t> last_modified;
bool seekable = false;
};
protected:
/// HTTP range, including right bound [begin, end].
struct Range
{
@ -99,7 +108,6 @@ namespace detail
std::optional<size_t> end;
};
protected:
Poco::URI uri;
std::string method;
std::string content_encoding;
@ -118,9 +126,8 @@ namespace detail
bool use_external_buffer;
size_t offset_from_begin_pos = 0;
const Range initial_read_range;
Range read_range;
std::optional<size_t> file_size;
std::optional<FileInfo> file_info;
/// Delayed exception in case retries with partial content are not satisfiable.
std::exception_ptr exception;
@ -136,7 +143,7 @@ namespace detail
bool withPartialContent(const Range & range) const
{
/**
* Add range header if we have some passed range (for disk web)
* Add range header if we have some passed range
* or if we want to retry GET request on purpose.
*/
return range.begin || range.end || retry_with_range_header;
@ -165,12 +172,7 @@ namespace detail
request.set(header, value);
std::optional<Range> range;
if constexpr (for_object_info)
{
if (withPartialContent(initial_read_range))
range = initial_read_range;
}
else
if constexpr (!for_object_info)
{
if (withPartialContent(read_range))
range = Range{getOffset(), read_range.end};
@ -221,24 +223,22 @@ namespace detail
size_t getFileSize() override
{
if (file_size)
return *file_size;
if (!file_info)
file_info = getFileInfo();
Poco::Net::HTTPResponse response;
getHeadResponse(response);
if (response.hasContentLength())
{
if (!read_range.end)
read_range.end = getRangeBegin() + response.getContentLength();
file_size = response.getContentLength();
return *file_size;
}
if (file_info->file_size)
return *file_info->file_size;
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", uri.toString());
}
bool checkIfActuallySeekable() override
{
if (!file_info)
file_info = getFileInfo();
return file_info->seekable;
}
String getFileName() const override { return uri.toString(); }
enum class InitializeError
@ -300,11 +300,11 @@ namespace detail
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const ReadSettings & settings_ = {},
HTTPHeaderEntries http_header_entries_ = {},
Range read_range_ = {},
const RemoteHostFilter * remote_host_filter_ = nullptr,
bool delay_initialization = false,
bool use_external_buffer_ = false,
bool http_skip_not_found_url_ = false)
bool http_skip_not_found_url_ = false,
std::optional<FileInfo> file_info_ = std::nullopt)
: SeekableReadBuffer(nullptr, 0)
, uri {uri_}
, method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}
@ -315,8 +315,7 @@ namespace detail
, remote_host_filter {remote_host_filter_}
, buffer_size {buffer_size_}
, use_external_buffer {use_external_buffer_}
, initial_read_range(read_range_)
, read_range(read_range_)
, file_info(file_info_)
, http_skip_not_found_url(http_skip_not_found_url_)
, settings {settings_}
, log(&Poco::Logger::get("ReadWriteBufferFromHTTP"))
@ -486,7 +485,7 @@ namespace detail
if (withPartialContent(read_range) && response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT)
{
/// Having `200 OK` instead of `206 Partial Content` is acceptable in case we retried with range.begin == 0.
if (read_range.begin && *read_range.begin != 0)
if (getOffset() != 0)
{
if (!exception)
{
@ -514,8 +513,9 @@ namespace detail
}
}
if (!offset_from_begin_pos && !read_range.end && response.hasContentLength())
read_range.end = getRangeBegin() + response.getContentLength();
// Remember file size. It'll be used to report eof in next nextImpl() call.
if (!read_range.end && response.hasContentLength())
file_info = parseFileInfo(response, withPartialContent(read_range) ? getOffset() : 0);
try
{
@ -545,11 +545,9 @@ namespace detail
if (next_callback)
next_callback(count());
if (read_range.end && getOffset() > read_range.end.value())
{
assert(getOffset() == read_range.end.value() + 1);
if ((read_range.end && getOffset() > read_range.end.value()) ||
(file_info && file_info->file_size && getOffset() >= file_info->file_size.value()))
return false;
}
if (impl)
{
@ -570,6 +568,7 @@ namespace detail
bool result = false;
size_t milliseconds_to_wait = settings.http_retry_initial_backoff_ms;
bool last_attempt = false;
auto on_retriable_error = [&]()
{
@ -577,11 +576,19 @@ namespace detail
impl.reset();
auto http_session = session->getSession();
http_session->reset();
sleepForMilliseconds(milliseconds_to_wait);
if (!last_attempt)
{
sleepForMilliseconds(milliseconds_to_wait);
milliseconds_to_wait = std::min(milliseconds_to_wait * 2, settings.http_retry_max_backoff_ms);
}
};
for (size_t i = 0; i < settings.http_max_tries; ++i)
for (size_t i = 0;; ++i)
{
if (last_attempt)
break;
last_attempt = i + 1 >= settings.http_max_tries;
exception = nullptr;
initialization_error = InitializeError::NONE;
@ -656,8 +663,6 @@ namespace detail
on_retriable_error();
exception = std::current_exception();
}
milliseconds_to_wait = std::min(milliseconds_to_wait * 2, settings.http_retry_max_backoff_ms);
}
if (exception)
@ -688,37 +693,80 @@ namespace detail
{
pos = working_buffer.end() - (current_offset - offset_);
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
assert(pos < working_buffer.end());
return getPosition();
}
auto position = getPosition();
if (offset_ > position)
{
size_t diff = offset_ - position;
if (diff < settings.remote_read_min_bytes_for_seek)
{
ignore(diff);
return offset_;
}
}
if (impl)
{
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
auto position = getPosition();
if (offset_ > position)
{
size_t diff = offset_ - position;
if (diff < settings.remote_read_min_bytes_for_seek)
{
ignore(diff);
return offset_;
}
}
if (!atEndOfRequestedRangeGuess())
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
impl.reset();
}
resetWorkingBuffer();
read_range.begin = offset_;
read_range.end = std::nullopt;
offset_from_begin_pos = 0;
return offset_;
}
SeekableReadBuffer::Range getRemainingReadRange() const override { return {getOffset(), read_range.end}; }
void setReadUntilPosition(size_t until) override
{
until = std::max(until, 1ul);
if (read_range.end && *read_range.end + 1 == until)
return;
read_range.end = until - 1;
read_range.begin = getPosition();
resetWorkingBuffer();
if (impl)
{
if (!atEndOfRequestedRangeGuess())
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
impl.reset();
}
}
void setReadUntilEnd() override
{
if (!read_range.end)
return;
read_range.end.reset();
read_range.begin = getPosition();
resetWorkingBuffer();
if (impl)
{
if (!atEndOfRequestedRangeGuess())
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
impl.reset();
}
}
bool supportsRightBoundedReads() const override { return true; }
// If true, if we destroy impl now, no work was wasted. Just for metrics.
bool atEndOfRequestedRangeGuess()
{
if (!impl)
return true;
if (read_range.end)
return getPosition() > static_cast<off_t>(*read_range.end);
if (file_info && file_info->file_size)
return getPosition() >= static_cast<off_t>(*file_info->file_size);
return false;
}
std::string getResponseCookie(const std::string & name, const std::string & def) const
{
@ -742,19 +790,63 @@ namespace detail
const std::string & getCompressionMethod() const { return content_encoding; }
std::optional<time_t> getLastModificationTime()
{
return getFileInfo().last_modified;
}
FileInfo getFileInfo()
{
Poco::Net::HTTPResponse response;
getHeadResponse(response);
if (!response.has("Last-Modified"))
return std::nullopt;
try
{
getHeadResponse(response);
}
catch (HTTPException & e)
{
/// Maybe the web server doesn't support HEAD requests.
/// E.g. webhdfs reports status 400.
/// We should proceed in hopes that the actual GET request will succeed.
/// (Unless the error in transient. Don't want to nondeterministically sometimes
/// fall back to slow whole-file reads when HEAD is actually supported; that sounds
/// like a nightmare to debug.)
if (e.getHTTPStatus() >= 400 && e.getHTTPStatus() <= 499 &&
e.getHTTPStatus() != Poco::Net::HTTPResponse::HTTP_TOO_MANY_REQUESTS)
return FileInfo{};
String date_str = response.get("Last-Modified");
struct tm info;
char * res = strptime(date_str.data(), "%a, %d %b %Y %H:%M:%S %Z", &info);
if (!res || res != date_str.data() + date_str.size())
return std::nullopt;
throw;
}
return parseFileInfo(response, 0);
}
return timegm(&info);
FileInfo parseFileInfo(const Poco::Net::HTTPResponse & response, size_t requested_range_begin)
{
FileInfo res;
if (response.hasContentLength())
{
res.file_size = response.getContentLength();
if (response.getStatus() == Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT)
{
*res.file_size += requested_range_begin;
res.seekable = true;
}
else
{
res.seekable = response.has("Accept-Ranges") && response.get("Accept-Ranges") == "bytes";
}
}
if (response.has("Last-Modified"))
{
String date_str = response.get("Last-Modified");
struct tm info;
char * end = strptime(date_str.data(), "%a, %d %b %Y %H:%M:%S %Z", &info);
if (end == date_str.data() + date_str.size())
res.last_modified = timegm(&info);
}
return res;
}
};
}
@ -789,11 +881,11 @@ public:
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const ReadSettings & settings_ = {},
const HTTPHeaderEntries & http_header_entries_ = {},
Range read_range_ = {},
const RemoteHostFilter * remote_host_filter_ = nullptr,
bool delay_initialization_ = true,
bool use_external_buffer_ = false,
bool skip_not_found_url_ = false)
bool skip_not_found_url_ = false,
std::optional<FileInfo> file_info_ = std::nullopt)
: Parent(
std::make_shared<SessionType>(uri_, max_redirects, std::make_shared<SessionFactory>(timeouts)),
uri_,
@ -803,23 +895,21 @@ public:
buffer_size_,
settings_,
http_header_entries_,
read_range_,
remote_host_filter_,
delay_initialization_,
use_external_buffer_,
skip_not_found_url_)
skip_not_found_url_,
file_info_)
{
}
};
class RangedReadWriteBufferFromHTTPFactory : public ParallelReadBuffer::ReadBufferFactory, public WithFileName
class RangedReadWriteBufferFromHTTPFactory : public SeekableReadBufferFactory, public WithFileName
{
using OutStreamCallback = ReadWriteBufferFromHTTP::OutStreamCallback;
public:
RangedReadWriteBufferFromHTTPFactory(
size_t total_object_size_,
size_t range_step_,
Poco::URI uri_,
std::string method_,
OutStreamCallback out_stream_callback_,
@ -833,10 +923,7 @@ public:
bool delay_initialization_ = true,
bool use_external_buffer_ = false,
bool skip_not_found_url_ = false)
: range_generator(total_object_size_, range_step_)
, total_object_size(total_object_size_)
, range_step(range_step_)
, uri(uri_)
: uri(uri_)
, method(std::move(method_))
, out_stream_callback(out_stream_callback_)
, timeouts(std::move(timeouts_))
@ -852,15 +939,9 @@ public:
{
}
SeekableReadBufferPtr getReader() override
std::unique_ptr<SeekableReadBuffer> getReader() override
{
const auto next_range = range_generator.nextRange();
if (!next_range)
{
return nullptr;
}
return std::make_shared<ReadWriteBufferFromHTTP>(
return std::make_unique<ReadWriteBufferFromHTTP>(
uri,
method,
out_stream_callback,
@ -870,28 +951,36 @@ public:
buffer_size,
settings,
http_header_entries,
// HTTP Range has inclusive bounds, i.e. [from, to]
ReadWriteBufferFromHTTP::Range{next_range->first, next_range->second - 1},
remote_host_filter,
delay_initialization,
use_external_buffer,
skip_not_found_url);
skip_not_found_url,
file_info);
}
off_t seek(off_t off, [[maybe_unused]] int whence) override
size_t getFileSize() override
{
range_generator = RangeGenerator{total_object_size, range_step, static_cast<size_t>(off)};
return off;
auto s = getFileInfo().file_size;
if (!s)
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", uri.toString());
return *s;
}
size_t getFileSize() override { return total_object_size; }
bool checkIfActuallySeekable() override
{
return getFileInfo().seekable;
}
ReadWriteBufferFromHTTP::FileInfo getFileInfo()
{
if (!file_info)
file_info = static_cast<ReadWriteBufferFromHTTP*>(getReader().get())->getFileInfo();
return *file_info;
}
String getFileName() const override { return uri.toString(); }
private:
RangeGenerator range_generator;
size_t total_object_size;
size_t range_step;
Poco::URI uri;
std::string method;
OutStreamCallback out_stream_callback;
@ -902,6 +991,7 @@ private:
ReadSettings settings;
HTTPHeaderEntries http_header_entries;
const RemoteHostFilter * remote_host_filter;
std::optional<ReadWriteBufferFromHTTP::FileInfo> file_info;
bool delay_initialization;
bool use_external_buffer;
bool skip_not_found_url;

View File

@ -26,6 +26,8 @@ public:
* @param off Offset.
* @param whence Seek mode (@see SEEK_SET, @see SEEK_CUR).
* @return New position from the beginning of underlying buffer / file.
*
* What happens if you seek above the end of the file? Implementation-defined.
*/
virtual off_t seek(off_t off, int whence) = 0;
@ -40,35 +42,55 @@ public:
*/
virtual off_t getPosition() = 0;
struct Range
{
size_t left;
std::optional<size_t> right;
String toString() const { return fmt::format("[{}:{}]", left, right ? std::to_string(*right) : "None"); }
};
/**
* Returns a struct, where `left` is current read position in file and `right` is the
* last included offset for reading according to setReadUntilPosition() or setReadUntilEnd().
* E.g. next nextImpl() call will read within range [left, right].
*/
virtual Range getRemainingReadRange() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getRemainingReadRange() not implemented");
}
virtual String getInfoForLog() { return ""; }
virtual size_t getFileOffsetOfBufferEnd() const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getFileOffsetOfBufferEnd() not implemented"); }
/// If true, setReadUntilPosition() guarantees that eof will be reported at the given position.
virtual bool supportsRightBoundedReads() const { return false; }
virtual bool isIntegratedWithFilesystemCache() const { return false; }
/// Returns true if seek() actually works, false if seek() will always throw (or make subsequent
/// nextImpl() calls throw).
///
/// This is needed because:
/// * Sometimes there's no cheap way to know in advance whether the buffer is really seekable.
/// Specifically, HTTP read buffer needs to send a request to check whether the server
/// supports byte ranges.
/// * Sometimes when we create such buffer we don't know in advance whether we'll need it to be
/// seekable or not. So we don't want to pay the price for this check in advance.
virtual bool checkIfActuallySeekable() { return true; }
};
/// Useful for reading in parallel.
/// The created read buffers may outlive the factory.
///
/// There are 2 ways to use this:
/// (1) Never call seek() or getFileSize(), read the file sequentially.
/// For HTTP, this usually translates to just one HTTP request.
/// (2) Call checkIfActuallySeekable(), then:
/// a. If it returned false, go to (1). seek() and getFileSize() are not available (throw if called).
/// b. If it returned true, seek() and getFileSize() are available, knock yourself out.
/// For HTTP, checkIfActuallySeekable() sends a HEAD request and returns false if the web server
/// doesn't support ranges (or doesn't support HEAD requests).
class SeekableReadBufferFactory : public WithFileSize
{
public:
~SeekableReadBufferFactory() override = default;
// We usually call setReadUntilPosition() and seek() on the returned buffer before reading.
// So it's recommended that the returned implementation be lazy, i.e. don't start reading
// before the first call to nextImpl().
virtual std::unique_ptr<SeekableReadBuffer> getReader() = 0;
virtual bool checkIfActuallySeekable() { return true; }
};
using SeekableReadBufferPtr = std::shared_ptr<SeekableReadBuffer>;
using SeekableReadBufferFactoryPtr = std::unique_ptr<SeekableReadBufferFactory>;
/// Wraps a reference to a SeekableReadBuffer into an unique pointer to SeekableReadBuffer.
/// This function is like wrapReadBufferReference() but for SeekableReadBuffer.
std::unique_ptr<SeekableReadBuffer> wrapSeekableReadBufferReference(SeekableReadBuffer & ref);

View File

@ -64,7 +64,7 @@ public:
* Segments in returned list are ordered in ascending order and represent a full contiguous
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
*
* As long as pointers to returned file segments are hold
* As long as pointers to returned file segments are held
* it is guaranteed that these file segments are not removed from cache.
*/
FileSegmentsHolderPtr getOrSet(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings);

View File

@ -226,8 +226,8 @@ void FileSegment::resetDownloaderUnlocked(const FileSegmentGuard::Lock &)
void FileSegment::assertIsDownloaderUnlocked(const std::string & operation, const FileSegmentGuard::Lock & lock) const
{
auto caller = getCallerId();
auto current_downloader = getDownloaderUnlocked(lock);
LOG_TEST(log, "Downloader id: {}, caller id: {}", current_downloader, caller);
auto current_downloader = getDownloaderUnlocked(segment_lock);
LOG_TEST(log, "Downloader id: {}, caller id: {}, operation: {}", current_downloader, caller, operation);
if (caller != current_downloader)
{
@ -503,8 +503,9 @@ void FileSegment::setDownloadFailedUnlocked(const FileSegmentGuard::Lock & lock)
{
cache_writer->finalize();
cache_writer.reset();
remote_file_reader.reset();
}
remote_file_reader.reset();
}
void FileSegment::completePartAndResetDownloader()
@ -567,10 +568,13 @@ void FileSegment::complete()
resetDownloaderUnlocked(segment_lock);
}
if (cache_writer && (is_downloader || is_last_holder))
if (is_downloader || is_last_holder)
{
cache_writer->finalize();
cache_writer.reset();
if (cache_writer)
{
cache_writer->finalize();
cache_writer.reset();
}
remote_file_reader.reset();
}

View File

@ -246,7 +246,7 @@ public:
* ========== Methods for _only_ file segment's `downloader` ==================
*/
/// Try to reserve exactly `size` bytes.
/// Try to reserve exactly `size` bytes (in addition to the getDownloadedSize() bytes already downloaded).
/// Returns true if reservation was successful, false otherwise.
bool reserve(size_t size_to_reserve);
@ -258,6 +258,12 @@ public:
/// Write data into reserved space.
void write(const char * from, size_t size, size_t offset);
// Invariant: if state() != DOWNLOADING and remote file reader is present, the reader's
// available() == 0, and getFileOffsetOfBufferEnd() == our getCurrentWriteOffset().
//
// The reader typically requires its internal_buffer to be assigned from the outside before
// calling next().
RemoteFileReaderPtr getRemoteFileReader();
RemoteFileReaderPtr extractRemoteFileReader();

View File

@ -5,14 +5,15 @@
namespace DB
{
IInputFormat::IInputFormat(Block header, ReadBuffer & in_)
: ISource(std::move(header)), in(&in_)
IInputFormat::IInputFormat(Block header, ReadBuffer * in_)
: ISource(std::move(header)), in(in_)
{
column_mapping = std::make_shared<ColumnMapping>();
}
void IInputFormat::resetParser()
{
chassert(in);
in->ignoreAll();
// those are protected attributes from ISource (I didn't want to propagate resetParser up there)
finished = false;
@ -23,6 +24,7 @@ void IInputFormat::resetParser()
void IInputFormat::setReadBuffer(ReadBuffer & in_)
{
chassert(in); // not supported by random-access formats
in = &in_;
}

View File

@ -18,10 +18,11 @@ class IInputFormat : public ISource
{
protected:
ReadBuffer * in [[maybe_unused]];
ReadBuffer * in [[maybe_unused]] = nullptr;
public:
IInputFormat(Block header, ReadBuffer & in_);
// ReadBuffer can be nullptr for random-access formats.
IInputFormat(Block header, ReadBuffer * in_);
/** In some usecase (hello Kafka) we need to read a lot of tiny streams in exactly the same format.
* The recreating of parser for each small stream takes too long, so we introduce a method
@ -32,7 +33,7 @@ public:
virtual void resetParser();
virtual void setReadBuffer(ReadBuffer & in_);
ReadBuffer & getReadBuffer() const { return *in; }
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }
virtual const BlockMissingValues & getMissingValues() const
{

View File

@ -54,7 +54,7 @@ bool isParseError(int code)
}
IRowInputFormat::IRowInputFormat(Block header, ReadBuffer & in_, Params params_)
: IInputFormat(std::move(header), in_), serializations(getPort().getHeader().getSerializations()), params(params_)
: IInputFormat(std::move(header), &in_), serializations(getPort().getHeader().getSerializations()), params(params_)
{
}

View File

@ -16,7 +16,7 @@ namespace ErrorCodes
}
/// Base class for schema inference for the data in some specific format.
/// It reads some data from read buffer and try to determine the schema
/// It reads some data from read buffer and tries to determine the schema
/// from read data.
class ISchemaReader
{

View File

@ -24,7 +24,7 @@ namespace ErrorCodes
}
ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_, bool stream_, const FormatSettings & format_settings_)
: IInputFormat(header_, in_), stream{stream_}, format_settings(format_settings_)
: IInputFormat(header_, &in_), stream{stream_}, format_settings(format_settings_)
{
}

View File

@ -47,13 +47,8 @@ arrow::Status ArrowBufferedOutputStream::Write(const void * data, int64_t length
return arrow::Status::OK();
}
RandomAccessFileFromSeekableReadBuffer::RandomAccessFileFromSeekableReadBuffer(ReadBuffer & in_, off_t file_size_)
: in{in_}, seekable_in{dynamic_cast<SeekableReadBuffer &>(in_)}, file_size{file_size_}, is_open{true}
{
}
RandomAccessFileFromSeekableReadBuffer::RandomAccessFileFromSeekableReadBuffer(ReadBuffer & in_)
: in{in_}, seekable_in{dynamic_cast<SeekableReadBuffer &>(in_)}, is_open{true}
RandomAccessFileFromSeekableReadBuffer::RandomAccessFileFromSeekableReadBuffer(ReadBuffer & in_, std::optional<off_t> file_size_, bool avoid_buffering_)
: in{in_}, seekable_in{dynamic_cast<SeekableReadBuffer &>(in_)}, file_size{file_size_}, is_open{true}, avoid_buffering(avoid_buffering_)
{
}
@ -80,6 +75,8 @@ arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::Tell() const
arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::Read(int64_t nbytes, void * out)
{
if (avoid_buffering)
in.setReadUntilPosition(seekable_in.getPosition() + nbytes);
return in.readBig(reinterpret_cast<char *>(out), nbytes);
}
@ -102,6 +99,12 @@ arrow::Future<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromSeekableReadBu
arrow::Status RandomAccessFileFromSeekableReadBuffer::Seek(int64_t position)
{
if (avoid_buffering)
{
// Seeking to a position above a previous setReadUntilPosition() confuses some of the
// ReadBuffer implementations.
in.setReadUntilEnd();
}
seekable_in.seek(position, SEEK_SET);
return arrow::Status::OK();
}
@ -143,28 +146,105 @@ arrow::Status ArrowInputStreamFromReadBuffer::Close()
return arrow::Status();
}
RandomAccessFileFromManyReadBuffers::RandomAccessFileFromManyReadBuffers(SeekableReadBufferFactory & factory) : buf_factory(factory) {}
arrow::Result<int64_t> RandomAccessFileFromManyReadBuffers::GetSize()
{
return buf_factory.getFileSize();
}
arrow::Result<int64_t> RandomAccessFileFromManyReadBuffers::ReadAt(int64_t position, int64_t nbytes, void* out)
{
std::unique_lock lock(mutex);
if (free_bufs.empty())
free_bufs.push_back(buf_factory.getReader());
auto buf = std::move(free_bufs.back());
free_bufs.pop_back();
lock.unlock();
// To work well with this, ReadBuffer implementations need to respect setReadUntilPosition() and
// not read above it. We often do very small reads here.
// Also nice if they:
// * Make readBig() read directly into the provided memory, instead of copying from internal
// buffer.
// * Allocate the internal buffer (if any) lazily in first nextImpl() call. If all reads are
// tiny readBig() calls (as is typical here), it won't allocate an unnecessary 1 MB buffer.
buf->seek(position, SEEK_SET);
buf->setReadUntilPosition(position + nbytes);
size_t bytes_read = buf->readBig(reinterpret_cast<char *>(out), nbytes);
// Seeking to a position above a previous setReadUntilPosition() confuses some of the
// ReadBuffer implementations. So we reset it before next seek.
buf->setReadUntilEnd();
lock.lock();
free_bufs.push_back(std::move(buf));
return static_cast<int64_t>(bytes_read);
}
arrow::Result<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromManyReadBuffers::ReadAt(int64_t position, int64_t nbytes)
{
ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes))
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(position, nbytes, buffer->mutable_data()))
if (bytes_read < nbytes)
RETURN_NOT_OK(buffer->Resize(bytes_read));
return buffer;
}
arrow::Future<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromManyReadBuffers::ReadAsync(const arrow::io::IOContext&, int64_t position, int64_t nbytes)
{
return arrow::Future<std::shared_ptr<arrow::Buffer>>::MakeFinished(ReadAt(position, nbytes));
}
arrow::Status RandomAccessFileFromManyReadBuffers::Close()
{
chassert(is_open);
is_open = false;
return arrow::Status::OK();
}
arrow::Status RandomAccessFileFromManyReadBuffers::Seek(int64_t) { return arrow::Status::NotImplemented(""); }
arrow::Result<int64_t> RandomAccessFileFromManyReadBuffers::Tell() const { return arrow::Status::NotImplemented(""); }
arrow::Result<int64_t> RandomAccessFileFromManyReadBuffers::Read(int64_t, void*) { return arrow::Status::NotImplemented(""); }
arrow::Result<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromManyReadBuffers::Read(int64_t) { return arrow::Status::NotImplemented(""); }
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(
ReadBuffer & in,
const FormatSettings & settings,
std::atomic<int> & is_cancelled,
const std::string & format_name,
const std::string & magic_bytes)
const std::string & magic_bytes,
bool avoid_buffering)
{
if (auto * fd_in = dynamic_cast<ReadBufferFromFileDescriptor *>(&in))
{
struct stat stat;
auto res = ::fstat(fd_in->getFD(), &stat);
// if fd is a regular file i.e. not stdin
if (res == 0 && S_ISREG(stat.st_mode))
return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(*fd_in, stat.st_size);
struct stat stat;
auto res = ::fstat(fd_in->getFD(), &stat);
// if fd is a regular file i.e. not stdin
if (res == 0 && S_ISREG(stat.st_mode))
return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(*fd_in, stat.st_size, avoid_buffering);
}
else if (dynamic_cast<SeekableReadBuffer *>(&in) && isBufferWithFileSize(in))
else if (auto * seekable_in = dynamic_cast<SeekableReadBuffer *>(&in);
seekable_in && settings.seekable_read && isBufferWithFileSize(in) &&
seekable_in->checkIfActuallySeekable())
{
if (settings.seekable_read)
return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(in);
return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(in, std::nullopt, avoid_buffering);
}
// fallback to loading the entire file in memory
return asArrowFileLoadIntoMemory(in, is_cancelled, format_name, magic_bytes);
}
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFileLoadIntoMemory(
ReadBuffer & in,
std::atomic<int> & is_cancelled,
const std::string & format_name,
const std::string & magic_bytes)
{
std::string file_data;
{
PeekableReadBuffer buf(in);

View File

@ -18,6 +18,7 @@ class ReadBuffer;
class WriteBuffer;
class SeekableReadBuffer;
class SeekableReadBufferFactory;
struct FormatSettings;
class ArrowBufferedOutputStream : public arrow::io::OutputStream
@ -46,9 +47,7 @@ private:
class RandomAccessFileFromSeekableReadBuffer : public arrow::io::RandomAccessFile
{
public:
RandomAccessFileFromSeekableReadBuffer(ReadBuffer & in_, off_t file_size_);
explicit RandomAccessFileFromSeekableReadBuffer(ReadBuffer & in_);
RandomAccessFileFromSeekableReadBuffer(ReadBuffer & in_, std::optional<off_t> file_size_, bool avoid_buffering_);
arrow::Result<int64_t> GetSize() override;
@ -74,10 +73,44 @@ private:
SeekableReadBuffer & seekable_in;
std::optional<off_t> file_size;
bool is_open = false;
bool avoid_buffering = false;
ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromSeekableReadBuffer);
};
// Thread-safe.
// Maintains a pool of SeekableReadBuffer-s. For each ReadAt(), takes a buffer, seeks it, and reads.
class RandomAccessFileFromManyReadBuffers : public arrow::io::RandomAccessFile
{
public:
explicit RandomAccessFileFromManyReadBuffers(SeekableReadBufferFactory & factory);
// These are thread safe.
arrow::Result<int64_t> GetSize() override;
arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) override;
arrow::Future<std::shared_ptr<arrow::Buffer>> ReadAsync(const arrow::io::IOContext&, int64_t position,
int64_t nbytes) override;
// These are not thread safe, and arrow shouldn't call them. Return NotImplemented error.
arrow::Status Seek(int64_t) override;
arrow::Result<int64_t> Tell() const override;
arrow::Result<int64_t> Read(int64_t, void*) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t) override;
arrow::Status Close() override;
bool closed() const override { return !is_open; }
private:
SeekableReadBufferFactory & buf_factory;
bool is_open = true;
std::mutex mutex;
std::vector<std::unique_ptr<SeekableReadBuffer>> free_bufs;
ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromManyReadBuffers);
};
class ArrowInputStreamFromReadBuffer : public arrow::io::InputStream
{
public:
@ -101,6 +134,19 @@ std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(
const FormatSettings & settings,
std::atomic<int> & is_cancelled,
const std::string & format_name,
const std::string & magic_bytes,
// If true, we'll use ReadBuffer::setReadUntilPosition() to avoid buffering and readahead as
// much as possible. For HTTP or S3 ReadBuffer, this means that each RandomAccessFile
// read call will do a new HTTP request. Used in parquet pre-buffered reading mode, which makes
// arrow do its own buffering and coalescing of reads.
// (ReadBuffer is not a good abstraction in this case, but it works.)
bool avoid_buffering = false);
// Reads the whole file into a memory buffer, owned by the returned RandomAccessFile.
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFileLoadIntoMemory(
ReadBuffer & in,
std::atomic<int> & is_cancelled,
const std::string & format_name,
const std::string & magic_bytes);
}

View File

@ -72,7 +72,7 @@ void JSONColumnsReaderBase::skipColumn()
JSONColumnsBlockInputFormatBase::JSONColumnsBlockInputFormatBase(
ReadBuffer & in_, const Block & header_, const FormatSettings & format_settings_, std::unique_ptr<JSONColumnsReaderBase> reader_)
: IInputFormat(header_, in_)
: IInputFormat(header_, &in_)
, format_settings(format_settings_)
, fields(header_.getNamesAndTypes())
, serializations(header_.getSerializations())

View File

@ -16,7 +16,7 @@ class NativeInputFormat final : public IInputFormat
{
public:
NativeInputFormat(ReadBuffer & buf, const Block & header_, const FormatSettings & settings)
: IInputFormat(header_, buf)
: IInputFormat(header_, &buf)
, reader(std::make_unique<NativeReader>(
buf,
header_,

View File

@ -22,7 +22,7 @@ namespace ErrorCodes
}
ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
: IInputFormat(std::move(header_), in_), format_settings(format_settings_), skip_stripes(format_settings.orc.skip_stripes)
: IInputFormat(std::move(header_), &in_), format_settings(format_settings_), skip_stripes(format_settings.orc.skip_stripes)
{
}

View File

@ -94,7 +94,7 @@ public:
};
explicit ParallelParsingInputFormat(Params params)
: IInputFormat(std::move(params.header), params.in)
: IInputFormat(std::move(params.header), &params.in)
, internal_parser_creator(params.internal_parser_creator)
, file_segmentation_engine(params.file_segmentation_engine)
, format_name(params.format_name)

View File

@ -3,6 +3,7 @@
#if USE_PARQUET
#include <Common/ThreadPool.h>
#include <Formats/FormatFactory.h>
#include <Formats/SchemaInferenceUtils.h>
#include <IO/ReadBufferFromMemory.h>
@ -11,12 +12,19 @@
#include <arrow/io/api.h>
#include <arrow/status.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/schema.h>
#include <parquet/file_reader.h>
#include "ArrowBufferedStreams.h"
#include "ArrowColumnToCHColumn.h"
#include "ArrowFieldIndexUtil.h"
#include <base/scope_guard.h>
#include <DataTypes/NestedUtils.h>
namespace CurrentMetrics
{
extern const Metric ParquetDecoderThreads;
extern const Metric ParquetDecoderThreadsActive;
}
namespace DB
{
@ -34,108 +42,342 @@ namespace ErrorCodes
throw Exception::createDeprecated(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \
} while (false)
ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
: IInputFormat(std::move(header_), in_), format_settings(format_settings_), skip_row_groups(format_settings.parquet.skip_row_groups)
ParquetBlockInputFormat::ParquetBlockInputFormat(
ReadBuffer * buf,
SeekableReadBufferFactoryPtr buf_factory_,
const Block & header_,
const FormatSettings & format_settings_,
size_t max_decoding_threads_,
size_t min_bytes_for_seek_)
: IInputFormat(header_, buf)
, buf_factory(std::move(buf_factory_))
, format_settings(format_settings_)
, skip_row_groups(format_settings.parquet.skip_row_groups)
, max_decoding_threads(max_decoding_threads_)
, min_bytes_for_seek(min_bytes_for_seek_)
, pending_chunks(PendingChunk::Compare { .row_group_first = format_settings_.parquet.preserve_order })
{
if (max_decoding_threads > 1)
pool = std::make_unique<ThreadPool>(CurrentMetrics::ParquetDecoderThreads, CurrentMetrics::ParquetDecoderThreadsActive, max_decoding_threads);
}
Chunk ParquetBlockInputFormat::generate()
ParquetBlockInputFormat::~ParquetBlockInputFormat() = default;
void ParquetBlockInputFormat::initializeIfNeeded()
{
Chunk res;
block_missing_values.clear();
if (std::exchange(is_initialized, true))
return;
if (!file_reader)
// Create arrow file adapter.
// TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that
// we'll need to read (which we know in advance). Use max_download_threads for that.
if (buf_factory)
{
prepareReader();
file_reader->set_batch_size(format_settings.parquet.max_block_size);
std::vector<int> row_group_indices;
for (int i = 0; i < row_group_total; ++i)
{
if (!skip_row_groups.contains(i))
row_group_indices.emplace_back(i);
}
auto read_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, &current_record_batch_reader);
if (!read_status.ok())
throw DB::ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString());
}
if (is_stopped)
return {};
auto batch = current_record_batch_reader->Next();
if (!batch.ok())
{
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}",
batch.status().ToString());
}
if (*batch)
{
auto tmp_table = arrow::Table::FromRecordBatches({*batch});
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
arrow_column_to_ch_column->arrowTableToCHChunk(res, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
if (format_settings.seekable_read && buf_factory->checkIfActuallySeekable())
arrow_file = std::make_shared<RandomAccessFileFromManyReadBuffers>(*buf_factory);
else
arrow_file = asArrowFileLoadIntoMemory(*buf_factory->getReader(), is_stopped, "Parquet", PARQUET_MAGIC_BYTES);
}
else
{
return {};
arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
}
return res;
}
void ParquetBlockInputFormat::resetParser()
{
IInputFormat::resetParser();
file_reader.reset();
current_record_batch_reader.reset();
column_indices.clear();
row_group_current = 0;
block_missing_values.clear();
}
const BlockMissingValues & ParquetBlockInputFormat::getMissingValues() const
{
return block_missing_values;
}
static void getFileReaderAndSchema(
ReadBuffer & in,
std::unique_ptr<parquet::arrow::FileReader> & file_reader,
std::shared_ptr<arrow::Schema> & schema,
const FormatSettings & format_settings,
std::atomic<int> & is_stopped)
{
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES);
if (is_stopped)
return;
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(std::move(arrow_file), arrow::default_memory_pool(), &file_reader));
THROW_ARROW_NOT_OK(file_reader->GetSchema(&schema));
}
void ParquetBlockInputFormat::prepareReader()
{
metadata = parquet::ReadMetaData(arrow_file);
std::shared_ptr<arrow::Schema> schema;
getFileReaderAndSchema(*in, file_reader, schema, format_settings, is_stopped);
if (is_stopped)
return;
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema));
row_group_total = file_reader->num_row_groups();
row_group_current = 0;
row_groups.resize(metadata->num_row_groups());
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
ArrowFieldIndexUtil field_util(
format_settings.parquet.case_insensitive_column_matching,
format_settings.parquet.allow_missing_columns);
column_indices = field_util.findRequiredIndices(getPort().getHeader(), *schema);
}
void ParquetBlockInputFormat::initializeRowGroupReader(size_t row_group_idx)
{
auto & row_group = row_groups[row_group_idx];
parquet::ArrowReaderProperties properties;
properties.set_use_threads(false);
properties.set_batch_size(format_settings.parquet.max_block_size);
// When reading a row group, arrow will:
// 1. Look at `metadata` to get all byte ranges it'll need to read from the file (typically one
// per requested column in the row group).
// 2. Coalesce ranges that are close together, trading off seeks vs read amplification.
// This is controlled by CacheOptions.
// 3. Process the columns one by one, issuing the corresponding (coalesced) range reads as
// needed. Each range gets its own memory buffer allocated. These buffers stay in memory
// (in arrow::io::internal::ReadRangeCache) until the whole row group reading is done.
// So the memory usage of a "SELECT *" will be at least the compressed size of a row group
// (typically hundreds of MB).
//
// With this coalescing, we don't need any readahead on our side, hence avoid_buffering in
// asArrowFile().
//
// This adds one unnecessary copy. We should probably do coalescing and prefetch scheduling on
// our side instead.
properties.set_pre_buffer(true);
auto cache_options = arrow::io::CacheOptions::LazyDefaults();
cache_options.hole_size_limit = min_bytes_for_seek;
cache_options.range_size_limit = 1l << 40; // reading the whole row group at once is fine
properties.set_cache_options(cache_options);
// Workaround for a workaround in the parquet library.
//
// From ComputeColumnChunkRange() in contrib/arrow/cpp/src/parquet/file_reader.cc:
// > The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
// > dictionary page header size in total_compressed_size and total_uncompressed_size
// > (see IMPALA-694). We add padding to compensate.
//
// That padding breaks the pre-buffered mode because the padded read ranges may overlap each
// other, failing an assert. So we disable pre-buffering in this case.
// That version is >10 years old, so this is not very important.
if (metadata->writer_version().VersionLt(parquet::ApplicationVersion::PARQUET_816_FIXED_VERSION()))
properties.set_pre_buffer(false);
parquet::arrow::FileReaderBuilder builder;
THROW_ARROW_NOT_OK(
builder.Open(arrow_file, /* not to be confused with ArrowReaderProperties */ parquet::default_reader_properties(), metadata));
builder.properties(properties);
// TODO: Pass custom memory_pool() to enable memory accounting with non-jemalloc allocators.
THROW_ARROW_NOT_OK(builder.Build(&row_group.file_reader));
THROW_ARROW_NOT_OK(
row_group.file_reader->GetRecordBatchReader({static_cast<int>(row_group_idx)}, column_indices, &row_group.record_batch_reader));
row_group.arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
getPort().getHeader(),
"Parquet",
format_settings.parquet.import_nested,
format_settings.parquet.allow_missing_columns,
format_settings.null_as_default,
format_settings.parquet.case_insensitive_column_matching);
}
ArrowFieldIndexUtil field_util(
format_settings.parquet.case_insensitive_column_matching,
format_settings.parquet.allow_missing_columns);
column_indices = field_util.findRequiredIndices(getPort().getHeader(), *schema);
void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_idx)
{
chassert(!mutex.try_lock());
auto & status = row_groups[row_group_idx].status;
chassert(status == RowGroupState::Status::NotStarted || status == RowGroupState::Status::Paused);
status = RowGroupState::Status::Running;
pool->scheduleOrThrowOnError(
[this, row_group_idx, thread_group = CurrentThread::getGroup()]()
{
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachFromGroupIfNotDetached(););
try
{
setThreadName("ParquetDecoder");
threadFunction(row_group_idx);
}
catch (...)
{
std::lock_guard lock(mutex);
background_exception = std::current_exception();
condvar.notify_all();
}
});
}
void ParquetBlockInputFormat::threadFunction(size_t row_group_idx)
{
std::unique_lock lock(mutex);
auto & row_group = row_groups[row_group_idx];
chassert(row_group.status == RowGroupState::Status::Running);
while (true)
{
if (is_stopped || row_group.num_pending_chunks >= max_pending_chunks_per_row_group)
{
row_group.status = RowGroupState::Status::Paused;
return;
}
decodeOneChunk(row_group_idx, lock);
if (row_group.status == RowGroupState::Status::Done)
return;
}
}
void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_idx, std::unique_lock<std::mutex> & lock)
{
auto & row_group = row_groups[row_group_idx];
chassert(row_group.status != RowGroupState::Status::Done);
chassert(lock.owns_lock());
SCOPE_EXIT({ chassert(lock.owns_lock() || std::uncaught_exceptions()); });
lock.unlock();
auto end_of_row_group = [&] {
row_group.arrow_column_to_ch_column.reset();
row_group.record_batch_reader.reset();
row_group.file_reader.reset();
lock.lock();
row_group.status = RowGroupState::Status::Done;
// We may be able to schedule more work now, but can't call scheduleMoreWorkIfNeeded() right
// here because we're running on the same thread pool, so it'll deadlock if thread limit is
// reached. Wake up generate() instead.
condvar.notify_all();
};
if (!row_group.record_batch_reader)
{
if (skip_row_groups.contains(static_cast<int>(row_group_idx)))
{
// Pretend that the row group is empty.
// (We could avoid scheduling the row group on a thread in the first place. But the
// skip_row_groups feature is mostly unused, so it's better to be a little inefficient
// than to add a bunch of extra mostly-dead code for this.)
end_of_row_group();
return;
}
initializeRowGroupReader(row_group_idx);
}
auto batch = row_group.record_batch_reader->Next();
if (!batch.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", batch.status().ToString());
if (!*batch)
{
end_of_row_group();
return;
}
auto tmp_table = arrow::Table::FromRecordBatches({*batch});
PendingChunk res = {.chunk_idx = row_group.next_chunk_idx, .row_group_idx = row_group_idx};
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &res.block_missing_values : nullptr;
row_group.arrow_column_to_ch_column->arrowTableToCHChunk(res.chunk, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
lock.lock();
++row_group.next_chunk_idx;
++row_group.num_pending_chunks;
pending_chunks.push(std::move(res));
condvar.notify_all();
}
void ParquetBlockInputFormat::scheduleMoreWorkIfNeeded(std::optional<size_t> row_group_touched)
{
while (row_groups_completed < row_groups.size())
{
auto & row_group = row_groups[row_groups_completed];
if (row_group.status != RowGroupState::Status::Done || row_group.num_pending_chunks != 0)
break;
++row_groups_completed;
}
if (pool)
{
while (row_groups_started - row_groups_completed < max_decoding_threads &&
row_groups_started < row_groups.size())
scheduleRowGroup(row_groups_started++);
if (row_group_touched)
{
auto & row_group = row_groups[*row_group_touched];
if (row_group.status == RowGroupState::Status::Paused &&
row_group.num_pending_chunks < max_pending_chunks_per_row_group)
scheduleRowGroup(*row_group_touched);
}
}
}
Chunk ParquetBlockInputFormat::generate()
{
initializeIfNeeded();
std::unique_lock lock(mutex);
while (true)
{
if (background_exception)
{
is_stopped = true;
std::rethrow_exception(background_exception);
}
if (is_stopped)
return {};
scheduleMoreWorkIfNeeded();
if (!pending_chunks.empty() &&
(!format_settings.parquet.preserve_order ||
pending_chunks.top().row_group_idx == row_groups_completed))
{
PendingChunk chunk = std::move(const_cast<PendingChunk&>(pending_chunks.top()));
pending_chunks.pop();
auto & row_group = row_groups[chunk.row_group_idx];
chassert(row_group.num_pending_chunks != 0);
chassert(chunk.chunk_idx == row_group.next_chunk_idx - row_group.num_pending_chunks);
--row_group.num_pending_chunks;
scheduleMoreWorkIfNeeded(chunk.row_group_idx);
previous_block_missing_values = std::move(chunk.block_missing_values);
return std::move(chunk.chunk);
}
if (row_groups_completed == row_groups.size())
return {};
if (pool)
condvar.wait(lock);
else
decodeOneChunk(row_groups_completed, lock);
}
}
void ParquetBlockInputFormat::resetParser()
{
is_stopped = true;
if (pool)
pool->wait();
arrow_file.reset();
metadata.reset();
column_indices.clear();
row_groups.clear();
while (!pending_chunks.empty())
pending_chunks.pop();
row_groups_completed = 0;
previous_block_missing_values.clear();
row_groups_started = 0;
background_exception = nullptr;
is_stopped = false;
is_initialized = false;
IInputFormat::resetParser();
}
const BlockMissingValues & ParquetBlockInputFormat::getMissingValues() const
{
return previous_block_missing_values;
}
ParquetSchemaReader::ParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
@ -145,10 +387,14 @@ ParquetSchemaReader::ParquetSchemaReader(ReadBuffer & in_, const FormatSettings
NamesAndTypesList ParquetSchemaReader::readSchema()
{
std::unique_ptr<parquet::arrow::FileReader> file_reader;
std::atomic<int> is_stopped{0};
auto file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES);
auto metadata = parquet::ReadMetaData(file);
std::shared_ptr<arrow::Schema> schema;
std::atomic<int> is_stopped = 0;
getFileReaderAndSchema(in, file_reader, schema, format_settings, is_stopped);
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema));
auto header = ArrowColumnToCHColumn::arrowSchemaToCHHeader(
*schema, "Parquet", format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference);
if (format_settings.schema_inference_make_columns_nullable)
@ -158,14 +404,25 @@ NamesAndTypesList ParquetSchemaReader::readSchema()
void registerInputFormatParquet(FormatFactory & factory)
{
factory.registerInputFormat(
factory.registerRandomAccessInputFormat(
"Parquet",
[](ReadBuffer &buf,
const Block &sample,
const RowInputFormatParams &,
const FormatSettings & settings)
[](ReadBuffer * buf,
SeekableReadBufferFactoryPtr buf_factory,
const Block & sample,
const FormatSettings & settings,
const ReadSettings& read_settings,
bool is_remote_fs,
size_t /* max_download_threads */,
size_t max_parsing_threads)
{
return std::make_shared<ParquetBlockInputFormat>(buf, sample, settings);
size_t min_bytes_for_seek = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : 8 * 1024;
return std::make_shared<ParquetBlockInputFormat>(
buf,
std::move(buf_factory),
sample,
settings,
max_parsing_threads,
min_bytes_for_seek);
});
factory.markFormatSupportsSubcolumns("Parquet");
factory.markFormatSupportsSubsetOfColumns("Parquet");

View File

@ -6,19 +6,56 @@
#include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h>
namespace parquet { class FileMetaData; }
namespace parquet::arrow { class FileReader; }
namespace arrow { class Buffer; class RecordBatchReader;}
namespace arrow::io { class RandomAccessFile; }
namespace DB
{
class ArrowColumnToCHColumn;
class SeekableReadBufferFactory;
// Parquet files contain a metadata block with the following information:
// * list of columns,
// * list of "row groups",
// * for each column in each row group:
// - byte range for the data,
// - min, max, count,
// - (note that we *don't* have a reliable estimate of the decompressed+decoded size; the
// metadata has decompressed size, but decoded size is sometimes much bigger because of
// dictionary encoding)
//
// This information could be used for:
// (1) Precise reads - only reading the byte ranges we need, instead of filling the whole
// arbitrarily-sized buffer inside ReadBuffer. We know in advance exactly what ranges we'll
// need to read.
// (2) Skipping row groups based on WHERE conditions.
// (3) Skipping decoding of individual pages based on PREWHERE.
// (4) Projections. I.e. for queries that only request min/max/count, we can report the
// min/max/count from metadata. This can be done per row group. I.e. for row groups that
// fully pass the WHERE conditions we'll use min/max/count from metadata, for row groups that
// only partially overlap with the WHERE conditions we'll read data.
// (4a) Before projections are implemented, we should at least be able to do `SELECT count(*)`
// without reading data.
//
// For (1), we need the IInputFormat to be in control of reading, with its own implementation of
// parallel reading+decoding, instead of using ParallelReadBuffer and ParallelParsingInputFormat.
// That's what RandomAccessInputCreator in FormatFactory is about.
class ParquetBlockInputFormat : public IInputFormat
{
public:
ParquetBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_);
ParquetBlockInputFormat(
// exactly one of these two is nullptr
ReadBuffer * buf,
std::unique_ptr<SeekableReadBufferFactory> buf_factory,
const Block & header,
const FormatSettings & format_settings,
size_t max_decoding_threads,
size_t min_bytes_for_seek);
~ParquetBlockInputFormat() override;
void resetParser() override;
@ -29,25 +66,215 @@ public:
private:
Chunk generate() override;
void prepareReader();
void onCancel() override
{
is_stopped = 1;
}
std::unique_ptr<parquet::arrow::FileReader> file_reader;
int row_group_total = 0;
int row_group_current = 0;
// indices of columns to read from Parquet file
std::vector<int> column_indices;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
BlockMissingValues block_missing_values;
void initializeIfNeeded();
void initializeRowGroupReader(size_t row_group_idx);
void decodeOneChunk(size_t row_group_idx, std::unique_lock<std::mutex> & lock);
void scheduleMoreWorkIfNeeded(std::optional<size_t> row_group_touched = std::nullopt);
void scheduleRowGroup(size_t row_group_idx);
void threadFunction(size_t row_group_idx);
// Data layout in the file:
//
// row group 0
// column 0
// page 0, page 1, ...
// column 1
// page 0, page 1, ...
// ...
// row group 1
// column 0
// ...
// ...
// ...
//
// All columns in one row group have the same number of rows.
// (Not necessarily the same number of *values* if there are arrays or nulls.)
// Pages have arbitrary sizes and numbers of rows, independent from each other, even if in the
// same column or row group.
//
// We can think of this as having lots of data streams, one for each column x row group.
// The main job of this class is to schedule read operations for these streams across threads.
// Also: reassembling the results into chunks, creating/destroying these streams, prefetching.
//
// Some considerations:
// * Row group size is typically hundreds of MB (compressed). Apache recommends 0.5 - 1 GB.
// * Compression ratio can be pretty extreme, especially with dictionary compression.
// We can afford to keep a compressed row group in memory, but not uncompressed.
// * For each pair <row group idx, column idx>, the data lives in one contiguous range in the
// file. We know all these ranges in advance, from metadata.
// * The byte range for a column in a row group is often very short, like a few KB.
// So we need to:
// - Avoid unnecessary readahead, e.g. don't read 1 MB when we only need 1 KB.
// - Coalesce nearby ranges into longer reads when needed. E.g. if we need to read 5 ranges,
// 1 KB each, with 1 KB gaps between them, it's better to do 1 x 9 KB read instead of
// 5 x 1 KB reads.
// - Have lots of parallelism for reading (not necessarily for parsing). E.g. if we're
// reading one small column, it may translate to hundreds of tiny reads with long gaps
// between them. If the data comes from an HTTP server, that's hundreds of tiny HTTP GET
// requests. To get good performance, we have to do tens or hundreds of them in parallel.
// So we should probably have separate parallelism control for IO vs parsing (since we
// don't want hundreds of worker threads oversubscribing the CPU cores).
//
// (Some motivating example access patterns:
// - 'SELECT small_column'. Bottlenecked on number of seeks. Need to do lots of file/network
// reads in parallel, for lots of row groups.
// - 'SELECT *' when row group size is big and there are many columns. Read the whole file.
// Need some moderate parallelism for IO and for parsing. Ideally read+parse columns of
// one row group in parallel to avoid having multiple row groups in memory at once.
// - 'SELECT big_column'. Have to read+parse multiple row groups in parallel.
// - 'SELECT big_column, many small columns'. This is a mix of the previous two scenarios.
// We have many columns, but still need to read+parse multiple row groups in parallel.)
// With all that in mind, here's what we do.
//
// We treat each row group as a sequential single-threaded stream of blocks.
//
// We have a sliding window of active row groups. When a row group becomes active, we start
// reading its data (using RAM). Row group becomes inactive when we finish reading and
// delivering all its blocks and free the RAM. Size of the window is max_decoding_threads.
//
// Decoded blocks are placed in `pending_chunks` queue, then picked up by generate().
// If row group decoding runs too far ahead of delivery (by `max_pending_chunks_per_row_group`
// chunks), we pause the stream for the row group, to avoid using too much memory when decoded
// chunks are much bigger than the compressed data.
//
// Also:
// * If preserve_order = true, we deliver chunks strictly in order of increasing row group.
// Decoding may still proceed in later row groups.
// * If max_decoding_threads <= 1, we run all tasks inline in generate(), without thread pool.
// Potential improvements:
// * Plan all read ranges ahead of time, for the whole file, and do prefetching for them
// in background. Using max_download_threads, which can be made much greater than
// max_decoding_threads by default.
// * Can parse different columns within the same row group in parallel. This would let us have
// fewer row groups in memory at once, reducing memory usage when selecting many columns.
// Should probably do more than one column per task because columns are often very small.
// Maybe split each row group into, say, max_decoding_threads * 2 equal-sized column bunches?
// * Sliding window could take into account the (predicted) memory usage of row groups.
// If row groups are big and many columns are selected, we may use lots of memory when
// reading max_decoding_threads row groups at once. Can adjust the sliding window size based
// on row groups' data sizes from metadata.
// * The max_pending_chunks_per_row_group limit could be based on actual memory usage too.
// Useful for preserve_order.
struct RowGroupState
{
// Transitions:
//
// NotStarted -> Running -> Complete
// Ʌ
// V
// Paused
//
// If max_decoding_threads <= 1: NotStarted -> Complete.
enum class Status
{
NotStarted,
Running,
// Paused decoding because too many chunks are pending.
Paused,
// Decoded everything.
Done,
};
Status status = Status::NotStarted;
// Window of chunks that were decoded but not returned from generate():
//
// (delivered) next_chunk_idx
// v v v
// +---+---+---+---+---+---+---+---+---+---+
// | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | <-- all chunks
// +---+---+---+---+---+---+---+---+---+---+
// ^ ^ ^ ^ ^
// num_pending_chunks
// (in pending_chunks)
// (at most max_pending_chunks_per_row_group)
size_t next_chunk_idx = 0;
size_t num_pending_chunks = 0;
// These are only used by the decoding thread, so don't require locking the mutex.
std::unique_ptr<parquet::arrow::FileReader> file_reader;
std::shared_ptr<arrow::RecordBatchReader> record_batch_reader;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
};
// Chunk ready to be delivered by generate().
struct PendingChunk
{
Chunk chunk;
BlockMissingValues block_missing_values;
size_t chunk_idx; // within row group
size_t row_group_idx;
// For priority_queue.
// In ordered mode we deliver strictly in order of increasing row group idx,
// in unordered mode we prefer to interleave chunks from different row groups.
struct Compare
{
bool row_group_first = false;
bool operator()(const PendingChunk & a, const PendingChunk & b) const
{
auto tuplificate = [this](const PendingChunk & c)
{ return row_group_first ? std::tie(c.row_group_idx, c.chunk_idx)
: std::tie(c.chunk_idx, c.row_group_idx); };
return tuplificate(a) > tuplificate(b);
}
};
};
std::unique_ptr<SeekableReadBufferFactory> buf_factory;
const FormatSettings format_settings;
const std::unordered_set<int> & skip_row_groups;
std::shared_ptr<arrow::RecordBatchReader> current_record_batch_reader;
size_t max_decoding_threads;
size_t min_bytes_for_seek;
const size_t max_pending_chunks_per_row_group = 2;
// RandomAccessFile is thread safe, so we share it among threads.
// FileReader is not, so each thread creates its own.
std::shared_ptr<arrow::io::RandomAccessFile> arrow_file;
std::shared_ptr<parquet::FileMetaData> metadata;
// indices of columns to read from Parquet file
std::vector<int> column_indices;
// Window of active row groups:
//
// row_groups_completed row_groups_started
// v v
// +---+---+---+---+---+---+---+---+---+---+
// | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | <-- all row groups
// +---+---+---+---+---+---+---+---+---+---+
// ^ ^ ^ ^ ^
// Done NotStarted
std::mutex mutex;
// Wakes up the generate() call, if any.
std::condition_variable condvar;
std::vector<RowGroupState> row_groups;
std::priority_queue<PendingChunk, std::vector<PendingChunk>, PendingChunk::Compare> pending_chunks;
size_t row_groups_completed = 0;
// These are only used when max_decoding_threads > 1.
size_t row_groups_started = 0;
std::unique_ptr<ThreadPool> pool;
BlockMissingValues previous_block_missing_values;
std::exception_ptr background_exception = nullptr;
std::atomic<int> is_stopped{0};
bool is_initialized = false;
};
class ParquetSchemaReader : public ISchemaReader

View File

@ -45,7 +45,7 @@ ValuesBlockInputFormat::ValuesBlockInputFormat(
const Block & header_,
const RowInputFormatParams & params_,
const FormatSettings & format_settings_)
: IInputFormat(header_, *buf_), buf(std::move(buf_)),
: IInputFormat(header_, buf_.get()), buf(std::move(buf_)),
params(params_), format_settings(format_settings_), num_columns(header_.columns()),
parser_type_for_column(num_columns, ParserType::Streaming),
attempts_to_deduce_template(num_columns), attempts_to_deduce_template_cached(num_columns),

View File

@ -77,8 +77,8 @@ Chunk FileLogSource::generate()
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto input_format
= FormatFactory::instance().getInputFormat(storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
auto input_format = FormatFactory::instance().getInput(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, std::nullopt, 1);
StreamingFormatExecutor executor(non_virtual_header, input_format);

View File

@ -275,8 +275,8 @@ public:
else
read_buf = std::move(remote_read_buf);
auto input_format = FormatFactory::instance().getInputFormat(
format, *read_buf, to_read_block, getContext(), max_block_size, updateFormatSettings(current_file));
auto input_format = FormatFactory::instance().getInput(
format, *read_buf, to_read_block, getContext(), max_block_size, updateFormatSettings(current_file), /* max_parsing_threads */ 1);
Pipe pipe(input_format);
if (columns_description.hasDefaults())
@ -607,8 +607,14 @@ HiveFiles StorageHive::collectHiveFilesFromPartition(
writeString("\n", wb);
ReadBufferFromString buffer(wb.str());
auto format = FormatFactory::instance().getInputFormat(
"CSV", buffer, partition_key_expr->getSampleBlock(), getContext(), getContext()->getSettingsRef().max_block_size);
auto format = FormatFactory::instance().getInput(
"CSV",
buffer,
partition_key_expr->getSampleBlock(),
getContext(),
getContext()->getSettingsRef().max_block_size,
std::nullopt,
/* max_parsing_threads */ 1);
auto pipeline = QueryPipeline(std::move(format));
auto reader = std::make_unique<PullingPipelineExecutor>(pipeline);
Block block;

View File

@ -101,8 +101,8 @@ Chunk KafkaSource::generateImpl()
auto put_error_to_stream = handle_error_mode == HandleKafkaErrorMode::STREAM;
EmptyReadBuffer empty_buf;
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size);
auto input_format = FormatFactory::instance().getInput(
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1);
std::optional<std::string> exception_message;
size_t total_rows = 0;

View File

@ -64,7 +64,7 @@ struct MergeTreeReadTask
/// Used to save current range processing status
MergeTreeRangeReader range_reader;
/// Range readers for multiple filtering steps: row level security, PREWHERE etc.
/// NOTE: we take references to elements and push_back new elements, that's why it is a deque but noit a vector
/// NOTE: we take references to elements and push_back new elements, that's why it is a deque but not a vector
std::deque<MergeTreeRangeReader> pre_range_readers;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;

View File

@ -365,7 +365,6 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
min_prefetch_step_marks = static_cast<size_t>(std::round(static_cast<double>(sum_marks) / settings.filesystem_prefetches_limit));
}
size_t total_prefetches_approx = 0;
for (const auto & part : parts_infos)
{
if (settings.filesystem_prefetch_step_marks)
@ -425,7 +424,6 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
sum_marks, threads, min_marks_per_thread, settings.filesystem_prefetch_step_bytes, settings.filesystem_prefetches_limit, total_size_approx);
size_t current_prefetches_count = 0;
prefetch_queue.reserve(total_prefetches_approx);
ThreadsTasks result_threads_tasks;
size_t memory_usage_approx = 0;

View File

@ -95,8 +95,8 @@ Chunk NATSSource::generate()
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
EmptyReadBuffer empty_buf;
auto input_format
= FormatFactory::instance().getInputFormat(storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size);
auto input_format = FormatFactory::instance().getInput(
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1);
StreamingFormatExecutor executor(non_virtual_header, input_format);

View File

@ -131,8 +131,8 @@ Chunk RabbitMQSource::generateImpl()
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
EmptyReadBuffer empty_buf;
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size);
auto input_format = FormatFactory::instance().getInput(
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1);
StreamingFormatExecutor executor(non_virtual_header, input_format);
size_t total_rows = 0;

View File

@ -574,14 +574,34 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
return {};
size_t object_size = info ? info->size : S3::getObjectSize(*client, bucket, current_key, version_id, request_settings);
auto compression_method = chooseCompressionMethod(current_key, compression_hint);
int zstd_window_log_max = static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max);
auto read_buf = wrapReadBufferWithCompressionMethod(
createS3ReadBuffer(current_key, object_size),
chooseCompressionMethod(current_key, compression_hint),
zstd_window_log_max);
InputFormatPtr input_format;
std::unique_ptr<ReadBuffer> owned_read_buf;
auto read_buf_or_factory = createS3ReadBuffer(current_key, object_size);
if (read_buf_or_factory.buf_factory)
{
input_format = FormatFactory::instance().getInputRandomAccess(
format,
std::move(read_buf_or_factory.buf_factory),
sample_block,
getContext(),
max_block_size,
/* is_remote_fs */ true,
compression_method,
format_settings);
}
else
{
owned_read_buf = wrapReadBufferWithCompressionMethod(
std::move(read_buf_or_factory.buf),
compression_method,
static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max));
input_format = FormatFactory::instance().getInput(
format, *owned_read_buf, sample_block, getContext(), max_block_size, format_settings);
}
auto input_format = getContext()->getInputFormat(format, *read_buf, sample_block, max_block_size, format_settings);
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
@ -595,7 +615,7 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto current_reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
return ReaderHolder{fs::path(bucket) / current_key, std::move(read_buf), std::move(pipeline), std::move(current_reader)};
return ReaderHolder{fs::path(bucket) / current_key, std::move(owned_read_buf), std::move(pipeline), std::move(current_reader)};
}
std::future<StorageS3Source::ReaderHolder> StorageS3Source::createReaderAsync()
@ -603,46 +623,32 @@ std::future<StorageS3Source::ReaderHolder> StorageS3Source::createReaderAsync()
return create_reader_scheduler([this] { return createReader(); }, 0);
}
std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & key, size_t object_size)
StorageS3Source::ReadBufferOrFactory StorageS3Source::createS3ReadBuffer(const String & key, size_t object_size)
{
auto read_settings = getContext()->getReadSettings().adjustBufferSize(object_size);
read_settings.enable_filesystem_cache = false;
auto download_buffer_size = getContext()->getSettings().max_download_buffer_size;
const bool use_parallel_download = download_buffer_size > 0 && download_thread_num > 1;
const bool object_too_small = object_size < download_thread_num * download_buffer_size;
const bool object_too_small = object_size <= 2 * download_buffer_size;
if (!use_parallel_download || object_too_small)
// Create a read buffer that will prefetch the first ~1 MB of the file.
// When reading lots of tiny files, this prefetching almost doubles the throughput.
// For bigger files, parallel reading is more useful.
if (object_too_small && read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
LOG_TRACE(log, "Downloading object of size {} from S3 in single thread", object_size);
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
return createAsyncS3ReadBuffer(key, read_settings, object_size);
return std::make_unique<ReadBufferFromS3>(client, bucket, key, version_id, request_settings, read_settings);
}
assert(object_size > 0);
if (download_buffer_size < DBMS_DEFAULT_BUFFER_SIZE)
{
LOG_WARNING(log, "Downloading buffer {} bytes too small, set at least {} bytes", download_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
download_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
LOG_TRACE(log, "Downloading object of size {} from S3 with initial prefetch", object_size);
return {.buf = createAsyncS3ReadBuffer(key, read_settings, object_size)};
}
auto factory = std::make_unique<ReadBufferS3Factory>(
client, bucket, key, version_id, download_buffer_size, object_size, request_settings, read_settings);
LOG_TRACE(log,
"Downloading from S3 in {} threads. Object size: {}, Range size: {}.",
download_thread_num, object_size, download_buffer_size);
return std::make_unique<ParallelReadBuffer>(std::move(factory), threadPoolCallbackRunner<void>(IOThreadPool::get(), "S3ParallelRead"), download_thread_num);
client, bucket, key, version_id, object_size, request_settings, read_settings);
return {.buf_factory = std::move(factory)};
}
std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
const String & key, const ReadSettings & read_settings, size_t object_size)
{
auto read_buffer_creator =
[this, read_settings]
[this, read_settings, object_size]
(const std::string & path, size_t read_until_position) -> std::shared_ptr<ReadBufferFromFileBase>
{
return std::make_shared<ReadBufferFromS3>(
@ -655,7 +661,8 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
/* use_external_buffer */true,
/* offset */0,
read_until_position,
/* restricted_seek */true);
/* restricted_seek */true,
object_size);
};
auto s3_impl = std::make_unique<ReadBufferFromRemoteFSGather>(

View File

@ -203,6 +203,12 @@ private:
std::unique_ptr<PullingPipelineExecutor> reader;
};
struct ReadBufferOrFactory
{
std::unique_ptr<ReadBuffer> buf;
SeekableReadBufferFactoryPtr buf_factory;
};
ReaderHolder reader;
std::vector<NameAndTypePair> requested_virtual_columns;
@ -223,7 +229,7 @@ private:
ReaderHolder createReader();
std::future<ReaderHolder> createReaderAsync();
std::unique_ptr<ReadBuffer> createS3ReadBuffer(const String & key, size_t object_size);
ReadBufferOrFactory createS3ReadBuffer(const String & key, size_t object_size);
std::unique_ptr<ReadBuffer> createAsyncS3ReadBuffer(const String & key, const ReadSettings & read_settings, size_t object_size);
};

View File

@ -14,7 +14,6 @@
#include <Parsers/ASTIdentifier.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/IOThreadPool.h>
#include <IO/ParallelReadBuffer.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
@ -205,7 +204,7 @@ namespace
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty url list");
auto first_option = uri_options.begin();
read_buf = getFirstAvailableURLReadBuffer(
auto buf_factory = getFirstAvailableURLReadBuffer(
first_option,
uri_options.end(),
context,
@ -213,24 +212,32 @@ namespace
http_method,
callback,
timeouts,
compression_method,
credentials,
headers,
glob_url,
uri_options.size() == 1,
download_threads);
uri_options.size() == 1);
try
{
total_size += getFileSizeFromReadBuffer(*read_buf);
total_size += buf_factory->getFileSize();
}
catch (...)
{
// we simply continue without total_size
}
auto input_format
= FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings);
// TODO: Pass max_parsing_threads and max_download_threads adjusted for num_streams.
auto input_format = FormatFactory::instance().getInputRandomAccess(
format,
std::move(buf_factory),
sample_block,
context,
max_block_size,
/* is_remote_fs */ true,
compression_method,
format_settings,
download_threads);
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
@ -284,7 +291,7 @@ namespace
return {};
}
static std::unique_ptr<ReadBuffer> getFirstAvailableURLReadBuffer(
static SeekableReadBufferFactoryPtr getFirstAvailableURLReadBuffer(
std::vector<String>::const_iterator & option,
const std::vector<String>::const_iterator & end,
ContextPtr context,
@ -292,12 +299,10 @@ namespace
const String & http_method,
std::function<void(std::ostream &)> callback,
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method,
Poco::Net::HTTPBasicCredentials & credentials,
const HTTPHeaderEntries & headers,
bool glob_url,
bool delay_initialization,
size_t download_threads)
bool delay_initialization)
{
String first_exception_message;
ReadSettings read_settings = context->getReadSettings();
@ -314,141 +319,40 @@ namespace
setCredentials(credentials, request_uri);
const auto settings = context->getSettings();
int zstd_window_log_max = static_cast<int>(settings.zstd_window_log_max);
try
auto res = std::make_unique<RangedReadWriteBufferFromHTTPFactory>(
request_uri,
http_method,
callback,
timeouts,
credentials,
settings.max_http_get_redirects,
settings.max_read_buffer_size,
read_settings,
headers,
&context->getRemoteHostFilter(),
delay_initialization,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
if (options > 1)
{
if (download_threads > 1)
// Send a HEAD request to check availability.
try
{
try
{
ReadWriteBufferFromHTTP buffer(
request_uri,
Poco::Net::HTTPRequest::HTTP_HEAD,
callback,
timeouts,
credentials,
settings.max_http_get_redirects,
settings.max_read_buffer_size,
read_settings,
headers,
ReadWriteBufferFromHTTP::Range{0, std::nullopt},
&context->getRemoteHostFilter(),
true,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
Poco::Net::HTTPResponse res;
for (size_t i = 0; i < settings.http_max_tries; ++i)
{
try
{
buffer.callWithRedirects(res, Poco::Net::HTTPRequest::HTTP_HEAD, true);
break;
}
catch (const Poco::Exception & e)
{
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
"HTTP HEAD request to `{}` failed at try {}/{}. "
"Error: {}.",
request_uri.toString(),
i + 1,
settings.http_max_tries,
e.displayText());
if (!ReadWriteBufferFromHTTP::isRetriableError(res.getStatus()))
{
throw;
}
}
}
// to check if Range header is supported, we need to send a request with it set
const bool supports_ranges = (res.has("Accept-Ranges") && res.get("Accept-Ranges") == "bytes")
|| (res.has("Content-Range") && res.get("Content-Range").starts_with("bytes"));
if (supports_ranges)
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "HTTP Range is supported");
else
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "HTTP Range is not supported");
if (supports_ranges && res.getStatus() == Poco::Net::HTTPResponse::HTTP_PARTIAL_CONTENT
&& res.hasContentLength())
{
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
"Using ParallelReadBuffer with {} workers with chunks of {} bytes",
download_threads,
settings.max_download_buffer_size);
auto read_buffer_factory = std::make_unique<RangedReadWriteBufferFromHTTPFactory>(
res.getContentLength(),
settings.max_download_buffer_size,
request_uri,
http_method,
callback,
timeouts,
credentials,
settings.max_http_get_redirects,
settings.max_read_buffer_size,
read_settings,
headers,
&context->getRemoteHostFilter(),
delay_initialization,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
return wrapReadBufferWithCompressionMethod(
std::make_unique<ParallelReadBuffer>(
std::move(read_buffer_factory),
threadPoolCallbackRunner<void>(IOThreadPool::get(), "URLParallelRead"),
download_threads),
compression_method,
zstd_window_log_max);
}
}
catch (const Poco::Exception & e)
{
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
"Failed to setup ParallelReadBuffer because of an exception:\n{}.\nFalling back to the single-threaded "
"buffer",
e.displayText());
}
res->getFileInfo();
}
catch (...)
{
if (first_exception_message.empty())
first_exception_message = getCurrentExceptionMessage(false);
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "Using single-threaded read buffer");
tryLogCurrentException(__PRETTY_FUNCTION__);
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
request_uri,
http_method,
callback,
timeouts,
credentials,
settings.max_http_get_redirects,
settings.max_read_buffer_size,
read_settings,
headers,
ReadWriteBufferFromHTTP::Range{},
&context->getRemoteHostFilter(),
delay_initialization,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error),
compression_method,
zstd_window_log_max);
continue;
}
}
catch (...)
{
if (first_exception_message.empty())
first_exception_message = getCurrentExceptionMessage(false);
if (options == 1)
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
return res;
}
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri ({}) options are unreachable: {}", options, first_exception_message);
@ -461,7 +365,6 @@ namespace
String name;
URIInfoPtr uri_info;
std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
@ -656,7 +559,7 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
if (it == urls_to_check.cend())
return nullptr;
auto buf = StorageURLSource::getFirstAvailableURLReadBuffer(
auto buf_factory = StorageURLSource::getFirstAvailableURLReadBuffer(
it,
urls_to_check.cend(),
context,
@ -664,14 +567,15 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
Poco::Net::HTTPRequest::HTTP_GET,
{},
getHTTPTimeouts(context),
compression_method,
credentials,
headers,
false,
false,
context->getSettingsRef().max_download_threads);\
false);
++it;
return buf;
return wrapReadBufferWithCompressionMethod(
buf_factory->getReader(),
compression_method,
static_cast<int>(context->getSettingsRef().zstd_window_log_max));
};
ColumnsDescription columns;
@ -936,7 +840,6 @@ std::optional<time_t> IStorageURLBase::getLastModificationTime(
settings.max_read_buffer_size,
context->getReadSettings(),
headers,
ReadWriteBufferFromHTTP::Range{},
&context->getRemoteHostFilter(),
true,
false,

View File

@ -23,9 +23,9 @@ void StorageSystemFormats::fillData(MutableColumns & res_columns, ContextPtr, co
for (const auto & pair : formats)
{
const auto & [format_name, creators] = pair;
UInt64 has_input_format(creators.input_creator != nullptr);
UInt64 has_input_format(creators.input_creator != nullptr || creators.random_access_input_creator != nullptr);
UInt64 has_output_format(creators.output_creator != nullptr);
UInt64 supports_parallel_parsing(creators.file_segmentation_engine != nullptr);
UInt64 supports_parallel_parsing(creators.file_segmentation_engine != nullptr || creators.random_access_input_creator != nullptr);
UInt64 supports_parallel_formatting(creators.supports_parallel_formatting);
res_columns[0]->insert(format_name);

View File

@ -12,12 +12,10 @@
{"query":"select * from url('http:\/\/127.0.0.2:8123\/?query=select%201%20format%20Null', CSV, 'a int')","status":"QueryFinish","tracestate":"another custom state","sorted_by_start_time":1}
{"query":"select 1 format Null\n","status":"QueryFinish","tracestate":"another custom state","sorted_by_start_time":1}
{"query":"select 1 format Null\n","status":"QueryFinish","tracestate":"another custom state","sorted_by_start_time":1}
{"query":"select 1 format Null\n","status":"QueryFinish","tracestate":"another custom state","sorted_by_start_time":1}
{"query":"select 1 format Null\n","query_status":"QueryFinish","tracestate":"another custom state","sorted_by_finish_time":1}
{"query":"select 1 format Null\n","query_status":"QueryFinish","tracestate":"another custom state","sorted_by_finish_time":1}
{"query":"select 1 format Null\n","query_status":"QueryFinish","tracestate":"another custom state","sorted_by_finish_time":1}
{"query":"select * from url('http:\/\/127.0.0.2:8123\/?query=select%201%20format%20Null', CSV, 'a int')","query_status":"QueryFinish","tracestate":"another custom state","sorted_by_finish_time":1}
{"total spans":"4","unique spans":"4","unique non-zero parent spans":"4"}
{"total spans":"3","unique spans":"3","unique non-zero parent spans":"3"}
{"initial query spans with proper parent":"1"}
{"unique non-empty tracestate values":"1"}
===sampled===

View File

@ -143,7 +143,7 @@ def test_select():
def main():
# HEAD + GET
t = start_server(3)
t = start_server(2)
t.start()
test_select()
t.join()