Streams -> Processors for dicts, part 1.

This commit is contained in:
Nikolai Kochetov 2021-08-04 20:58:18 +03:00
parent 9d580732cc
commit 8d14f2ef8f
31 changed files with 407 additions and 370 deletions

View File

@ -13,6 +13,9 @@
#include <Dictionaries/DictionaryBlockInputStream.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipeline.h>
namespace ProfileEvents
{
extern const Event DictCacheKeysRequested;
@ -481,24 +484,28 @@ MutableColumns CacheDictionary<dictionary_key_type>::aggregateColumns(
}
template <DictionaryKeyType dictionary_key_type>
BlockInputStreamPtr CacheDictionary<dictionary_key_type>::getBlockInputStream(const Names & column_names, size_t max_block_size) const
Pipe CacheDictionary<dictionary_key_type>::read(const Names & column_names, size_t max_block_size) const
{
std::shared_ptr<DictionaryBlockInputStream> stream;
Pipe pipe;
{
/// Write lock on storage
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
stream = std::make_shared<DictionaryBlockInputStream>(shared_from_this(), max_block_size, cache_storage_ptr->getCachedSimpleKeys(), column_names);
pipe = Pipe(std::make_shared<DictionarySource>(
DictionarySourceData(shared_from_this(), cache_storage_ptr->getCachedSimpleKeys(), column_names),
max_block_size));
else
{
auto keys = cache_storage_ptr->getCachedComplexKeys();
stream = std::make_shared<DictionaryBlockInputStream>(shared_from_this(), max_block_size, keys, column_names);
pipe = Pipe(std::make_shared<DictionarySource>(
DictionarySourceData(shared_from_this(), keys, column_names),
max_block_size));
}
}
return stream;
return pipe;
}
template <DictionaryKeyType dictionary_key_type>
@ -567,21 +574,21 @@ void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<d
auto current_source_ptr = getSourceAndUpdateIfNeeded();
Stopwatch watch;
BlockInputStreamPtr stream;
QueryPipeline pipeline;
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
stream = current_source_ptr->loadIds(requested_keys_vector);
pipeline.init(current_source_ptr->loadIds(requested_keys_vector));
else
stream = current_source_ptr->loadKeys(update_unit_ptr->key_columns, requested_complex_key_rows);
stream->readPrefix();
pipeline.init(current_source_ptr->loadKeys(update_unit_ptr->key_columns, requested_complex_key_rows));
size_t skip_keys_size_offset = dict_struct.getKeysSize();
PaddedPODArray<KeyType> found_keys_in_source;
Columns fetched_columns_during_update = fetch_request.makeAttributesResultColumnsNonMutable();
while (Block block = stream->read())
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
{
Columns key_columns;
key_columns.reserve(skip_keys_size_offset);
@ -625,8 +632,6 @@ void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<d
for (const auto & fetched_column : fetched_columns_during_update)
update_unit_ptr_mutable_columns.emplace_back(fetched_column->assumeMutable());
stream->readSuffix();
{
/// Lock for cache modification
ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
@ -686,4 +691,4 @@ void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<d
template class CacheDictionary<DictionaryKeyType::simple>;
template class CacheDictionary<DictionaryKeyType::complex>;
}
}

View File

@ -137,7 +137,7 @@ public:
ColumnUInt8::Ptr hasKeys(const Columns & key_columns, const DataTypes & key_types) const override;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
Pipe read(const Names & column_names, size_t max_block_size) const override;
std::exception_ptr getLastException() const override;

View File

@ -22,12 +22,13 @@ namespace ErrorCodes
extern const int UNKNOWN_TYPE;
}
CassandraBlockInputStream::CassandraBlockInputStream(
CassandraSource::CassandraSource(
const CassSessionShared & session_,
const String & query_str,
const Block & sample_block,
size_t max_block_size_)
: session(session_)
: SourceWithProgress(sample_block)
, session(session_)
, statement(query_str.c_str(), /*parameters count*/ 0)
, max_block_size(max_block_size_)
, has_more_pages(cass_true)
@ -36,7 +37,7 @@ CassandraBlockInputStream::CassandraBlockInputStream(
cassandraCheck(cass_statement_set_paging_size(statement, max_block_size));
}
void CassandraBlockInputStream::insertValue(IColumn & column, ValueType type, const CassValue * cass_value)
void CassandraSource::insertValue(IColumn & column, ValueType type, const CassValue * cass_value)
{
switch (type)
{
@ -148,13 +149,15 @@ void CassandraBlockInputStream::insertValue(IColumn & column, ValueType type, co
}
}
void CassandraBlockInputStream::readPrefix()
{
result_future = cass_session_execute(*session, statement);
}
Block CassandraBlockInputStream::readImpl()
Chunk CassandraSource::generate()
{
if (!is_initialized)
{
result_future = cass_session_execute(*session, statement);
is_initialized = true;
}
if (!has_more_pages)
return {};
@ -194,12 +197,13 @@ Block CassandraBlockInputStream::readImpl()
}
}
assert(cass_result_row_count(result) == columns.front()->size());
size_t num_rows = columns.front()->size();
assert(cass_result_row_count(result) == num_rows);
return description.sample_block.cloneWithColumns(std::move(columns));
return Chunk(std::move(columns), num_rows);
}
void CassandraBlockInputStream::assertTypes(const CassResultPtr & result)
void CassandraSource::assertTypes(const CassResultPtr & result)
{
if (!assert_types)
return;

View File

@ -4,17 +4,17 @@
#if USE_CASSANDRA
#include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Core/ExternalResultDescription.h>
namespace DB
{
class CassandraBlockInputStream final : public IBlockInputStream
class CassandraSource final : public SourceWithProgress
{
public:
CassandraBlockInputStream(
CassandraSource(
const CassSessionShared & session_,
const String & query_str,
const Block & sample_block,
@ -24,12 +24,11 @@ public:
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
void readPrefix() override;
private:
using ValueType = ExternalResultDescription::ValueType;
Block readImpl() override;
Chunk generate() override;
static void insertValue(IColumn & column, ValueType type, const CassValue * cass_value);
void assertTypes(const CassResultPtr & result);
@ -40,6 +39,7 @@ private:
ExternalResultDescription description;
cass_bool_t has_more_pages;
bool assert_types = true;
bool is_initialized = false;
};
}

View File

@ -40,7 +40,6 @@ void registerDictionarySourceCassandra(DictionarySourceFactory & factory)
#include <Common/SipHash.h>
#include "CassandraBlockInputStream.h"
#include <common/logger_useful.h>
#include <DataStreams/UnionBlockInputStream.h>
namespace DB
{
@ -132,12 +131,12 @@ void CassandraDictionarySource::maybeAllowFiltering(String & query) const
query += " ALLOW FILTERING;";
}
BlockInputStreamPtr CassandraDictionarySource::loadAll()
Pipe CassandraDictionarySource::loadAll()
{
String query = query_builder.composeLoadAllQuery();
maybeAllowFiltering(query);
LOG_INFO(log, "Loading all using query: {}", query);
return std::make_shared<CassandraBlockInputStream>(getSession(), query, sample_block, max_block_size);
return Pipe(std::make_shared<CassandraSource>(getSession(), query, sample_block, max_block_size));
}
std::string CassandraDictionarySource::toString() const
@ -145,15 +144,15 @@ std::string CassandraDictionarySource::toString() const
return "Cassandra: " + settings.db + '.' + settings.table;
}
BlockInputStreamPtr CassandraDictionarySource::loadIds(const std::vector<UInt64> & ids)
Pipe CassandraDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
String query = query_builder.composeLoadIdsQuery(ids);
maybeAllowFiltering(query);
LOG_INFO(log, "Loading ids using query: {}", query);
return std::make_shared<CassandraBlockInputStream>(getSession(), query, sample_block, max_block_size);
return Pipe(std::make_shared<CassandraSource>(getSession(), query, sample_block, max_block_size));
}
BlockInputStreamPtr CassandraDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
Pipe CassandraDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
if (requested_rows.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No rows requested");
@ -168,22 +167,19 @@ BlockInputStreamPtr CassandraDictionarySource::loadKeys(const Columns & key_colu
partitions[partition_key.get64()].push_back(row);
}
BlockInputStreams streams;
Pipes pipes;
for (const auto & partition : partitions)
{
String query = query_builder.composeLoadKeysQuery(key_columns, partition.second, ExternalQueryBuilder::CASSANDRA_SEPARATE_PARTITION_KEY, settings.partition_key_prefix);
maybeAllowFiltering(query);
LOG_INFO(log, "Loading keys for partition hash {} using query: {}", partition.first, query);
streams.push_back(std::make_shared<CassandraBlockInputStream>(getSession(), query, sample_block, max_block_size));
pipes.push_back(Pipe(std::make_shared<CassandraSource>(getSession(), query, sample_block, max_block_size)));
}
if (streams.size() == 1)
return streams.front();
return std::make_shared<UnionBlockInputStream>(streams, nullptr, settings.max_threads);
return Pipe::unitePipes(std::move(pipes));
}
BlockInputStreamPtr CassandraDictionarySource::loadUpdatedAll()
Pipe CassandraDictionarySource::loadUpdatedAll()
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for CassandraDictionarySource");
}

