Revert "Parallel distributed insert select from *Cluster table functions (#39107)"

This reverts commit d3cc234986.
This commit is contained in:
Alexander Tokmakov 2022-08-24 15:17:15 +03:00 committed by GitHub
parent 0781e8b4f7
commit f9f85a0e8b
12 changed files with 88 additions and 526 deletions

View File

@ -326,7 +326,7 @@ BlockIO InterpreterInsertQuery::execute()
if (!query.table_function)
getContext()->checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames());
if (query.select && settings.parallel_distributed_insert_select)
if (query.select && table->isRemote() && settings.parallel_distributed_insert_select)
// Distributed INSERT SELECT
distributed_pipeline = table->distributedWrite(query, getContext());

View File

@ -41,7 +41,7 @@ StorageHDFSCluster::StorageHDFSCluster(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & compression_method_)
: IStorageCluster(table_id_)
: IStorage(table_id_)
, cluster_name(cluster_name_)
, uri(uri_)
, format_name(format_name_)
@ -74,7 +74,13 @@ Pipe StorageHDFSCluster::read(
size_t /*max_block_size*/,
unsigned /*num_streams*/)
{
createIteratorAndCallback(context);
auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context, uri);
auto callback = std::make_shared<HDFSSource::IteratorWrapper>([iterator]() mutable -> String
{
return iterator->next();
});
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block header =
@ -134,29 +140,6 @@ QueryProcessingStage::Enum StorageHDFSCluster::getQueryProcessingStage(
}
void StorageHDFSCluster::createIteratorAndCallback(ContextPtr context) const
{
cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context, uri);
callback = std::make_shared<HDFSSource::IteratorWrapper>([iter = this->iterator]() mutable -> String { return iter->next(); });
}
RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(ContextPtr context) const
{
createIteratorAndCallback(context);
return RemoteQueryExecutor::Extension{.task_iterator = callback};
}
ClusterPtr StorageHDFSCluster::getCluster(ContextPtr context) const
{
createIteratorAndCallback(context);
return cluster;
}
NamesAndTypesList StorageHDFSCluster::getVirtuals() const
{
return NamesAndTypesList{

View File

@ -9,7 +9,6 @@
#include <Client/Connection.h>
#include <Interpreters/Cluster.h>
#include <Storages/IStorageCluster.h>
#include <Storages/HDFS/StorageHDFS.h>
namespace DB
@ -17,7 +16,7 @@ namespace DB
class Context;
class StorageHDFSCluster : public IStorageCluster
class StorageHDFSCluster : public IStorage
{
public:
StorageHDFSCluster(
@ -40,20 +39,11 @@ public:
NamesAndTypesList getVirtuals() const override;
ClusterPtr getCluster(ContextPtr context) const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ContextPtr context) const override;
private:
String cluster_name;
String uri;
String format_name;
String compression_method;
mutable ClusterPtr cluster;
mutable std::shared_ptr<HDFSSource::DisclosedGlobIterator> iterator;
mutable std::shared_ptr<HDFSSource::IteratorWrapper> callback;
void createIteratorAndCallback(ContextPtr context) const;
};

View File

@ -1,28 +0,0 @@
#pragma once
#include <Storages/IStorage.h>
#include <Interpreters/Cluster.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
namespace DB
{
/**
* Base cluster for Storages used in table functions like s3Cluster and hdfsCluster
* Needed for code simplification around parallel_distributed_insert_select
*/
class IStorageCluster: public IStorage
{
public:
explicit IStorageCluster(const StorageID & table_id_) : IStorage(table_id_) {}
virtual ClusterPtr getCluster(ContextPtr context) const = 0;
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(ContextPtr context) const = 0;
bool isRemote() const override { return true; }
};
}

View File

@ -59,8 +59,6 @@
#include <TableFunctions/TableFunctionView.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/IStorageCluster.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
@ -761,16 +759,33 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata
}
std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistributedTables(const StorageDistributed & src_distributed, const ASTInsertQuery & query, ContextPtr local_context) const
std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
{
const auto & settings = local_context->getSettingsRef();
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
QueryPipeline pipeline;
const Settings & settings = local_context->getSettingsRef();
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
std::shared_ptr<StorageDistributed> storage_src;
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
if (select.list_of_selects->children.size() == 1)
{
if (auto * select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery>())
{
JoinedTables joined_tables(Context::createCopy(local_context), *select_query);
if (joined_tables.tablesCount() == 1)
{
storage_src = std::dynamic_pointer_cast<StorageDistributed>(joined_tables.getLeftTableStorage());
if (storage_src)
{
/// Unwrap view() function.
if (src_distributed.remote_table_function_ptr)
if (storage_src->remote_table_function_ptr)
{
const TableFunctionPtr src_table_function =
TableFunctionFactory::instance().get(src_distributed.remote_table_function_ptr, local_context);
TableFunctionFactory::instance().get(storage_src->remote_table_function_ptr, local_context);
const TableFunctionView * view_function =
assert_cast<const TableFunctionView *>(src_table_function.get());
new_query->select = view_function->getSelectQuery().clone();
@ -780,16 +795,19 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistribu
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
auto * select = query.select->as<ASTSelectWithUnionQuery &>().list_of_selects->children.at(0)->as<ASTSelectQuery>();
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select->clone());
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select_query->clone());
select_with_union_query->list_of_selects->children.push_back(new_select_query);
new_select_query->replaceDatabaseAndTable(src_distributed.getRemoteDatabaseName(), src_distributed.getRemoteTableName());
new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName());
new_query->select = select_with_union_query;
}
}
}
}
}
const Cluster::AddressesWithFailover & src_addresses = src_distributed.getCluster()->getShardsAddresses();
const Cluster::AddressesWithFailover & src_addresses = storage_src ? storage_src->getCluster()->getShardsAddresses() : Cluster::AddressesWithFailover{};
const Cluster::AddressesWithFailover & dst_addresses = getCluster()->getShardsAddresses();
/// Compare addresses instead of cluster name, to handle remote()/cluster().
/// (since for remote()/cluster() the getClusterName() is empty string)
@ -804,7 +822,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistribu
LOG_WARNING(log,
"Parallel distributed INSERT SELECT is not possible "
"(source cluster={} ({} addresses), destination cluster={} ({} addresses))",
src_distributed.getClusterName(),
storage_src ? storage_src->getClusterName() : "<not a Distributed table>",
src_addresses.size(),
getClusterName(),
dst_addresses.size());
@ -831,7 +849,6 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistribu
new_query_str = buf.str();
}
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
@ -865,114 +882,6 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistribu
}
std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStorage(const IStorageCluster & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context) const
{
const auto & settings = local_context->getSettingsRef();
auto extension = src_storage_cluster.getTaskIteratorExtension(local_context);
auto dst_cluster = getCluster();
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
if (settings.parallel_distributed_insert_select == PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL)
{
new_query->table_id = StorageID(getRemoteDatabaseName(), getRemoteTableName());
/// Reset table function for INSERT INTO remote()/cluster()
new_query->table_function.reset();
}
String new_query_str;
{
WriteBufferFromOwnString buf;
IAST::FormatSettings ast_format_settings(buf, /*one_line*/ true);
ast_format_settings.always_quote_identifiers = true;
new_query->IAST::format(ast_format_settings);
new_query_str = buf.str();
}
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
/// Here we take addresses from destination cluster and assume source table exists on these nodes
for (const auto & replicas : getCluster()->getShardsAddresses())
{
/// There will be only one replica, because we consider each replica as a shard
for (const auto & node : replicas)
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, query_context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
"ParallelInsertSelectInititiator",
node.compression,
node.secure
);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection,
new_query_str,
Block{},
query_context,
/*throttler=*/nullptr,
Scalars{},
Tables{},
QueryProcessingStage::Complete,
extension);
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote));
remote_pipeline.complete(std::make_shared<EmptySink>(remote_query_executor->getHeader()));
pipeline.addCompletedPipeline(std::move(remote_pipeline));
}
}
return pipeline;
}
std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
{
const Settings & settings = local_context->getSettingsRef();
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
StoragePtr src_storage;
if (select.list_of_selects->children.size() == 1)
{
if (auto * select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery>())
{
JoinedTables joined_tables(Context::createCopy(local_context), *select_query);
if (joined_tables.tablesCount() == 1)
{
src_storage = joined_tables.getLeftTableStorage();
}
}
}
if (!src_storage)
return {};
if (auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(src_storage))
{
return distributedWriteBetweenDistributedTables(*src_distributed, query, local_context);
}
else if (auto src_storage_cluster = std::dynamic_pointer_cast<IStorageCluster>(src_storage))
{
return distributedWriteFromClusterStorage(*src_storage_cluster, query, local_context);
}
else if (local_context->getClientInfo().distributed_depth == 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parallel distributed INSERT SELECT is not possible. "\
"Reason: distributed reading is supported only from Distributed engine or *Cluster table functions, but got {} storage", src_storage->getName());
}
return {};
}
void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
{
auto name_deps = getDependentViewsByColumn(local_context);

View File

@ -1,7 +1,6 @@
#pragma once
#include <Storages/IStorage.h>
#include <Storages/IStorageCluster.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/Distributed/DistributedSettings.h>
#include <Storages/getStructureOfRemoteTable.h>
@ -208,9 +207,6 @@ private:
void delayInsertOrThrowIfNeeded() const;
std::optional<QueryPipeline> distributedWriteFromClusterStorage(const IStorageCluster & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context) const;
std::optional<QueryPipeline> distributedWriteBetweenDistributedTables(const StorageDistributed & src_distributed, const ASTInsertQuery & query, ContextPtr context) const;
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;

View File

@ -45,13 +45,11 @@
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTPartition.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ExpressionListParsers.h>
@ -61,7 +59,6 @@
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Sinks/EmptySink.h>
#include <IO/ReadBufferFromString.h>
#include <IO/Operators.h>
@ -76,7 +73,6 @@
#include <Interpreters/InterserverCredentials.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/JoinedTables.h>
#include <Backups/BackupEntriesCollector.h>
#include <Backups/IBackup.h>
@ -163,7 +159,6 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int CONCURRENT_ACCESS_NOT_SUPPORTED;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
}
namespace ActionLocks
@ -4472,106 +4467,6 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con
}
std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context)
{
const auto & settings = local_context->getSettingsRef();
auto extension = src_storage_cluster->getTaskIteratorExtension(local_context);
/// Here we won't check that the cluster formed from table replicas is a subset of a cluster specified in s3Cluster/hdfsCluster table function
auto src_cluster = src_storage_cluster->getCluster(local_context);
/// Actually the query doesn't change, we just serialize it to string
String query_str;
{
WriteBufferFromOwnString buf;
IAST::FormatSettings ast_format_settings(buf, /*one_line*/ true);
ast_format_settings.always_quote_identifiers = true;
query.IAST::format(ast_format_settings);
query_str = buf.str();
}
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
for (const auto & replicas : src_cluster->getShardsAddresses())
{
/// There will be only one replica, because we consider each replica as a shard
for (const auto & node : replicas)
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, query_context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
"ParallelInsertSelectInititiator",
node.compression,
node.secure
);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection,
query_str,
Block{},
query_context,
/*throttler=*/nullptr,
Scalars{},
Tables{},
QueryProcessingStage::Complete,
extension);
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote));
remote_pipeline.complete(std::make_shared<EmptySink>(remote_query_executor->getHeader()));
pipeline.addCompletedPipeline(std::move(remote_pipeline));
}
}
return pipeline;
}
std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
{
/// Do not enable parallel distributed INSERT SELECT in case when query probably comes from another server
if (local_context->getClientInfo().query_kind != ClientInfo::QueryKind::INITIAL_QUERY)
return {};
const Settings & settings = local_context->getSettingsRef();
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
StoragePtr src_storage;
if (select.list_of_selects->children.size() == 1)
{
if (auto * select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery>())
{
JoinedTables joined_tables(Context::createCopy(local_context), *select_query);
if (joined_tables.tablesCount() == 1)
{
src_storage = joined_tables.getLeftTableStorage();
}
}
}
if (!src_storage)
return {};
if (auto src_distributed = std::dynamic_pointer_cast<IStorageCluster>(src_storage))
{
return distributedWriteFromClusterStorage(src_distributed, query, local_context);
}
else if (local_context->getClientInfo().distributed_depth == 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parallel distributed INSERT SELECT is not possible. Reason: distributed "\
"reading into Replicated table is supported only from *Cluster table functions, but got {} storage", src_storage->getName());
}
return {};
}
bool StorageReplicatedMergeTree::optimize(
const ASTPtr &,
const StorageMetadataPtr &,

View File

@ -4,7 +4,6 @@
#include <atomic>
#include <pcg_random.hpp>
#include <Storages/IStorage.h>
#include <Storages/IStorageCluster.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
@ -137,8 +136,6 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
std::optional<QueryPipeline> distributedWrite(const ASTInsertQuery & /*query*/, ContextPtr /*context*/) override;
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
@ -468,8 +465,6 @@ private:
std::mutex last_broken_disks_mutex;
std::set<String> last_broken_disks;
static std::optional<QueryPipeline> distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context);
template <class Func>
void foreachActiveParts(Func && func, bool select_sequential_consistency) const;

