Streams -> Processors for dicts, part 3.

This commit is contained in:
Nikolai Kochetov 2021-08-06 11:41:45 +03:00
parent 8546df13c2
commit 13f95f3fdf
58 changed files with 170 additions and 98 deletions

View File

@ -180,4 +180,4 @@ Pipe LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWriteBufferFromHTT
return Pipe(std::move(source));
}
}
}

View File

@ -85,4 +85,4 @@ private:
size_t bridge_port;
};
}
}

View File

@ -366,4 +366,4 @@ Chunk MongoDBSource::generate()
return Chunk(std::move(columns), num_rows);
}
}
}

View File

@ -50,4 +50,4 @@ private:
bool strict_check_names;
};
}
}

View File

@ -170,4 +170,4 @@ class PostgreSQLSource<pqxx::ReadTransaction>;
}
#endif
#endif

View File

@ -44,11 +44,11 @@ protected:
Status prepare() override;
private:
void onStart();
Chunk generate() override;
void onFinish();
private:
void init(const Block & sample_block);
const UInt64 max_block_size;
@ -63,26 +63,34 @@ private:
};
/// Passes transaction object into PostgreSQLBlockInputStream and does not close transaction after read is finished.
/// Passes transaction object into PostgreSQLSource and does not close transaction after read is finished.
template <typename T>
class PostgreSQLTransactionBlockInputStream : public PostgreSQLBlockInputStream<T>
class PostgreSQLTransactionSource : public PostgreSQLSource<T>
{
public:
using Base = PostgreSQLBlockInputStream<T>;
using Base = PostgreSQLSource<T>;
PostgreSQLTransactionBlockInputStream(
PostgreSQLTransactionSource(
std::shared_ptr<T> tx_,
const std::string & query_str_,
const Block & sample_block_,
const UInt64 max_block_size_)
: PostgreSQLBlockInputStream<T>(tx_, query_str_, sample_block_, max_block_size_, false) {}
: PostgreSQLSource<T>(tx_, query_str_, sample_block_, max_block_size_, false) {}
void readPrefix() override
Chunk generate() override
{
Base::stream = std::make_unique<pqxx::stream_from>(*Base::tx, pqxx::from_query, std::string_view(Base::query_str));
if (!is_initialized)
{
Base::stream = std::make_unique<pqxx::stream_from>(*Base::tx, pqxx::from_query, std::string_view(Base::query_str));
is_initialized = true;
}
return Base::generate();
}
bool is_initialized = false;
};
}
#endif
#endif

View File