View File

@ -49,7 +49,7 @@ public:
const String & config_prefix,
Block & sample_block);
BlockInputStreamPtr loadAll() override;
Pipe loadAll() override;
bool supportsSelectiveLoad() const override { return true; }
@ -62,11 +62,11 @@ public:
return std::make_unique<CassandraDictionarySource>(dict_struct, settings, sample_block);
}
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
Pipe loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
Pipe loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
BlockInputStreamPtr loadUpdatedAll() override;
Pipe loadUpdatedAll() override;
String toString() const override;

View File

@ -1,8 +1,11 @@
#include "ClickHouseDictionarySource.h"
#include <memory>
#include <Client/ConnectionPool.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <Processors/Sources/RemoteSource.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ExpressionActions.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/executeQuery.h>
#include <Common/isLocalAddress.h>
@ -105,29 +108,29 @@ std::string ClickHouseDictionarySource::getUpdateFieldAndDate()
}
}
BlockInputStreamPtr ClickHouseDictionarySource::loadAllWithSizeHint(std::atomic<size_t> * result_size_hint)
Pipe ClickHouseDictionarySource::loadAllWithSizeHint(std::atomic<size_t> * result_size_hint)
{
return createStreamForQuery(load_all_query, result_size_hint);
}
BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
Pipe ClickHouseDictionarySource::loadAll()
{
return createStreamForQuery(load_all_query);
}
BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll()
Pipe ClickHouseDictionarySource::loadUpdatedAll()
{
String load_update_query = getUpdateFieldAndDate();
return createStreamForQuery(load_update_query);
}
BlockInputStreamPtr ClickHouseDictionarySource::loadIds(const std::vector<UInt64> & ids)
Pipe ClickHouseDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
return createStreamForQuery(query_builder.composeLoadIdsQuery(ids));
}
BlockInputStreamPtr ClickHouseDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
Pipe ClickHouseDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
String query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::IN_WITH_TUPLES);
return createStreamForQuery(query);
@ -157,32 +160,41 @@ std::string ClickHouseDictionarySource::toString() const
return "ClickHouse: " + configuration.db + '.' + configuration.table + (where.empty() ? "" : ", where: " + where);
}
BlockInputStreamPtr ClickHouseDictionarySource::createStreamForQuery(const String & query, std::atomic<size_t> * result_size_hint)
Pipe ClickHouseDictionarySource::createStreamForQuery(const String & query, std::atomic<size_t> * result_size_hint)
{
BlockInputStreamPtr stream;
QueryPipeline pipeline;
/// Sample block should not contain first row default values
auto empty_sample_block = sample_block.cloneEmpty();
if (configuration.is_local)
{
stream = executeQuery(query, context, true).getInputStream();
stream = std::make_shared<ConvertingBlockInputStream>(stream, empty_sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
pipeline = executeQuery(query, context, true).pipeline;
auto converting = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
empty_sample_block.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, std::make_shared<ExpressionActions>(converting));
});
}
else
{
stream = std::make_shared<RemoteBlockInputStream>(pool, query, empty_sample_block, context);
pipeline.init(Pipe(std::make_shared<RemoteSource>(
std::make_shared<RemoteQueryExecutor>(pool, query, empty_sample_block, context), false, false)));
}
if (result_size_hint)
{
stream->setProgressCallback([result_size_hint](const Progress & progress)
pipeline.setProgressCallback([result_size_hint](const Progress & progress)
{
*result_size_hint += progress.total_rows_to_read;
});
}
return stream;
return QueryPipeline::getPipe(std::move(pipeline));
}
std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & request) const
@ -191,15 +203,16 @@ std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & re
if (configuration.is_local)
{
auto query_context = Context::createCopy(context);
auto input_block = executeQuery(request, query_context, true).getInputStream();
return readInvalidateQuery(*input_block);
auto pipe = QueryPipeline::getPipe(executeQuery(request, query_context, true).pipeline);
return readInvalidateQuery(std::move(pipe));
}
else
{
/// We pass empty block to RemoteBlockInputStream, because we don't know the structure of the result.
Block invalidate_sample_block;
RemoteBlockInputStream invalidate_stream(pool, request, invalidate_sample_block, context);
return readInvalidateQuery(invalidate_stream);
Pipe pipe(std::make_shared<RemoteSource>(
std::make_shared<RemoteQueryExecutor>(pool, request, invalidate_sample_block, context), false, false));
return readInvalidateQuery(std::move(pipe));
}
}

View File

@ -44,15 +44,15 @@ public:
ClickHouseDictionarySource(const ClickHouseDictionarySource & other);
ClickHouseDictionarySource & operator=(const ClickHouseDictionarySource &) = delete;
BlockInputStreamPtr loadAllWithSizeHint(std::atomic<size_t> * result_size_hint) override;
Pipe loadAllWithSizeHint(std::atomic<size_t> * result_size_hint) override;
BlockInputStreamPtr loadAll() override;
Pipe loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override;
Pipe loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
Pipe loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
Pipe loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override;
bool supportsSelectiveLoad() const override { return true; }
@ -70,7 +70,7 @@ public:
private:
std::string getUpdateFieldAndDate();
BlockInputStreamPtr createStreamForQuery(const String & query, std::atomic<size_t> * result_size_hint = nullptr);
Pipe createStreamForQuery(const String & query, std::atomic<size_t> * result_size_hint = nullptr);
std::string doInvalidateQuery(const std::string & request) const;

