#pragma once #include #include #include #include #include #include namespace DB { class IMergeTreeReader; class UncompressedCache; class MarkCache; struct PrewhereExprInfo; struct ChunkAndProgress { Chunk chunk; size_t num_read_rows = 0; size_t num_read_bytes = 0; }; struct ParallelReadingExtension { MergeTreeReadTaskCallback callback; size_t count_participating_replicas{0}; size_t number_of_current_replica{0}; /// This is needed to estimate the number of bytes /// between a pair of marks to perform one request /// over the network for a 1Gb of data. Names colums_to_read; }; /// Base class for MergeTreeThreadSelectAlgorithm and MergeTreeSelectAlgorithm class IMergeTreeSelectAlgorithm { public: IMergeTreeSelectAlgorithm( Block header, const MergeTreeData & storage_, const StorageSnapshotPtr & storage_snapshot_, const PrewhereInfoPtr & prewhere_info_, ExpressionActionsSettings actions_settings, UInt64 max_block_size_rows_, UInt64 preferred_block_size_bytes_, UInt64 preferred_max_column_in_block_size_bytes_, const MergeTreeReaderSettings & reader_settings_, bool use_uncompressed_cache_, const Names & virt_column_names_ = {}, std::optional extension_ = {}); virtual ~IMergeTreeSelectAlgorithm(); static Block transformHeader( Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns); static std::unique_ptr getSizePredictor( const MergeTreeData::DataPartPtr & data_part, const MergeTreeReadTaskColumns & task_columns, const Block & sample_block); Block getHeader() const { return result_header; } ChunkAndProgress read(); void cancel() { is_cancelled = true; } const MergeTreeReaderSettings & getSettings() const { return reader_settings; } virtual std::string getName() const = 0; protected: /// This struct allow to return block with no columns but with non-zero number of rows similar to Chunk struct BlockAndProgress { Block block; size_t row_count = 0; size_t num_read_rows = 0; size_t num_read_bytes = 0; }; /// Creates new this->task and return a flag whether it was successful or not virtual bool getNewTaskImpl() = 0; /// Creates new readers for a task it is needed. These methods are separate, because /// in case of parallel reading from replicas the whole task could be denied by a coodinator /// or it could modified somehow. virtual void finalizeNewTask() = 0; size_t estimateMaxBatchSizeForHugeRanges(); virtual bool canUseConsistentHashingForParallelReading() { return false; } /// Closes readers and unlock part locks virtual void finish() = 0; virtual BlockAndProgress readFromPart(); BlockAndProgress readFromPartImpl(); /// Used for filling header with no rows as well as block with data static void injectVirtualColumns(Block & block, size_t row_count, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns); static std::unique_ptr getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings); static void initializeRangeReadersImpl( MergeTreeRangeReader & range_reader, std::deque & pre_range_readers, PrewhereInfoPtr prewhere_info, const PrewhereExprInfo * prewhere_actions, IMergeTreeReader * reader, bool has_lightweight_delete, const MergeTreeReaderSettings & reader_settings, const std::vector> & pre_reader_for_step, const PrewhereExprStep & lightweight_delete_filter_step, const Names & non_const_virtual_column_names); /// Sets up data readers for each step of prewhere and where void initializeMergeTreeReadersForPart( MergeTreeData::DataPartPtr & data_part, const MergeTreeReadTaskColumns & task_columns, const StorageMetadataPtr & metadata_snapshot, const MarkRanges & mark_ranges, const IMergeTreeReader::ValueSizeMap & value_size_map, const ReadBufferFromFileBase::ProfileCallback & profile_callback); /// Sets up range readers corresponding to data readers void initializeRangeReaders(MergeTreeReadTask & task); const MergeTreeData & storage; StorageSnapshotPtr storage_snapshot; /// This step is added when the part has lightweight delete mask const PrewhereExprStep lightweight_delete_filter_step { nullptr, LightweightDeleteDescription::FILTER_COLUMN.name, true, true }; PrewhereInfoPtr prewhere_info; std::unique_ptr prewhere_actions; UInt64 max_block_size_rows; UInt64 preferred_block_size_bytes; UInt64 preferred_max_column_in_block_size_bytes; MergeTreeReaderSettings reader_settings; bool use_uncompressed_cache; Names virt_column_names; /// These columns will be filled by the merge tree range reader Names non_const_virtual_column_names; DataTypePtr partition_value_type; /// This header is used for chunks from readFromPart(). Block header_without_const_virtual_columns; /// A result of getHeader(). A chunk which this header is returned from read(). Block result_header; std::shared_ptr owned_uncompressed_cache; std::shared_ptr owned_mark_cache; using MergeTreeReaderPtr = std::unique_ptr; MergeTreeReaderPtr reader; std::vector pre_reader_for_step; MergeTreeReadTaskPtr task; std::optional extension; bool no_more_tasks{false}; std::deque delayed_tasks; std::deque buffered_ranges; /// This setting is used in base algorithm only to additionally limit the number of granules to read. /// It is changed in ctor of MergeTreeThreadSelectAlgorithm. /// /// The reason why we have it here is because MergeTreeReadPool takes the full task /// ignoring min_marks_to_read setting in case of remote disk (see MergeTreeReadPool::getTask). /// In this case, we won't limit the number of rows to read based on adaptive granularity settings. /// /// Big reading tasks are better for remote disk and prefetches. /// So, for now it's easier to limit max_rows_to_read. /// Somebody need to refactor this later. size_t min_marks_to_read = 0; private: Poco::Logger * log = &Poco::Logger::get("MergeTreeBaseSelectProcessor"); std::atomic is_cancelled{false}; enum class Status { Accepted, Cancelled, Denied }; /// Calls getNewTaskImpl() to get new task, then performs a request to a coordinator /// The coordinator may modify the set of ranges to read from a part or could /// deny the whole request. In the latter case it creates new task and retries. /// Then it calls finalizeNewTask() to create readers for a task if it is needed. bool getNewTask(); bool getNewTaskParallelReading(); /// After PK analysis the range of marks could be extremely big /// We divide this range to a set smaller consecutive ranges /// Then, depending on the type of reading (concurrent, in order or in reverse order) /// we can calculate a consistent hash function with the number of buckets equal to /// the number of replicas involved. And after that we can throw away some ranges with /// hash not equals to the number of the current replica. bool getTaskFromBuffer(); /// But we can't throw that ranges completely, because if we have different sets of parts /// on replicas (have merged part on one, but not on another), then such a situation is possible /// - Coordinator allows to read from a big merged part, but this part is present only on one replica. /// And that replica calculates consistent hash and throws away some ranges /// - Coordinator denies other replicas to read from another parts (source parts for that big one) /// At the end, the result of the query is wrong, because we didn't read all the data. /// So, we have to remember parts and mark ranges with hash different then current replica number. /// An we have to ask the coordinator about its permission to read from that "delayed" parts. /// It won't work with reading in order or reading in reverse order, because we can possibly seek back. bool getDelayedTasks(); /// It will form a request to coordinator and /// then reinitialize the mark ranges of this->task object Status performRequestToCoordinator(MarkRanges requested_ranges, bool delayed); void splitCurrentTaskRangesAndFillBuffer(); static Block applyPrewhereActions(Block block, const PrewhereInfoPtr & prewhere_info); }; using MergeTreeSelectAlgorithmPtr = std::unique_ptr; }