Streams -> Processors for dicts, part 2.

This commit is contained in:
Nikolai Kochetov 2021-08-05 21:08:52 +03:00
parent 8d14f2ef8f
commit 8546df13c2
32 changed files with 336 additions and 306 deletions

View File

@ -1,11 +1,10 @@
#include "LibraryBridgeHelper.h" #include "LibraryBridgeHelper.h"
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <DataStreams/formatBlock.h> #include <DataStreams/formatBlock.h>
#include <Dictionaries/DictionarySourceHelpers.h> #include <Dictionaries/DictionarySourceHelpers.h>
#include <Processors/Formats/InputStreamFromInputFormat.h> #include <Processors/Pipe.h>
#include <Processors/Formats/IInputFormat.h>
#include <IO/WriteBufferFromOStream.h> #include <IO/WriteBufferFromOStream.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
@ -117,7 +116,7 @@ bool LibraryBridgeHelper::supportsSelectiveLoad()
} }
BlockInputStreamPtr LibraryBridgeHelper::loadAll() Pipe LibraryBridgeHelper::loadAll()
{ {
startBridgeSync(); startBridgeSync();
auto uri = createRequestURI(LOAD_ALL_METHOD); auto uri = createRequestURI(LOAD_ALL_METHOD);
@ -125,7 +124,7 @@ BlockInputStreamPtr LibraryBridgeHelper::loadAll()
} }
BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::string ids_string) Pipe LibraryBridgeHelper::loadIds(const std::string ids_string)
{ {
startBridgeSync(); startBridgeSync();
auto uri = createRequestURI(LOAD_IDS_METHOD); auto uri = createRequestURI(LOAD_IDS_METHOD);
@ -133,7 +132,7 @@ BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::string ids_string)
} }
BlockInputStreamPtr LibraryBridgeHelper::loadKeys(const Block & requested_block) Pipe LibraryBridgeHelper::loadKeys(const Block & requested_block)
{ {
startBridgeSync(); startBridgeSync();
auto uri = createRequestURI(LOAD_KEYS_METHOD); auto uri = createRequestURI(LOAD_KEYS_METHOD);
@ -163,7 +162,7 @@ bool LibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferF
} }
BlockInputStreamPtr LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback) Pipe LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback)
{ {
auto read_buf_ptr = std::make_unique<ReadWriteBufferFromHTTP>( auto read_buf_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
uri, uri,
@ -176,7 +175,9 @@ BlockInputStreamPtr LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWri
ReadWriteBufferFromHTTP::HTTPHeaderEntries{}); ReadWriteBufferFromHTTP::HTTPHeaderEntries{});
auto input_stream = getContext()->getInputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, DEFAULT_BLOCK_SIZE); auto input_stream = getContext()->getInputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, DEFAULT_BLOCK_SIZE);
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(read_buf_ptr)); auto source = FormatFactory::instance().getInput(LibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, getContext(), DEFAULT_BLOCK_SIZE);
source->addBuffer(std::move(read_buf_ptr));
return Pipe(std::move(source));
} }
} }

View File

@ -11,6 +11,8 @@
namespace DB namespace DB
{ {
class Pipe;
class LibraryBridgeHelper : public IBridgeHelper class LibraryBridgeHelper : public IBridgeHelper
{ {
@ -29,13 +31,13 @@ public:
bool supportsSelectiveLoad(); bool supportsSelectiveLoad();
BlockInputStreamPtr loadAll(); Pipe loadAll();
BlockInputStreamPtr loadIds(std::string ids_string); Pipe loadIds(std::string ids_string);
BlockInputStreamPtr loadKeys(const Block & requested_block); Pipe loadKeys(const Block & requested_block);
BlockInputStreamPtr loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {}); Pipe loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {}); bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});

View File