View File

@ -8,9 +8,9 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
DictionaryBlockInputStream::DictionaryBlockInputStream(
std::shared_ptr<const IDictionary> dictionary_, UInt64 max_block_size_, PaddedPODArray<UInt64> && ids_, const Names & column_names_)
: DictionaryBlockInputStreamBase(ids_.size(), max_block_size_)
DictionarySourceData::DictionarySourceData(
std::shared_ptr<const IDictionary> dictionary_, PaddedPODArray<UInt64> && ids_, const Names & column_names_)
: num_rows(ids_.size())
, dictionary(dictionary_)
, column_names(column_names_)
, ids(std::move(ids_))
@ -18,12 +18,11 @@ DictionaryBlockInputStream::DictionaryBlockInputStream(
{
}
DictionaryBlockInputStream::DictionaryBlockInputStream(
DictionarySourceData::DictionarySourceData(
std::shared_ptr<const IDictionary> dictionary_,
UInt64 max_block_size_,
const PaddedPODArray<StringRef> & keys,
const Names & column_names_)
: DictionaryBlockInputStreamBase(keys.size(), max_block_size_)
: num_rows(keys.size())
, dictionary(dictionary_)
, column_names(column_names_)
, key_type(DictionaryInputStreamKeyType::ComplexKey)
@ -32,14 +31,13 @@ DictionaryBlockInputStream::DictionaryBlockInputStream(
fillKeyColumns(keys, 0, keys.size(), dictionary_structure, key_columns);
}
DictionaryBlockInputStream::DictionaryBlockInputStream(
DictionarySourceData::DictionarySourceData(
std::shared_ptr<const IDictionary> dictionary_,
UInt64 max_block_size_,
const Columns & data_columns_,
const Names & column_names_,
GetColumnsFunction && get_key_columns_function_,
GetColumnsFunction && get_view_columns_function_)
: DictionaryBlockInputStreamBase(data_columns_.front()->size(), max_block_size_)
: num_rows(data_columns_.front()->size())
, dictionary(dictionary_)
, column_names(column_names_)
, data_columns(data_columns_)
@ -49,7 +47,7 @@ DictionaryBlockInputStream::DictionaryBlockInputStream(
{
}
Block DictionaryBlockInputStream::getBlock(size_t start, size_t length) const
Block DictionarySourceData::getBlock(size_t start, size_t length) const
{
/// TODO: Rewrite
switch (key_type)
@ -98,7 +96,7 @@ Block DictionaryBlockInputStream::getBlock(size_t start, size_t length) const
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected DictionaryInputStreamKeyType.");
}
Block DictionaryBlockInputStream::fillBlock(
Block DictionarySourceData::fillBlock(
const PaddedPODArray<UInt64> & ids_to_fill,
const Columns & keys,
const DataTypes & types,
@ -161,14 +159,14 @@ Block DictionaryBlockInputStream::fillBlock(
return Block(block_columns);
}
ColumnPtr DictionaryBlockInputStream::getColumnFromIds(const PaddedPODArray<UInt64> & ids_to_fill)
ColumnPtr DictionarySourceData::getColumnFromIds(const PaddedPODArray<UInt64> & ids_to_fill)
{
auto column_vector = ColumnVector<UInt64>::create();
column_vector->getData().assign(ids_to_fill);
return column_vector;
}
void DictionaryBlockInputStream::fillKeyColumns(
void DictionarySourceData::fillKeyColumns(
const PaddedPODArray<StringRef> & keys,
size_t start,
size_t size,

View File

@ -20,18 +20,16 @@ namespace DB
/* BlockInputStream implementation for external dictionaries
* read() returns blocks consisting of the in-memory contents of the dictionaries
*/
class DictionaryBlockInputStream : public DictionaryBlockInputStreamBase
class DictionarySourceData
{
public:
DictionaryBlockInputStream(
DictionarySourceData(
std::shared_ptr<const IDictionary> dictionary,
UInt64 max_block_size,
PaddedPODArray<UInt64> && ids,
const Names & column_names);
DictionaryBlockInputStream(
DictionarySourceData(
std::shared_ptr<const IDictionary> dictionary,
UInt64 max_block_size,
const PaddedPODArray<StringRef> & keys,
const Names & column_names);
@ -41,18 +39,15 @@ public:
// Calls get_key_columns_function to get key column for dictionary get function call
// and get_view_columns_function to get key representation.
// Now used in trie dictionary, where columns are stored as ip and mask, and are showed as string
DictionaryBlockInputStream(
DictionarySourceData(
std::shared_ptr<const IDictionary> dictionary,
UInt64 max_block_size,
const Columns & data_columns,
const Names & column_names,
GetColumnsFunction && get_key_columns_function,
GetColumnsFunction && get_view_columns_function);
String getName() const override { return "Dictionary"; }
protected:
Block getBlock(size_t start, size_t length) const override;
Block getBlock(size_t start, size_t length) const;
size_t getNumRows() const { return num_rows; }
private:
Block fillBlock(
@ -70,6 +65,7 @@ private:
const DictionaryStructure & dictionary_structure,
ColumnsWithTypeAndName & result);
const size_t num_rows;
std::shared_ptr<const IDictionary> dictionary;
Names column_names;
PaddedPODArray<UInt64> ids;
@ -89,4 +85,18 @@ private:
DictionaryInputStreamKeyType key_type;
};
class DictionarySource final : public DictionarySourceBase
{
public:
DictionarySource(DictionarySourceData data_, UInt64 max_block_size)
: DictionarySourceBase(data_.getBlock(0, 0), data_.getNumRows(), max_block_size)
, data(std::move(data_))
{}
String getName() const override { return "DictionarySource"; }
Block getBlock(size_t start, size_t length) const override { return data.getBlock(start, length); }
DictionarySourceData data;
};
}

View File

@ -2,25 +2,20 @@
namespace DB
{
DictionaryBlockInputStreamBase::DictionaryBlockInputStreamBase(size_t rows_count_, size_t max_block_size_)
: rows_count(rows_count_), max_block_size(max_block_size_)
DictionarySourceBase::DictionarySourceBase(const Block & header, size_t rows_count_, size_t max_block_size_)
: SourceWithProgress(header), rows_count(rows_count_), max_block_size(max_block_size_)
{
}
Block DictionaryBlockInputStreamBase::readImpl()
Chunk DictionarySourceBase::generate()
{
if (next_row == rows_count)
return Block();
return {};
size_t block_size = std::min(max_block_size, rows_count - next_row);
Block block = getBlock(next_row, block_size);
next_row += block_size;
return block;
}
Block DictionaryBlockInputStreamBase::getHeader() const
{
return getBlock(0, 0);
size_t size = std::min(max_block_size, rows_count - next_row);
auto chunk = getChunk(next_row, size);
next_row += size;
return chunk;
}
}

View File

@ -1,24 +1,22 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Sources/SourceWithProgress.h>
namespace DB
{
class DictionaryBlockInputStreamBase : public IBlockInputStream
class DictionarySourceBase : public SourceWithProgress
{
protected:
DictionaryBlockInputStreamBase(size_t rows_count_, size_t max_block_size_);
DictionarySourceBase(const Block & header, size_t rows_count_, size_t max_block_size_);
virtual Block getBlock(size_t start, size_t length) const = 0;
Block getHeader() const override;
private:
const size_t rows_count;
const size_t max_block_size;
size_t next_row = 0;
Block readImpl() override;
Chunk generate() override;
};
}

View File

@ -15,7 +15,8 @@
#include <Core/Block.h>
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/DictionaryStructure.h>
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipeline.h>
namespace DB
@ -501,10 +502,10 @@ private:
* Note: readPrefix readImpl readSuffix will be called on stream object during function execution.
*/
template <DictionaryKeyType dictionary_key_type>
void mergeBlockWithStream(
void mergeBlockWithPipe(
size_t key_columns_size,
Block & block_to_update,
BlockInputStreamPtr & stream)
Pipe pipe)
{
using KeyType = std::conditional_t<dictionary_key_type == DictionaryKeyType::simple, UInt64, StringRef>;
static_assert(dictionary_key_type != DictionaryKeyType::range, "Range key type is not supported by updatePreviousyLoadedBlockWithStream");
@ -555,9 +556,13 @@ void mergeBlockWithStream(
auto result_fetched_columns = block_to_update.cloneEmptyColumns();
stream->readPrefix();
QueryPipeline pipeline;
pipeline.init(std::move(pipe));
while (Block block = stream->read())
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
{
Columns block_key_columns;
block_key_columns.reserve(key_columns_size);
@ -591,8 +596,6 @@ void mergeBlockWithStream(
}
}
stream->readSuffix();
size_t result_fetched_rows = result_fetched_columns.front()->size();
size_t filter_hint = filter.size() - indexes_to_remove_count;
@ -645,4 +648,4 @@ static const PaddedPODArray<T> & getColumnVectorData(
}
}
}
}

View File

@ -85,62 +85,44 @@ ContextMutablePtr copyContextAndApplySettings(
return local_context;
}
BlockInputStreamWithAdditionalColumns::BlockInputStreamWithAdditionalColumns(
Block block_to_add_, std::unique_ptr<IBlockInputStream> && stream_)
: block_to_add(std::move(block_to_add_))
, stream(std::move(stream_))
static Block transformHeader(Block header, Block block_to_add)
{
}
Block BlockInputStreamWithAdditionalColumns::getHeader() const
{
auto header = stream->getHeader();
if (header)
{
for (Int64 i = static_cast<Int64>(block_to_add.columns() - 1); i >= 0; --i)
header.insert(0, block_to_add.getByPosition(i).cloneEmpty());
}
for (Int64 i = static_cast<Int64>(block_to_add.columns() - 1); i >= 0; --i)
header.insert(0, block_to_add.getByPosition(i).cloneEmpty());
return header;
}
Block BlockInputStreamWithAdditionalColumns::readImpl()
TransformWithAdditionalColumns::TransformWithAdditionalColumns(
Block block_to_add_, const Block & header)
: ISimpleTransform(header, transformHeader(header, block_to_add_), true)
, block_to_add(std::move(block_to_add_))
{
auto block = stream->read();
}
if (block)
void TransformWithAdditionalColumns::transform(Chunk & chunk)
{
if (chunk)
{
auto block_rows = block.rows();
auto num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
auto cut_block = block_to_add.cloneWithCutColumns(current_range_index, block_rows);
auto cut_block = block_to_add.cloneWithCutColumns(current_range_index, num_rows);
if (cut_block.rows() != block_rows)
if (cut_block.rows() != num_rows)
throw Exception(ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH,
"Number of rows in block to add after cut must equal to number of rows in block from inner stream");
for (Int64 i = static_cast<Int64>(cut_block.columns() - 1); i >= 0; --i)
block.insert(0, cut_block.getByPosition(i));
columns.insert(columns.begin(), cut_block.getByPosition(i).column);
current_range_index += block_rows;
current_range_index += num_rows;
chunk.setColumns(std::move(columns), num_rows);
}
return block;
}
void BlockInputStreamWithAdditionalColumns::readPrefix()
String TransformWithAdditionalColumns::getName() const
{
stream->readPrefix();
}
void BlockInputStreamWithAdditionalColumns::readSuffix()
{
stream->readSuffix();
}
String BlockInputStreamWithAdditionalColumns::getName() const
{
return "BlockInputStreamWithAdditionalColumns";
}
return "TransformWithAdditionalColumns";
}
}

View File

@ -4,7 +4,7 @@
#include <common/types.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <DataStreams/IBlockInputStream.h>
#include <Processors/ISimpleTransform.h>
#include <Columns/IColumn.h>
#include <Core/Block.h>
#include <Interpreters/Context_fwd.h>
@ -38,25 +38,18 @@ ContextMutablePtr copyContextAndApplySettings(
*
* block_to_add rows size must be equal to final sum rows size of all inner stream blocks.
*/
class BlockInputStreamWithAdditionalColumns final : public IBlockInputStream
class TransformWithAdditionalColumns final : public ISimpleTransform
{
public:
BlockInputStreamWithAdditionalColumns(Block block_to_add_, std::unique_ptr<IBlockInputStream> && stream_);
TransformWithAdditionalColumns(Block block_to_add_, const Block & header);
Block getHeader() const override;
Block readImpl() override;
void readPrefix() override;
void readSuffix() override;
void transform(Chunk & chunk) override;
String getName() const override;
private:
Block block_to_add;
std::unique_ptr<IBlockInputStream> stream;
size_t current_range_index = 0;
};
}
}

View File

@ -8,6 +8,8 @@
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
namespace DB
{
@ -66,11 +68,13 @@ Columns DirectDictionary<dictionary_key_type>::getColumns(
size_t dictionary_keys_size = dict_struct.getKeysNames().size();
block_key_columns.reserve(dictionary_keys_size);
BlockInputStreamPtr stream = getSourceBlockInputStream(key_columns, requested_keys);
QueryPipeline pipeline;
pipeline.init(getSourceBlockInputStream(key_columns, requested_keys));
stream->readPrefix();
PullingPipelineExecutor executor(pipeline);
while (const auto block = stream->read())
Block block;
while (executor.pull(block))
{
/// Split into keys columns and attribute columns
for (size_t i = 0; i < dictionary_keys_size; ++i)
@ -98,8 +102,6 @@ Columns DirectDictionary<dictionary_key_type>::getColumns(
block_key_columns.clear();
}
stream->readSuffix();
Field value_to_insert;
size_t requested_keys_size = requested_keys.size();
@ -183,13 +185,14 @@ ColumnUInt8::Ptr DirectDictionary<dictionary_key_type>::hasKeys(
size_t dictionary_keys_size = dict_struct.getKeysNames().size();
block_key_columns.reserve(dictionary_keys_size);
BlockInputStreamPtr stream = getSourceBlockInputStream(key_columns, requested_keys);
QueryPipeline pipeline;
pipeline.init(getSourceBlockInputStream(key_columns, requested_keys));
stream->readPrefix();
PullingPipelineExecutor executor(pipeline);
size_t keys_found = 0;
while (const auto block = stream->read())
Block block;
while (executor.pull(block))
{
/// Split into keys columns and attribute columns
for (size_t i = 0; i < dictionary_keys_size; ++i)
@ -216,8 +219,6 @@ ColumnUInt8::Ptr DirectDictionary<dictionary_key_type>::hasKeys(
block_key_columns.clear();
}
stream->readSuffix();
query_count.fetch_add(requested_keys_size, std::memory_order_relaxed);
found_count.fetch_add(keys_found, std::memory_order_relaxed);
@ -260,13 +261,13 @@ ColumnUInt8::Ptr DirectDictionary<dictionary_key_type>::isInHierarchy(
}
template <DictionaryKeyType dictionary_key_type>
BlockInputStreamPtr DirectDictionary<dictionary_key_type>::getSourceBlockInputStream(
Pipe DirectDictionary<dictionary_key_type>::getSourceBlockInputStream(
const Columns & key_columns [[maybe_unused]],
const PaddedPODArray<KeyType> & requested_keys [[maybe_unused]]) const
{
size_t requested_keys_size = requested_keys.size();
BlockInputStreamPtr stream;
Pipe pipe;
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
{
@ -276,7 +277,7 @@ BlockInputStreamPtr DirectDictionary<dictionary_key_type>::getSourceBlockInputSt
for (auto key : requested_keys)
ids.emplace_back(key);
stream = source_ptr->loadIds(ids);
pipe = source_ptr->loadIds(ids);
}
else
{
@ -285,14 +286,14 @@ BlockInputStreamPtr DirectDictionary<dictionary_key_type>::getSourceBlockInputSt
for (size_t i = 0; i < requested_keys_size; ++i)
requested_rows.emplace_back(i);
stream = source_ptr->loadKeys(key_columns, requested_rows);
pipe = source_ptr->loadKeys(key_columns, requested_rows);
}
return stream;
return pipe;
}
template <DictionaryKeyType dictionary_key_type>
BlockInputStreamPtr DirectDictionary<dictionary_key_type>::getBlockInputStream(const Names & /* column_names */, size_t /* max_block_size */) const
Pipe DirectDictionary<dictionary_key_type>::read(const Names & /* column_names */, size_t /* max_block_size */) const
{
return source_ptr->loadAll();
}
@ -353,4 +354,4 @@ void registerDictionaryDirect(DictionaryFactory & factory)
}
}
}

View File

@ -97,10 +97,10 @@ public:
ColumnPtr in_key_column,
const DataTypePtr & key_type) const override;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
Pipe read(const Names & column_names, size_t max_block_size) const override;
private:
BlockInputStreamPtr getSourceBlockInputStream(const Columns & key_columns, const PaddedPODArray<KeyType> & requested_keys) const;
Pipe getSourceBlockInputStream(const Columns & key_columns, const PaddedPODArray<KeyType> & requested_keys) const;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
@ -113,4 +113,4 @@ private:
extern template class DirectDictionary<DictionaryKeyType::simple>;
extern template class DirectDictionary<DictionaryKeyType::complex>;
}
}

View File

@ -2,13 +2,17 @@
#include <functional>
#include <common/scope_guard.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <DataStreams/formatBlock.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Formats/IInputFormat.h>
#include <Interpreters/Context.h>
#include <Formats/FormatFactory.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/copyData.h>
#include <Common/ShellCommand.h>
#include <Common/ThreadPool.h>
#include <common/logger_useful.h>
@ -34,26 +38,34 @@ namespace ErrorCodes
namespace
{
/// Owns ShellCommand and calls wait for it.
class ShellCommandOwningBlockInputStream : public OwningBlockInputStream<ShellCommand>
class ShellCommandOwningTransform final : public ISimpleTransform
{
private:
Poco::Logger * log;
std::unique_ptr<ShellCommand> command;
public:
ShellCommandOwningBlockInputStream(Poco::Logger * log_, const BlockInputStreamPtr & impl, std::unique_ptr<ShellCommand> command_)
: OwningBlockInputStream(std::move(impl), std::move(command_)), log(log_)
ShellCommandOwningTransform(const Block & header, Poco::Logger * log_, std::unique_ptr<ShellCommand> command_)
: ISimpleTransform(header, header, true), log(log_), command(std::move(command_))
{
}
void readSuffix() override
String getName() const override { return "ShellCommandOwningTransform"; }
void transform(Chunk &) override {}
Status prepare() override
{
OwningBlockInputStream<ShellCommand>::readSuffix();
auto status = ISimpleTransform::prepare();
if (status == Status::Finished)
{
std::string err;
readStringUntilEOF(err, command->err);
if (!err.empty())
LOG_ERROR(log, "Having stderr: {}", err);
std::string err;
readStringUntilEOF(err, own->err);
if (!err.empty())
LOG_ERROR(log, "Having stderr: {}", err);
command->wait();
}
own->wait();
return status;
}
};
@ -94,18 +106,19 @@ ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionar
{
}
BlockInputStreamPtr ExecutableDictionarySource::loadAll()
Pipe ExecutableDictionarySource::loadAll()
{
if (configuration.implicit_key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadAll method");
LOG_TRACE(log, "loadAll {}", toString());
auto process = ShellCommand::execute(configuration.command);
auto input_stream = context->getInputFormat(configuration.format, process->out, sample_block, max_block_size);
return std::make_shared<ShellCommandOwningBlockInputStream>(log, input_stream, std::move(process));
Pipe pipe(FormatFactory::instance().getInput(configuration.format, process->out, sample_block, context, max_block_size));
pipe.addTransform(std::make_shared<ShellCommandOwningTransform>(pipe.getHeader(), log, std::move(process)));
return pipe;
}
BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll()
Pipe ExecutableDictionarySource::loadUpdatedAll()
{
if (configuration.implicit_key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadUpdatedAll method");
@ -119,81 +132,86 @@ BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll()
LOG_TRACE(log, "loadUpdatedAll {}", command_with_update_field);
auto process = ShellCommand::execute(command_with_update_field);
auto input_stream = context->getInputFormat(configuration.format, process->out, sample_block, max_block_size);
return std::make_shared<ShellCommandOwningBlockInputStream>(log, input_stream, std::move(process));
Pipe pipe(FormatFactory::instance().getInput(configuration.format, process->out, sample_block, context, max_block_size));
pipe.addTransform(std::make_shared<ShellCommandOwningTransform>(pipe.getHeader(), log, std::move(process)));
return pipe;
}
namespace
{
/** A stream, that runs child process and sends data to its stdin in background thread,
* and receives data from its stdout.
*
* TODO: implement without background thread.
*/
class BlockInputStreamWithBackgroundThread final : public IBlockInputStream
class SourceWithBackgroundThread final : public SourceWithProgress
{
public:
BlockInputStreamWithBackgroundThread(
SourceWithBackgroundThread(
ContextPtr context,
const std::string & format,
const Block & sample_block,
const std::string & command_str,
Poco::Logger * log_,
std::function<void(WriteBufferFromFile &)> && send_data_)
: log(log_),
command(ShellCommand::execute(command_str)),
send_data(std::move(send_data_)),
thread([this] { send_data(command->in); })
: SourceWithProgress(sample_block)
, log(log_)
, command(ShellCommand::execute(command_str))
, send_data(std::move(send_data_))
, thread([this] { send_data(command->in); })
{
stream = context->getInputFormat(format, command->out, sample_block, max_block_size);
pipeline.init(Pipe(FormatFactory::instance().getInput(format, command->out, sample_block, context, max_block_size)));
executor = std::make_unique<PullingPipelineExecutor>(pipeline);
}
~BlockInputStreamWithBackgroundThread() override
~SourceWithBackgroundThread() override
{
if (thread.joinable())
thread.join();
}
Block getHeader() const override
protected:
Chunk generate() override
{
return stream->getHeader();
Chunk chunk;
executor->pull(chunk);
return chunk;
}
private:
Block readImpl() override
public:
Status prepare() override
{
return stream->read();
auto status = SourceWithProgress::prepare();
if (status == Status::Finished)
{
std::string err;
readStringUntilEOF(err, command->err);
if (!err.empty())
LOG_ERROR(log, "Having stderr: {}", err);
if (thread.joinable())
thread.join();
command->wait();
}
return status;
}
void readPrefix() override
{
stream->readPrefix();
}
void readSuffix() override
{
stream->readSuffix();
std::string err;
readStringUntilEOF(err, command->err);
if (!err.empty())
LOG_ERROR(log, "Having stderr: {}", err);
if (thread.joinable())
thread.join();
command->wait();
}
String getName() const override { return "WithBackgroundThread"; }
String getName() const override { return "SourceWithBackgroundThread"; }
Poco::Logger * log;
BlockInputStreamPtr stream;
QueryPipeline pipeline;
std::unique_ptr<PullingPipelineExecutor> executor;
std::unique_ptr<ShellCommand> command;
std::function<void(WriteBufferFromFile &)> send_data;
ThreadFromGlobalPool thread;
};
}
BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector<UInt64> & ids)
Pipe ExecutableDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
@ -201,7 +219,7 @@ BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector<UInt64
return getStreamForBlock(block);
}
BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
Pipe ExecutableDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size());
@ -209,21 +227,21 @@ BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_col
return getStreamForBlock(block);
}
BlockInputStreamPtr ExecutableDictionarySource::getStreamForBlock(const Block & block)
Pipe ExecutableDictionarySource::getStreamForBlock(const Block & block)
{
auto stream = std::make_unique<BlockInputStreamWithBackgroundThread>(
Pipe pipe(std::make_unique<SourceWithBackgroundThread>(
context, configuration.format, sample_block, configuration.command, log,
[block, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context->getOutputStream(configuration.format, out, block.cloneEmpty());
formatBlock(output_stream, block);
out.close();
});
}));
if (configuration.implicit_key)
return std::make_shared<BlockInputStreamWithAdditionalColumns>(block, std::move(stream));
else
return std::shared_ptr<BlockInputStreamWithBackgroundThread>(stream.release());
pipe.addTransform(std::make_shared<TransformWithAdditionalColumns>(block, pipe.getHeader()));
return pipe;
}
bool ExecutableDictionarySource::isModified() const
@ -289,4 +307,4 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
factory.registerSource("executable", create_table_source);
}
}
}

View File

@ -36,17 +36,17 @@ public:
ExecutableDictionarySource(const ExecutableDictionarySource & other);
ExecutableDictionarySource & operator=(const ExecutableDictionarySource &) = delete;
BlockInputStreamPtr loadAll() override;
Pipe loadAll() override;
/** The logic of this method is flawed, absolutely incorrect and ignorant.
* It may lead to skipping some values due to clock sync or timezone changes.
* The intended usage of "update_field" is totally different.
*/
BlockInputStreamPtr loadUpdatedAll() override;
Pipe loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
Pipe loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
Pipe loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override;
@ -58,7 +58,7 @@ public:
std::string toString() const override;
BlockInputStreamPtr getStreamForBlock(const Block & block);
Pipe getStreamForBlock(const Block & block);
private:
Poco::Logger * log;
@ -69,4 +69,4 @@ private:
ContextPtr context;
};
}
}

