ClickHouse/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h

221 lines
8.5 KiB
C++
Raw Normal View History

#pragma once
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/SelectQueryInfo.h>
2022-07-17 18:41:17 +00:00
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/RequestResponse.h>
2022-11-18 20:09:20 +00:00
#include <Processors/Chunk.h>
2021-07-17 18:06:46 +00:00
namespace DB
{
2019-10-10 16:30:30 +00:00
class IMergeTreeReader;
class UncompressedCache;
class MarkCache;
2021-06-29 11:53:34 +00:00
struct PrewhereExprInfo;
2022-11-18 20:09:20 +00:00
struct ChunkAndProgress
{
Chunk chunk;
size_t num_read_rows = 0;
size_t num_read_bytes = 0;
};
struct ParallelReadingExtension
{
MergeTreeReadTaskCallback callback;
size_t count_participating_replicas{0};
size_t number_of_current_replica{0};
/// This is needed to estimate the number of bytes
/// between a pair of marks to perform one request
/// over the network for a 1Gb of data.
Names colums_to_read;
};
2022-11-18 20:09:20 +00:00
/// Base class for MergeTreeThreadSelectAlgorithm and MergeTreeSelectAlgorithm
class IMergeTreeSelectAlgorithm
{
public:
2022-11-18 20:09:20 +00:00
IMergeTreeSelectAlgorithm(
Block header,
2019-08-03 11:02:40 +00:00
const MergeTreeData & storage_,
const StorageSnapshotPtr & storage_snapshot_,
2019-08-03 11:02:40 +00:00
const PrewhereInfoPtr & prewhere_info_,
2021-06-25 14:49:28 +00:00
ExpressionActionsSettings actions_settings,
2019-08-03 11:02:40 +00:00
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
const MergeTreeReaderSettings & reader_settings_,
2019-08-03 11:02:40 +00:00
bool use_uncompressed_cache_,
const Names & virt_column_names_ = {},
2022-11-14 05:32:18 +00:00
std::optional<ParallelReadingExtension> extension_ = {});
2022-11-18 20:09:20 +00:00
virtual ~IMergeTreeSelectAlgorithm();
2021-04-27 08:15:59 +00:00
static Block transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns);
2021-03-30 10:25:26 +00:00
static std::unique_ptr<MergeTreeBlockSizePredictor> getSizePredictor(
const MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns,
const Block & sample_block);
2022-11-18 20:09:20 +00:00
Block getHeader() const { return result_header; }
ChunkAndProgress read();
void cancel() { is_cancelled = true; }
const MergeTreeReaderSettings & getSettings() const { return reader_settings; }
virtual std::string getName() const = 0;
protected:
2022-07-17 18:41:17 +00:00
/// This struct allow to return block with no columns but with non-zero number of rows similar to Chunk
2022-11-18 20:09:20 +00:00
struct BlockAndProgress
2022-07-17 18:41:17 +00:00
{
Block block;
size_t row_count = 0;
2022-11-15 21:23:18 +00:00
size_t num_read_rows = 0;
size_t num_read_bytes = 0;
};
/// Creates new this->task and return a flag whether it was successful or not
virtual bool getNewTaskImpl() = 0;
/// Creates new readers for a task it is needed. These methods are separate, because
/// in case of parallel reading from replicas the whole task could be denied by a coodinator
/// or it could modified somehow.
virtual void finalizeNewTask() = 0;
size_t estimateMaxBatchSizeForHugeRanges();
virtual bool canUseConsistentHashingForParallelReading() { return false; }
/// Closes readers and unlock part locks
virtual void finish() = 0;
2022-11-18 20:09:20 +00:00
virtual BlockAndProgress readFromPart();
2022-11-18 20:09:20 +00:00
BlockAndProgress readFromPartImpl();
2022-07-17 18:41:17 +00:00
/// Used for filling header with no rows as well as block with data
2021-04-27 08:15:59 +00:00
static void
2022-07-17 18:41:17 +00:00
injectVirtualColumns(Block & block, size_t row_count, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns);
2022-09-05 16:55:00 +00:00
static std::unique_ptr<PrewhereExprInfo> getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings);
static void initializeRangeReadersImpl(
MergeTreeRangeReader & range_reader,
std::deque<MergeTreeRangeReader> & pre_range_readers,
PrewhereInfoPtr prewhere_info,
const PrewhereExprInfo * prewhere_actions,
IMergeTreeReader * reader,
bool has_lightweight_delete,
const MergeTreeReaderSettings & reader_settings,
const std::vector<std::unique_ptr<IMergeTreeReader>> & pre_reader_for_step,
const PrewhereExprStep & lightweight_delete_filter_step,
const Names & non_const_virtual_column_names);
2022-07-17 18:41:17 +00:00
/// Sets up data readers for each step of prewhere and where
void initializeMergeTreeReadersForPart(
MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns, const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges, const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
2019-07-19 14:56:00 +00:00
2022-07-17 18:41:17 +00:00
/// Sets up range readers corresponding to data readers
void initializeRangeReaders(MergeTreeReadTask & task);
2018-10-17 03:13:00 +00:00
const MergeTreeData & storage;
StorageSnapshotPtr storage_snapshot;
2022-07-17 18:41:17 +00:00
/// This step is added when the part has lightweight delete mask
2022-07-25 14:15:15 +00:00
const PrewhereExprStep lightweight_delete_filter_step { nullptr, LightweightDeleteDescription::FILTER_COLUMN.name, true, true };
PrewhereInfoPtr prewhere_info;
2021-06-29 11:53:34 +00:00
std::unique_ptr<PrewhereExprInfo> prewhere_actions;
UInt64 max_block_size_rows;
UInt64 preferred_block_size_bytes;
UInt64 preferred_max_column_in_block_size_bytes;
MergeTreeReaderSettings reader_settings;
bool use_uncompressed_cache;
Names virt_column_names;
2021-04-27 08:15:59 +00:00
/// These columns will be filled by the merge tree range reader
Names non_const_virtual_column_names;
2021-04-27 08:15:59 +00:00
DataTypePtr partition_value_type;
2019-10-10 14:16:15 +00:00
/// This header is used for chunks from readFromPart().
2022-11-18 20:09:20 +00:00
Block header_without_const_virtual_columns;
/// A result of getHeader(). A chunk which this header is returned from read().
Block result_header;
std::shared_ptr<UncompressedCache> owned_uncompressed_cache;
std::shared_ptr<MarkCache> owned_mark_cache;
2019-10-10 16:30:30 +00:00
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
MergeTreeReaderPtr reader;
std::vector<MergeTreeReaderPtr> pre_reader_for_step;
MergeTreeReadTaskPtr task;
std::optional<ParallelReadingExtension> extension;
bool no_more_tasks{false};
std::deque<MergeTreeReadTaskPtr> delayed_tasks;
std::deque<MarkRanges> buffered_ranges;
private:
Poco::Logger * log = &Poco::Logger::get("MergeTreeBaseSelectProcessor");
2022-11-18 20:09:20 +00:00
std::atomic<bool> is_cancelled{false};
enum class Status
{
Accepted,
Cancelled,
Denied
};
/// Calls getNewTaskImpl() to get new task, then performs a request to a coordinator
/// The coordinator may modify the set of ranges to read from a part or could
/// deny the whole request. In the latter case it creates new task and retries.
/// Then it calls finalizeNewTask() to create readers for a task if it is needed.
bool getNewTask();
bool getNewTaskParallelReading();
/// After PK analysis the range of marks could be extremely big
/// We divide this range to a set smaller consecutive ranges
/// Then, depending on the type of reading (concurrent, in order or in reverse order)
/// we can calculate a consistent hash function with the number of buckets equal to
/// the number of replicas involved. And after that we can throw away some ranges with
/// hash not equals to the number of the current replica.
bool getTaskFromBuffer();
/// But we can't throw that ranges completely, because if we have different sets of parts
/// on replicas (have merged part on one, but not on another), then such a situation is possible
/// - Coordinator allows to read from a big merged part, but this part is present only on one replica.
/// And that replica calculates consistent hash and throws away some ranges
/// - Coordinator denies other replicas to read from another parts (source parts for that big one)
/// At the end, the result of the query is wrong, because we didn't read all the data.
/// So, we have to remember parts and mark ranges with hash different then current replica number.
/// An we have to ask the coordinator about its permission to read from that "delayed" parts.
/// It won't work with reading in order or reading in reverse order, because we can possibly seek back.
bool getDelayedTasks();
2022-11-14 05:37:19 +00:00
/// It will form a request to coordinator and
/// then reinitialize the mark ranges of this->task object
Status performRequestToCoordinator(MarkRanges requested_ranges, bool delayed);
void splitCurrentTaskRangesAndFillBuffer();
2022-11-18 20:09:20 +00:00
static Block applyPrewhereActions(Block block, const PrewhereInfoPtr & prewhere_info);
};
2022-11-18 20:09:20 +00:00
using MergeTreeSelectAlgorithmPtr = std::unique_ptr<IMergeTreeSelectAlgorithm>;
}