@ -150,13 +150,14 @@ std::unique_ptr<Poco::MongoDB::Cursor> createCursor(const std::string & database
return cursor; return cursor;
} }
MongoDBBlockInputStream::MongoDBBlockInputStream( MongoDBSource::MongoDBSource(
std::shared_ptr<Poco::MongoDB::Connection> & connection_, std::shared_ptr<Poco::MongoDB::Connection> & connection_,
std::unique_ptr<Poco::MongoDB::Cursor> cursor_, std::unique_ptr<Poco::MongoDB::Cursor> cursor_,
const Block & sample_block, const Block & sample_block,
UInt64 max_block_size_, UInt64 max_block_size_,
bool strict_check_names_) bool strict_check_names_)
: connection(connection_) : SourceWithProgress(sample_block.cloneEmpty())
, connection(connection_)
, cursor{std::move(cursor_)} , cursor{std::move(cursor_)}
, max_block_size{max_block_size_} , max_block_size{max_block_size_}
, strict_check_names{strict_check_names_} , strict_check_names{strict_check_names_}
@ -165,7 +166,7 @@ MongoDBBlockInputStream::MongoDBBlockInputStream(
} }
MongoDBBlockInputStream::~MongoDBBlockInputStream() = default; MongoDBSource::~MongoDBSource() = default;
namespace namespace
@ -307,7 +308,7 @@ namespace
} }
Block MongoDBBlockInputStream::readImpl() Chunk MongoDBSource::generate()
{ {
if (all_read) if (all_read)
return {}; return {};
@ -362,7 +363,7 @@ Block MongoDBBlockInputStream::readImpl()
if (num_rows == 0) if (num_rows == 0)
return {}; return {};
return description.sample_block.cloneWithColumns(std::move(columns)); return Chunk(std::move(columns), num_rows);
} }
} }

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include <Core/Block.h> #include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h> #include <Processors/Sources/SourceWithProgress.h>
#include <Core/ExternalResultDescription.h> #include <Core/ExternalResultDescription.h>
@ -22,24 +22,22 @@ void authenticate(Poco::MongoDB::Connection & connection, const std::string & da
std::unique_ptr<Poco::MongoDB::Cursor> createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select); std::unique_ptr<Poco::MongoDB::Cursor> createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select);
/// Converts MongoDB Cursor to a stream of Blocks /// Converts MongoDB Cursor to a stream of Blocks
class MongoDBBlockInputStream final : public IBlockInputStream class MongoDBSource final : public SourceWithProgress
{ {
public: public:
MongoDBBlockInputStream( MongoDBSource(
std::shared_ptr<Poco::MongoDB::Connection> & connection_, std::shared_ptr<Poco::MongoDB::Connection> & connection_,
std::unique_ptr<Poco::MongoDB::Cursor> cursor_, std::unique_ptr<Poco::MongoDB::Cursor> cursor_,
const Block & sample_block, const Block & sample_block,
UInt64 max_block_size_, UInt64 max_block_size_,
bool strict_check_names_ = false); bool strict_check_names_ = false);
~MongoDBBlockInputStream() override; ~MongoDBSource() override;
String getName() const override { return "MongoDB"; } String getName() const override { return "MongoDB"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private: private:
Block readImpl() override; Chunk generate() override;
std::shared_ptr<Poco::MongoDB::Connection> connection; std::shared_ptr<Poco::MongoDB::Connection> connection;
std::unique_ptr<Poco::MongoDB::Cursor> cursor; std::unique_ptr<Poco::MongoDB::Cursor> cursor;

View File

@ -24,12 +24,13 @@ namespace DB
template<typename T> template<typename T>
PostgreSQLBlockInputStream<T>::PostgreSQLBlockInputStream( PostgreSQLSource<T>::PostgreSQLSource(
postgres::ConnectionHolderPtr connection_holder_, postgres::ConnectionHolderPtr connection_holder_,
const std::string & query_str_, const std::string & query_str_,
const Block & sample_block, const Block & sample_block,
const UInt64 max_block_size_) const UInt64 max_block_size_)
: query_str(query_str_) : SourceWithProgress(sample_block.cloneEmpty())
, query_str(query_str_)
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, connection_holder(std::move(connection_holder_)) , connection_holder(std::move(connection_holder_))
{ {
@ -38,13 +39,14 @@ PostgreSQLBlockInputStream<T>::PostgreSQLBlockInputStream(
template<typename T> template<typename T>
PostgreSQLBlockInputStream<T>::PostgreSQLBlockInputStream( PostgreSQLSource<T>::PostgreSQLSource(
std::shared_ptr<T> tx_, std::shared_ptr<T> tx_,
const std::string & query_str_, const std::string & query_str_,
const Block & sample_block, const Block & sample_block,
const UInt64 max_block_size_, const UInt64 max_block_size_,
bool auto_commit_) bool auto_commit_)
: query_str(query_str_) : SourceWithProgress(sample_block.cloneEmpty())
, query_str(query_str_)
, tx(std::move(tx_)) , tx(std::move(tx_))
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, auto_commit(auto_commit_) , auto_commit(auto_commit_)
@ -54,7 +56,7 @@ PostgreSQLBlockInputStream<T>::PostgreSQLBlockInputStream(
template<typename T> template<typename T>
void PostgreSQLBlockInputStream<T>::init(const Block & sample_block) void PostgreSQLSource<T>::init(const Block & sample_block)
{ {
description.init(sample_block); description.init(sample_block);
@ -69,19 +71,34 @@ void PostgreSQLBlockInputStream<T>::init(const Block & sample_block)
template<typename T> template<typename T>
void PostgreSQLBlockInputStream<T>::readPrefix() void PostgreSQLSource<T>::onStart()
{ {
tx = std::make_shared<T>(connection_holder->get()); tx = std::make_shared<T>(connection_holder->get());
stream = std::make_unique<pqxx::stream_from>(*tx, pqxx::from_query, std::string_view(query_str)); stream = std::make_unique<pqxx::stream_from>(*tx, pqxx::from_query, std::string_view(query_str));
} }
template<typename T>
IProcessor::Status PostgreSQLSource<T>::prepare()
{
if (!started)
{
onStart();
started = true;
}
auto status = SourceWithProgress::prepare();
if (status == Status::Finished)
onFinish();
return status;
}
template<typename T> template<typename T>
Block PostgreSQLBlockInputStream<T>::readImpl() Chunk PostgreSQLSource<T>::generate()
{ {
/// Check if pqxx::stream_from is finished /// Check if pqxx::stream_from is finished
if (!stream || !(*stream)) if (!stream || !(*stream))
return Block(); return {};
MutableColumns columns = description.sample_block.cloneEmptyColumns(); MutableColumns columns = description.sample_block.cloneEmptyColumns();
size_t num_rows = 0; size_t num_rows = 0;
@ -129,12 +146,12 @@ Block PostgreSQLBlockInputStream<T>::readImpl()
break; break;
} }
return description.sample_block.cloneWithColumns(std::move(columns)); return Chunk(std::move(columns), num_rows);
} }
template<typename T> template<typename T>
void PostgreSQLBlockInputStream<T>::readSuffix() void PostgreSQLSource<T>::onFinish()
{ {
if (stream) if (stream)
{ {
@ -146,10 +163,10 @@ void PostgreSQLBlockInputStream<T>::readSuffix()
} }
template template
class PostgreSQLBlockInputStream<pqxx::ReplicationTransaction>; class PostgreSQLSource<pqxx::ReplicationTransaction>;
template template
class PostgreSQLBlockInputStream<pqxx::ReadTransaction>; class PostgreSQLSource<pqxx::ReadTransaction>;
} }

View File

@ -6,7 +6,7 @@
#if USE_LIBPQXX #if USE_LIBPQXX
#include <Core/Block.h> #include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h> #include <Processors/Sources/SourceWithProgress.h>
#include <Core/ExternalResultDescription.h> #include <Core/ExternalResultDescription.h>
#include <Core/Field.h> #include <Core/Field.h>
#include <Core/PostgreSQL/insertPostgreSQLValue.h> #include <Core/PostgreSQL/insertPostgreSQLValue.h>
@ -18,23 +18,20 @@ namespace DB
{ {
template <typename T = pqxx::ReadTransaction> template <typename T = pqxx::ReadTransaction>
class PostgreSQLBlockInputStream : public IBlockInputStream class PostgreSQLSource : public SourceWithProgress
{ {
public: public:
PostgreSQLBlockInputStream( PostgreSQLSource(
postgres::ConnectionHolderPtr connection_holder_, postgres::ConnectionHolderPtr connection_holder_,
const String & query_str_, const String & query_str_,
const Block & sample_block, const Block & sample_block,
const UInt64 max_block_size_); const UInt64 max_block_size_);
String getName() const override { return "PostgreSQL"; } String getName() const override { return "PostgreSQL"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
void readPrefix() override;
protected: protected:
PostgreSQLBlockInputStream( PostgreSQLSource(
std::shared_ptr<T> tx_, std::shared_ptr<T> tx_,
const std::string & query_str_, const std::string & query_str_,
const Block & sample_block, const Block & sample_block,
@ -45,9 +42,12 @@ protected:
std::shared_ptr<T> tx; std::shared_ptr<T> tx;
std::unique_ptr<pqxx::stream_from> stream; std::unique_ptr<pqxx::stream_from> stream;
Status prepare() override;
private: private:
Block readImpl() override; void onStart();
void readSuffix() override; Chunk generate() override;
void onFinish();
void init(const Block & sample_block); void init(const Block & sample_block);
@ -55,6 +55,8 @@ private:
bool auto_commit = true; bool auto_commit = true;
ExternalResultDescription description; ExternalResultDescription description;
bool started = false;
postgres::ConnectionHolderPtr connection_holder; postgres::ConnectionHolderPtr connection_holder;
std::unordered_map<size_t, PostgreSQLArrayInfo> array_info; std::unordered_map<size_t, PostgreSQLArrayInfo> array_info;

View File

@ -8,6 +8,8 @@
#include <IO/WriteBufferFromOStream.h> #include <IO/WriteBufferFromOStream.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IInputFormat.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Poco/Net/HTTPRequest.h> #include <Poco/Net/HTTPRequest.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
@ -62,14 +64,15 @@ HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
credentials.setPassword(other.credentials.getPassword()); credentials.setPassword(other.credentials.getPassword());
} }
BlockInputStreamPtr HTTPDictionarySource::createWrappedBuffer(std::unique_ptr<ReadWriteBufferFromHTTP> http_buffer_ptr) Pipe HTTPDictionarySource::createWrappedBuffer(std::unique_ptr<ReadWriteBufferFromHTTP> http_buffer_ptr)
{ {
Poco::URI uri(configuration.url); Poco::URI uri(configuration.url);
String http_request_compression_method_str = http_buffer_ptr->getCompressionMethod(); String http_request_compression_method_str = http_buffer_ptr->getCompressionMethod();
auto in_ptr_wrapped auto in_ptr_wrapped
= wrapReadBufferWithCompressionMethod(std::move(http_buffer_ptr), chooseCompressionMethod(uri.getPath(), http_request_compression_method_str)); = wrapReadBufferWithCompressionMethod(std::move(http_buffer_ptr), chooseCompressionMethod(uri.getPath(), http_request_compression_method_str));
auto input_stream = context->getInputFormat(configuration.format, *in_ptr_wrapped, sample_block, max_block_size); auto source = FormatFactory::instance().getInput(configuration.format, *in_ptr_wrapped, sample_block, context, max_block_size);
return std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(in_ptr_wrapped)); source->addBuffer(std::move(in_ptr_wrapped));
return Pipe(std::move(source));
} }
void HTTPDictionarySource::getUpdateFieldAndDate(Poco::URI & uri) void HTTPDictionarySource::getUpdateFieldAndDate(Poco::URI & uri)
@ -89,7 +92,7 @@ void HTTPDictionarySource::getUpdateFieldAndDate(Poco::URI & uri)
} }
} }
BlockInputStreamPtr HTTPDictionarySource::loadAll() Pipe HTTPDictionarySource::loadAll()
{ {
LOG_TRACE(log, "loadAll {}", toString()); LOG_TRACE(log, "loadAll {}", toString());
Poco::URI uri(configuration.url); Poco::URI uri(configuration.url);
@ -106,7 +109,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadAll()
return createWrappedBuffer(std::move(in_ptr)); return createWrappedBuffer(std::move(in_ptr));
} }
BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll() Pipe HTTPDictionarySource::loadUpdatedAll()
{ {
Poco::URI uri(configuration.url); Poco::URI uri(configuration.url);
getUpdateFieldAndDate(uri); getUpdateFieldAndDate(uri);
@ -124,7 +127,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll()
return createWrappedBuffer(std::move(in_ptr)); return createWrappedBuffer(std::move(in_ptr));
} }
BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & ids) Pipe HTTPDictionarySource::loadIds(const std::vector<UInt64> & ids)
{ {
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size()); LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
@ -151,7 +154,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & id
return createWrappedBuffer(std::move(in_ptr)); return createWrappedBuffer(std::move(in_ptr));
} }
BlockInputStreamPtr HTTPDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) Pipe HTTPDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{ {
LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size()); LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size());

View File

@ -43,13 +43,13 @@ public:
HTTPDictionarySource(const HTTPDictionarySource & other); HTTPDictionarySource(const HTTPDictionarySource & other);
HTTPDictionarySource & operator=(const HTTPDictionarySource &) = delete; HTTPDictionarySource & operator=(const HTTPDictionarySource &) = delete;
BlockInputStreamPtr loadAll() override; Pipe loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override; Pipe loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override; Pipe loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override; Pipe loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override; bool isModified() const override;
@ -65,7 +65,7 @@ private:
void getUpdateFieldAndDate(Poco::URI & uri); void getUpdateFieldAndDate(Poco::URI & uri);
// wrap buffer using encoding from made request // wrap buffer using encoding from made request
BlockInputStreamPtr createWrappedBuffer(std::unique_ptr<ReadWriteBufferFromHTTP> http_buffer); Pipe createWrappedBuffer(std::unique_ptr<ReadWriteBufferFromHTTP> http_buffer);
Poco::Logger * log; Poco::Logger * log;
@ -81,4 +81,3 @@ private:
}; };
} }