View File

@ -2,12 +2,15 @@
#include <functional>
#include <common/scope_guard.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipeline.h>
#include <DataStreams/formatBlock.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/copyData.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IInputFormat.h>
#include <Common/ShellCommand.h>
#include <Common/ThreadPool.h>
#include <common/logger_useful.h>
@ -69,12 +72,12 @@ ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(const ExecutableP
{
}
BlockInputStreamPtr ExecutablePoolDictionarySource::loadAll()
Pipe ExecutablePoolDictionarySource::loadAll()
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource does not support loadAll method");
}
BlockInputStreamPtr ExecutablePoolDictionarySource::loadUpdatedAll()
Pipe ExecutablePoolDictionarySource::loadUpdatedAll()
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource does not support loadUpdatedAll method");
}
@ -84,19 +87,19 @@ namespace
/** A stream, that runs child process and sends data to its stdin in background thread,
* and receives data from its stdout.
*/
class PoolBlockInputStreamWithBackgroundThread final : public IBlockInputStream
class PoolSourceWithBackgroundThread final : public SourceWithProgress
{
public:
PoolBlockInputStreamWithBackgroundThread(
PoolSourceWithBackgroundThread(
std::shared_ptr<ProcessPool> process_pool_,
std::unique_ptr<ShellCommand> && command_,
BlockInputStreamPtr && stream_,
Pipe pipe,
size_t read_rows_,
Poco::Logger * log_,
std::function<void(WriteBufferFromFile &)> && send_data_)
: process_pool(process_pool_)
: SourceWithProgress(pipe.getHeader())
, process_pool(process_pool_)
, command(std::move(command_))
, stream(std::move(stream_))
, rows_to_read(read_rows_)
, log(log_)
, send_data(std::move(send_data_))
@ -112,9 +115,12 @@ namespace
exception_during_read = std::current_exception();
}
})
{}
{
pipeline.init(std::move(pipe));
executor = std::make_unique<PullingPipelineExecutor>(pipeline);
}
~PoolBlockInputStreamWithBackgroundThread() override
~PoolSourceWithBackgroundThread() override
{
if (thread.joinable())
thread.join();
@ -123,25 +129,22 @@ namespace
process_pool->returnObject(std::move(command));
}
Block getHeader() const override
{
return stream->getHeader();
}
private:
Block readImpl() override
protected:
Chunk generate() override
{
rethrowExceptionDuringReadIfNeeded();
if (current_read_rows == rows_to_read)
return Block();
return {};
Block block;
Chunk chunk;
try
{
block = stream->read();
current_read_rows += block.rows();
if (!executor->pull(chunk))
return {};
current_read_rows += chunk.getNumRows();
}
catch (...)
{
@ -150,22 +153,23 @@ namespace
throw;
}
return block;
return chunk;
}
void readPrefix() override
public:
Status prepare() override
{
rethrowExceptionDuringReadIfNeeded();
stream->readPrefix();
}
auto status = SourceWithProgress::prepare();
void readSuffix() override
{
if (thread.joinable())
thread.join();
if (status == Status::Finished)
{
if (thread.joinable())
thread.join();
rethrowExceptionDuringReadIfNeeded();
stream->readSuffix();
rethrowExceptionDuringReadIfNeeded();
}
return status;
}
void rethrowExceptionDuringReadIfNeeded()
@ -182,7 +186,8 @@ namespace
std::shared_ptr<ProcessPool> process_pool;
std::unique_ptr<ShellCommand> command;
BlockInputStreamPtr stream;
QueryPipeline pipeline;
std::unique_ptr<PullingPipelineExecutor> executor;
size_t rows_to_read;
Poco::Logger * log;
std::function<void(WriteBufferFromFile &)> send_data;
@ -194,7 +199,7 @@ namespace
}
BlockInputStreamPtr ExecutablePoolDictionarySource::loadIds(const std::vector<UInt64> & ids)
Pipe ExecutablePoolDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
@ -202,7 +207,7 @@ BlockInputStreamPtr ExecutablePoolDictionarySource::loadIds(const std::vector<UI
return getStreamForBlock(block);
}
BlockInputStreamPtr ExecutablePoolDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
Pipe ExecutablePoolDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size());
@ -210,7 +215,7 @@ BlockInputStreamPtr ExecutablePoolDictionarySource::loadKeys(const Columns & key
return getStreamForBlock(block);
}
BlockInputStreamPtr ExecutablePoolDictionarySource::getStreamForBlock(const Block & block)
Pipe ExecutablePoolDictionarySource::getStreamForBlock(const Block & block)
{
std::unique_ptr<ShellCommand> process;
bool result = process_pool->tryBorrowObject(process, [this]()
@ -227,20 +232,20 @@ BlockInputStreamPtr ExecutablePoolDictionarySource::getStreamForBlock(const Bloc
configuration.max_command_execution_time);
size_t rows_to_read = block.rows();
auto read_stream = context->getInputFormat(configuration.format, process->out, sample_block, rows_to_read);
auto format = FormatFactory::instance().getInput(configuration.format, process->out, sample_block, context, rows_to_read);
auto stream = std::make_unique<PoolBlockInputStreamWithBackgroundThread>(
process_pool, std::move(process), std::move(read_stream), rows_to_read, log,
Pipe pipe(std::make_unique<PoolSourceWithBackgroundThread>(
process_pool, std::move(process), Pipe(std::move(format)), rows_to_read, log,
[block, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context->getOutputStream(configuration.format, out, block.cloneEmpty());
formatBlock(output_stream, block);
});
}));
if (configuration.implicit_key)
return std::make_shared<BlockInputStreamWithAdditionalColumns>(block, std::move(stream));
else
return std::shared_ptr<PoolBlockInputStreamWithBackgroundThread>(stream.release());
pipe.addTransform(std::make_shared<TransformWithAdditionalColumns>(block, pipe.getHeader()));
return pipe;
}
bool ExecutablePoolDictionarySource::isModified() const
@ -320,4 +325,4 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
factory.registerSource("executable_pool", create_table_source);
}
}
}

