Merge pull request #27575 from kitaisreal/removed-some-data-streams

Removed some data streams
This commit is contained in:
Nikolai Kochetov 2021-08-13 12:59:00 +03:00 committed by GitHub
commit ad00aaa18c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 66 additions and 81 deletions

View File

@ -6,7 +6,7 @@
namespace DB
{
/** Common part for implementation of MySQLBlockInputStream, MongoDBBlockInputStream and others.
/** Common part for implementation of MySQLSource, MongoDBSource and others.
*/
struct ExternalResultDescription
{

View File

@ -1,3 +1,5 @@
#include "MongoDBSource.h"
#include <string>
#include <vector>
@ -15,7 +17,6 @@
#include <Common/assert_cast.h>
#include <Common/quoteString.h>
#include <common/range.h>
#include <DataStreams/MongoDBBlockInputStream.h>
#include <Poco/URI.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Version.h>

View File

@ -1,4 +1,4 @@
#include "PostgreSQLBlockInputStream.h"
#include "PostgreSQLSource.h"
#if USE_LIBPQXX
#include <Columns/ColumnNullable.h>

View File

@ -1,4 +1,4 @@
#include "SQLiteBlockInputStream.h"
#include "SQLiteSource.h"
#if USE_SQLITE
#include <common/range.h>
@ -22,21 +22,18 @@ namespace ErrorCodes
extern const int SQLITE_ENGINE_ERROR;
}
SQLiteBlockInputStream::SQLiteBlockInputStream(
SQLitePtr sqlite_db_,
const String & query_str_,
const Block & sample_block,
const UInt64 max_block_size_)
: query_str(query_str_)
SQLiteSource::SQLiteSource(
SQLitePtr sqlite_db_,
const String & query_str_,
const Block & sample_block,
const UInt64 max_block_size_)
: SourceWithProgress(sample_block.cloneEmpty())
, query_str(query_str_)
, max_block_size(max_block_size_)
, sqlite_db(std::move(sqlite_db_))
{
description.init(sample_block);
}
void SQLiteBlockInputStream::readPrefix()
{
sqlite3_stmt * compiled_stmt = nullptr;
int status = sqlite3_prepare_v2(sqlite_db.get(), query_str.c_str(), query_str.size() + 1, &compiled_stmt, nullptr);
@ -48,11 +45,10 @@ void SQLiteBlockInputStream::readPrefix()
compiled_statement = std::unique_ptr<sqlite3_stmt, StatementDeleter>(compiled_stmt, StatementDeleter());
}
Block SQLiteBlockInputStream::readImpl()
Chunk SQLiteSource::generate()
{
if (!compiled_statement)
return Block();
return {};
MutableColumns columns = description.sample_block.cloneEmptyColumns();
size_t num_rows = 0;
@ -73,30 +69,30 @@ Block SQLiteBlockInputStream::readImpl()
else if (status != SQLITE_ROW)
{
throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
"Expected SQLITE_ROW status, but got status {}. Error: {}, Message: {}",
status, sqlite3_errstr(status), sqlite3_errmsg(sqlite_db.get()));
"Expected SQLITE_ROW status, but got status {}. Error: {}, Message: {}",
status, sqlite3_errstr(status), sqlite3_errmsg(sqlite_db.get()));
}
int column_count = sqlite3_column_count(compiled_statement.get());
for (const auto idx : collections::range(0, column_count))
{
const auto & sample = description.sample_block.getByPosition(idx);
if (sqlite3_column_type(compiled_statement.get(), idx) == SQLITE_NULL)
for (int column_index = 0; column_index < column_count; ++column_index)
{
if (sqlite3_column_type(compiled_statement.get(), column_index) == SQLITE_NULL)
{
insertDefaultSQLiteValue(*columns[idx], *sample.column);
columns[column_index]->insertDefault();
continue;
}
if (description.types[idx].second)
auto & [type, is_nullable] = description.types[column_index];
if (is_nullable)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, idx);
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[column_index]);
insertValue(column_nullable.getNestedColumn(), type, column_index);
column_nullable.getNullMapData().emplace_back(0);
}
else
{
insertValue(*columns[idx], description.types[idx].first, idx);
insertValue(*columns[column_index], type, column_index);
}
}
@ -104,18 +100,16 @@ Block SQLiteBlockInputStream::readImpl()
break;
}
return description.sample_block.cloneWithColumns(std::move(columns));
}
void SQLiteBlockInputStream::readSuffix()
{
if (compiled_statement)
if (num_rows == 0)
{
compiled_statement.reset();
return {};
}
return Chunk(std::move(columns), num_rows);
}
void SQLiteBlockInputStream::insertValue(IColumn & column, const ExternalResultDescription::ValueType type, size_t idx)
void SQLiteSource::insertValue(IColumn & column, ExternalResultDescription::ValueType type, size_t idx)
{
switch (type)
{

View File

@ -6,32 +6,28 @@
#if USE_SQLITE
#include <Core/ExternalResultDescription.h>
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <sqlite3.h> // Y_IGNORE
namespace DB
{
class SQLiteBlockInputStream : public IBlockInputStream
class SQLiteSource : public SourceWithProgress
{
using SQLitePtr = std::shared_ptr<sqlite3>;
public:
SQLiteBlockInputStream(SQLitePtr sqlite_db_,
SQLiteSource(SQLitePtr sqlite_db_,
const String & query_str_,
const Block & sample_block,
UInt64 max_block_size_);
String getName() const override { return "SQLite"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private:
void insertDefaultSQLiteValue(IColumn & column, const IColumn & sample_column)
{
column.insertFrom(sample_column, 0);
}
using ValueType = ExternalResultDescription::ValueType;
@ -40,19 +36,14 @@ private:
void operator()(sqlite3_stmt * stmt) { sqlite3_finalize(stmt); }
};
void readPrefix() override;
Chunk generate() override;
Block readImpl() override;
void readSuffix() override;
void insertValue(IColumn & column, const ExternalResultDescription::ValueType type, size_t idx);
void insertValue(IColumn & column, ExternalResultDescription::ValueType type, size_t idx);
String query_str;
UInt64 max_block_size;
ExternalResultDescription description;
SQLitePtr sqlite_db;
std::unique_ptr<sqlite3_stmt, StatementDeleter> compiled_statement;
};

View File

@ -29,7 +29,7 @@ SRCS(
ITTLAlgorithm.cpp
InternalTextLogsRowOutputStream.cpp
MaterializingBlockInputStream.cpp
MongoDBBlockInputStream.cpp
MongoDBSource.cpp
NativeBlockInputStream.cpp
NativeBlockOutputStream.cpp
PushingToViewsBlockOutputStream.cpp
@ -37,7 +37,7 @@ SRCS(
RemoteBlockOutputStream.cpp
RemoteQueryExecutor.cpp
RemoteQueryExecutorReadContext.cpp
SQLiteBlockInputStream.cpp
SQLiteSource.cpp
SizeLimits.cpp
SquashingBlockInputStream.cpp
SquashingBlockOutputStream.cpp

View File

@ -11,7 +11,7 @@
# include <DataTypes/convertMySQLDataType.h>
# include <Databases/MySQL/DatabaseMySQL.h>
# include <Databases/MySQL/FetchTablesColumnsList.h>
# include <Formats/MySQLBlockInputStream.h>
# include <Formats/MySQLSource.h>
# include <Processors/Executors/PullingPipelineExecutor.h>
# include <Processors/QueryPipeline.h>
# include <IO/Operators.h>

View File

@ -10,7 +10,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipeline.h>
#include <Formats/MySQLBlockInputStream.h>
#include <Formats/MySQLSource.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>

View File

@ -5,7 +5,7 @@
#include <Core/Block.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Formats/MySQLBlockInputStream.h>
#include <Formats/MySQLSource.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipeline.h>
#include <IO/ReadBufferFromFile.h>

View File

@ -16,7 +16,7 @@
# include <DataStreams/copyData.h>
# include <Databases/MySQL/DatabaseMaterializedMySQL.h>
# include <Databases/MySQL/MaterializeMetadata.h>
# include <Formats/MySQLBlockInputStream.h>
# include <Formats/MySQLSource.h>
# include <IO/ReadBufferFromString.h>
# include <Interpreters/Context.h>
# include <Interpreters/executeQuery.h>

View File

@ -36,10 +36,10 @@ void registerDictionarySourceCassandra(DictionarySourceFactory & factory)
#if USE_CASSANDRA
#include <IO/WriteHelpers.h>
#include <Common/SipHash.h>
#include "CassandraBlockInputStream.h"
#include <common/logger_useful.h>
#include <Common/SipHash.h>
#include <IO/WriteHelpers.h>
#include <Dictionaries/CassandraSource.h>
namespace DB
{

View File

@ -10,7 +10,7 @@
#include <Columns/ColumnsNumber.h>
#include <Core/ExternalResultDescription.h>
#include <IO/ReadHelpers.h>
#include "CassandraBlockInputStream.h"
#include "CassandraSource.h"
namespace DB

View File

@ -50,7 +50,7 @@ void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
// Poco/MongoDB/BSONWriter.h:54: void writeCString(const std::string & value);
// src/IO/WriteHelpers.h:146 #define writeCString(s, buf)
#include <IO/WriteHelpers.h>
#include <DataStreams/MongoDBBlockInputStream.h>
#include <DataStreams/MongoDBSource.h>
namespace DB

View File

@ -12,7 +12,7 @@
# include "DictionaryStructure.h"
# include "ExternalQueryBuilder.h"
# include "IDictionarySource.h"
# include <Formats/MySQLBlockInputStream.h>
# include <Formats/MySQLSource.h>
namespace Poco
{

View File

@ -7,7 +7,7 @@
#if USE_LIBPQXX
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/PostgreSQLBlockInputStream.h>
#include <DataStreams/PostgreSQLSource.h>
#include "readInvalidateQuery.h"
#include <Interpreters/Context.h>
#endif

View File

@ -31,7 +31,7 @@ void registerDictionarySourceRedis(DictionarySourceFactory & factory)
#include <IO/WriteHelpers.h>
#include "RedisBlockInputStream.h"
#include "RedisSource.h"
namespace DB

View File

@ -1,4 +1,4 @@
#include "RedisBlockInputStream.h"
#include "RedisSource.h"
#include <string>
#include <vector>

View File

@ -22,9 +22,9 @@ NO_COMPILER_WARNINGS()
SRCS(
CacheDictionary.cpp
CacheDictionaryUpdateQueue.cpp
CassandraBlockInputStream.cpp
CassandraDictionarySource.cpp
CassandraHelpers.cpp
CassandraSource.cpp
ClickHouseDictionarySource.cpp
DictionaryFactory.cpp
DictionarySource.cpp
@ -57,8 +57,8 @@ SRCS(
PolygonDictionaryImplementations.cpp
PolygonDictionaryUtils.cpp
RangeHashedDictionary.cpp
RedisBlockInputStream.cpp
RedisDictionarySource.cpp
RedisSource.cpp
XDBCDictionarySource.cpp
getDictionaryConfigurationFromAST.cpp
readInvalidateQuery.cpp

View File

@ -19,7 +19,7 @@
#include <Common/assert_cast.h>
#include <common/range.h>
#include <common/logger_useful.h>
#include "MySQLBlockInputStream.h"
#include "MySQLSource.h"
namespace DB

View File

@ -58,7 +58,7 @@ protected:
ExternalResultDescription description;
};
/// Like MySQLBlockInputStream, but allocates connection only when reading is starting.
/// Like MySQLSource, but allocates connection only when reading is starting.
/// It allows to create a lot of stream objects without occupation of all connection pool.
/// Also makes attempts to reconnect in case of connection failures.
class MySQLWithFailoverSource final : public MySQLSource

View File

@ -14,7 +14,7 @@ SRCS(
FormatFactory.cpp
FormatSchemaInfo.cpp
JSONEachRowUtils.cpp
MySQLBlockInputStream.cpp
MySQLSource.cpp
NativeFormat.cpp
NullFormat.cpp
ParsedTemplateFormatString.cpp

View File

@ -1,6 +1,6 @@
#include "PostgreSQLReplicationHandler.h"
#include <DataStreams/PostgreSQLBlockInputStream.h>
#include <DataStreams/PostgreSQLSource.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>

View File

@ -15,7 +15,7 @@
#include <Parsers/ASTLiteral.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h>
#include <DataStreams/MongoDBBlockInputStream.h>
#include <DataStreams/MongoDBSource.h>
namespace DB
{

View File

@ -4,7 +4,7 @@
#include <Storages/StorageFactory.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <Formats/MySQLBlockInputStream.h>
#include <Formats/MySQLSource.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>

View File

@ -1,7 +1,7 @@
#include "StoragePostgreSQL.h"
#if USE_LIBPQXX
#include <DataStreams/PostgreSQLBlockInputStream.h>
#include <DataStreams/PostgreSQLSource.h>
#include <Storages/StorageFactory.h>
#include <Storages/transformQueryForExternalDatabase.h>

View File

@ -2,7 +2,7 @@
#if USE_SQLITE
#include <common/range.h>
#include <DataStreams/SQLiteBlockInputStream.h>
#include <DataStreams/SQLiteSource.h>
#include <Databases/SQLite/SQLiteUtils.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/Context.h>
@ -78,8 +78,7 @@ Pipe StorageSQLite::read(
sample_block.insert({column_data.type, column_data.name});
}
return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<SQLiteBlockInputStream>(sqlite_db, query, sample_block, max_block_size)));
return Pipe(std::make_shared<SQLiteSource>(sqlite_db, query, sample_block, max_block_size));
}

View File

@ -8,7 +8,7 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/convertMySQLDataType.h>
#include <Formats/MySQLBlockInputStream.h>
#include <Formats/MySQLSource.h>
#include <IO/Operators.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>