View File

@ -367,10 +367,12 @@ void HashedDictionary<dictionary_key_type, sparse>::updateData()
if (!update_field_loaded_block || update_field_loaded_block->rows() == 0) if (!update_field_loaded_block || update_field_loaded_block->rows() == 0)
{ {
auto stream = source_ptr->loadUpdatedAll(); QueryPipeline pipeline;
stream->readPrefix(); pipeline.init(source_ptr->loadUpdatedAll());
while (const auto block = stream->read()) PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
{ {
/// We are using this to keep saved data if input stream consists of multiple blocks /// We are using this to keep saved data if input stream consists of multiple blocks
if (!update_field_loaded_block) if (!update_field_loaded_block)
@ -383,15 +385,14 @@ void HashedDictionary<dictionary_key_type, sparse>::updateData()
saved_column->insertRangeFrom(update_column, 0, update_column.size()); saved_column->insertRangeFrom(update_column, 0, update_column.size());
} }
} }
stream->readSuffix();
} }
else else
{ {
auto stream = source_ptr->loadUpdatedAll(); auto pipe = source_ptr->loadUpdatedAll();
mergeBlockWithStream<dictionary_key_type>( mergeBlockWithPipe<dictionary_key_type>(
dict_struct.getKeysSize(), dict_struct.getKeysSize(),
*update_field_loaded_block, *update_field_loaded_block,
stream); std::move(pipe));
} }
if (update_field_loaded_block) if (update_field_loaded_block)
@ -560,15 +561,15 @@ void HashedDictionary<dictionary_key_type, sparse>::loadData()
{ {
std::atomic<size_t> new_size = 0; std::atomic<size_t> new_size = 0;
BlockInputStreamPtr stream; QueryPipeline pipeline;
if (configuration.preallocate) if (configuration.preallocate)
stream = source_ptr->loadAllWithSizeHint(&new_size); pipeline.init(source_ptr->loadAllWithSizeHint(&new_size));
else else
stream = source_ptr->loadAll(); pipeline.init(source_ptr->loadAll());
stream->readPrefix(); PullingPipelineExecutor executor(pipeline);
Block block;
while (const auto block = stream->read()) while (executor.pull(block))
{ {
if (configuration.preallocate && new_size) if (configuration.preallocate && new_size)
{ {
@ -584,8 +585,6 @@ void HashedDictionary<dictionary_key_type, sparse>::loadData()
blockToAttributes(block); blockToAttributes(block);
} }
stream->readSuffix();
} }
else else
updateData(); updateData();

View File

@ -352,14 +352,16 @@ void IPAddressDictionary::createAttributes()
void IPAddressDictionary::loadData() void IPAddressDictionary::loadData()
{ {
auto stream = source_ptr->loadAll(); QueryPipeline pipeline;
stream->readPrefix(); pipeline.init(source_ptr->loadAll());
std::vector<IPRecord> ip_records; std::vector<IPRecord> ip_records;
bool has_ipv6 = false; bool has_ipv6 = false;
while (const auto block = stream->read()) PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
{ {
const auto rows = block.rows(); const auto rows = block.rows();
element_count += rows; element_count += rows;
@ -387,8 +389,6 @@ void IPAddressDictionary::loadData()
} }
} }
stream->readSuffix();
if (access_to_key_from_attributes) if (access_to_key_from_attributes)
{ {
/// We format key attribute values here instead of filling with data from key_column /// We format key attribute values here instead of filling with data from key_column
@ -835,7 +835,7 @@ static auto keyViewGetter()
}; };
} }
BlockInputStreamPtr IPAddressDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const Pipe IPAddressDictionary::read(const Names & column_names, size_t max_block_size) const
{ {
const bool is_ipv4 = std::get_if<IPv4Container>(&ip_column) != nullptr; const bool is_ipv4 = std::get_if<IPv4Container>(&ip_column) != nullptr;
@ -857,13 +857,15 @@ BlockInputStreamPtr IPAddressDictionary::getBlockInputStream(const Names & colum
if (is_ipv4) if (is_ipv4)
{ {
auto get_view = keyViewGetter<ColumnVector<UInt32>, true>(); auto get_view = keyViewGetter<ColumnVector<UInt32>, true>();
return std::make_shared<DictionaryBlockInputStream>( return Pipe(std::make_shared<DictionarySource>(
shared_from_this(), max_block_size, getKeyColumns(), column_names, std::move(get_keys), std::move(get_view)); DictionarySourceData(shared_from_this(), getKeyColumns(), column_names, std::move(get_keys), std::move(get_view)),
max_block_size));
} }
auto get_view = keyViewGetter<ColumnFixedString, false>(); auto get_view = keyViewGetter<ColumnFixedString, false>();
return std::make_shared<DictionaryBlockInputStream>( return Pipe(std::make_shared<DictionarySource>(
shared_from_this(), max_block_size, getKeyColumns(), column_names, std::move(get_keys), std::move(get_view)); DictionarySourceData(shared_from_this(), getKeyColumns(), column_names, std::move(get_keys), std::move(get_view)),
max_block_size));
} }
IPAddressDictionary::RowIdxConstIter IPAddressDictionary::ipNotFound() const IPAddressDictionary::RowIdxConstIter IPAddressDictionary::ipNotFound() const

View File

@ -78,7 +78,7 @@ public:
ColumnUInt8::Ptr hasKeys(const Columns & key_columns, const DataTypes & key_types) const override; ColumnUInt8::Ptr hasKeys(const Columns & key_columns, const DataTypes & key_types) const override;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override; Pipe read(const Names & column_names, size_t max_block_size) const override;
private: private:

View File

@ -89,21 +89,21 @@ bool LibraryDictionarySource::supportsSelectiveLoad() const
} }
BlockInputStreamPtr LibraryDictionarySource::loadAll() Pipe LibraryDictionarySource::loadAll()
{ {
LOG_TRACE(log, "loadAll {}", toString()); LOG_TRACE(log, "loadAll {}", toString());
return bridge_helper->loadAll(); return bridge_helper->loadAll();
} }
BlockInputStreamPtr LibraryDictionarySource::loadIds(const std::vector<UInt64> & ids) Pipe LibraryDictionarySource::loadIds(const std::vector<UInt64> & ids)
{ {
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size()); LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
return bridge_helper->loadIds(getDictIdsString(ids)); return bridge_helper->loadIds(getDictIdsString(ids));
} }
BlockInputStreamPtr LibraryDictionarySource::loadKeys(const Columns & key_columns, const std::vector<std::size_t> & requested_rows) Pipe LibraryDictionarySource::loadKeys(const Columns & key_columns, const std::vector<std::size_t> & requested_rows)
{ {
LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size()); LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size());
auto block = blockForKeys(dict_struct, key_columns, requested_rows); auto block = blockForKeys(dict_struct, key_columns, requested_rows);

View File

@ -47,16 +47,16 @@ public:
~LibraryDictionarySource() override; ~LibraryDictionarySource() override;
BlockInputStreamPtr loadAll() override; Pipe loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override Pipe loadUpdatedAll() override
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for LibraryDictionarySource"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for LibraryDictionarySource");
} }
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override; Pipe loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<std::size_t> & requested_rows) override; Pipe loadKeys(const Columns & key_columns, const std::vector<std::size_t> & requested_rows) override;
bool isModified() const override; bool isModified() const override;