View File

@ -48,17 +48,17 @@ public:
ExecutablePoolDictionarySource(const ExecutablePoolDictionarySource & other);
ExecutablePoolDictionarySource & operator=(const ExecutablePoolDictionarySource &) = delete;
BlockInputStreamPtr loadAll() override;
Pipe loadAll() override;
/** The logic of this method is flawed, absolutely incorrect and ignorant.
* It may lead to skipping some values due to clock sync or timezone changes.
* The intended usage of "update_field" is totally different.
*/
BlockInputStreamPtr loadUpdatedAll() override;
Pipe loadUpdatedAll() override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
Pipe loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
Pipe loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override;
@ -70,7 +70,7 @@ public:
std::string toString() const override;
BlockInputStreamPtr getStreamForBlock(const Block & block);
Pipe getStreamForBlock(const Block & block);
private:
Poco::Logger * log;
@ -83,4 +83,4 @@ private:
std::shared_ptr<ProcessPool> process_pool;
};
}
}

View File

@ -5,6 +5,8 @@
#include <DataStreams/OwningBlockInputStream.h>
#include <IO/ReadBufferFromFile.h>
#include <Interpreters/Context.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IInputFormat.h>
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
#include "registerDictionaries.h"
@ -45,14 +47,15 @@ FileDictionarySource::FileDictionarySource(const FileDictionarySource & other)
}
BlockInputStreamPtr FileDictionarySource::loadAll()
Pipe FileDictionarySource::loadAll()
{
LOG_TRACE(&Poco::Logger::get("FileDictionary"), "loadAll {}", toString());
auto in_ptr = std::make_unique<ReadBufferFromFile>(filepath);
auto stream = context->getInputFormat(format, *in_ptr, sample_block, max_block_size);
auto source = FormatFactory::instance().getInput(format, *in_ptr, sample_block, context, max_block_size);
source->addBuffer(std::move(in_ptr));
last_modification = getLastModification();
return std::make_shared<OwningBlockInputStream<ReadBuffer>>(stream, std::move(in_ptr));
return Pipe(std::move(source));
}
@ -92,4 +95,4 @@ void registerDictionarySourceFile(DictionarySourceFactory & factory)
factory.registerSource("file", create_table_source);
}
}
}