@ -12,6 +12,8 @@
# include <Databases/MySQL/DatabaseMySQL.h>
# include <Databases/MySQL/FetchTablesColumnsList.h>
# include <Formats/MySQLBlockInputStream.h>
# include <Processors/Executors/PullingPipelineExecutor.h>
# include <Processors/QueryPipeline.h>
# include <IO/Operators.h>
# include <Interpreters/Context.h>
# include <Parsers/ASTCreateQuery.h>
@ -281,9 +283,13 @@ std::map<String, UInt64> DatabaseMySQL::fetchTablesWithModificationTime(ContextP
std::map<String, UInt64> tables_with_modification_time;
StreamSettings mysql_input_stream_settings(local_context->getSettingsRef());
MySQLBlockInputStream result(mysql_pool.get(), query.str(), tables_status_sample_block, mysql_input_stream_settings);
auto result = std::make_unique<MySQLSource>(mysql_pool.get(), query.str(), tables_status_sample_block, mysql_input_stream_settings);
QueryPipeline pipeline;
pipeline.init(Pipe(std::move(result)));
while (Block block = result.read())
Block block;
PullingPipelineExecutor executor(pipeline);
while (executor.pull(block))
{
size_t rows = block.rows();
for (size_t index = 0; index < rows; ++index)

View File

@ -8,6 +8,8 @@
#include <DataTypes/convertMySQLDataType.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipeline.h>
#include <Formats/MySQLBlockInputStream.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
@ -85,8 +87,13 @@ std::map<String, ColumnsDescription> fetchTablesColumnsList(
query << " TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION";
StreamSettings mysql_input_stream_settings(settings);
MySQLBlockInputStream result(pool.get(), query.str(), tables_columns_sample_block, mysql_input_stream_settings);
while (Block block = result.read())
auto result = std::make_unique<MySQLSource>(pool.get(), query.str(), tables_columns_sample_block, mysql_input_stream_settings);
QueryPipeline pipeline;
pipeline.init(Pipe(std::move(result)));
Block block;
PullingPipelineExecutor executor(pipeline);
while (executor.pull(block))
{
const auto & table_name_col = *block.getByPosition(0).column;
const auto & column_name_col = *block.getByPosition(1).column;

View File

@ -6,6 +6,8 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Formats/MySQLBlockInputStream.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipeline.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <Common/quoteString.h>
@ -38,11 +40,16 @@ static std::unordered_map<String, String> fetchTablesCreateQuery(
};
StreamSettings mysql_input_stream_settings(global_settings, false, true);
MySQLBlockInputStream show_create_table(
auto show_create_table = std::make_unique<MySQLSource>(
connection, "SHOW CREATE TABLE " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(fetch_table_name),
show_create_table_header, mysql_input_stream_settings);
Block create_query_block = show_create_table.read();
QueryPipeline pipeline;
pipeline.init(Pipe(std::move(show_create_table)));
Block create_query_block;
PullingPipelineExecutor executor(pipeline);
executor.pull(create_query_block);
if (!create_query_block || create_query_block.rows() != 1)
throw Exception("LOGICAL ERROR mysql show create return more rows.", ErrorCodes::LOGICAL_ERROR);
@ -60,9 +67,14 @@ static std::vector<String> fetchTablesInDB(const mysqlxx::PoolWithFailover::Entr
std::vector<String> tables_in_db;
StreamSettings mysql_input_stream_settings(global_settings);
MySQLBlockInputStream input(connection, query, header, mysql_input_stream_settings);
auto input = std::make_unique<MySQLSource>(connection, query, header, mysql_input_stream_settings);
while (Block block = input.read())
QueryPipeline pipeline;
pipeline.init(Pipe(std::move(input)));
Block block;
PullingPipelineExecutor executor(pipeline);
while (executor.pull(block))
{
tables_in_db.reserve(tables_in_db.size() + block.rows());
for (size_t index = 0; index < block.rows(); ++index)
@ -83,8 +95,14 @@ void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & c
};
StreamSettings mysql_input_stream_settings(settings, false, true);
MySQLBlockInputStream input(connection, "SHOW MASTER STATUS;", header, mysql_input_stream_settings);
Block master_status = input.read();
auto input = std::make_unique<MySQLSource>(connection, "SHOW MASTER STATUS;", header, mysql_input_stream_settings);
QueryPipeline pipeline;
pipeline.init(Pipe(std::move(input)));
Block master_status;
PullingPipelineExecutor executor(pipeline);
executor.pull(master_status);
if (!master_status || master_status.rows() != 1)
throw Exception("Unable to get master status from MySQL.", ErrorCodes::LOGICAL_ERROR);
@ -106,9 +124,13 @@ void MaterializeMetadata::fetchMasterVariablesValue(const mysqlxx::PoolWithFailo
const String & fetch_query = "SHOW VARIABLES WHERE Variable_name = 'binlog_checksum'";
StreamSettings mysql_input_stream_settings(settings, false, true);
MySQLBlockInputStream variables_input(connection, fetch_query, variables_header, mysql_input_stream_settings);
auto variables_input = std::make_unique<MySQLSource>(connection, fetch_query, variables_header, mysql_input_stream_settings);
QueryPipeline pipeline;
pipeline.init(Pipe(std::move(variables_input)));
while (Block variables_block = variables_input.read())
Block variables_block;
PullingPipelineExecutor executor(pipeline);
while (executor.pull(variables_block))
{
ColumnPtr variables_name = variables_block.getByName("Variable_name").column;
ColumnPtr variables_value = variables_block.getByName("Value").column;
@ -130,8 +152,13 @@ static bool checkSyncUserPrivImpl(const mysqlxx::PoolWithFailover::Entry & conne
String grants_query, sub_privs;
StreamSettings mysql_input_stream_settings(global_settings);
MySQLBlockInputStream input(connection, "SHOW GRANTS FOR CURRENT_USER();", sync_user_privs_header, mysql_input_stream_settings);
while (Block block = input.read())
auto input = std::make_unique<MySQLSource>(connection, "SHOW GRANTS FOR CURRENT_USER();", sync_user_privs_header, mysql_input_stream_settings);
QueryPipeline pipeline;
pipeline.init(Pipe(std::move(input)));
Block block;
PullingPipelineExecutor executor(pipeline);
while (executor.pull(block))
{
for (size_t index = 0; index < block.rows(); ++index)
{
@ -176,9 +203,13 @@ bool MaterializeMetadata::checkBinlogFileExists(const mysqlxx::PoolWithFailover:
};
StreamSettings mysql_input_stream_settings(settings, false, true);
MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", logs_header, mysql_input_stream_settings);
auto input = std::make_unique<MySQLSource>(connection, "SHOW MASTER LOGS", logs_header, mysql_input_stream_settings);
QueryPipeline pipeline;
pipeline.init(Pipe(std::move(input)));
while (Block block = input.read())
Block block;
PullingPipelineExecutor executor(pipeline);
while (executor.pull(block))
{
for (size_t index = 0; index < block.rows(); ++index)
{

View File

@ -9,6 +9,8 @@
# include <random>
# include <Columns/ColumnTuple.h>
# include <Columns/ColumnDecimal.h>
# include <Processors/QueryPipeline.h>
# include <Processors/Executors/PullingPipelineExecutor.h>
# include <DataStreams/CountingBlockOutputStream.h>
# include <DataStreams/OneBlockInputStream.h>
# include <DataStreams/copyData.h>
@ -100,7 +102,7 @@ static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection, const S
const String & check_query = "SHOW VARIABLES;";
StreamSettings mysql_input_stream_settings(settings, false, true);
MySQLBlockInputStream variables_input(connection, check_query, variables_header, mysql_input_stream_settings);
auto variables_input = std::make_unique<MySQLSource>(connection, check_query, variables_header, mysql_input_stream_settings);
std::unordered_map<String, String> variables_error_message{
{"log_bin", "ON"},
@ -110,7 +112,12 @@ static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection, const S
{"log_bin_use_v1_row_events", "OFF"}
};
while (Block variables_block = variables_input.read())
QueryPipeline pipeline;
pipeline.init(Pipe(std::move(variables_input)));
PullingPipelineExecutor executor(pipeline);
Block variables_block;
while (executor.pull(variables_block))
{
ColumnPtr variable_name_column = variables_block.getByName("Variable_name").column;
ColumnPtr variable_value_column = variables_block.getByName("Value").column;
@ -327,12 +334,22 @@ static inline void dumpDataForTables(
auto out = std::make_shared<CountingBlockOutputStream>(getTableOutput(database_name, table_name, query_context));
StreamSettings mysql_input_stream_settings(context->getSettingsRef());
MySQLBlockInputStream input(
auto input = std::make_unique<MySQLSource>(
connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name),
out->getHeader(), mysql_input_stream_settings);
QueryPipeline pipeline;
pipeline.init(Pipe(std::move(input)));
PullingPipelineExecutor executor(pipeline);
Stopwatch watch;
copyData(input, *out, is_cancelled);
out->writePrefix();
Block block;
while (executor.pull(block))
out->write(block);
out->writeSuffix();
const Progress & progress = out->getProgress();
LOG_INFO(&Poco::Logger::get("MaterializedMySQLSyncThread(" + database_name + ")"),
"Materialize MySQL step 1: dump {}, {} rows, {} in {} sec., {} rows/sec., {}/sec."

View File

@ -691,4 +691,4 @@ void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<d
template class CacheDictionary<DictionaryKeyType::simple>;
template class CacheDictionary<DictionaryKeyType::complex>;
}
}

View File

@ -22,9 +22,6 @@ public:
String getName() const override { return "Cassandra"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private:
using ValueType = ExternalResultDescription::ValueType;

View File

@ -13,9 +13,9 @@ Chunk DictionarySourceBase::generate()
return {};
size_t size = std::min(max_block_size, rows_count - next_row);
auto chunk = getChunk(next_row, size);
auto block = getBlock(next_row, size);
next_row += size;
return chunk;
return Chunk(block.getColumns(), size);
}
}

View File

@ -648,4 +648,6 @@ static const PaddedPODArray<T> & getColumnVectorData(
}
}
}
}

View File

@ -125,4 +125,4 @@ String TransformWithAdditionalColumns::getName() const
{
return "TransformWithAdditionalColumns";
}
}
}

View File

@ -52,4 +52,4 @@ private:
size_t current_range_index = 0;
};
}
}

View File

@ -354,4 +354,4 @@ void registerDictionaryDirect(DictionaryFactory & factory)
}
}
}

View File

@ -113,4 +113,4 @@ private:
extern template class DirectDictionary<DictionaryKeyType::simple>;
extern template class DirectDictionary<DictionaryKeyType::complex>;
}
}

