Processors support for StorageDistributed reading.

This commit is contained in:
Nikolai Kochetov 2020-01-31 11:54:57 +03:00
parent 240f1e3e96
commit 0157de021a
10 changed files with 66 additions and 22 deletions

View File

@ -12,6 +12,9 @@ class Context;
class Cluster; class Cluster;
class Throttler; class Throttler;
class Pipe;
using Pipes = std::vector<Pipe>;
namespace ClusterProxy namespace ClusterProxy
{ {
@ -26,7 +29,7 @@ public:
const Cluster::ShardInfo & shard_info, const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast, const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler, const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res) = 0; Pipes & res) = 0;
}; };
} }

View File

@ -12,7 +12,10 @@
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <DataStreams/ConvertingBlockInputStream.h> #include <DataStreams/ConvertingBlockInputStream.h>
#include <Processors/Pipe.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
namespace ProfileEvents namespace ProfileEvents
{ {
@ -67,12 +70,12 @@ SelectStreamFactory::SelectStreamFactory(
namespace namespace
{ {
BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage) Pipe createLocalStream(const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage)
{ {
checkStackSize(); checkStackSize();
InterpreterSelectQuery interpreter{query_ast, context, SelectQueryOptions(processed_stage)}; InterpreterSelectQuery interpreter{query_ast, context, SelectQueryOptions(processed_stage)};
BlockInputStreamPtr stream = interpreter.execute().in; Pipe pipe = interpreter.executeWithProcessors().getPipe();
/** Materialization is needed, since from remote servers the constants come materialized. /** Materialization is needed, since from remote servers the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads, * If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
@ -84,7 +87,12 @@ BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Block & he
*/ */
/// return std::make_shared<MaterializingBlockInputStream>(stream); /// return std::make_shared<MaterializingBlockInputStream>(stream);
return std::make_shared<ConvertingBlockInputStream>(context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name); auto converting = std::make_shared<ConvertingTransform>(
pipe.getHeader(), header,ConvertingTransform::MatchColumnsMode::Name, context);
pipe.addSimpleTransform(std::move(converting));
return pipe;
} }
static String formattedAST(const ASTPtr & ast) static String formattedAST(const ASTPtr & ast)
@ -102,7 +110,7 @@ void SelectStreamFactory::createForShard(
const Cluster::ShardInfo & shard_info, const Cluster::ShardInfo & shard_info,
const String &, const ASTPtr & query_ast, const String &, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler, const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res) Pipes & res)
{ {
auto modified_query_ast = query_ast->clone(); auto modified_query_ast = query_ast->clone();
if (has_virtual_shard_num_column) if (has_virtual_shard_num_column)
@ -122,7 +130,8 @@ void SelectStreamFactory::createForShard(
stream->setPoolMode(PoolMode::GET_MANY); stream->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr) if (!table_func_ptr)
stream->setMainTable(main_table); stream->setMainTable(main_table);
res.emplace_back(std::move(stream));
res.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
}; };
const auto & settings = context.getSettingsRef(); const auto & settings = context.getSettingsRef();
@ -250,7 +259,7 @@ void SelectStreamFactory::createForShard(
} }
if (try_results.empty() || local_delay < max_remote_delay) if (try_results.empty() || local_delay < max_remote_delay)
return createLocalStream(modified_query_ast, header, context, stage); return std::make_shared<TreeExecutorBlockInputStream>(createLocalStream(modified_query_ast, header, context, stage));
else else
{ {
std::vector<IConnectionPool::Entry> connections; std::vector<IConnectionPool::Entry> connections;
@ -263,7 +272,7 @@ void SelectStreamFactory::createForShard(
} }
}; };
res.emplace_back(std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream)); res.emplace_back(std::make_shared<SourceFromInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream));
} }
else else
emplace_remote_stream(); emplace_remote_stream();

View File

@ -35,7 +35,7 @@ public:
const Cluster::ShardInfo & shard_info, const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast, const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler, const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res) override; Pipes & res) override;
private: private:
const Block header; const Block header;

View File