View File

@ -21,19 +21,19 @@ public:
FileDictionarySource(const FileDictionarySource & other);
BlockInputStreamPtr loadAll() override;
Pipe loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override
Pipe loadUpdatedAll() override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for FileDictionarySource");
}
BlockInputStreamPtr loadIds(const std::vector<UInt64> & /*ids*/) override
Pipe loadIds(const std::vector<UInt64> & /*ids*/) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadIds is unsupported for FileDictionarySource");
}
BlockInputStreamPtr loadKeys(const Columns & /*key_columns*/, const std::vector<size_t> & /*requested_rows*/) override
Pipe loadKeys(const Columns & /*key_columns*/, const std::vector<size_t> & /*requested_rows*/) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadKeys is unsupported for FileDictionarySource");
}
@ -65,4 +65,4 @@ private:
Poco::Timestamp last_modification;
};
}
}

View File

@ -10,6 +10,9 @@
#include <Columns/ColumnNullable.h>
#include <Functions/FunctionHelpers.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Dictionaries/DictionaryBlockInputStream.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
@ -319,10 +322,12 @@ void FlatDictionary::updateData()
{
if (!update_field_loaded_block || update_field_loaded_block->rows() == 0)
{
auto stream = source_ptr->loadUpdatedAll();
stream->readPrefix();
QueryPipeline pipeline;
pipeline.init(source_ptr->loadUpdatedAll());
while (const auto block = stream->read())
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
{
/// We are using this to keep saved data if input stream consists of multiple blocks
if (!update_field_loaded_block)
@ -335,15 +340,14 @@ void FlatDictionary::updateData()
saved_column->insertRangeFrom(update_column, 0, update_column.size());
}
}
stream->readSuffix();
}
else
{
auto stream = source_ptr->loadUpdatedAll();
mergeBlockWithStream<DictionaryKeyType::simple>(
Pipe pipe(source_ptr->loadUpdatedAll());
mergeBlockWithPipe<DictionaryKeyType::simple>(
dict_struct.getKeysSize(),
*update_field_loaded_block,
stream);
std::move(pipe));
}
if (update_field_loaded_block)
@ -354,13 +358,13 @@ void FlatDictionary::loadData()
{
if (!source_ptr->hasUpdateField())
{
auto stream = source_ptr->loadAll();
stream->readPrefix();
QueryPipeline pipeline;
pipeline.init(source_ptr->loadAll());
PullingPipelineExecutor executor(pipeline);
while (const auto block = stream->read())
Block block;
while (executor.pull(block))
blockToAttributes(block);
stream->readSuffix();
}
else
updateData();
@ -531,7 +535,7 @@ void FlatDictionary::setAttributeValue(Attribute & attribute, const UInt64 key,
callOnDictionaryAttributeType(attribute.type, type_call);
}
BlockInputStreamPtr FlatDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const
Pipe FlatDictionary::read(const Names & column_names, size_t max_block_size) const
{
const auto keys_count = loaded_keys.size();
@ -542,7 +546,8 @@ BlockInputStreamPtr FlatDictionary::getBlockInputStream(const Names & column_nam
if (loaded_keys[key_index])
keys.push_back(key_index);
return std::make_shared<DictionaryBlockInputStream>(shared_from_this(), max_block_size, std::move(keys), column_names);
return Pipe(std::make_shared<DictionarySource>(
DictionarySourceData(shared_from_this(), std::move(keys), column_names), max_block_size));
}
void registerDictionaryFlat(DictionaryFactory & factory)
@ -586,4 +591,4 @@ void registerDictionaryFlat(DictionaryFactory & factory)
}
}
}