View File

@ -307,4 +307,4 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
factory.registerSource("executable", create_table_source);
}
}
}

View File

@ -69,4 +69,4 @@ private:
ContextPtr context;
};
}
}

View File

@ -325,4 +325,4 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
factory.registerSource("executable_pool", create_table_source);
}
}
}

View File

@ -83,4 +83,4 @@ private:
std::shared_ptr<ProcessPool> process_pool;
};
}
}

View File

@ -95,4 +95,4 @@ void registerDictionarySourceFile(DictionarySourceFactory & factory)
factory.registerSource("file", create_table_source);
}
}
}

View File

@ -65,4 +65,4 @@ private:
Poco::Timestamp last_modification;
};
}
}

View File

@ -591,4 +591,4 @@ void registerDictionaryFlat(DictionaryFactory & factory)
}
}
}

View File

@ -178,4 +178,4 @@ private:
BlockPtr update_field_loaded_block;
};
}
}

View File

@ -263,4 +263,4 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
factory.registerSource("http", create_table_source);
}
}
}

View File

@ -80,4 +80,4 @@ private:
ConnectionTimeouts timeouts;
};
}
}

View File

@ -766,4 +766,4 @@ void registerDictionaryHashed(DictionaryFactory & factory)
}
}
}

View File

@ -225,4 +225,4 @@ extern template class HashedDictionary<DictionaryKeyType::simple, true>;
extern template class HashedDictionary<DictionaryKeyType::complex, false>;
extern template class HashedDictionary<DictionaryKeyType::complex, true>;
}
}

