Use processors for sending external tables.

This commit is contained in:
Nikolai Kochetov 2020-02-19 17:00:49 +03:00
parent 75af5414d9
commit 3bfbd26901
3 changed files with 89 additions and 17 deletions

View File

@ -22,6 +22,9 @@
#include <Common/config_version.h>
#include <Interpreters/ClientInfo.h>
#include <Compression/CompressionFactory.h>
#include <Processors/Pipe.h>
#include <Processors/ISink.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Common/config.h>
#if USE_POCO_NETSSL
@ -535,6 +538,36 @@ void Connection::sendScalarsData(Scalars & data)
}
class ExternalTableDataSink : public ISink
{
public:
using OnCancell = std::function<void()>;
ExternalTableDataSink(Block header, Connection & connection_, ExternalTableData & table_data_, OnCancell callback)
: ISink(std::move(header))
, connection(connection_), table_data(table_data_), on_cancell(std::move(callback)) {}
String getName() const override { return "ExternalTableSink"; }
protected:
void consume(Chunk chunk) override
{
if (table_data.is_cancelled)
{
on_cancell();
return;
}
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
connection.sendData(block, table_data.table_name);
}
private:
Connection & connection;
ExternalTableData & table_data;
OnCancell on_cancell;
};
void Connection::sendExternalTablesData(ExternalTablesData & data)
{
if (data.empty())
@ -553,13 +586,17 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
for (auto & elem : data)
{
elem.first->readPrefix();
while (Block block = elem.first->read())
{
rows += block.rows();
sendData(block, elem.second);
}
elem.first->readSuffix();
PipelineExecutorPtr executor;
auto on_cancel = [& executor]() { executor->cancel(); };
auto sink = std::make_shared<ExternalTableDataSink>(elem->pipe->getHeader(), *this, *elem, std::move(on_cancel));
DB::connect(elem->pipe->getPort(), sink->getPort());
auto processors = std::move(*elem->pipe).detachProcessors();
processors.push_back(std::move(sink));
executor = std::make_shared<PipelineExecutor>(processors);
executor->execute(/*num_threads = */ 1);
}
/// Send empty block, which means end of data transfer.

View File

@ -30,11 +30,19 @@ namespace DB
{
class ClientInfo;
class Pipe;
/// The stream of blocks reading from the table and its name
using ExternalTableData = std::pair<BlockInputStreamPtr, std::string>;
/// Vector of pairs describing tables
using ExternalTablesData = std::vector<ExternalTableData>;
struct ExternalTableData
{
/// Pipe of data form table;
std::unique_ptr<Pipe> pipe;
std::string table_name;
/// Flag if need to stop reading.
std::atomic_bool is_cancelled = false;
};
using ExternalTableDataPtr = std::unique_ptr<ExternalTableData>;
using ExternalTablesData = std::vector<ExternalTableDataPtr>;
class Connection;

View File

@ -7,8 +7,12 @@
#include <Interpreters/castColumn.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Storages/IStorage.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Pipe.h>
#include <IO/ConnectionTimeouts.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
namespace DB
@ -112,7 +116,7 @@ void RemoteBlockInputStream::cancel(bool kill)
/// Stop sending external data.
for (auto & vec : external_tables_data)
for (auto & elem : vec)
elem.first->cancel(kill);
elem->is_cancelled = true;
}
if (!isQueryPending() || hasThrownException())
@ -142,12 +146,35 @@ void RemoteBlockInputStream::sendExternalTables()
{
StoragePtr cur = table.second;
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(context);
BlockInputStreams input = cur->read(cur->getColumns().getNamesOfPhysical(), {}, context,
Pipes pipes;
if (cur->supportProcessorsPipeline())
pipes = cur->readWithProcessors(cur->getColumns().getNamesOfPhysical(), {}, context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
if (input.size() == 0)
res.push_back(std::make_pair(std::make_shared<OneBlockInputStream>(cur->getSampleBlock()), table.first));
else
res.push_back(std::make_pair(input[0], table.first));
{
auto streams = cur->read(cur->getColumns().getNamesOfPhysical(), {}, context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
for (auto & stream : streams)
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
}
auto data = std::make_unique<ExternalTableData>();
data->table_name = table.first;
if (pipes.empty())
data->pipe = std::make_unique<Pipe>(std::make_shared<SourceFromSingleChunk>(cur->getSampleBlock(), Chunk()));
else if (pipes.size() == 1)
data->pipe = std::make_unique<Pipe>(std::move(pipes.front()));
else
{
auto concat = std::make_shared<ConcatProcessor>(pipes.front().getHeader(), pipes.size());
data->pipe = std::make_unique<Pipe>(std::move(pipes), std::move(concat));
}
res.emplace_back(std::move(data));
}
external_tables_data.push_back(std::move(res));
}