View File

@ -97,7 +97,7 @@ public:
const DataTypePtr & key_type,
size_t level) const override;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
Pipe read(const Names & column_names, size_t max_block_size) const override;
private:
template <typename Value>
@ -178,4 +178,4 @@ private:
BlockPtr update_field_loaded_block;
};
}
}

View File

@ -637,7 +637,7 @@ void HashedDictionary<dictionary_key_type, sparse>::calculateBytesAllocated()
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
BlockInputStreamPtr HashedDictionary<dictionary_key_type, sparse>::getBlockInputStream(const Names & column_names, size_t max_block_size) const
Pipe HashedDictionary<dictionary_key_type, sparse>::read(const Names & column_names, size_t max_block_size) const
{
PaddedPODArray<HashedDictionary::KeyType> keys;
@ -667,9 +667,9 @@ BlockInputStreamPtr HashedDictionary<dictionary_key_type, sparse>::getBlockInput
}
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
return std::make_shared<DictionaryBlockInputStream>(shared_from_this(), max_block_size, std::move(keys), column_names);
return Pipe(std::make_shared<DictionarySource>(DictionarySourceData(shared_from_this(), std::move(keys), column_names), max_block_size));
else
return std::make_shared<DictionaryBlockInputStream>(shared_from_this(), max_block_size, keys, column_names);
return Pipe(std::make_shared<DictionarySource>(DictionarySourceData(shared_from_this(), keys, column_names), max_block_size));
}
template <DictionaryKeyType dictionary_key_type, bool sparse>
@ -767,4 +767,4 @@ void registerDictionaryHashed(DictionaryFactory & factory)
}
}
}

