ClickHouse/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h

220 lines
8.3 KiB
C++
Raw Normal View History

#pragma once
2017-12-01 13:32:37 +00:00
#include <Common/EventFD.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/SelectQueryInfo.h>
2022-07-17 18:41:17 +00:00
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/RequestResponse.h>
2022-05-20 19:49:31 +00:00
#include <Processors/ISource.h>
2021-07-17 18:06:46 +00:00
namespace DB
{
2019-10-10 16:30:30 +00:00
class IMergeTreeReader;
class UncompressedCache;
class MarkCache;
2021-06-29 11:53:34 +00:00
struct PrewhereExprInfo;
struct ParallelReadingExtension
{
MergeTreeReadTaskCallback callback;
size_t count_participating_replicas{0};
size_t number_of_current_replica{0};
/// This is needed to estimate the number of bytes
/// between a pair of marks to perform one request
/// over the network for a 1Gb of data.
Names colums_to_read;
};
2019-10-10 14:16:15 +00:00
/// Base class for MergeTreeThreadSelectProcessor and MergeTreeSelectProcessor
2022-05-20 19:49:31 +00:00
class MergeTreeBaseSelectProcessor : public ISource
{
public:
2019-10-01 16:50:08 +00:00
MergeTreeBaseSelectProcessor(
Block header,
2019-08-03 11:02:40 +00:00
const MergeTreeData & storage_,
const StorageSnapshotPtr & storage_snapshot_,
2019-08-03 11:02:40 +00:00
const PrewhereInfoPtr & prewhere_info_,
2021-06-25 14:49:28 +00:00
ExpressionActionsSettings actions_settings,
2019-08-03 11:02:40 +00:00
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
const MergeTreeReaderSettings & reader_settings_,
2019-08-03 11:02:40 +00:00
bool use_uncompressed_cache_,
const Names & virt_column_names_ = {},
std::optional<ParallelReadingExtension> extension = {});
2019-10-01 16:50:08 +00:00
~MergeTreeBaseSelectProcessor() override;
2021-04-27 08:15:59 +00:00
static Block transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns);
2021-03-30 10:25:26 +00:00
static std::unique_ptr<MergeTreeBlockSizePredictor> getSizePredictor(
const MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns,
const Block & sample_block);
Status prepare() override;
protected:
2022-07-17 18:41:17 +00:00
/// This struct allow to return block with no columns but with non-zero number of rows similar to Chunk
struct BlockAndRowCount
{
Block block;
size_t row_count = 0;
2022-11-15 21:23:18 +00:00
size_t num_read_rows = 0;
size_t num_read_bytes = 0;
};
struct ChunkWithProgress
{
Chunk chunk;
size_t num_read_rows = 0;
size_t num_read_bytes = 0;
2022-07-17 18:41:17 +00:00
};
2022-11-16 14:10:56 +00:00
std::optional<Chunk> reportProgress(ChunkWithProgress chunk);
std::optional<Chunk> tryGenerate() final;
2022-11-16 14:10:56 +00:00
ChunkWithProgress read();
int schedule() override;
/// Creates new this->task and return a flag whether it was successful or not
virtual bool getNewTaskImpl() = 0;
/// Creates new readers for a task it is needed. These methods are separate, because
/// in case of parallel reading from replicas the whole task could be denied by a coodinator
/// or it could modified somehow.
virtual void finalizeNewTask() = 0;
size_t estimateMaxBatchSizeForHugeRanges();
virtual bool canUseConsistentHashingForParallelReading() { return false; }
/// Closes readers and unlock part locks
virtual void finish() = 0;
2022-07-17 18:41:17 +00:00
virtual BlockAndRowCount readFromPart();
2022-07-17 18:41:17 +00:00
BlockAndRowCount readFromPartImpl();
2022-07-17 18:41:17 +00:00
/// Used for filling header with no rows as well as block with data
2021-04-27 08:15:59 +00:00
static void
2022-07-17 18:41:17 +00:00
injectVirtualColumns(Block & block, size_t row_count, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns);
2022-09-05 16:55:00 +00:00
static std::unique_ptr<PrewhereExprInfo> getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings);
static void initializeRangeReadersImpl(
MergeTreeRangeReader & range_reader,
std::deque<MergeTreeRangeReader> & pre_range_readers,
PrewhereInfoPtr prewhere_info,
const PrewhereExprInfo * prewhere_actions,
IMergeTreeReader * reader,
bool has_lightweight_delete,
const MergeTreeReaderSettings & reader_settings,
const std::vector<std::unique_ptr<IMergeTreeReader>> & pre_reader_for_step,
const PrewhereExprStep & lightweight_delete_filter_step,
const Names & non_const_virtual_column_names);
2022-07-17 18:41:17 +00:00
/// Sets up data readers for each step of prewhere and where
void initializeMergeTreeReadersForPart(
MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns, const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges, const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
2019-07-19 14:56:00 +00:00
2022-07-17 18:41:17 +00:00
/// Sets up range readers corresponding to data readers
void initializeRangeReaders(MergeTreeReadTask & task);
2018-10-17 03:13:00 +00:00
const MergeTreeData & storage;
StorageSnapshotPtr storage_snapshot;
2022-07-17 18:41:17 +00:00
/// This step is added when the part has lightweight delete mask
2022-07-25 14:15:15 +00:00
const PrewhereExprStep lightweight_delete_filter_step { nullptr, LightweightDeleteDescription::FILTER_COLUMN.name, true, true };
PrewhereInfoPtr prewhere_info;
2021-06-29 11:53:34 +00:00
std::unique_ptr<PrewhereExprInfo> prewhere_actions;
UInt64 max_block_size_rows;
UInt64 preferred_block_size_bytes;
UInt64 preferred_max_column_in_block_size_bytes;
MergeTreeReaderSettings reader_settings;
bool use_uncompressed_cache;
Names virt_column_names;
2021-04-27 08:15:59 +00:00
/// These columns will be filled by the merge tree range reader
Names non_const_virtual_column_names;
2021-04-27 08:15:59 +00:00
DataTypePtr partition_value_type;
2019-10-10 14:16:15 +00:00
/// This header is used for chunks from readFromPart().
Block header_without_virtual_columns;
std::shared_ptr<UncompressedCache> owned_uncompressed_cache;
std::shared_ptr<MarkCache> owned_mark_cache;
2019-10-10 16:30:30 +00:00
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
MergeTreeReaderPtr reader;
std::vector<MergeTreeReaderPtr> pre_reader_for_step;
MergeTreeReadTaskPtr task;
std::optional<ParallelReadingExtension> extension;
bool no_more_tasks{false};
std::deque<MergeTreeReadTaskPtr> delayed_tasks;
std::deque<MarkRanges> buffered_ranges;
private:
Poco::Logger * log = &Poco::Logger::get("MergeTreeBaseSelectProcessor");
struct AsyncReadingState;
std::unique_ptr<AsyncReadingState> async_reading_state;
enum class Status
{
Accepted,
Cancelled,
Denied
};
/// Calls getNewTaskImpl() to get new task, then performs a request to a coordinator
/// The coordinator may modify the set of ranges to read from a part or could
/// deny the whole request. In the latter case it creates new task and retries.
/// Then it calls finalizeNewTask() to create readers for a task if it is needed.
bool getNewTask();
bool getNewTaskParallelReading();
/// After PK analysis the range of marks could be extremely big
/// We divide this range to a set smaller consecutive ranges
/// Then, depending on the type of reading (concurrent, in order or in reverse order)
/// we can calculate a consistent hash function with the number of buckets equal to
/// the number of replicas involved. And after that we can throw away some ranges with
/// hash not equals to the number of the current replica.
bool getTaskFromBuffer();
/// But we can't throw that ranges completely, because if we have different sets of parts
/// on replicas (have merged part on one, but not on another), then such a situation is possible
/// - Coordinator allows to read from a big merged part, but this part is present only on one replica.
/// And that replica calculates consistent hash and throws away some ranges
/// - Coordinator denies other replicas to read from another parts (source parts for that big one)
/// At the end, the result of the query is wrong, because we didn't read all the data.
/// So, we have to remember parts and mark ranges with hash different then current replica number.
/// An we have to ask the coordinator about its permission to read from that "delayed" parts.
/// It won't work with reading in order or reading in reverse order, because we can possibly seek back.
bool getDelayedTasks();
/// It will form a request a request to coordinator and
/// then reinitialize the mark ranges of this->task object
Status performRequestToCoordinator(MarkRanges requested_ranges, bool delayed);
void splitCurrentTaskRangesAndFillBuffer();
};
}