View File

@ -142,12 +142,12 @@ MongoDBDictionarySource::MongoDBDictionarySource(const MongoDBDictionarySource &
MongoDBDictionarySource::~MongoDBDictionarySource() = default; MongoDBDictionarySource::~MongoDBDictionarySource() = default;
BlockInputStreamPtr MongoDBDictionarySource::loadAll() Pipe MongoDBDictionarySource::loadAll()
{ {
return std::make_shared<MongoDBBlockInputStream>(connection, createCursor(db, collection, sample_block), sample_block, max_block_size); return Pipe(std::make_shared<MongoDBSource>(connection, createCursor(db, collection, sample_block), sample_block, max_block_size));
} }
BlockInputStreamPtr MongoDBDictionarySource::loadIds(const std::vector<UInt64> & ids) Pipe MongoDBDictionarySource::loadIds(const std::vector<UInt64> & ids)
{ {
if (!dict_struct.id) if (!dict_struct.id)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'id' is required for selective loading"); throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'id' is required for selective loading");
@ -164,11 +164,11 @@ BlockInputStreamPtr MongoDBDictionarySource::loadIds(const std::vector<UInt64> &
cursor->query().selector().addNewDocument(dict_struct.id->name).add("$in", ids_array); cursor->query().selector().addNewDocument(dict_struct.id->name).add("$in", ids_array);
return std::make_shared<MongoDBBlockInputStream>(connection, std::move(cursor), sample_block, max_block_size); return Pipe(std::make_shared<MongoDBSource>(connection, std::move(cursor), sample_block, max_block_size));
} }
BlockInputStreamPtr MongoDBDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) Pipe MongoDBDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{ {
if (!dict_struct.key) if (!dict_struct.key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'key' is required for selective loading"); throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'key' is required for selective loading");
@ -230,7 +230,7 @@ BlockInputStreamPtr MongoDBDictionarySource::loadKeys(const Columns & key_column
/// If more than one key we should use $or /// If more than one key we should use $or
cursor->query().selector().add("$or", keys_array); cursor->query().selector().add("$or", keys_array);
return std::make_shared<MongoDBBlockInputStream>(connection, std::move(cursor), sample_block, max_block_size); return Pipe(std::make_shared<MongoDBSource>(connection, std::move(cursor), sample_block, max_block_size));
} }
std::string MongoDBDictionarySource::toString() const std::string MongoDBDictionarySource::toString() const

View File

@ -46,18 +46,18 @@ public:
~MongoDBDictionarySource() override; ~MongoDBDictionarySource() override;
BlockInputStreamPtr loadAll() override; Pipe loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override Pipe loadUpdatedAll() override
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for MongoDBDictionarySource"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for MongoDBDictionarySource");
} }
bool supportsSelectiveLoad() const override { return true; } bool supportsSelectiveLoad() const override { return true; }
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override; Pipe loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override; Pipe loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
/// @todo: for MongoDB, modification date can somehow be determined from the `_id` object field /// @todo: for MongoDB, modification date can somehow be determined from the `_id` object field
bool isModified() const override { return true; } bool isModified() const override { return true; }

View File

@ -11,6 +11,7 @@
#include "registerDictionaries.h" #include "registerDictionaries.h"
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Processors/Pipe.h>
namespace DB namespace DB
{ {
@ -131,13 +132,13 @@ std::string MySQLDictionarySource::getUpdateFieldAndDate()
} }
} }
BlockInputStreamPtr MySQLDictionarySource::loadFromQuery(const String & query) Pipe MySQLDictionarySource::loadFromQuery(const String & query)
{ {
return std::make_shared<MySQLWithFailoverBlockInputStream>( return Pipe(std::make_shared<MySQLWithFailoverSource>(
pool, query, sample_block, settings); pool, query, sample_block, settings));
} }
BlockInputStreamPtr MySQLDictionarySource::loadAll() Pipe MySQLDictionarySource::loadAll()
{ {
auto connection = pool->get(); auto connection = pool->get();
last_modification = getLastModification(connection, false); last_modification = getLastModification(connection, false);
@ -146,7 +147,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadAll()
return loadFromQuery(load_all_query); return loadFromQuery(load_all_query);
} }
BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll() Pipe MySQLDictionarySource::loadUpdatedAll()
{ {
auto connection = pool->get(); auto connection = pool->get();
last_modification = getLastModification(connection, false); last_modification = getLastModification(connection, false);
@ -156,14 +157,14 @@ BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll()
return loadFromQuery(load_update_query); return loadFromQuery(load_update_query);
} }
BlockInputStreamPtr MySQLDictionarySource::loadIds(const std::vector<UInt64> & ids) Pipe MySQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
{ {
/// We do not log in here and do not update the modification time, as the request can be large, and often called. /// We do not log in here and do not update the modification time, as the request can be large, and often called.
const auto query = query_builder.composeLoadIdsQuery(ids); const auto query = query_builder.composeLoadIdsQuery(ids);
return loadFromQuery(query); return loadFromQuery(query);
} }
BlockInputStreamPtr MySQLDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) Pipe MySQLDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{ {
/// We do not log in here and do not update the modification time, as the request can be large, and often called. /// We do not log in here and do not update the modification time, as the request can be large, and often called.
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN); const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);

View File

@ -6,7 +6,7 @@
# include "config_core.h" # include "config_core.h"
#endif #endif
#if USE_MYSQL //#if USE_MYSQL
# include <common/LocalDateTime.h> # include <common/LocalDateTime.h>
# include <mysqlxx/PoolWithFailover.h> # include <mysqlxx/PoolWithFailover.h>
# include "DictionaryStructure.h" # include "DictionaryStructure.h"
@ -53,13 +53,13 @@ public:
MySQLDictionarySource(const MySQLDictionarySource & other); MySQLDictionarySource(const MySQLDictionarySource & other);
MySQLDictionarySource & operator=(const MySQLDictionarySource &) = delete; MySQLDictionarySource & operator=(const MySQLDictionarySource &) = delete;
BlockInputStreamPtr loadAll() override; Pipe loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override; Pipe loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override; Pipe loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override; Pipe loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override; bool isModified() const override;
@ -72,7 +72,7 @@ public:
std::string toString() const override; std::string toString() const override;
private: private:
BlockInputStreamPtr loadFromQuery(const String & query); Pipe loadFromQuery(const String & query);
std::string getUpdateFieldAndDate(); std::string getUpdateFieldAndDate();

View File

@ -119,7 +119,7 @@ ColumnPtr IPolygonDictionary::getColumn(
return result; return result;
} }
BlockInputStreamPtr IPolygonDictionary::getBlockInputStream(const Names &, size_t) const Pipe IPolygonDictionary::read(const Names &, size_t) const
{ {
// TODO: In order for this to work one would first have to support retrieving arrays from dictionaries. // TODO: In order for this to work one would first have to support retrieving arrays from dictionaries.
// I believe this is a separate task done by some other people. // I believe this is a separate task done by some other people.
@ -165,12 +165,13 @@ void IPolygonDictionary::blockToAttributes(const DB::Block & block)
void IPolygonDictionary::loadData() void IPolygonDictionary::loadData()
{ {
auto stream = source_ptr->loadAll(); QueryPipeline pipeline;
stream->readPrefix(); pipeline.init(source_ptr->loadAll());
while (const auto block = stream->read())
blockToAttributes(block);
stream->readSuffix();
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
blockToAttributes(block);
/// Correct and sort polygons by area and update polygon_index_to_attribute_value_index after sort /// Correct and sort polygons by area and update polygon_index_to_attribute_value_index after sort
PaddedPODArray<double> areas; PaddedPODArray<double> areas;
@ -516,4 +517,3 @@ void IPolygonDictionary::extractPolygons(const ColumnPtr & column)
} }
} }

View File

@ -97,7 +97,7 @@ public:
ColumnUInt8::Ptr hasKeys(const Columns & key_columns, const DataTypes & key_types) const override; ColumnUInt8::Ptr hasKeys(const Columns & key_columns, const DataTypes & key_types) const override;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override; Pipe read(const Names & column_names, size_t max_block_size) const override;
/** Single coordinate type. */ /** Single coordinate type. */
using Coord = Float32; using Coord = Float32;
@ -167,4 +167,3 @@ private:
}; };
} }

View File