View File

@ -116,7 +116,7 @@ public:
const DataTypePtr & key_type,
size_t level) const override;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
Pipe read(const Names & column_names, size_t max_block_size) const override;
private:
template <typename Value>
@ -225,4 +225,4 @@ extern template class HashedDictionary<DictionaryKeyType::simple, true>;
extern template class HashedDictionary<DictionaryKeyType::complex, false>;
extern template class HashedDictionary<DictionaryKeyType::complex, true>;
}
}

View File

@ -191,7 +191,7 @@ struct IDictionary : public IExternalLoadable
getDictionaryID().getNameForLogs());
}
virtual BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const = 0;
virtual Pipe read(const Names & column_names, size_t max_block_size) const = 0;
bool supportUpdates() const override { return true; }

View File

@ -1,7 +1,7 @@
#pragma once
#include <Columns/IColumn.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Processors/Pipe.h>
#include <vector>
#include <atomic>
@ -21,28 +21,30 @@ class IDictionarySource
{
public:
/// Returns an input stream with all the data available from this source.
virtual BlockInputStreamPtr loadAll() = 0;
/// Returns a pipe with all the data available from this source.
virtual Pipe loadAll() = 0;
/// Returns an input stream with updated data available from this source.
virtual BlockInputStreamPtr loadUpdatedAll() = 0;
/// Returns a pipe with updated data available from this source.
virtual Pipe loadUpdatedAll() = 0;
/**
* result_size_hint - approx number of rows in the stream.
* Returns an input stream with all the data available from this source.
* Returns a pipe with all the data available from this source.
*
* NOTE: result_size_hint may be changed during you are reading (usually it
* will be non zero for the first block and zero for others, since it uses
* Progress::total_rows_approx,) from the input stream, and may be called
* Progress::total_rows_approx,) from the pipe, and may be called
* in parallel, so you should use something like this:
*
* ...
* std::atomic<uint64_t> new_size = 0;
*
* auto stream = source->loadAll(&new_size);
* stream->readPrefix();
* QueryPipeline pipeline;
* pipeline.init(source->loadAll(&new_size));
* PullingPipelineExecutor executor;
*
* while (const auto block = stream->read())
* Block block;
* while (executor.pull(block))
* {
* if (new_size)
* {
@ -56,10 +58,9 @@ public:
* }
* }
*
* stream->readSuffix();
* ...
*/
virtual BlockInputStreamPtr loadAllWithSizeHint(std::atomic<size_t> * /* result_size_hint */)
virtual Pipe loadAllWithSizeHint(std::atomic<size_t> * /* result_size_hint */)
{
return loadAll();
}
@ -72,13 +73,13 @@ public:
/** Returns an input stream with the data for a collection of identifiers.
* It must be guaranteed, that 'ids' array will live at least until all data will be read from returned stream.
*/
virtual BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) = 0;
virtual Pipe loadIds(const std::vector<UInt64> & ids) = 0;
/** Returns an input stream with the data for a collection of composite keys.
* `requested_rows` contains indices of all rows containing unique keys.
* It must be guaranteed, that 'requested_rows' array will live at least until all data will be read from returned stream.
*/
virtual BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) = 0;
virtual Pipe loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) = 0;
/// indicates whether the source has been modified since last load* operation
virtual bool isModified() const = 0;

View File

@ -1,5 +1,6 @@
#include "readInvalidateQuery.h"
#include <DataStreams/IBlockInputStream.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <IO/WriteBufferFromString.h>
#include <Formats/FormatSettings.h>
@ -14,11 +15,18 @@ namespace ErrorCodes
extern const int RECEIVED_EMPTY_DATA;
}
std::string readInvalidateQuery(IBlockInputStream & block_input_stream)
std::string readInvalidateQuery(Pipe pipe)
{
block_input_stream.readPrefix();
QueryPipeline pipeline;
pipeline.init(std::move(pipe));
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
if (block)
break;
Block block = block_input_stream.read();
if (!block)
throw Exception(ErrorCodes::RECEIVED_EMPTY_DATA, "Empty response");
@ -36,11 +44,10 @@ std::string readInvalidateQuery(IBlockInputStream & block_input_stream)
auto & column_type = block.getByPosition(0);
column_type.type->getDefaultSerialization()->serializeTextQuoted(*column_type.column->convertToFullColumnIfConst(), 0, out, FormatSettings());
while ((block = block_input_stream.read()))
while (executor.pull(block))
if (block.rows() > 0)
throw Exception(ErrorCodes::TOO_MANY_ROWS, "Expected single row in resultset, got at least {}", std::to_string(rows + 1));
block_input_stream.readSuffix();
return out.str();
}

View File

@ -1,13 +1,13 @@
#pragma once
#include <DataStreams/IBlockStream_fwd.h>
#include <string>
namespace DB
{
class Pipe;
/// Using in MySQLDictionarySource and XDBCDictionarySource after processing invalidate_query.
std::string readInvalidateQuery(IBlockInputStream & block_input_stream);
std::string readInvalidateQuery(Pipe pipe);
}