@ -6,6 +6,7 @@
#include <Interpreters/IInterpreter.h> #include <Interpreters/IInterpreter.h>
#include <Parsers/queryToString.h> #include <Parsers/queryToString.h>
#include <Interpreters/ProcessList.h> #include <Interpreters/ProcessList.h>
#include <Processors/Pipe.h>
namespace DB namespace DB
@ -36,11 +37,11 @@ Context removeUserRestrictionsFromSettings(const Context & context, const Settin
return new_context; return new_context;
} }
BlockInputStreams executeQuery( Pipes executeQuery(
IStreamFactory & stream_factory, const ClusterPtr & cluster, IStreamFactory & stream_factory, const ClusterPtr & cluster,
const ASTPtr & query_ast, const Context & context, const Settings & settings) const ASTPtr & query_ast, const Context & context, const Settings & settings)
{ {
BlockInputStreams res; Pipes res;
const std::string query = queryToString(query_ast); const std::string query = queryToString(query_ast);

View File

@ -10,6 +10,9 @@ struct Settings;
class Context; class Context;
class Cluster; class Cluster;
class Pipe;
using Pipes = std::vector<Pipe>;
namespace ClusterProxy namespace ClusterProxy
{ {
@ -22,7 +25,7 @@ Context removeUserRestrictionsFromSettings(const Context & context, const Settin
/// Execute a distributed query, creating a vector of BlockInputStreams, from which the result can be read. /// Execute a distributed query, creating a vector of BlockInputStreams, from which the result can be read.
/// `stream_factory` object encapsulates the logic of creating streams for a different type of query /// `stream_factory` object encapsulates the logic of creating streams for a different type of query
/// (currently SELECT, DESCRIBE). /// (currently SELECT, DESCRIBE).
BlockInputStreams executeQuery( Pipes executeQuery(
IStreamFactory & stream_factory, const ClusterPtr & cluster, IStreamFactory & stream_factory, const ClusterPtr & cluster,
const ASTPtr & query_ast, const Context & context, const Settings & settings); const ASTPtr & query_ast, const Context & context, const Settings & settings);

View File

@ -597,11 +597,14 @@ void QueryPipeline::calcRowsBeforeLimit()
if (auto * source = typeid_cast<SourceFromInputStream *>(processor)) if (auto * source = typeid_cast<SourceFromInputStream *>(processor))
{ {
auto & info = source->getStream().getProfileInfo(); if (auto & stream = source->getStream())
if (info.hasAppliedLimit())
{ {
has_limit = visited_limit = true; auto & info = stream->getProfileInfo();
rows_before_limit_at_least += info.getRowsBeforeLimit(); if (info.hasAppliedLimit())
{
has_limit = visited_limit = true;
rows_before_limit_at_least += info.getRowsBeforeLimit();
}
} }
} }
} }

View File

@ -10,6 +10,19 @@ SourceFromInputStream::SourceFromInputStream(BlockInputStreamPtr stream_, bool f
: ISourceWithProgress(stream_->getHeader()) : ISourceWithProgress(stream_->getHeader())
, force_add_aggregating_info(force_add_aggregating_info_) , force_add_aggregating_info(force_add_aggregating_info_)
, stream(std::move(stream_)) , stream(std::move(stream_))
{
init();
}
SourceFromInputStream::SourceFromInputStream(String name, Block header, std::function<BlockInputStreamPtr()> stream_builder_)
: ISourceWithProgress(std::move(header))
, stream_builder(std::move(stream_builder_))
, source_name(std::move(name))
{
init();
}
void SourceFromInputStream::init()
{ {
auto & sample = getPort().getHeader(); auto & sample = getPort().getHeader();
for (auto & type : sample.getDataTypes()) for (auto & type : sample.getDataTypes())
@ -104,6 +117,9 @@ Chunk SourceFromInputStream::generate()
if (!is_stream_started) if (!is_stream_started)
{ {
if (!stream)
stream = stream_builder();
stream->readPrefix(); stream->readPrefix();
is_stream_started = true; is_stream_started = true;
} }

View File

@ -12,14 +12,16 @@ class SourceFromInputStream : public ISourceWithProgress
{ {
public: public:
explicit SourceFromInputStream(BlockInputStreamPtr stream_, bool force_add_aggregating_info_ = false); explicit SourceFromInputStream(BlockInputStreamPtr stream_, bool force_add_aggregating_info_ = false);
String getName() const override { return "SourceFromInputStream"; } /// Constructor which works like LazyBlockInputStream. First 'generate' method creates stream using callback.
SourceFromInputStream(String name, Block header, std::function<BlockInputStreamPtr()> stream_builder_);
String getName() const override { return source_name.empty() ? "SourceFromInputStream" : source_name; }
Status prepare() override; Status prepare() override;
void work() override; void work() override;
Chunk generate() override; Chunk generate() override;
IBlockInputStream & getStream() { return *stream; } BlockInputStreamPtr & getStream() { return stream; }
void addTotalsPort(); void addTotalsPort();
@ -35,9 +37,12 @@ protected:
private: private:
bool has_aggregate_functions = false; bool has_aggregate_functions = false;
bool force_add_aggregating_info; bool force_add_aggregating_info = false;
BlockInputStreamPtr stream; BlockInputStreamPtr stream;
std::function<BlockInputStreamPtr()> stream_builder;
String source_name;
Chunk totals; Chunk totals;
bool has_totals_port = false; bool has_totals_port = false;
bool has_totals = false; bool has_totals = false;
@ -45,6 +50,8 @@ private:
bool is_generating_finished = false; bool is_generating_finished = false;
bool is_stream_finished = false; bool is_stream_finished = false;
bool is_stream_started = false; bool is_stream_started = false;
void init();
}; };
} }

View File

@ -340,7 +340,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Con
: QueryProcessingStage::WithMergeableState; : QueryProcessingStage::WithMergeableState;
} }
BlockInputStreams StorageDistributed::read( Pipes StorageDistributed::readWithProcessors(
const Names & column_names, const Names & column_names,
const SelectQueryInfo & query_info, const SelectQueryInfo & query_info,
const Context & context, const Context & context,

View File

@ -68,7 +68,7 @@ public:
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override; QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ClusterPtr & cluster) const; QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ClusterPtr & cluster) const;
BlockInputStreams read( Pipes readWithProcessors(
const Names & column_names, const Names & column_names,
const SelectQueryInfo & query_info, const SelectQueryInfo & query_info,
const Context & context, const Context & context,
@ -76,6 +76,8 @@ public:
size_t max_block_size, size_t max_block_size,
unsigned num_streams) override; unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override; BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void drop(TableStructureWriteLockHolder &) override {} void drop(TableStructureWriteLockHolder &) override {}