Delay reading from StorageFileLog by creating a query plan step

This commit is contained in:
János Benjamin Antal 2024-01-15 11:36:00 +00:00
parent 3e1d7bf685
commit b04e8b11ae
5 changed files with 158 additions and 80 deletions

View File

@ -0,0 +1,43 @@
#include <Processors/QueryPlan/ReadFromStreamLikeEngine.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace DB
{
namespace ErrorCodes
{
extern const int QUERY_NOT_ALLOWED;
}
ReadFromStreamLikeEngine::ReadFromStreamLikeEngine(
const Names & column_names_,
const StorageSnapshotPtr & storage_snapshot_,
std::shared_ptr<const StorageLimitsList> storage_limits_,
ContextPtr context_)
: ISourceStep{DataStream{.header = storage_snapshot_->getSampleBlockForColumns(column_names_)}}
, storage_limits{std::move(storage_limits_)}
, context{context_}
{
}
void ReadFromStreamLikeEngine::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (!context->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(
ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");
auto pipe = makePipe();
/// Add storage limits.
for (const auto & processor : pipe.getProcessors())
processor->setStorageLimits(storage_limits);
/// Add to processors to get processor info through explain pipeline statement.
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
pipeline.init(std::move(pipe));
}
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <Processors/QueryPlan/ISourceStep.h>
#include <Storages/IStorage.h>
#include <Storages/StorageSnapshot.h>
namespace DB
{
class ReadFromStreamLikeEngine : public ISourceStep
{
public:
ReadFromStreamLikeEngine(
const Names & column_names_,
const StorageSnapshotPtr & storage_snapshot_,
std::shared_ptr<const StorageLimitsList> storage_limits_,
ContextPtr context_);
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*settings*/) final;
protected:
virtual Pipe makePipe() = 0;
std::shared_ptr<const StorageLimitsList> storage_limits;
ContextPtr context;
};
}

View File

@ -1,7 +1,7 @@
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <Disks/StoragePolicy.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
@ -13,9 +13,12 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromStreamLikeEngine.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/FileLog/FileLogSource.h>
#include <Storages/FileLog/StorageFileLog.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/checkAndGetLiteralArgument.h>
@ -50,6 +53,76 @@ namespace
static constexpr auto TMP_SUFFIX = ".tmp";
class ReadFromStorageFileLogStep final : public ReadFromStreamLikeEngine
{
public:
ReadFromStorageFileLogStep(
const Names & column_names_,
StoragePtr storage_,
const StorageSnapshotPtr & storage_snapshot_,
SelectQueryInfo & query_info,
ContextPtr context_)
: ReadFromStreamLikeEngine{column_names_, storage_snapshot_, query_info.storage_limits, context_}
, column_names{column_names_}
, storage{storage_}
, storage_snapshot{storage_snapshot_}
{
}
String getName() const override { return "ReadFromStorageFileLog"; }
private:
Pipe makePipe() final
{
auto & file_log = storage->as<StorageFileLog &>();
if (file_log.mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageFileLog with attached materialized views");
std::lock_guard lock(file_log.file_infos_mutex);
if (file_log.running_streams)
throw Exception(ErrorCodes::CANNOT_SELECT, "Another select query is running on this table, need to wait it finish.");
file_log.updateFileInfos();
/// No files to parse
if (file_log.file_infos.file_names.empty())
{
LOG_WARNING(file_log.log, "There is a idle table named {}, no files need to parse.", getName());
return Pipe{};
}
auto modified_context = Context::createCopy(context);
auto max_streams_number = std::min<UInt64>(file_log.filelog_settings->max_threads, file_log.file_infos.file_names.size());
/// Each stream responsible for closing it's files and store meta
file_log.openFilesAndSetPos();
Pipes pipes;
pipes.reserve(max_streams_number);
for (size_t stream_number = 0; stream_number < max_streams_number; ++stream_number)
{
pipes.emplace_back(std::make_shared<FileLogSource>(
file_log,
storage_snapshot,
modified_context,
column_names,
file_log.getMaxBlockSize(),
file_log.getPollTimeoutMillisecond(),
stream_number,
max_streams_number,
file_log.filelog_settings->handle_error_mode));
}
return Pipe::unitePipes(std::move(pipes));
}
const Names column_names;
StoragePtr storage;
StorageSnapshotPtr storage_snapshot;
};
StorageFileLog::StorageFileLog(
const StorageID & table_id_,
ContextPtr context_,
@ -296,62 +369,19 @@ UInt64 StorageFileLog::getInode(const String & file_name)
return file_stat.st_ino;
}
Pipe StorageFileLog::read(
void StorageFileLog::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /* query_info */,
ContextPtr local_context,
SelectQueryInfo & query_info,
ContextPtr query_context,
QueryProcessingStage::Enum /* processed_stage */,
size_t /* max_block_size */,
size_t /* num_streams */)
{
/// If there are MVs depended on this table, we just forbid reading
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED,
"Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");
if (mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageFileLog with attached materialized views");
std::lock_guard lock(file_infos_mutex);
if (running_streams)
{
throw Exception(ErrorCodes::CANNOT_SELECT, "Another select query is running on this table, need to wait it finish.");
}
updateFileInfos();
/// No files to parse
if (file_infos.file_names.empty())
{
LOG_WARNING(log, "There is a idle table named {}, no files need to parse.", getName());
return Pipe{};
}
auto modified_context = Context::createCopy(local_context);
auto max_streams_number = std::min<UInt64>(filelog_settings->max_threads, file_infos.file_names.size());
/// Each stream responsible for closing it's files and store meta
openFilesAndSetPos();
Pipes pipes;
pipes.reserve(max_streams_number);
for (size_t stream_number = 0; stream_number < max_streams_number; ++stream_number)
{
pipes.emplace_back(std::make_shared<FileLogSource>(
*this,
storage_snapshot,
modified_context,
column_names,
getMaxBlockSize(),
getPollTimeoutMillisecond(),
stream_number,
max_streams_number,
filelog_settings->handle_error_mode));
}
return Pipe::unitePipes(std::move(pipes));
query_plan.addStep(std::make_unique<ReadFromStorageFileLogStep>(
column_names, shared_from_this(), storage_snapshot, query_info, std::move(query_context)));
}
void StorageFileLog::increaseStreams()

View File

@ -49,7 +49,8 @@ public:
void startup() override;
void shutdown(bool is_drop) override;
Pipe read(
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -133,6 +134,8 @@ public:
const auto & getFileLogSettings() const { return filelog_settings; }
private:
friend class ReadFromStorageFileLogStep;
std::unique_ptr<FileLogSettings> filelog_settings;
const String path;

View File

@ -43,6 +43,7 @@
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Processors/QueryPlan/ReadFromStreamLikeEngine.h>
#include <Common/logger_useful.h>
#include <Common/quoteString.h>
#include <Common/setThreadName.h>
@ -178,7 +179,7 @@ struct StorageKafkaInterceptors
}
};
class ReadFromStorageKafkaStep final : public ISourceStep
class ReadFromStorageKafkaStep final : public ReadFromStreamLikeEngine
{
public:
ReadFromStorageKafkaStep(
@ -187,44 +188,22 @@ public:
const StorageSnapshotPtr & storage_snapshot_,
SelectQueryInfo & query_info,
ContextPtr context_)
: ISourceStep{DataStream{.header = storage_snapshot_->getSampleBlockForColumns(column_names_)}}
: ReadFromStreamLikeEngine{column_names_, storage_snapshot_, query_info.storage_limits, context_}
, column_names{column_names_}
, storage{storage_}
, storage_snapshot{storage_snapshot_}
, storage_limits{query_info.storage_limits}
, context{context_}
{
}
String getName() const override { return "ReadFromStorageKafka"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override
{
auto pipe = makePipe();
/// Add storage limits.
for (const auto & processor : pipe.getProcessors())
processor->setStorageLimits(storage_limits);
/// Add to processors to get processor info through explain pipeline statement.
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
pipeline.init(std::move(pipe));
}
private:
Pipe makePipe()
Pipe makePipe() final
{
auto & kafka_storage = storage->as<StorageKafka &>();
if (kafka_storage.shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Table is detached");
if (!context->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(
ErrorCodes::QUERY_NOT_ALLOWED,
"Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");
if (kafka_storage.mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka with attached materialized views");
@ -255,13 +234,10 @@ private:
LOG_DEBUG(kafka_storage.log, "Starting reading {} streams", pipes.size());
return Pipe::unitePipes(std::move(pipes));
}
ActionsDAGPtr buildFilterDAG();
const Names column_names;
StoragePtr storage;
StorageSnapshotPtr storage_snapshot;
std::shared_ptr<const StorageLimitsList> storage_limits;
ContextPtr context;
};
namespace