@ -77,35 +77,35 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(const PostgreSQLDictionar
} }
BlockInputStreamPtr PostgreSQLDictionarySource::loadAll() Pipe PostgreSQLDictionarySource::loadAll()
{ {
LOG_TRACE(log, load_all_query); LOG_TRACE(log, load_all_query);
return loadBase(load_all_query); return loadBase(load_all_query);
} }
BlockInputStreamPtr PostgreSQLDictionarySource::loadUpdatedAll() Pipe PostgreSQLDictionarySource::loadUpdatedAll()
{ {
auto load_update_query = getUpdateFieldAndDate(); auto load_update_query = getUpdateFieldAndDate();
LOG_TRACE(log, load_update_query); LOG_TRACE(log, load_update_query);
return loadBase(load_update_query); return loadBase(load_update_query);
} }
BlockInputStreamPtr PostgreSQLDictionarySource::loadIds(const std::vector<UInt64> & ids) Pipe PostgreSQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
{ {
const auto query = query_builder.composeLoadIdsQuery(ids); const auto query = query_builder.composeLoadIdsQuery(ids);
return loadBase(query); return loadBase(query);
} }
BlockInputStreamPtr PostgreSQLDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) Pipe PostgreSQLDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{ {
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN); const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
return loadBase(query); return loadBase(query);
} }
BlockInputStreamPtr PostgreSQLDictionarySource::loadBase(const String & query) Pipe PostgreSQLDictionarySource::loadBase(const String & query)
{ {
return std::make_shared<PostgreSQLBlockInputStream<>>(pool->get(), query, sample_block, max_block_size); return std::make_shared<PostgreSQLBlockInputStream<>>(pool->get(), query, sample_block, max_block_size);
} }

View File

@ -42,10 +42,10 @@ public:
PostgreSQLDictionarySource(const PostgreSQLDictionarySource & other); PostgreSQLDictionarySource(const PostgreSQLDictionarySource & other);
PostgreSQLDictionarySource & operator=(const PostgreSQLDictionarySource &) = delete; PostgreSQLDictionarySource & operator=(const PostgreSQLDictionarySource &) = delete;
BlockInputStreamPtr loadAll() override; Pipe loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override; Pipe loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override; Pipe loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override; Pipe loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override; bool isModified() const override;
bool supportsSelectiveLoad() const override; bool supportsSelectiveLoad() const override;
@ -57,7 +57,7 @@ public:
private: private:
String getUpdateFieldAndDate(); String getUpdateFieldAndDate();
String doInvalidateQuery(const std::string & request) const; String doInvalidateQuery(const std::string & request) const;
BlockInputStreamPtr loadBase(const String & query); Pipe loadBase(const String & query);
const DictionaryStructure dict_struct; const DictionaryStructure dict_struct;
const Configuration configuration; const Configuration configuration;

View File