View File

@ -970,4 +970,4 @@ void registerDictionaryTrie(DictionaryFactory & factory)
factory.registerLayout("ip_trie", create_layout, true);
}
}
}

View File

@ -224,4 +224,4 @@ private:
Poco::Logger * logger;
};
}
}

View File

@ -183,4 +183,4 @@ void registerDictionarySourceLibrary(DictionarySourceFactory & factory)
}
}
}

View File

@ -92,4 +92,4 @@ private:
ExternalResultDescription description;
};
}
}

View File

@ -238,4 +238,4 @@ std::string MongoDBDictionarySource::toString() const
return "MongoDB: " + db + '.' + collection + ',' + (user.empty() ? " " : " " + user + '@') + host + ':' + DB::toString(port);
}
}
}

View File

@ -84,4 +84,4 @@ private:
std::shared_ptr<Poco::MongoDB::Connection> connection;
};
}
}

View File

@ -285,10 +285,9 @@ std::string MySQLDictionarySource::doInvalidateQuery(const std::string & request
Block invalidate_sample_block;
ColumnPtr column(ColumnString::create());
invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
MySQLBlockInputStream block_input_stream(pool->get(), request, invalidate_sample_block, settings);
return readInvalidateQuery(block_input_stream);
return readInvalidateQuery(Pipe(std::make_unique<MySQLSource>(pool->get(), request, invalidate_sample_block, settings)));
}
}
#endif
#endif

View File

@ -6,7 +6,7 @@
# include "config_core.h"
#endif
//#if USE_MYSQL
#if USE_MYSQL
# include <common/LocalDateTime.h>
# include <mysqlxx/PoolWithFailover.h>
# include "DictionaryStructure.h"
@ -99,4 +99,4 @@ private:
}
#endif
#endif

View File

@ -516,4 +516,4 @@ void IPolygonDictionary::extractPolygons(const ColumnPtr & column)
}
}
}
}

View File

@ -166,4 +166,4 @@ private:
static std::vector<Point> extractPoints(const Columns &key_columns);
};
}
}

View File

@ -107,7 +107,7 @@ Pipe PostgreSQLDictionarySource::loadKeys(const Columns & key_columns, const std
Pipe PostgreSQLDictionarySource::loadBase(const String & query)
{
return std::make_shared<PostgreSQLBlockInputStream<>>(pool->get(), query, sample_block, max_block_size);
return Pipe(std::make_shared<PostgreSQLSource<>>(pool->get(), query, sample_block, max_block_size));
}
@ -129,8 +129,7 @@ std::string PostgreSQLDictionarySource::doInvalidateQuery(const std::string & re
Block invalidate_sample_block;
ColumnPtr column(ColumnString::create());
invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
PostgreSQLBlockInputStream<> block_input_stream(pool->get(), request, invalidate_sample_block, 1);
return readInvalidateQuery(block_input_stream);
return readInvalidateQuery(Pipe(std::make_unique<PostgreSQLSource<>>(pool->get(), request, invalidate_sample_block, 1)));
}
@ -220,4 +219,4 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
factory.registerSource("postgresql", create_table_source);
}
}
}

View File

@ -72,4 +72,4 @@ private:
};
}
#endif
#endif

View File

@ -195,4 +195,4 @@ Block RangeDictionarySource<RangeType>::getBlock(size_t start, size_t length) co
return data.getBlock(start, length);
}
}
}

View File

@ -677,4 +677,4 @@ void registerDictionaryRangeHashed(DictionaryFactory & factory)
factory.registerLayout("range_hashed", create_layout, false);
}
}
}

