2023-06-12 12:29:16 +00:00
# include <IO/Operators.h>
# include <Interpreters/Context.h>
# include <Interpreters/threadPoolCallbackRunner.h>
2023-05-25 22:54:54 +00:00
# include <Storages/MergeTree/AlterConversions.h>
2023-06-12 12:29:16 +00:00
# include <Storages/MergeTree/IMergeTreeReader.h>
# include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
# include <Storages/MergeTree/MarkRange.h>
# include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
# include <Storages/MergeTree/MergeTreePrefetchedReadPool.h>
2023-05-25 22:54:54 +00:00
# include <Storages/MergeTree/MergeTreeRangeReader.h>
2023-06-12 12:29:16 +00:00
# include <Storages/MergeTree/RangesInDataPart.h>
# include <base/getThreadId.h>
2023-02-07 17:50:31 +00:00
# include <Common/ElapsedTimeProfileEventIncrement.h>
2023-04-08 04:47:21 +00:00
# include <Common/logger_useful.h>
2023-11-02 23:32:07 +00:00
# include <Common/FailPoint.h>
2023-02-07 17:50:31 +00:00
namespace ProfileEvents
{
extern const Event MergeTreePrefetchedReadPoolInit ;
2023-08-29 13:38:51 +00:00
extern const Event WaitPrefetchTaskMicroseconds ;
2023-02-07 17:50:31 +00:00
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR ;
2023-04-20 17:35:49 +00:00
extern const int BAD_ARGUMENTS ;
2023-02-07 17:50:31 +00:00
}
2023-11-02 23:32:07 +00:00
namespace FailPoints
{
2023-11-03 15:40:50 +00:00
extern const char prefetched_reader_pool_failpoint [ ] ;
2023-11-02 23:32:07 +00:00
}
2023-08-29 13:38:51 +00:00
bool MergeTreePrefetchedReadPool : : TaskHolder : : operator < ( const TaskHolder & other ) const
{
chassert ( task - > priority > = 0 ) ;
chassert ( other . task - > priority > = 0 ) ;
/// With default std::priority_queue, top() returns largest element.
/// So closest to 0 will be on top with this comparator.
return task - > priority > other . task - > priority ; /// Less is better.
}
2023-11-03 15:00:16 +00:00
2023-11-03 15:40:50 +00:00
MergeTreePrefetchedReadPool : : PrefetchedReaders : : ~ PrefetchedReaders ( )
2023-11-03 15:00:16 +00:00
{
for ( auto & prefetch_future : prefetch_futures )
if ( prefetch_future . valid ( ) )
prefetch_future . wait ( ) ;
}
2023-11-03 15:40:50 +00:00
MergeTreePrefetchedReadPool : : PrefetchedReaders : : PrefetchedReaders (
2023-08-29 17:23:52 +00:00
MergeTreeReadTask : : Readers readers_ ,
Priority priority_ ,
MergeTreePrefetchedReadPool & pool_ )
: is_valid ( true )
, readers ( std : : move ( readers_ ) )
2023-08-29 13:38:51 +00:00
{
2023-11-02 23:32:07 +00:00
try
{
2023-11-03 15:53:52 +00:00
prefetch_futures . reserve ( 1 + readers . prewhere . size ( ) ) ;
2023-11-02 23:32:07 +00:00
prefetch_futures . push_back ( pool_ . createPrefetchedFuture ( readers . main . get ( ) , priority_ ) ) ;
for ( const auto & reader : readers . prewhere )
prefetch_futures . push_back ( pool_ . createPrefetchedFuture ( reader . get ( ) , priority_ ) ) ;
2023-11-03 15:40:50 +00:00
fiu_do_on ( FailPoints : : prefetched_reader_pool_failpoint ,
2023-11-02 23:32:07 +00:00
{
2023-11-03 15:40:50 +00:00
throw Exception ( ErrorCodes : : BAD_ARGUMENTS , " Failpoint for prefetched reader enabled " ) ;
2023-11-02 23:32:07 +00:00
} ) ;
}
catch ( . . . ) /// in case of memory exceptions we have to wait
{
for ( auto & prefetch_future : prefetch_futures )
if ( prefetch_future . valid ( ) )
prefetch_future . wait ( ) ;
2023-08-29 17:23:52 +00:00
2023-11-02 23:32:07 +00:00
throw ;
}
2023-08-29 13:38:51 +00:00
}
2023-11-03 15:40:50 +00:00
void MergeTreePrefetchedReadPool : : PrefetchedReaders : : wait ( )
2023-08-29 13:38:51 +00:00
{
ProfileEventTimeIncrement < Microseconds > watch ( ProfileEvents : : WaitPrefetchTaskMicroseconds ) ;
2023-08-29 17:23:52 +00:00
for ( auto & prefetch_future : prefetch_futures )
prefetch_future . wait ( ) ;
}
2023-08-29 13:38:51 +00:00
2023-11-03 15:40:50 +00:00
MergeTreeReadTask : : Readers MergeTreePrefetchedReadPool : : PrefetchedReaders : : get ( )
2023-08-29 17:23:52 +00:00
{
2023-09-07 16:56:13 +00:00
SCOPE_EXIT ( { is_valid = false ; } ) ;
2023-08-29 17:23:52 +00:00
ProfileEventTimeIncrement < Microseconds > watch ( ProfileEvents : : WaitPrefetchTaskMicroseconds ) ;
2023-09-11 14:52:59 +00:00
/// First wait for completion of all futures.
for ( auto & prefetch_future : prefetch_futures )
prefetch_future . wait ( ) ;
/// Then rethrow first exception if any.
2023-08-29 17:23:52 +00:00
for ( auto & prefetch_future : prefetch_futures )
prefetch_future . get ( ) ;
return std : : move ( readers ) ;
2023-08-29 13:38:51 +00:00
}
2023-02-07 17:50:31 +00:00
MergeTreePrefetchedReadPool : : MergeTreePrefetchedReadPool (
RangesInDataParts & & parts_ ,
const StorageSnapshotPtr & storage_snapshot_ ,
const PrewhereInfoPtr & prewhere_info_ ,
2023-02-12 21:48:28 +00:00
const ExpressionActionsSettings & actions_settings_ ,
2023-08-29 13:38:51 +00:00
const MergeTreeReaderSettings & reader_settings_ ,
2023-02-07 17:50:31 +00:00
const Names & column_names_ ,
const Names & virtual_column_names_ ,
2023-08-29 13:38:51 +00:00
const PoolSettings & settings_ ,
const ContextPtr & context_ )
: MergeTreeReadPoolBase (
std : : move ( parts_ ) ,
storage_snapshot_ ,
prewhere_info_ ,
actions_settings_ ,
reader_settings_ ,
column_names_ ,
virtual_column_names_ ,
settings_ ,
context_ )
, WithContext ( context_ )
2023-02-07 17:50:31 +00:00
, prefetch_threadpool ( getContext ( ) - > getPrefetchThreadpool ( ) )
2023-09-21 10:54:09 +00:00
, log ( & Poco : : Logger : : get ( " MergeTreePrefetchedReadPool( " + ( parts_ranges . empty ( ) ? " " : parts_ranges . front ( ) . data_part - > storage . getStorageID ( ) . getNameForLogs ( ) ) + " ) " ) )
2023-02-07 17:50:31 +00:00
{
/// Tasks creation might also create a lost of readers - check they do not
/// do any time consuming operations in ctor.
ProfileEventTimeIncrement < Milliseconds > watch ( ProfileEvents : : MergeTreePrefetchedReadPoolInit ) ;
2023-08-29 13:38:51 +00:00
fillPerPartStatistics ( ) ;
fillPerThreadTasks ( pool_settings . threads , pool_settings . sum_marks ) ;
2023-02-07 17:50:31 +00:00
}
2023-08-29 17:23:52 +00:00
std : : future < void > MergeTreePrefetchedReadPool : : createPrefetchedFuture ( IMergeTreeReader * reader , Priority priority )
2023-08-29 13:38:51 +00:00
{
2023-02-07 17:50:31 +00:00
/// In order to make a prefetch we need to wait for marks to be loaded. But we just created
/// a reader (which starts loading marks in its constructor), then if we do prefetch right
/// after creating a reader, it will be very inefficient. We can do prefetch for all parts
/// only inside this MergeTreePrefetchedReadPool, where read tasks are created and distributed,
/// and we cannot block either, therefore make prefetch inside the pool and put the future
2023-08-29 17:23:52 +00:00
/// into the thread task. When a thread calls getTask(), it will wait for it is not ready yet.
auto task = [ = , context = getContext ( ) ] ( ) mutable
2023-02-07 17:50:31 +00:00
{
/// For async read metrics in system.query_log.
PrefetchIncrement watch ( context - > getAsyncReadCounters ( ) ) ;
2023-08-29 17:23:52 +00:00
reader - > prefetchBeginOfRange ( priority ) ;
2023-02-07 17:50:31 +00:00
} ;
2023-08-29 13:38:51 +00:00
2023-08-29 17:23:52 +00:00
return scheduleFromThreadPool < void > ( std : : move ( task ) , prefetch_threadpool , " ReadPrepare " , priority ) ;
2023-02-07 17:50:31 +00:00
}
2023-08-29 13:38:51 +00:00
void MergeTreePrefetchedReadPool : : createPrefetchedReadersForTask ( ThreadTask & task )
2023-02-07 17:50:31 +00:00
{
2023-11-03 15:00:16 +00:00
if ( task . isValidReadersFuture ( ) )
2023-02-07 17:50:31 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Task already has a reader " ) ;
2023-08-29 13:38:51 +00:00
auto extras = getExtras ( ) ;
auto readers = MergeTreeReadTask : : createReaders ( task . read_info , extras , task . ranges ) ;
2023-11-03 15:40:50 +00:00
task . readers_future = std : : make_unique < PrefetchedReaders > ( std : : move ( readers ) , task . priority , * this ) ;
2023-02-07 17:50:31 +00:00
}
2023-08-29 13:38:51 +00:00
void MergeTreePrefetchedReadPool : : startPrefetches ( )
2023-02-07 17:50:31 +00:00
{
2023-04-21 18:02:34 +00:00
if ( prefetch_queue . empty ( ) )
return ;
2023-08-29 13:38:51 +00:00
[[maybe_unused]] TaskHolder prev ;
2023-05-26 13:55:30 +00:00
[[maybe_unused]] const Priority highest_priority { reader_settings . read_settings . priority . value + 1 } ;
2023-04-21 18:02:34 +00:00
assert ( prefetch_queue . top ( ) . task - > priority = = highest_priority ) ;
2023-08-29 13:38:51 +00:00
2023-04-21 18:02:34 +00:00
while ( ! prefetch_queue . empty ( ) )
2023-02-07 17:50:31 +00:00
{
2023-08-31 12:59:21 +00:00
const auto & top = prefetch_queue . top ( ) ;
2023-08-29 13:38:51 +00:00
createPrefetchedReadersForTask ( * top . task ) ;
2023-04-21 18:02:34 +00:00
# ifndef NDEBUG
if ( prev . task )
{
assert ( top . task - > priority > = highest_priority ) ;
if ( prev . thread_id = = top . thread_id )
assert ( prev . task - > priority < top . task - > priority ) ;
}
prev = top ;
# endif
prefetch_queue . pop ( ) ;
2023-02-07 17:50:31 +00:00
}
}
2023-08-29 13:38:51 +00:00
MergeTreeReadTaskPtr MergeTreePrefetchedReadPool : : getTask ( size_t task_idx , MergeTreeReadTask * previous_task )
2023-02-07 17:50:31 +00:00
{
std : : lock_guard lock ( mutex ) ;
2023-08-29 13:38:51 +00:00
if ( per_thread_tasks . empty ( ) )
2023-02-07 17:50:31 +00:00
return nullptr ;
if ( ! started_prefetches )
{
started_prefetches = true ;
2023-04-21 18:02:34 +00:00
startPrefetches ( ) ;
2023-02-07 17:50:31 +00:00
}
2023-08-29 13:38:51 +00:00
auto it = per_thread_tasks . find ( task_idx ) ;
if ( it = = per_thread_tasks . end ( ) )
return stealTask ( task_idx , previous_task ) ;
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
auto & thread_tasks = it - > second ;
assert ( ! thread_tasks . empty ( ) ) ;
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
auto thread_task = std : : move ( thread_tasks . front ( ) ) ;
thread_tasks . pop_front ( ) ;
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
if ( thread_tasks . empty ( ) )
per_thread_tasks . erase ( it ) ;
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
return createTask ( * thread_task , previous_task ) ;
}
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
MergeTreeReadTaskPtr MergeTreePrefetchedReadPool : : stealTask ( size_t thread , MergeTreeReadTask * previous_task )
{
auto non_prefetched_tasks_to_steal = per_thread_tasks . end ( ) ;
auto prefetched_tasks_to_steal = per_thread_tasks . end ( ) ;
int64_t best_prefetched_task_priority = - 1 ;
/// There is no point stealing in order (like in MergeTreeReadPool, where tasks can be stolen
/// only from the next thread). Even if we steal task from the next thread, which reads from
/// the same part as we just read, it might seem that we can reuse our own reader, do some
/// seek avoiding and it will have a good result as we avoided seek (new request). But it is
/// not so, because this next task will most likely have its own reader a prefetch already on
/// the fly. (Not to mention that in fact we cannot reuse our own reader if initially we did
/// not accounted this range into range request to object storage).
for ( auto thread_tasks_it = per_thread_tasks . begin ( ) ; thread_tasks_it ! = per_thread_tasks . end ( ) ; + + thread_tasks_it )
{
/// Prefer to steal tasks which have an initialized reader (with prefetched data). Thus we avoid
/// losing a prefetch by creating our own reader (or resusing our own reader if the part
/// is the same as last read by this thread).
auto & thread_tasks = thread_tasks_it - > second ;
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
auto task_it = std : : find_if (
thread_tasks . begin ( ) , thread_tasks . end ( ) ,
2023-11-03 15:00:16 +00:00
[ ] ( const auto & task ) { return task - > isValidReadersFuture ( ) ; } ) ;
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
if ( task_it = = thread_tasks . end ( ) )
2023-02-07 17:50:31 +00:00
{
2023-08-29 13:38:51 +00:00
/// The follow back to non-prefetched task should lie on the thread which
/// has more tasks than others.
if ( non_prefetched_tasks_to_steal = = per_thread_tasks . end ( )
| | non_prefetched_tasks_to_steal - > second . size ( ) < thread_tasks . size ( ) )
non_prefetched_tasks_to_steal = thread_tasks_it ;
2023-02-07 17:50:31 +00:00
}
2023-08-29 13:38:51 +00:00
/// Try to steal task with the best (lowest) priority (because it will be executed faster).
else if ( prefetched_tasks_to_steal = = per_thread_tasks . end ( )
| | ( * task_it ) - > priority < best_prefetched_task_priority )
{
best_prefetched_task_priority = ( * task_it ) - > priority ;
chassert ( best_prefetched_task_priority > = 0 ) ;
prefetched_tasks_to_steal = thread_tasks_it ;
}
}
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
if ( prefetched_tasks_to_steal ! = per_thread_tasks . end ( ) )
{
auto & thread_tasks = prefetched_tasks_to_steal - > second ;
assert ( ! thread_tasks . empty ( ) ) ;
auto task_it = std : : find_if (
thread_tasks . begin ( ) , thread_tasks . end ( ) ,
2023-11-03 15:00:16 +00:00
[ ] ( const auto & task ) { return task - > isValidReadersFuture ( ) ; } ) ;
2023-08-29 13:38:51 +00:00
assert ( task_it ! = thread_tasks . end ( ) ) ;
auto thread_task = std : : move ( * task_it ) ;
thread_tasks . erase ( task_it ) ;
if ( thread_tasks . empty ( ) )
per_thread_tasks . erase ( prefetched_tasks_to_steal ) ;
return createTask ( * thread_task , previous_task ) ;
2023-02-07 17:50:31 +00:00
}
2023-08-29 13:38:51 +00:00
/// TODO: it also makes sense to first try to steal from the next thread if it has ranges
/// from the same part as current thread last read - to reuse the reader.
if ( non_prefetched_tasks_to_steal ! = per_thread_tasks . end ( ) )
{
auto & thread_tasks = non_prefetched_tasks_to_steal - > second ;
assert ( ! thread_tasks . empty ( ) ) ;
/// Get second half of the tasks.
const size_t total_tasks = thread_tasks . size ( ) ;
const size_t half = total_tasks / 2 ;
auto half_it = thread_tasks . begin ( ) + half ;
assert ( half_it ! = thread_tasks . end ( ) ) ;
/// Give them to current thread, as current thread's tasks list is empty.
auto & current_thread_tasks = per_thread_tasks [ thread ] ;
current_thread_tasks . insert (
current_thread_tasks . end ( ) , make_move_iterator ( half_it ) , make_move_iterator ( thread_tasks . end ( ) ) ) ;
/// Erase them from the thread from which we steal.
thread_tasks . resize ( half ) ;
if ( thread_tasks . empty ( ) )
per_thread_tasks . erase ( non_prefetched_tasks_to_steal ) ;
auto thread_task = std : : move ( current_thread_tasks . front ( ) ) ;
current_thread_tasks . erase ( current_thread_tasks . begin ( ) ) ;
if ( current_thread_tasks . empty ( ) )
per_thread_tasks . erase ( thread ) ;
return createTask ( * thread_task , previous_task ) ;
}
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
return nullptr ;
}
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
MergeTreeReadTaskPtr MergeTreePrefetchedReadPool : : createTask ( ThreadTask & task , MergeTreeReadTask * previous_task )
{
2023-11-03 15:00:16 +00:00
if ( task . isValidReadersFuture ( ) )
2023-08-29 13:38:51 +00:00
{
auto size_predictor = task . read_info - > shared_size_predictor
? std : : make_unique < MergeTreeBlockSizePredictor > ( * task . read_info - > shared_size_predictor )
: nullptr ;
2023-11-03 15:00:16 +00:00
return std : : make_unique < MergeTreeReadTask > ( task . read_info , task . readers_future - > get ( ) , task . ranges , std : : move ( size_predictor ) ) ;
2023-08-29 13:38:51 +00:00
}
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
return MergeTreeReadPoolBase : : createTask ( task . read_info , task . ranges , previous_task ) ;
2023-02-07 17:50:31 +00:00
}
2023-06-12 12:29:16 +00:00
size_t getApproximateSizeOfGranule ( const IMergeTreeDataPart & part , const Names & columns_to_read )
2023-02-07 17:50:31 +00:00
{
2023-06-12 12:29:16 +00:00
ColumnSize columns_size { } ;
for ( const auto & col_name : columns_to_read )
columns_size . add ( part . getColumnSize ( col_name ) ) ;
return columns_size . data_compressed / part . getMarksCount ( ) ;
2023-02-07 17:50:31 +00:00
}
2023-08-29 13:38:51 +00:00
void MergeTreePrefetchedReadPool : : fillPerPartStatistics ( )
2023-02-07 17:50:31 +00:00
{
2023-08-29 13:38:51 +00:00
per_part_statistics . clear ( ) ;
per_part_statistics . reserve ( parts_ranges . size ( ) ) ;
2023-04-20 21:24:04 +00:00
const auto & settings = getContext ( ) - > getSettingsRef ( ) ;
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
for ( size_t i = 0 ; i < parts_ranges . size ( ) ; + + i )
2023-02-07 17:50:31 +00:00
{
2023-08-29 13:38:51 +00:00
auto & part_stat = per_part_statistics . emplace_back ( ) ;
const auto & read_info = * per_part_infos [ i ] ;
2023-05-25 22:54:54 +00:00
2023-02-07 17:50:31 +00:00
/// Sum up total size of all mark ranges in a data part.
2023-08-29 13:38:51 +00:00
for ( const auto & range : parts_ranges [ i ] . ranges )
part_stat . sum_marks + = range . end - range . begin ;
2023-02-07 17:50:31 +00:00
2023-06-28 21:41:51 +00:00
const auto & columns = settings . merge_tree_determine_task_size_by_prewhere_columns & & prewhere_info
? prewhere_info - > prewhere_actions - > getRequiredColumnsNames ( )
: column_names ;
2023-08-29 13:38:51 +00:00
part_stat . approx_size_of_mark = getApproximateSizeOfGranule ( * read_info . data_part , columns ) ;
auto update_stat_for_column = [ & ] ( const auto & column_name )
{
size_t column_size = read_info . data_part - > getColumnSize ( column_name ) . data_compressed ;
part_stat . estimated_memory_usage_for_single_prefetch + = std : : min < size_t > ( column_size , settings . prefetch_buffer_size ) ;
+ + part_stat . required_readers_num ;
} ;
2023-02-07 17:50:31 +00:00
2023-04-20 21:24:04 +00:00
/// adjustBufferSize(), which is done in MergeTreeReaderStream and MergeTreeReaderCompact,
/// lowers buffer size if file size (or required read range) is less. So we know that the
/// settings.prefetch_buffer_size will be lowered there, therefore we account it here as well.
/// But here we make a more approximate lowering (because we do not have loaded marks yet),
/// while in adjustBufferSize it will be presize.
2023-08-29 13:38:51 +00:00
for ( const auto & column : read_info . task_columns . columns )
update_stat_for_column ( column . name ) ;
2023-04-20 21:24:04 +00:00
2023-08-29 13:38:51 +00:00
if ( reader_settings . apply_deleted_mask & & read_info . data_part - > hasLightweightDelete ( ) )
update_stat_for_column ( LightweightDeleteDescription : : FILTER_COLUMN . name ) ;
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
for ( const auto & pre_columns : read_info . task_columns . pre_columns )
for ( const auto & column : pre_columns )
update_stat_for_column ( column . name ) ;
}
2023-02-07 17:50:31 +00:00
}
2023-08-29 13:38:51 +00:00
void MergeTreePrefetchedReadPool : : fillPerThreadTasks ( size_t threads , size_t sum_marks )
2023-02-07 17:50:31 +00:00
{
2023-08-29 13:38:51 +00:00
if ( per_part_infos . empty ( ) )
return ;
2023-02-07 17:50:31 +00:00
const auto & context = getContext ( ) ;
const auto & settings = context - > getSettingsRef ( ) ;
size_t total_size_approx = 0 ;
2023-08-29 13:38:51 +00:00
for ( const auto & part : per_part_statistics )
total_size_approx + = part . sum_marks * part . approx_size_of_mark ;
2023-02-07 17:50:31 +00:00
size_t min_prefetch_step_marks = 0 ;
2023-08-29 13:38:51 +00:00
for ( size_t i = 0 ; i < per_part_infos . size ( ) ; + + i )
2023-02-07 17:50:31 +00:00
{
2023-08-29 13:38:51 +00:00
auto & part_stat = per_part_statistics [ i ] ;
2023-02-07 17:50:31 +00:00
if ( settings . filesystem_prefetch_step_marks )
{
2023-08-29 13:38:51 +00:00
part_stat . prefetch_step_marks = settings . filesystem_prefetch_step_marks ;
2023-02-07 17:50:31 +00:00
}
2023-08-29 13:38:51 +00:00
else if ( settings . filesystem_prefetch_step_bytes & & part_stat . approx_size_of_mark )
2023-02-07 17:50:31 +00:00
{
2023-08-29 13:38:51 +00:00
part_stat . prefetch_step_marks = std : : max < size_t > (
1 , static_cast < size_t > ( std : : round ( static_cast < double > ( settings . filesystem_prefetch_step_bytes ) / part_stat . approx_size_of_mark ) ) ) ;
2023-02-07 17:50:31 +00:00
}
/// This limit is important to avoid spikes of slow aws getObject requests when parallelizing within one file.
/// (The default is taken from here https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/use-byte-range-fetches.html).
2023-08-29 13:38:51 +00:00
if ( part_stat . approx_size_of_mark
2023-02-07 17:50:31 +00:00
& & settings . filesystem_prefetch_min_bytes_for_single_read_task
2023-08-29 13:38:51 +00:00
& & part_stat . approx_size_of_mark < settings . filesystem_prefetch_min_bytes_for_single_read_task )
2023-02-07 17:50:31 +00:00
{
2023-06-12 12:29:16 +00:00
const size_t min_prefetch_step_marks_by_total_cols = static_cast < size_t > (
2023-08-29 13:38:51 +00:00
std : : ceil ( static_cast < double > ( settings . filesystem_prefetch_min_bytes_for_single_read_task ) / part_stat . approx_size_of_mark ) ) ;
2023-06-12 12:29:16 +00:00
/// At least one task to start working on it right now and another one to prefetch in the meantime.
const size_t new_min_prefetch_step_marks = std : : min < size_t > ( min_prefetch_step_marks_by_total_cols , sum_marks / threads / 2 ) ;
2023-02-07 17:50:31 +00:00
if ( min_prefetch_step_marks < new_min_prefetch_step_marks )
{
2023-06-12 12:29:16 +00:00
LOG_DEBUG ( log , " Increasing min prefetch step from {} to {} " , min_prefetch_step_marks , new_min_prefetch_step_marks ) ;
2023-02-07 17:50:31 +00:00
min_prefetch_step_marks = new_min_prefetch_step_marks ;
}
}
2023-08-29 13:38:51 +00:00
if ( part_stat . prefetch_step_marks < min_prefetch_step_marks )
2023-02-07 17:50:31 +00:00
{
2023-08-29 13:38:51 +00:00
LOG_DEBUG ( log , " Increasing prefetch step from {} to {} " , part_stat . prefetch_step_marks , min_prefetch_step_marks ) ;
part_stat . prefetch_step_marks = min_prefetch_step_marks ;
2023-02-07 17:50:31 +00:00
}
2023-06-12 12:29:16 +00:00
LOG_DEBUG (
log ,
" Part: {}, sum_marks: {}, approx mark size: {}, prefetch_step_bytes: {}, prefetch_step_marks: {}, (ranges: {}) " ,
2023-08-29 13:38:51 +00:00
parts_ranges [ i ] . data_part - > name ,
part_stat . sum_marks ,
part_stat . approx_size_of_mark ,
2023-06-12 12:29:16 +00:00
settings . filesystem_prefetch_step_bytes ,
2023-08-29 13:38:51 +00:00
part_stat . prefetch_step_marks ,
toString ( parts_ranges [ i ] . ranges ) ) ;
2023-02-07 17:50:31 +00:00
}
const size_t min_marks_per_thread = ( sum_marks - 1 ) / threads + 1 ;
LOG_DEBUG (
log ,
2023-06-12 12:29:16 +00:00
" Sum marks: {}, threads: {}, min_marks_per_thread: {}, min prefetch step marks: {}, prefetches limit: {}, total_size_approx: {} " ,
sum_marks ,
threads ,
min_marks_per_thread ,
min_prefetch_step_marks ,
settings . filesystem_prefetches_limit ,
total_size_approx ) ;
2023-02-07 17:50:31 +00:00
2023-04-20 13:42:26 +00:00
size_t allowed_memory_usage = settings . filesystem_prefetch_max_memory_usage ;
if ( ! allowed_memory_usage )
throw Exception ( ErrorCodes : : BAD_ARGUMENTS , " Setting `filesystem_prefetch_max_memory_usage` must be non-zero " ) ;
2023-08-29 13:38:51 +00:00
2023-04-20 13:42:26 +00:00
std : : optional < size_t > allowed_prefetches_num = settings . filesystem_prefetches_limit
? std : : optional < size_t > ( settings . filesystem_prefetches_limit )
: std : : nullopt ;
2023-08-29 13:38:51 +00:00
per_thread_tasks . clear ( ) ;
2023-06-12 12:29:16 +00:00
size_t total_tasks = 0 ;
2023-08-29 13:38:51 +00:00
/// Make a copy to modify ranges.
std : : vector < MarkRanges > per_part_ranges ;
per_part_ranges . reserve ( parts_ranges . size ( ) ) ;
for ( const auto & part_with_ranges : parts_ranges )
{
auto & part_ranges = per_part_ranges . emplace_back ( part_with_ranges . ranges ) ;
std : : sort ( part_ranges . begin ( ) , part_ranges . end ( ) ) ;
}
for ( size_t i = 0 , part_idx = 0 ; i < threads & & part_idx < per_part_infos . size ( ) ; + + i )
2023-02-07 17:50:31 +00:00
{
2023-06-19 11:48:52 +00:00
int64_t need_marks = min_marks_per_thread ;
2023-02-07 17:50:31 +00:00
/// Priority is given according to the prefetch number for each thread,
2023-05-26 13:55:30 +00:00
/// e.g. the first task of each thread has the same priority and is greater
/// than the second task of each thread, and so on.
2023-02-07 17:50:31 +00:00
/// Add 1 to query read priority because higher priority should be given to
/// reads from pool which are from reader.
2023-05-26 13:55:30 +00:00
Priority priority { reader_settings . read_settings . priority . value + 1 } ;
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
while ( need_marks > 0 & & part_idx < per_part_infos . size ( ) )
2023-02-07 17:50:31 +00:00
{
2023-08-29 13:38:51 +00:00
auto & part_stat = per_part_statistics [ part_idx ] ;
auto & part_ranges = per_part_ranges [ part_idx ] ;
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
if ( part_stat . sum_marks = = 0 )
2023-02-07 17:50:31 +00:00
{
+ + part_idx ;
continue ;
}
MarkRanges ranges_to_get_from_part ;
2023-08-29 13:38:51 +00:00
size_t marks_to_get_from_part = std : : min < size_t > ( need_marks , part_stat . sum_marks ) ;
2023-02-07 17:50:31 +00:00
/// Split by prefetch step even if !allow_prefetch below. Because it will allow
/// to make a better distribution of tasks which did not fill into memory limit
/// or prefetches limit through tasks stealing.
2023-08-29 13:38:51 +00:00
if ( part_stat . prefetch_step_marks )
2023-02-07 17:50:31 +00:00
{
2023-08-29 13:38:51 +00:00
marks_to_get_from_part = std : : min < size_t > ( marks_to_get_from_part , part_stat . prefetch_step_marks ) ;
2023-02-07 17:50:31 +00:00
}
2023-08-29 13:38:51 +00:00
if ( part_stat . sum_marks = = marks_to_get_from_part )
2023-02-07 17:50:31 +00:00
{
2023-08-29 13:38:51 +00:00
ranges_to_get_from_part = part_ranges ;
2023-02-07 17:50:31 +00:00
}
else
{
2023-08-29 13:38:51 +00:00
if ( part_stat . sum_marks < marks_to_get_from_part )
2023-02-07 17:50:31 +00:00
{
throw Exception (
ErrorCodes : : LOGICAL_ERROR ,
" Requested {} marks from part {}, but part has only {} marks " ,
2023-08-29 13:38:51 +00:00
marks_to_get_from_part , per_part_infos [ part_idx ] - > data_part - > name , part_stat . sum_marks ) ;
2023-02-07 17:50:31 +00:00
}
2023-08-29 13:38:51 +00:00
size_t num_marks_to_get = marks_to_get_from_part ;
while ( num_marks_to_get > 0 )
2023-02-07 17:50:31 +00:00
{
2023-08-29 13:38:51 +00:00
MarkRange & range = part_ranges . front ( ) ;
2023-02-07 17:50:31 +00:00
const size_t marks_in_range = range . end - range . begin ;
2023-08-29 13:38:51 +00:00
const size_t marks_to_get_from_range = std : : min ( marks_in_range , num_marks_to_get ) ;
num_marks_to_get - = marks_to_get_from_range ;
2023-02-07 17:50:31 +00:00
ranges_to_get_from_part . emplace_back ( range . begin , range . begin + marks_to_get_from_range ) ;
range . begin + = marks_to_get_from_range ;
if ( range . begin = = range . end )
{
2023-08-29 13:38:51 +00:00
part_ranges . pop_front ( ) ;
2023-02-07 17:50:31 +00:00
}
2023-08-29 13:38:51 +00:00
else if ( ! num_marks_to_get & & part_stat . prefetch_step_marks & & range . end - range . begin < part_stat . prefetch_step_marks )
2023-02-07 17:50:31 +00:00
{
2023-08-29 13:38:51 +00:00
/// We already have `num_marks_to_get` marks, but current mark range has
2023-02-07 17:50:31 +00:00
/// less than `prefetch_step_marks` marks, then add them too.
ranges_to_get_from_part . emplace_back ( range . begin , range . end ) ;
marks_to_get_from_part + = range . end - range . begin ;
2023-08-29 13:38:51 +00:00
part_ranges . pop_front ( ) ;
2023-02-07 17:50:31 +00:00
}
}
}
need_marks - = marks_to_get_from_part ;
sum_marks - = marks_to_get_from_part ;
2023-08-29 13:38:51 +00:00
part_stat . sum_marks - = marks_to_get_from_part ;
2023-02-07 17:50:31 +00:00
2023-04-20 13:42:26 +00:00
bool allow_prefetch = false ;
if ( allowed_memory_usage
2023-08-29 13:38:51 +00:00
& & ( ! allowed_prefetches_num . has_value ( ) | | allowed_prefetches_num . value ( ) > 0 ) )
2023-02-07 17:50:31 +00:00
{
2023-08-29 13:38:51 +00:00
allow_prefetch = part_stat . estimated_memory_usage_for_single_prefetch < = allowed_memory_usage
& & ( ! allowed_prefetches_num . has_value ( ) | | part_stat . required_readers_num < = allowed_prefetches_num . value ( ) ) ;
2023-04-20 13:42:26 +00:00
if ( allow_prefetch )
{
2023-08-29 13:38:51 +00:00
allowed_memory_usage - = part_stat . estimated_memory_usage_for_single_prefetch ;
2023-04-20 13:42:26 +00:00
if ( allowed_prefetches_num . has_value ( ) )
2023-08-29 13:38:51 +00:00
* allowed_prefetches_num - = part_stat . required_readers_num ;
2023-04-20 13:42:26 +00:00
}
2023-02-07 17:50:31 +00:00
}
2023-04-20 13:42:26 +00:00
2023-08-29 13:38:51 +00:00
auto thread_task = std : : make_unique < ThreadTask > ( per_part_infos [ part_idx ] , ranges_to_get_from_part , priority ) ;
2023-02-07 17:50:31 +00:00
if ( allow_prefetch )
2023-08-29 13:38:51 +00:00
prefetch_queue . emplace ( TaskHolder { thread_task . get ( ) , i } ) ;
2023-02-07 17:50:31 +00:00
2023-08-29 13:38:51 +00:00
per_thread_tasks [ i ] . push_back ( std : : move ( thread_task ) ) ;
+ + priority . value ;
2023-06-12 12:29:16 +00:00
+ + total_tasks ;
2023-02-07 17:50:31 +00:00
}
}
2023-08-29 13:38:51 +00:00
LOG_TEST ( log , " Result tasks {} for {} threads: {} " , total_tasks , threads , dumpTasks ( per_thread_tasks ) ) ;
2023-02-07 17:50:31 +00:00
}
2023-08-29 13:38:51 +00:00
std : : string MergeTreePrefetchedReadPool : : dumpTasks ( const TasksPerThread & tasks )
2023-02-07 17:50:31 +00:00
{
WriteBufferFromOwnString result ;
for ( const auto & [ thread_id , thread_tasks ] : tasks )
{
result < < " \t thread id: " < < toString ( thread_id ) < < " , tasks: " < < toString ( thread_tasks . size ( ) ) ;
if ( ! thread_tasks . empty ( ) )
{
size_t no = 0 ;
for ( const auto & task : thread_tasks )
{
result < < ' \t ' ;
result < < + + no < < " : " ;
2023-11-03 15:00:16 +00:00
result < < " reader future: " < < task - > isValidReadersFuture ( ) < < " , " ;
2023-08-29 13:38:51 +00:00
result < < " part: " < < task - > read_info - > data_part - > name < < " , " ;
result < < " ranges: " < < toString ( task - > ranges ) ;
2023-02-07 17:50:31 +00:00
}
}
}
return result . str ( ) ;
}
bool MergeTreePrefetchedReadPool : : checkReadMethodAllowed ( LocalFSReadMethod method )
{
return method = = LocalFSReadMethod : : pread_threadpool | | method = = LocalFSReadMethod : : pread_fake_async ;
}
bool MergeTreePrefetchedReadPool : : checkReadMethodAllowed ( RemoteFSReadMethod method )
{
return method = = RemoteFSReadMethod : : threadpool ;
}
}