@ -13,28 +13,22 @@
namespace DB namespace DB
{ {
/*
* BlockInputStream implementation for external dictionaries
* read() returns single block consisting of the in-memory contents of the dictionaries
*/
template <typename RangeType> template <typename RangeType>
class RangeDictionaryBlockInputStream : public DictionaryBlockInputStreamBase class RangeDictionarySourceData
{ {
public: public:
using Key = UInt64; using Key = UInt64;
RangeDictionaryBlockInputStream( RangeDictionarySourceData(
std::shared_ptr<const IDictionary> dictionary, std::shared_ptr<const IDictionary> dictionary,
size_t max_block_size,
const Names & column_names, const Names & column_names,
PaddedPODArray<Key> && ids_to_fill, PaddedPODArray<Key> && ids_to_fill,
PaddedPODArray<RangeType> && start_dates, PaddedPODArray<RangeType> && start_dates,
PaddedPODArray<RangeType> && end_dates); PaddedPODArray<RangeType> && end_dates);
String getName() const override { return "RangeDictionary"; } Block getBlock(size_t start, size_t length) const;
size_t getNumRows() const { return ids.size(); }
protected:
Block getBlock(size_t start, size_t length) const override;
private: private:
template <typename T> template <typename T>
@ -58,15 +52,13 @@ private:
template <typename RangeType> template <typename RangeType>
RangeDictionaryBlockInputStream<RangeType>::RangeDictionaryBlockInputStream( RangeDictionarySourceData<RangeType>::RangeDictionarySourceData(
std::shared_ptr<const IDictionary> dictionary_, std::shared_ptr<const IDictionary> dictionary_,
size_t max_block_size_,
const Names & column_names_, const Names & column_names_,
PaddedPODArray<Key> && ids_, PaddedPODArray<Key> && ids_,
PaddedPODArray<RangeType> && block_start_dates, PaddedPODArray<RangeType> && block_start_dates,
PaddedPODArray<RangeType> && block_end_dates) PaddedPODArray<RangeType> && block_end_dates)
: DictionaryBlockInputStreamBase(ids_.size(), max_block_size_) : dictionary(dictionary_)
, dictionary(dictionary_)
, column_names(column_names_.begin(), column_names_.end()) , column_names(column_names_.begin(), column_names_.end())
, ids(std::move(ids_)) , ids(std::move(ids_))
, start_dates(std::move(block_start_dates)) , start_dates(std::move(block_start_dates))
@ -75,7 +67,7 @@ RangeDictionaryBlockInputStream<RangeType>::RangeDictionaryBlockInputStream(
} }
template <typename RangeType> template <typename RangeType>
Block RangeDictionaryBlockInputStream<RangeType>::getBlock(size_t start, size_t length) const Block RangeDictionarySourceData<RangeType>::getBlock(size_t start, size_t length) const
{ {
PaddedPODArray<Key> block_ids; PaddedPODArray<Key> block_ids;
PaddedPODArray<RangeType> block_start_dates; PaddedPODArray<RangeType> block_start_dates;
@ -96,7 +88,7 @@ Block RangeDictionaryBlockInputStream<RangeType>::getBlock(size_t start, size_t
template <typename RangeType> template <typename RangeType>
template <typename T> template <typename T>
ColumnPtr RangeDictionaryBlockInputStream<RangeType>::getColumnFromPODArray(const PaddedPODArray<T> & array) const ColumnPtr RangeDictionarySourceData<RangeType>::getColumnFromPODArray(const PaddedPODArray<T> & array) const
{ {
auto column_vector = ColumnVector<T>::create(); auto column_vector = ColumnVector<T>::create();
column_vector->getData().reserve(array.size()); column_vector->getData().reserve(array.size());
@ -106,7 +98,7 @@ ColumnPtr RangeDictionaryBlockInputStream<RangeType>::getColumnFromPODArray(cons
} }
template <typename RangeType> template <typename RangeType>
PaddedPODArray<Int64> RangeDictionaryBlockInputStream<RangeType>::makeDateKey( PaddedPODArray<Int64> RangeDictionarySourceData<RangeType>::makeDateKey(
const PaddedPODArray<RangeType> & block_start_dates, const PaddedPODArray<RangeType> & block_end_dates) const const PaddedPODArray<RangeType> & block_start_dates, const PaddedPODArray<RangeType> & block_end_dates) const
{ {
PaddedPODArray<Int64> key(block_start_dates.size()); PaddedPODArray<Int64> key(block_start_dates.size());
@ -123,7 +115,7 @@ PaddedPODArray<Int64> RangeDictionaryBlockInputStream<RangeType>::makeDateKey(
template <typename RangeType> template <typename RangeType>
Block RangeDictionaryBlockInputStream<RangeType>::fillBlock( Block RangeDictionarySourceData<RangeType>::fillBlock(
const PaddedPODArray<Key> & ids_to_fill, const PaddedPODArray<Key> & ids_to_fill,
const PaddedPODArray<RangeType> & block_start_dates, const PaddedPODArray<RangeType> & block_start_dates,
const PaddedPODArray<RangeType> & block_end_dates) const const PaddedPODArray<RangeType> & block_end_dates) const
@ -170,4 +162,37 @@ Block RangeDictionaryBlockInputStream<RangeType>::fillBlock(
return Block(columns); return Block(columns);
} }
/*
* BlockInputStream implementation for external dictionaries
* read() returns single block consisting of the in-memory contents of the dictionaries
*/
template <typename RangeType>
class RangeDictionarySource : public DictionarySourceBase
{
public:
using Key = UInt64;
RangeDictionarySource(RangeDictionarySourceData<RangeType> data_, size_t max_block_size);
String getName() const override { return "RangeDictionarySource"; }
protected:
Block getBlock(size_t start, size_t length) const override;
RangeDictionarySourceData<RangeType> data;
};
template <typename RangeType>
RangeDictionarySource<RangeType>::RangeDictionarySource(RangeDictionarySourceData<RangeType> data_, size_t max_block_size)
: DictionarySourceBase(data_.getBlock(0, 0), data_.getNumRows(), max_block_size)
, data(std::move(data_))
{
}
template <typename RangeType>
Block RangeDictionarySource<RangeType>::getBlock(size_t start, size_t length) const
{
return data.getBlock(start, length);
}
} }

View File

@ -298,10 +298,12 @@ void RangeHashedDictionary::createAttributes()
void RangeHashedDictionary::loadData() void RangeHashedDictionary::loadData()
{ {
auto stream = source_ptr->loadAll(); QueryPipeline pipeline;
stream->readPrefix(); pipeline.init(source_ptr->loadAll());
while (const auto block = stream->read()) PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
{ {
const auto & id_column = *block.safeGetByPosition(0).column; const auto & id_column = *block.safeGetByPosition(0).column;
@ -339,8 +341,6 @@ void RangeHashedDictionary::loadData()
} }
} }
stream->readSuffix();
if (require_nonempty && 0 == element_count) if (require_nonempty && 0 == element_count)
throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY, throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY,
"{}: dictionary source is empty and 'require_nonempty' property is set."); "{}: dictionary source is empty and 'require_nonempty' property is set.");
@ -594,29 +594,30 @@ void RangeHashedDictionary::getIdsAndDates(
template <typename RangeType> template <typename RangeType>
BlockInputStreamPtr RangeHashedDictionary::getBlockInputStreamImpl(const Names & column_names, size_t max_block_size) const Pipe RangeHashedDictionary::readImpl(const Names & column_names, size_t max_block_size) const
{ {
PaddedPODArray<UInt64> ids; PaddedPODArray<UInt64> ids;
PaddedPODArray<RangeType> start_dates; PaddedPODArray<RangeType> start_dates;
PaddedPODArray<RangeType> end_dates; PaddedPODArray<RangeType> end_dates;
getIdsAndDates(ids, start_dates, end_dates); getIdsAndDates(ids, start_dates, end_dates);
using BlockInputStreamType = RangeDictionaryBlockInputStream<RangeType>; using RangeDictionarySourceType = RangeDictionarySource<RangeType>;
auto stream = std::make_shared<BlockInputStreamType>( auto source = std::make_shared<RangeDictionarySourceType>(
shared_from_this(), RangeDictionarySourceData<RangeType>(
max_block_size, shared_from_this(),
column_names, column_names,
std::move(ids), std::move(ids),
std::move(start_dates), std::move(start_dates),
std::move(end_dates)); std::move(end_dates)),
max_block_size);
return stream; return Pipe(source);
} }
struct RangeHashedDictionaryCallGetBlockInputStreamImpl struct RangeHashedDictionaryCallGetSourceImpl
{ {
BlockInputStreamPtr stream; Pipe pipe;
const RangeHashedDictionary * dict; const RangeHashedDictionary * dict;
const Names * column_names; const Names * column_names;
size_t max_block_size; size_t max_block_size;
@ -625,28 +626,28 @@ struct RangeHashedDictionaryCallGetBlockInputStreamImpl
void operator()() void operator()()
{ {
const auto & type = dict->dict_struct.range_min->type; const auto & type = dict->dict_struct.range_min->type;
if (!stream && dynamic_cast<const DataTypeNumberBase<RangeType> *>(type.get())) if (pipe.empty() && dynamic_cast<const DataTypeNumberBase<RangeType> *>(type.get()))
stream = dict->getBlockInputStreamImpl<RangeType>(*column_names, max_block_size); pipe = dict->readImpl<RangeType>(*column_names, max_block_size);
} }
}; };
BlockInputStreamPtr RangeHashedDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const Pipe RangeHashedDictionary::read(const Names & column_names, size_t max_block_size) const
{ {
using ListType = TypeList<UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Int128, Float32, Float64>; using ListType = TypeList<UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Int128, Float32, Float64>;
RangeHashedDictionaryCallGetBlockInputStreamImpl callable; RangeHashedDictionaryCallGetSourceImpl callable;
callable.dict = this; callable.dict = this;
callable.column_names = &column_names; callable.column_names = &column_names;
callable.max_block_size = max_block_size; callable.max_block_size = max_block_size;
ListType::forEach(callable); ListType::forEach(callable);
if (!callable.stream) if (callable.pipe.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, throw Exception(ErrorCodes::LOGICAL_ERROR,
"Unexpected range type for RangeHashed dictionary: {}", "Unexpected range type for RangeHashed dictionary: {}",
dict_struct.range_min->type->getName()); dict_struct.range_min->type->getName());
return callable.stream; return std::move(callable.pipe);
} }

View File

@ -75,7 +75,7 @@ public:
using RangeStorageType = Int64; using RangeStorageType = Int64;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override; Pipe read(const Names & column_names, size_t max_block_size) const override;
struct Range struct Range
{ {
@ -178,9 +178,9 @@ private:
PaddedPODArray<RangeType> & end_dates) const; PaddedPODArray<RangeType> & end_dates) const;
template <typename RangeType> template <typename RangeType>
BlockInputStreamPtr getBlockInputStreamImpl(const Names & column_names, size_t max_block_size) const; Pipe readImpl(const Names & column_names, size_t max_block_size) const;
friend struct RangeHashedDictionaryCallGetBlockInputStreamImpl; friend struct RangeHashedDictionaryCallGetSourceImpl;
const DictionaryStructure dict_struct; const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr; const DictionarySourcePtr source_ptr;

View File

@ -29,18 +29,19 @@ namespace DB
} }
RedisBlockInputStream::RedisBlockInputStream( RedisSource::RedisSource(
const std::shared_ptr<Poco::Redis::Client> & client_, const std::shared_ptr<Poco::Redis::Client> & client_,
const RedisArray & keys_, const RedisArray & keys_,
const RedisStorageType & storage_type_, const RedisStorageType & storage_type_,
const DB::Block & sample_block, const DB::Block & sample_block,
const size_t max_block_size_) const size_t max_block_size_)
: client(client_), keys(keys_), storage_type(storage_type_), max_block_size{max_block_size_} : SourceWithProgress(sample_block)
, client(client_), keys(keys_), storage_type(storage_type_), max_block_size{max_block_size_}
{ {
description.init(sample_block); description.init(sample_block);
} }
RedisBlockInputStream::~RedisBlockInputStream() = default; RedisSource::~RedisSource() = default;
namespace namespace
@ -121,7 +122,7 @@ namespace DB
} }
Block RedisBlockInputStream::readImpl() Chunk RedisSource::generate()
{ {
if (keys.isNull() || description.sample_block.rows() == 0 || cursor >= keys.size()) if (keys.isNull() || description.sample_block.rows() == 0 || cursor >= keys.size())
all_read = true; all_read = true;
@ -218,6 +219,7 @@ namespace DB
cursor += need_values; cursor += need_values;
} }
return description.sample_block.cloneWithColumns(std::move(columns)); size_t num_rows = columns.at(0)->size();
return Chunk(std::move(columns), num_rows);
} }
} }

View File