View File

@ -197,4 +197,4 @@ private:
mutable std::atomic<size_t> found_count{0};
};
}
}

View File

@ -222,4 +222,4 @@ namespace DB
size_t num_rows = columns.at(0)->size();
return Chunk(std::move(columns), num_rows);
}
}
}

View File

@ -48,4 +48,5 @@ namespace DB
bool all_read = false;
};
}
}

View File

@ -276,4 +276,4 @@ namespace DB
return RedisStorageType::SIMPLE;
}
}
}

View File

@ -94,4 +94,4 @@ namespace ErrorCodes
std::shared_ptr<Poco::Redis::Client> client;
};
}
}

View File

@ -285,4 +285,4 @@ void registerDictionarySourceJDBC(DictionarySourceFactory & factory)
factory.registerSource("jdbc", create_table_source);
}
}
}

View File

@ -90,4 +90,4 @@ private:
ConnectionTimeouts timeouts;
};
}
}

View File

@ -338,4 +338,4 @@ void MySQLSource::initPositionMappingFromQueryResultStructure()
}
#endif
#endif

View File

@ -81,4 +81,4 @@ private:
bool is_initialized = false;
};
}
}

View File

@ -1,6 +1,8 @@
#include "PostgreSQLReplicationHandler.h"
#include <DataStreams/PostgreSQLBlockInputStream.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Storages/PostgreSQL/StorageMaterializedPostgreSQL.h>
#include <Interpreters/InterpreterDropQuery.h>
@ -226,9 +228,17 @@ StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(String & snapshot_name
const StorageInMemoryMetadata & storage_metadata = nested_storage->getInMemoryMetadata();
auto sample_block = storage_metadata.getSampleBlockNonMaterialized();
PostgreSQLTransactionBlockInputStream<pqxx::ReplicationTransaction> input(tx, query_str, sample_block, DEFAULT_BLOCK_SIZE);
assertBlocksHaveEqualStructure(input.getHeader(), block_io.out->getHeader(), "postgresql replica load from snapshot");
copyData(input, *block_io.out);
auto input = std::make_unique<PostgreSQLTransactionSource<pqxx::ReplicationTransaction>>(tx, query_str, sample_block, DEFAULT_BLOCK_SIZE);
QueryPipeline pipeline;
pipeline.init(Pipe(std::move(input)));
assertBlocksHaveEqualStructure(pipeline.getHeader(), block_io.out->getHeader(), "postgresql replica load from snapshot");
PullingPipelineExecutor executor(pipeline);
Block block;
block_io.out->writePrefix();
while (executor.pull(block))
block_io.out->write(block);
block_io.out->writeSuffix();
nested_storage = materialized_storage->prepare();
auto nested_table_id = nested_storage->getStorageID();

View File

@ -169,9 +169,7 @@ Pipe StorageDictionary::read(
{
auto registered_dictionary_name = location == Location::SameDatabaseAndNameAsDictionary ? getStorageID().getInternalDictionaryName() : dictionary_name;
auto dictionary = getContext()->getExternalDictionariesLoader().getDictionary(registered_dictionary_name, local_context);
auto stream = dictionary->getBlockInputStream(column_names, max_block_size);
/// TODO: update dictionary interface for processors.
return Pipe(std::make_shared<SourceFromInputStream>(stream));
return dictionary->read(column_names, max_block_size);
}
void StorageDictionary::shutdown()

View File

@ -99,8 +99,7 @@ Pipe StorageMongoDB::read(
sample_block.insert({ column_data.type, column_data.name });
}
return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<MongoDBBlockInputStream>(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size, true)));
return Pipe(std::make_shared<MongoDBSource>(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size, true));
}
void registerStorageMongoDB(StorageFactory & factory)

View File

@ -104,8 +104,7 @@ Pipe StorageMySQL::read(
StreamSettings mysql_input_stream_settings(context_->getSettingsRef(),
mysql_settings.connection_auto_close);
return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<MySQLWithFailoverBlockInputStream>(pool, query, sample_block, mysql_input_stream_settings)));
return Pipe(std::make_shared<MySQLWithFailoverSource>(pool, query, sample_block, mysql_input_stream_settings));
}

View File

@ -90,8 +90,7 @@ Pipe StoragePostgreSQL::read(
sample_block.insert({ column_data.type, column_data.name });
}
return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<PostgreSQLBlockInputStream<>>(pool->get(), query, sample_block, max_block_size_)));
return Pipe(std::make_shared<PostgreSQLSource<>>(pool->get(), query, sample_block, max_block_size_));
}