View File

@ -56,7 +56,7 @@ StorageS3Cluster::StorageS3Cluster(
const ConstraintsDescription & constraints_,
ContextPtr context_,
const String & compression_method_)
: IStorageCluster(table_id_)
: IStorage(table_id_)
, s3_configuration{S3::URI{Poco::URI{filename_}}, access_key_id_, secret_access_key_, {}, {}, S3Settings::ReadWriteSettings(context_->getSettingsRef())}
, filename(filename_)
, cluster_name(cluster_name_)
@ -105,7 +105,12 @@ Pipe StorageS3Cluster::read(
unsigned /*num_streams*/)
{
StorageS3::updateS3Configuration(context, s3_configuration);
createIteratorAndCallback(query_info.query, context);
auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query_info.query, virtual_block, context);
auto callback = std::make_shared<StorageS3Source::IteratorWrapper>([iterator]() mutable -> String { return iterator->next(); });
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block header =
@ -165,29 +170,6 @@ QueryProcessingStage::Enum StorageS3Cluster::getQueryProcessingStage(
}
void StorageS3Cluster::createIteratorAndCallback(ASTPtr query, ContextPtr context) const
{
cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query, virtual_block, context);
callback = std::make_shared<StorageS3Source::IteratorWrapper>([iter = this->iterator]() mutable -> String { return iter->next(); });
}
RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(ContextPtr context) const
{
createIteratorAndCallback(/*query=*/nullptr, context);
return RemoteQueryExecutor::Extension{.task_iterator = callback};
}
ClusterPtr StorageS3Cluster::getCluster(ContextPtr context) const
{
createIteratorAndCallback(/*query=*/nullptr, context);
return cluster;
}
NamesAndTypesList StorageS3Cluster::getVirtuals() const
{
return virtual_columns;

View File

@ -10,7 +10,6 @@
#include "Client/Connection.h"
#include <Interpreters/Cluster.h>
#include <IO/S3Common.h>
#include <Storages/IStorageCluster.h>
#include <Storages/StorageS3.h>
namespace DB
@ -18,7 +17,7 @@ namespace DB
class Context;
class StorageS3Cluster : public IStorageCluster
class StorageS3Cluster : public IStorage
{
public:
StorageS3Cluster(
@ -43,22 +42,15 @@ public:
NamesAndTypesList getVirtuals() const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ContextPtr context) const override;
ClusterPtr getCluster(ContextPtr context) const override;
private:
StorageS3::S3Configuration s3_configuration;
String filename;
String cluster_name;
String format_name;
String compression_method;
NamesAndTypesList virtual_columns;
Block virtual_block;
mutable ClusterPtr cluster;
mutable std::shared_ptr<StorageS3Source::DisclosedGlobIterator> iterator;
mutable std::shared_ptr<StorageS3Source::IteratorWrapper> callback;
void createIteratorAndCallback(ASTPtr query, ContextPtr context) const;
};

View File

@ -20,21 +20,6 @@
</shard>
</cluster_simple>
<!-- A part of the cluster above, represents only one shard-->
<first_shard>
<shard>
<replica>
<host>s0_0_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_0_1</host>
<port>9000</port>
</replica>
</shard>
</first_shard>
</remote_servers>
<macros>
<default_cluster_macro>cluster_simple</default_cluster_macro>

View File

@ -34,24 +34,10 @@ def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"s0_0_0",
main_configs=["configs/cluster.xml"],
macros={"replica": "node1", "shard": "shard1"},
with_minio=True,
with_zookeeper=True,
)
cluster.add_instance(
"s0_0_1",
main_configs=["configs/cluster.xml"],
macros={"replica": "replica2", "shard": "shard1"},
with_zookeeper=True,
)
cluster.add_instance(
"s0_1_0",
main_configs=["configs/cluster.xml"],
macros={"replica": "replica1", "shard": "shard2"},
with_zookeeper=True,
"s0_0_0", main_configs=["configs/cluster.xml"], with_minio=True
)
cluster.add_instance("s0_0_1", main_configs=["configs/cluster.xml"])
cluster.add_instance("s0_1_0", main_configs=["configs/cluster.xml"])
logging.info("Starting cluster...")
cluster.start()
@ -209,126 +195,3 @@ def test_ambiguous_join(started_cluster):
"""
)
assert "AMBIGUOUS_COLUMN_NAME" not in result
def test_distributed_insert_select(started_cluster):
first_replica_first_shard = started_cluster.instances["s0_0_0"]
second_replica_first_shard = started_cluster.instances["s0_0_1"]
first_replica_second_shard = started_cluster.instances["s0_1_0"]
first_replica_first_shard.query(
"""
CREATE TABLE insert_select_local ON CLUSTER 'cluster_simple' (a String, b UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select', '{replica}')
ORDER BY (a, b);
"""
)
first_replica_first_shard.query(
"""
CREATE TABLE insert_select_distributed ON CLUSTER 'cluster_simple' as insert_select_local
ENGINE = Distributed('cluster_simple', default, insert_select_local, b % 2);
"""
)
for file_number in range(100):
first_replica_first_shard.query(
"""
INSERT INTO TABLE FUNCTION s3('http://minio1:9001/root/data/generated/file_{}.csv', 'minio', 'minio123', 'CSV','a String, b UInt64')
SELECT repeat('{}', 10), number from numbers(100);
""".format(
file_number, file_number
)
)
first_replica_first_shard.query(
"""
INSERT INTO insert_select_distributed SELECT * FROM s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/generated/*.csv', 'minio', 'minio123', 'CSV','a String, b UInt64'
) SETTINGS parallel_distributed_insert_select=1;
"""
)
for line in (
first_replica_first_shard.query("""SELECT * FROM insert_select_local;""")
.strip()
.split("\n")
):
_, b = line.split()
assert int(b) % 2 == 0
for line in (
second_replica_first_shard.query("""SELECT * FROM insert_select_local;""")
.strip()
.split("\n")
):
_, b = line.split()
assert int(b) % 2 == 0
for line in (
first_replica_second_shard.query("""SELECT * FROM insert_select_local;""")
.strip()
.split("\n")
):
_, b = line.split()
assert int(b) % 2 == 1
def test_distributed_insert_select_with_replicated(started_cluster):
first_replica_first_shard = started_cluster.instances["s0_0_0"]
second_replica_first_shard = started_cluster.instances["s0_0_1"]
first_replica_first_shard.query(
"""
CREATE TABLE insert_select_replicated_local ON CLUSTER 'first_shard' (a String, b UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select_with_replicated', '{replica}')
ORDER BY (a, b);
"""
)
for replica in [first_replica_first_shard, second_replica_first_shard]:
replica.query(
"""
SYSTEM STOP FETCHES;
"""
)
replica.query(
"""
SYSTEM STOP MERGES;
"""
)
for file_number in range(100):
first_replica_first_shard.query(
"""
INSERT INTO TABLE FUNCTION s3('http://minio1:9001/root/data/generated_replicated/file_{}.csv', 'minio', 'minio123', 'CSV','a String, b UInt64')
SELECT repeat('{}', 10), number from numbers(100);
""".format(
file_number, file_number
)
)
first_replica_first_shard.query(
"""
INSERT INTO insert_select_replicated_local SELECT * FROM s3Cluster(
'first_shard',
'http://minio1:9001/root/data/generated_replicated/*.csv', 'minio', 'minio123', 'CSV','a String, b UInt64'
) SETTINGS parallel_distributed_insert_select=1;
"""
)
first = int(
first_replica_first_shard.query(
"""SELECT count(*) FROM insert_select_replicated_local"""
).strip()
)
second = int(
second_replica_first_shard.query(
"""SELECT count(*) FROM insert_select_replicated_local"""
).strip()
)
assert first != 0
assert second != 0
assert first + second == 100 * 100