@ -3,7 +3,7 @@
#include <Core/Block.h> #include <Core/Block.h>
#include <Core/ExternalResultDescription.h> #include <Core/ExternalResultDescription.h>
#include <DataStreams/IBlockInputStream.h> #include <Processors/Sources/SourceWithProgress.h>
#include <Poco/Redis/Array.h> #include <Poco/Redis/Array.h>
#include <Poco/Redis/Type.h> #include <Poco/Redis/Type.h>
#include "RedisDictionarySource.h" #include "RedisDictionarySource.h"
@ -19,27 +19,25 @@ namespace Poco
namespace DB namespace DB
{ {
class RedisBlockInputStream final : public IBlockInputStream class RedisSource final : public SourceWithProgress
{ {
public: public:
using RedisArray = Poco::Redis::Array; using RedisArray = Poco::Redis::Array;
using RedisBulkString = Poco::Redis::BulkString; using RedisBulkString = Poco::Redis::BulkString;
RedisBlockInputStream( RedisSource(
const std::shared_ptr<Poco::Redis::Client> & client_, const std::shared_ptr<Poco::Redis::Client> & client_,
const Poco::Redis::Array & keys_, const Poco::Redis::Array & keys_,
const RedisStorageType & storage_type_, const RedisStorageType & storage_type_,
const Block & sample_block, const Block & sample_block,
const size_t max_block_size); const size_t max_block_size);
~RedisBlockInputStream() override; ~RedisSource() override;
String getName() const override { return "Redis"; } String getName() const override { return "Redis"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private: private:
Block readImpl() override; Chunk generate() override;
std::shared_ptr<Poco::Redis::Client> client; std::shared_ptr<Poco::Redis::Client> client;
Poco::Redis::Array keys; Poco::Redis::Array keys;

View File

@ -159,7 +159,7 @@ namespace DB
__builtin_unreachable(); __builtin_unreachable();
} }
BlockInputStreamPtr RedisDictionarySource::loadAll() Pipe RedisDictionarySource::loadAll()
{ {
if (!client->isConnected()) if (!client->isConnected())
client->connect(host, port); client->connect(host, port);
@ -170,7 +170,7 @@ namespace DB
/// Get only keys for specified storage type. /// Get only keys for specified storage type.
auto all_keys = client->execute<RedisArray>(command_for_keys); auto all_keys = client->execute<RedisArray>(command_for_keys);
if (all_keys.isNull()) if (all_keys.isNull())
return std::make_shared<RedisBlockInputStream>(client, RedisArray{}, storage_type, sample_block, max_block_size); return Pipe(std::make_shared<RedisSource>(client, RedisArray{}, storage_type, sample_block, max_block_size));
RedisArray keys; RedisArray keys;
auto key_type = storageTypeToKeyType(storage_type); auto key_type = storageTypeToKeyType(storage_type);
@ -209,11 +209,11 @@ namespace DB
keys = std::move(hkeys); keys = std::move(hkeys);
} }
return std::make_shared<RedisBlockInputStream>(client, std::move(keys), storage_type, sample_block, max_block_size); return Pipe(std::make_shared<RedisSource>(client, std::move(keys), storage_type, sample_block, max_block_size));
} }
BlockInputStreamPtr RedisDictionarySource::loadIds(const std::vector<UInt64> & ids) Pipe RedisDictionarySource::loadIds(const std::vector<UInt64> & ids)
{ {
if (!client->isConnected()) if (!client->isConnected())
client->connect(host, port); client->connect(host, port);
@ -229,10 +229,10 @@ namespace DB
for (UInt64 id : ids) for (UInt64 id : ids)
keys << DB::toString(id); keys << DB::toString(id);
return std::make_shared<RedisBlockInputStream>(client, std::move(keys), storage_type, sample_block, max_block_size); return Pipe(std::make_shared<RedisSource>(client, std::move(keys), storage_type, sample_block, max_block_size));
} }
BlockInputStreamPtr RedisDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) Pipe RedisDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{ {
if (!client->isConnected()) if (!client->isConnected())
client->connect(host, port); client->connect(host, port);
@ -258,7 +258,7 @@ namespace DB
keys.add(key); keys.add(key);
} }
return std::make_shared<RedisBlockInputStream>(client, std::move(keys), storage_type, sample_block, max_block_size); return Pipe(std::make_shared<RedisSource>(client, std::move(keys), storage_type, sample_block, max_block_size));
} }

View File

@ -59,18 +59,18 @@ namespace ErrorCodes
~RedisDictionarySource() override; ~RedisDictionarySource() override;
BlockInputStreamPtr loadAll() override; Pipe loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override Pipe loadUpdatedAll() override
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for RedisDictionarySource"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for RedisDictionarySource");
} }
bool supportsSelectiveLoad() const override { return true; } bool supportsSelectiveLoad() const override { return true; }
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override; Pipe loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override; Pipe loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override { return true; } bool isModified() const override { return true; }

View File

@ -1,7 +1,7 @@
#include "XDBCDictionarySource.h" #include "XDBCDictionarySource.h"
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <DataStreams/IBlockInputStream.h> #include <Processors/Sources/SourceWithProgress.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Processors/Formats/InputStreamFromInputFormat.h> #include <Processors/Formats/InputStreamFromInputFormat.h>
@ -30,37 +30,6 @@ namespace ErrorCodes
namespace namespace
{ {
class XDBCBridgeBlockInputStream : public IBlockInputStream
{
public:
XDBCBridgeBlockInputStream(
const Poco::URI & uri,
std::function<void(std::ostream &)> callback,
const Block & sample_block,
ContextPtr context,
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
const String name_)
: name(name_)
{
read_buf = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, callback, timeouts);
auto format = FormatFactory::instance().getInput(IXDBCBridgeHelper::DEFAULT_FORMAT, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<InputStreamFromInputFormat>(format);
}
Block getHeader() const override { return reader->getHeader(); }
String getName() const override { return name; }
private:
Block readImpl() override { return reader->read(); }
String name;
std::unique_ptr<ReadWriteBufferFromHTTP> read_buf;
BlockInputStreamPtr reader;
};
ExternalQueryBuilder makeExternalQueryBuilder(const DictionaryStructure & dict_struct_, ExternalQueryBuilder makeExternalQueryBuilder(const DictionaryStructure & dict_struct_,
const std::string & db_, const std::string & db_,
const std::string & schema_, const std::string & schema_,
@ -155,14 +124,14 @@ std::string XDBCDictionarySource::getUpdateFieldAndDate()
} }
BlockInputStreamPtr XDBCDictionarySource::loadAll() Pipe XDBCDictionarySource::loadAll()
{ {
LOG_TRACE(log, load_all_query); LOG_TRACE(log, load_all_query);
return loadFromQuery(bridge_url, sample_block, load_all_query); return loadFromQuery(bridge_url, sample_block, load_all_query);
} }
BlockInputStreamPtr XDBCDictionarySource::loadUpdatedAll() Pipe XDBCDictionarySource::loadUpdatedAll()
{ {
std::string load_query_update = getUpdateFieldAndDate(); std::string load_query_update = getUpdateFieldAndDate();
@ -171,14 +140,14 @@ BlockInputStreamPtr XDBCDictionarySource::loadUpdatedAll()
} }
BlockInputStreamPtr XDBCDictionarySource::loadIds(const std::vector<UInt64> & ids) Pipe XDBCDictionarySource::loadIds(const std::vector<UInt64> & ids)
{ {
const auto query = query_builder.composeLoadIdsQuery(ids); const auto query = query_builder.composeLoadIdsQuery(ids);
return loadFromQuery(bridge_url, sample_block, query); return loadFromQuery(bridge_url, sample_block, query);
} }
BlockInputStreamPtr XDBCDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) Pipe XDBCDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{ {
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN); const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
return loadFromQuery(bridge_url, sample_block, query); return loadFromQuery(bridge_url, sample_block, query);
@ -236,11 +205,11 @@ std::string XDBCDictionarySource::doInvalidateQuery(const std::string & request)
for (const auto & [name, value] : url_params) for (const auto & [name, value] : url_params)
invalidate_url.addQueryParameter(name, value); invalidate_url.addQueryParameter(name, value);
return readInvalidateQuery(*loadFromQuery(invalidate_url, invalidate_sample_block, request)); return readInvalidateQuery(loadFromQuery(invalidate_url, invalidate_sample_block, request));
} }
BlockInputStreamPtr XDBCDictionarySource::loadFromQuery(const Poco::URI & url, const Block & required_sample_block, const std::string & query) const Pipe XDBCDictionarySource::loadFromQuery(const Poco::URI & url, const Block & required_sample_block, const std::string & query) const
{ {
bridge_helper->startBridgeSync(); bridge_helper->startBridgeSync();
@ -251,16 +220,12 @@ BlockInputStreamPtr XDBCDictionarySource::loadFromQuery(const Poco::URI & url, c
os << "query=" << escapeForFileName(query); os << "query=" << escapeForFileName(query);
}; };
return std::make_shared<XDBCBridgeBlockInputStream>( auto read_buf = std::make_unique<ReadWriteBufferFromHTTP>(url, Poco::Net::HTTPRequest::HTTP_POST, write_body_callback, timeouts);
url, auto format = FormatFactory::instance().getInput(IXDBCBridgeHelper::DEFAULT_FORMAT, *read_buf, sample_block, getContext(), max_block_size);
write_body_callback, format->addBuffer(std::move(read_buf));
required_sample_block,
getContext(),
max_block_size,
timeouts,
bridge_helper->getName() + "BlockInputStream");
}
return Pipe(std::move(format));
}
void registerDictionarySourceXDBC(DictionarySourceFactory & factory) void registerDictionarySourceXDBC(DictionarySourceFactory & factory)
{ {

View File

@ -49,13 +49,13 @@ public:
XDBCDictionarySource(const XDBCDictionarySource & other); XDBCDictionarySource(const XDBCDictionarySource & other);
XDBCDictionarySource & operator=(const XDBCDictionarySource &) = delete; XDBCDictionarySource & operator=(const XDBCDictionarySource &) = delete;
BlockInputStreamPtr loadAll() override; Pipe loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override; Pipe loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override; Pipe loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override; Pipe loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override; bool isModified() const override;
@ -73,7 +73,7 @@ private:
// execute invalidate_query. expects single cell in result // execute invalidate_query. expects single cell in result
std::string doInvalidateQuery(const std::string & request) const; std::string doInvalidateQuery(const std::string & request) const;
BlockInputStreamPtr loadFromQuery(const Poco::URI & url, const Block & required_sample_block, const std::string & query) const; Pipe loadFromQuery(const Poco::URI & url, const Block & required_sample_block, const std::string & query) const;
Poco::Logger * log; Poco::Logger * log;

View File

@ -40,7 +40,7 @@ StreamSettings::StreamSettings(const Settings & settings, bool auto_close_, bool
{ {
} }
MySQLBlockInputStream::Connection::Connection( MySQLSource::Connection::Connection(
const mysqlxx::PoolWithFailover::Entry & entry_, const mysqlxx::PoolWithFailover::Entry & entry_,
const std::string & query_str) const std::string & query_str)
: entry(entry_) : entry(entry_)
@ -50,12 +50,13 @@ MySQLBlockInputStream::Connection::Connection(
} }
/// Used in MaterializedMySQL and in doInvalidateQuery for dictionary source. /// Used in MaterializedMySQL and in doInvalidateQuery for dictionary source.
MySQLBlockInputStream::MySQLBlockInputStream( MySQLSource::MySQLSource(
const mysqlxx::PoolWithFailover::Entry & entry, const mysqlxx::PoolWithFailover::Entry & entry,
const std::string & query_str, const std::string & query_str,
const Block & sample_block, const Block & sample_block,
const StreamSettings & settings_) const StreamSettings & settings_)
: log(&Poco::Logger::get("MySQLBlockInputStream")) : SourceWithProgress(sample_block.cloneEmpty())
, log(&Poco::Logger::get("MySQLBlockInputStream"))
, connection{std::make_unique<Connection>(entry, query_str)} , connection{std::make_unique<Connection>(entry, query_str)}
, settings{std::make_unique<StreamSettings>(settings_)} , settings{std::make_unique<StreamSettings>(settings_)}
{ {
@ -64,26 +65,27 @@ MySQLBlockInputStream::MySQLBlockInputStream(
} }
/// For descendant MySQLWithFailoverBlockInputStream /// For descendant MySQLWithFailoverBlockInputStream
MySQLBlockInputStream::MySQLBlockInputStream(const Block &sample_block_, const StreamSettings & settings_) MySQLSource::MySQLSource(const Block &sample_block_, const StreamSettings & settings_)
: log(&Poco::Logger::get("MySQLBlockInputStream")) : SourceWithProgress(sample_block_.cloneEmpty())
, log(&Poco::Logger::get("MySQLBlockInputStream"))
, settings(std::make_unique<StreamSettings>(settings_)) , settings(std::make_unique<StreamSettings>(settings_))
{ {
description.init(sample_block_); description.init(sample_block_);
} }
/// Used by MySQL storage / table function and dictionary source. /// Used by MySQL storage / table function and dictionary source.
MySQLWithFailoverBlockInputStream::MySQLWithFailoverBlockInputStream( MySQLWithFailoverSource::MySQLWithFailoverSource(
mysqlxx::PoolWithFailoverPtr pool_, mysqlxx::PoolWithFailoverPtr pool_,
const std::string & query_str_, const std::string & query_str_,
const Block & sample_block_, const Block & sample_block_,
const StreamSettings & settings_) const StreamSettings & settings_)
: MySQLBlockInputStream(sample_block_, settings_) : MySQLSource(sample_block_, settings_)
, pool(pool_) , pool(pool_)
, query_str(query_str_) , query_str(query_str_)
{ {
} }
void MySQLWithFailoverBlockInputStream::readPrefix() void MySQLWithFailoverSource::onStart()
{ {
size_t count_connect_attempts = 0; size_t count_connect_attempts = 0;
@ -110,6 +112,18 @@ void MySQLWithFailoverBlockInputStream::readPrefix()
initPositionMappingFromQueryResultStructure(); initPositionMappingFromQueryResultStructure();
} }
Chunk MySQLWithFailoverSource::generate()
{
if (!is_initialized)
{
onStart();
is_initialized = true;
}
return MySQLSource::generate();
}
namespace namespace
{ {
using ValueType = ExternalResultDescription::ValueType; using ValueType = ExternalResultDescription::ValueType;
@ -213,7 +227,7 @@ namespace
} }
Block MySQLBlockInputStream::readImpl() Chunk MySQLSource::generate()
{ {
auto row = connection->result.fetch(); auto row = connection->result.fetch();
if (!row) if (!row)
@ -272,10 +286,10 @@ Block MySQLBlockInputStream::readImpl()
row = connection->result.fetch(); row = connection->result.fetch();
} }
return description.sample_block.cloneWithColumns(std::move(columns)); return Chunk(std::move(columns), num_rows);
} }
void MySQLBlockInputStream::initPositionMappingFromQueryResultStructure() void MySQLSource::initPositionMappingFromQueryResultStructure()
{ {
position_mapping.resize(description.sample_block.columns()); position_mapping.resize(description.sample_block.columns());

View File

@ -2,7 +2,7 @@
#include <string> #include <string>
#include <Core/Block.h> #include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h> #include <Processors/Sources/SourceWithProgress.h>
#include <mysqlxx/PoolWithFailover.h> #include <mysqlxx/PoolWithFailover.h>
#include <mysqlxx/Query.h> #include <mysqlxx/Query.h>
#include <Core/ExternalResultDescription.h> #include <Core/ExternalResultDescription.h>
@ -25,10 +25,10 @@ struct StreamSettings
}; };
/// Allows processing results of a MySQL query as a sequence of Blocks, simplifies chaining /// Allows processing results of a MySQL query as a sequence of Blocks, simplifies chaining
class MySQLBlockInputStream : public IBlockInputStream class MySQLSource : public SourceWithProgress
{ {
public: public:
MySQLBlockInputStream( MySQLSource(
const mysqlxx::PoolWithFailover::Entry & entry, const mysqlxx::PoolWithFailover::Entry & entry,
const std::string & query_str, const std::string & query_str,
const Block & sample_block, const Block & sample_block,
@ -36,11 +36,9 @@ public:
String getName() const override { return "MySQL"; } String getName() const override { return "MySQL"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
protected: protected:
MySQLBlockInputStream(const Block & sample_block_, const StreamSettings & settings); MySQLSource(const Block & sample_block_, const StreamSettings & settings);
Block readImpl() override; Chunk generate() override;
void initPositionMappingFromQueryResultStructure(); void initPositionMappingFromQueryResultStructure();
struct Connection struct Connection
@ -63,21 +61,24 @@ protected:
/// Like MySQLBlockInputStream, but allocates connection only when reading is starting. /// Like MySQLBlockInputStream, but allocates connection only when reading is starting.
/// It allows to create a lot of stream objects without occupation of all connection pool. /// It allows to create a lot of stream objects without occupation of all connection pool.
/// Also makes attempts to reconnect in case of connection failures. /// Also makes attempts to reconnect in case of connection failures.
class MySQLWithFailoverBlockInputStream final : public MySQLBlockInputStream class MySQLWithFailoverSource final : public MySQLSource
{ {
public: public:
MySQLWithFailoverBlockInputStream( MySQLWithFailoverSource(
mysqlxx::PoolWithFailoverPtr pool_, mysqlxx::PoolWithFailoverPtr pool_,
const std::string & query_str_, const std::string & query_str_,
const Block & sample_block_, const Block & sample_block_,
const StreamSettings & settings_); const StreamSettings & settings_);
Chunk generate() override;
private: private:
void readPrefix() override; void onStart();
mysqlxx::PoolWithFailoverPtr pool; mysqlxx::PoolWithFailoverPtr pool;
std::string query_str; std::string query_str;
bool is_initialized = false;
}; };
} }